토픽에 데이터가 없을 때 offset이 0이 되는 현상 문의
542
작성자 없음
작성한 질문수 0
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!
- 먼저 유사한 질문이 있었는지 검색해보세요.
- 서로 예의를 지키며 존중하는 문화를 만들어가요.
- 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.
선생님 안녕하십니까!
강의를 열심히 듣고 있는 수강생입니다.
다름이아니라, 토픽에 데이터가 없을 때 offset이 0이 된다는 로그는 어디서부터 기인하는 것인지 궁금하여 질문 드립니다.
제가 이해하기로 AUTO_COMMIT false가 되어있으면 명시적으로 commit을 호출하지 않으면 commit이 되지 않는 것으로 알고있습니다 (테스트도 해보았습니다.)
하지만, 강의 6:00경 시나리오.
즉, 토픽에 아무런 데이터가 없는 상태에서 컨슈머만 켠 케이스에서 저는 커밋(commitSync)을 하지 않았는데 poll을 몇번 돌다보면 offset이 0이 되었다는 로그가 발생합니다.
(실제로 __consumer_offsets-* 파일에는 offset이 기록되진 않고, 그래서 컨슈머를 껐다 켜면 offset 0부터 읽습니다)
저는 commit을 한적이 없으므로 실제로 __consumer_offsets에는 0으로 기록되지 않는데, 저 로그에 있는 offset이 0은 어디서부터 오는 것일까요?
+) 코드상으로는 poll 시점에 updateAssignmentMetadataIfNeeded (maybeSeekUnvalidated) 에서 해당 로그가 찍히는데요, 이건 무슨 정책일까요? 관련 자료가 있으면 공유주셔도 감사합니다.
// offset=0 설정이 되어버림
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Fetch position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition pizza-topic-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Resetting offset for partition pizza-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
테스트한 코드 일부 공유드립니다.
(필요할 설정들을 했으며, 정말 커밋이 안되고 있는지 확인하기 위해 records.count()>100일 때만 명시적으로 커밋을 한 코드입니다)
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
TopicPartition topicPartition = new TopicPartition(topicName, 0);
kafkaConsumer.assign(List.of(topicPartition));
kafkaConsumer.seek(topicPartition, 10L);while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1_000L));
for(ConsumerRecord<String, String> record: records) {
log.info("record key: {}, partition: {}, recordOffset: {}",
record.key(), record.partition(), record.offset());
}
if (records.count() > 100 ){
kafkaConsumer.commitSync();
log.info("commit sync called");
}
}
감사합니다 ^^
답변 1
1
안녕하십니까,
제가 돌려 볼수 있도록 테스트 코드 일부 말고, 전체를 올려 주시겠습니까?
감사합니다.
0
선생님 안녕하세요.
죄송합니다. 잘 모르다보니 횡설수설 했습니다. 생각을 좀 정리하였고, 다시 한 번 말씀드리겠습니다.
조건 : seek(N)으로 데이터를 읽으려고 할 때 토픽에 N보다 적은 수의 데이터가 있는 경우 (즉 offset이 없거나 N보다 작을 경우)
현상 : 처음 FetchPosition의 offset은 N이었으나 이어서 consumer가 토픽의 마지막 offset으로 컨슈머의 FetchPosition을 덮어쓰고, topic에 데이터가 쌓이면 그 설정된 FetchPosition(토픽의 마지막 offset) 데이터를 읽어옵니다 (Fetcher.java의 resetOffsetIfNeeded() 에서 수행됩니다. )
왜 FetchPosition이 seek(N)했을 때 FetchPosition{offset=N}이 어느 순간 토픽의 마지막 offset으로 덮어씌워지는 정책을 가지는지 궁금합니다.
(FetchPosition의 offset을 seek에서 준 값을 그대로 가지고 있지, 왜 굳이 토픽의 마지막 오프셋으로 업데이트하지? 싶어서요)
// seek 값 10을 가지는 FetchPosition 로그 (FetchPosition{offset=10 ...)
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-11-1, groupId=group-pizza-assign-seek-maintenance-11] Fetch position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition pizza-topic-0, resetting offset
// topic의 마지막 offset인 1로 덮어씌워지는 FetchPosition 로그 (FetchPosition{offset=1 ...)
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-11-1, groupId=group-pizza-assign-seek-maintenance-11] Resetting offset for partition pizza-topic-0 to position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
테스트한 코드도 첨부드립니다.
(partiton3개인 pizzaProducer를 사용하였고 consumer도 선생님께서 강의에서 진행해주신 코드와 거의 똑같습니다)
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-pizza-assign-seek-maintenance-11");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
String topicName = "pizza-topic";
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
kafkaConsumer.assign(List.of(topicPartition));
kafkaConsumer.seek(topicPartition, 10L);
Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("main program starts to exit by calling wakeup");
kafkaConsumer.wakeup();
try{
mainThread.join();
} catch(InterruptedException e) {
e.printStackTrace();
}
}));
pollCommitSync(kafkaConsumer);
}
private static void pollCommitSync(final KafkaConsumer<String, String> kafkaConsumer) {
int loopCnt = 0;
try {
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1_000L));
log.info("loopCnt: {}, consumerRecords count: {}", loopCnt++, records.count());
for(ConsumerRecord<String, String> record: records) {
log.info("record key: {}, partition: {}, recordOffset: {}",
record.key(), record.partition(), record.offset());
}
try {
if (records.count() > 100 ){
kafkaConsumer.commitSync();
log.info("commit sync called");
}
} catch(CommitFailedException e) {
log.error("CommitFailedException! ", e);
}
}
} catch (WakeupException e) {
log.error("wakeup exception has been called", e);
} finally {
log.info("finally consumer is closed");
kafkaConsumer.close();
}
}
질문이 고르지 못했던 점 양해 부탁드립니다.
주말에도 답변 감사합니다 선생님!
1
오, 굉장히 재미있는(?) 부분을 테스트 해보셨군요.
seek(N)인데 아직 offset이 N까지 없는 경우 , (제 기억으로는) 과거 버전에서는 Out of range 오류를 내었던 걸로 기억합니다만,
근데 지금은 out of range 오류 시 default offset으로 Fetcher가 position을 잡는 걸로 보입니다. 만약 이렇게 처리하지 않으면 오류를 발생 시키던지, 아님 N 이 될 때 까지 Consumer가 계속 대기 해야 하는데, 이런 부분을 개선하기 위해서 default offset으로 resetting하는 걸로 추정됩니다.
0
아하!
이런 현상이 카프카 설계 원리로 인해 발생하는 것이 아니라,
현 버전의 정책정도일 뿐이고 계속해서 업데이트 될 수 있는 부분이군요!
답변 감사드립니다 선생님. 남은 강의도 열심히 듣겠습니다!
즐거운 한주 시작하십쇼!
virtual box 설치 문제
0
60
2
카프카 서버 구축 관련 문의
0
66
2
카프카 토픽 Key 타입 변경에 관한 질문
0
70
2
Zookeeper에서 KRaft
0
78
1
카프카 학습과 관련하여 질문 드립니다
0
96
2
파티션 증가시 비용 증가 고려
0
71
2
Kafka 초기 Partition 개수 설정 관련 질문
0
82
2
VM 과 도커의 차이
0
139
2
vm 어댑터설정 문의
0
78
2
Cooperative Sticky Rebalancing 질문
0
58
2
consumer 설정 질문
0
68
2
consumer.poll 질문입니다.
1
69
2
mainThread.join() 관련 질문
0
57
2
문의
0
153
2
멀티 브로커 설정 중 포트 충돌 발생
0
120
2
Consumer Group 강의 Lag 질문있습니다!
0
107
2
강의 설명 및 코드 정리
0
174
2
실습 코드는 어디서 받나요.. 아무리 찾아도 엄네요..
0
151
1
java.nio.BufferUnderflowException 에러 발생합니다..
0
156
3
KafkaTimeoutError:
0
157
2
acks 1 이면 비동기가 아니지 않나요?!
0
184
2
Producer의 메시지 비동기화 전송 구현 강좌 내용 중 질문
0
109
2
자문자답: 데이터 누락된다고 하시는 분 참고하세요.
0
215
2
자문자답: kafka Error connecting to node utuntu-20.myguest.virtualbox.org:9092
0
200
2





