작성
·
119
0
package com.example;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class ConsumerPartitionAssignSeek {
public static final Logger logger = LoggerFactory.getLogger(ConsumerPartitionAssignSeek.class.getName());
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_pizza_assign_seek_v001");
//props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "6000");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
//kafkaConsumer.subscribe(List.of(topicName));
kafkaConsumer.assign(Arrays.asList(topicPartition));
kafkaConsumer.seek(topicPartition, 5L);
//main thread
Thread mainThread = Thread.currentThread();
//main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.info(" main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch(InterruptedException e) { e.printStackTrace();}
}
});
//kafkaConsumer.close();
//pollAutoCommit(kafkaConsumer);
//pollCommitSync(kafkaConsumer);
//pollCommitAsync(kafkaConsumer);
pollNoCommit(kafkaConsumer);
}
private static void pollNoCommit(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception != null) {
logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
}
}
});
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("##### commit sync before closing");
kafkaConsumer.commitSync();
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
try {
if(consumerRecords.count() > 0 ) {
kafkaConsumer.commitSync();
logger.info("commit sync has been called");
}
} catch(CommitFailedException e) {
logger.error(e.getMessage());
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
try {
logger.info("main thread is sleeping {} ms during while loop", 10000);
Thread.sleep(10000);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}catch(WakeupException e) { logger.error("wakeup exception has been called");
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
}
해당 코드에 문제가 없는 것으로 보입니다.
java.lang.IllegalStateException: This consumer has already been closed.
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2456)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at com.example.ConsumerPartitionAssignSeek.pollNoCommit(ConsumerPartitionAssignSeek.java:63)
at com.example.ConsumerPartitionAssignSeek.main(ConsumerPartitionAssignSeek.java:53)
해당하는 에러가 띄는데 이유를 알 수 있을까요?
git 코드는 잘돌아가는 것을 확인했습니다. 차이점이 알 수가 없어서 질문드립니다,