nocommit 관련 질문
164
작성한 질문수 22
package com.example;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class ConsumerPartitionAssignSeek {
public static final Logger logger = LoggerFactory.getLogger(ConsumerPartitionAssignSeek.class.getName());
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_pizza_assign_seek_v001");
//props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "6000");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
//kafkaConsumer.subscribe(List.of(topicName));
kafkaConsumer.assign(Arrays.asList(topicPartition));
kafkaConsumer.seek(topicPartition, 5L);
//main thread
Thread mainThread = Thread.currentThread();
//main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.info(" main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try {
mainThread.join();
} catch(InterruptedException e) { e.printStackTrace();}
}
});
//kafkaConsumer.close();
//pollAutoCommit(kafkaConsumer);
//pollCommitSync(kafkaConsumer);
//pollCommitAsync(kafkaConsumer);
pollNoCommit(kafkaConsumer);
}
private static void pollNoCommit(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception != null) {
logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
}
}
});
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("##### commit sync before closing");
kafkaConsumer.commitSync();
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
try {
if(consumerRecords.count() > 0 ) {
kafkaConsumer.commitSync();
logger.info("commit sync has been called");
}
} catch(CommitFailedException e) {
logger.error(e.getMessage());
}
}
}catch(WakeupException e) {
logger.error("wakeup exception has been called");
}catch(Exception e) {
logger.error(e.getMessage());
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
for (ConsumerRecord record : consumerRecords) {
logger.info("record key:{}, partition:{}, record offset:{} record value:{}",
record.key(), record.partition(), record.offset(), record.value());
}
try {
logger.info("main thread is sleeping {} ms during while loop", 10000);
Thread.sleep(10000);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}catch(WakeupException e) { logger.error("wakeup exception has been called");
}finally {
logger.info("finally consumer is closing");
kafkaConsumer.close();
}
}
}
해당 코드에 문제가 없는 것으로 보입니다.
java.lang.IllegalStateException: This consumer has already been closed.
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2456)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at com.example.ConsumerPartitionAssignSeek.pollNoCommit(ConsumerPartitionAssignSeek.java:63)
at com.example.ConsumerPartitionAssignSeek.main(ConsumerPartitionAssignSeek.java:53)
해당하는 에러가 띄는데 이유를 알 수 있을까요?
git 코드는 잘돌아가는 것을 확인했습니다. 차이점이 알 수가 없어서 질문드립니다,
답변 1
virtual box 설치 문제
0
62
2
카프카 서버 구축 관련 문의
0
67
2
카프카 토픽 Key 타입 변경에 관한 질문
0
70
2
Zookeeper에서 KRaft
0
78
1
카프카 학습과 관련하여 질문 드립니다
0
98
2
파티션 증가시 비용 증가 고려
0
72
2
Kafka 초기 Partition 개수 설정 관련 질문
0
83
2
VM 과 도커의 차이
0
139
2
vm 어댑터설정 문의
0
78
2
Cooperative Sticky Rebalancing 질문
0
58
2
consumer 설정 질문
0
68
2
consumer.poll 질문입니다.
1
69
2
mainThread.join() 관련 질문
0
57
2
문의
0
153
2
멀티 브로커 설정 중 포트 충돌 발생
0
120
2
Consumer Group 강의 Lag 질문있습니다!
0
107
2
강의 설명 및 코드 정리
0
174
2
실습 코드는 어디서 받나요.. 아무리 찾아도 엄네요..
0
151
1
java.nio.BufferUnderflowException 에러 발생합니다..
0
158
3
KafkaTimeoutError:
0
157
2
acks 1 이면 비동기가 아니지 않나요?!
0
185
2
Producer의 메시지 비동기화 전송 구현 강좌 내용 중 질문
0
109
2
자문자답: 데이터 누락된다고 하시는 분 참고하세요.
0
216
2
자문자답: kafka Error connecting to node utuntu-20.myguest.virtualbox.org:9092
0
203
2





