인프런 커뮤니티 질문&답변

연어님의 프로필 이미지
연어

작성한 질문수

카프카 완벽 가이드 - 코어편

Consumer에서 토픽 특정 파티션의 특정 offset 부터 읽어오기 구현 실습

토픽에 데이터가 없을 때 offset이 0이 되는 현상 문의

해결된 질문

작성

·

428

·

수정됨

0

- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요!
- 먼저 유사한 질문이 있었는지 검색해보세요.
- 서로 예의를 지키며 존중하는 문화를 만들어가요.
- 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.

 

선생님 안녕하십니까!

강의를 열심히 듣고 있는 수강생입니다.

 

다름이아니라, 토픽에 데이터가 없을 때 offset이 0이 된다는 로그는 어디서부터 기인하는 것인지 궁금하여 질문 드립니다.

 

제가 이해하기로 AUTO_COMMIT false가 되어있으면 명시적으로 commit을 호출하지 않으면 commit이 되지 않는 것으로 알고있습니다 (테스트도 해보았습니다.)

 

하지만, 강의 6:00경 시나리오.

즉, 토픽에 아무런 데이터가 없는 상태에서 컨슈머만 켠 케이스에서 저는 커밋(commitSync)을 하지 않았는데 poll을 몇번 돌다보면 offset이 0이 되었다는 로그가 발생합니다.

(실제로 __consumer_offsets-* 파일에는 offset이 기록되진 않고, 그래서 컨슈머를 껐다 켜면 offset 0부터 읽습니다)

 

저는 commit을 한적이 없으므로 실제로 __consumer_offsets에는 0으로 기록되지 않는데, 저 로그에 있는 offset이 0은 어디서부터 오는 것일까요?

 

+) 코드상으로는 poll 시점에 updateAssignmentMetadataIfNeeded (maybeSeekUnvalidated) 에서 해당 로그가 찍히는데요, 이건 무슨 정책일까요? 관련 자료가 있으면 공유주셔도 감사합니다.

 

// offset=0 설정이 되어버림

[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Fetch position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition pizza-topic-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Resetting offset for partition pizza-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

 

테스트한 코드 일부 공유드립니다.

(필요할 설정들을 했으며, 정말 커밋이 안되고 있는지 확인하기 위해 records.count()>100일 때만 명시적으로 커밋을 한 코드입니다)

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
TopicPartition topicPartition = new TopicPartition(topicName, 0);
kafkaConsumer.assign(List.of(topicPartition));
kafkaConsumer.seek(topicPartition, 10L);
while(true) {
    ConsumerRecords<String, String> records =  kafkaConsumer.poll(Duration.ofMillis(1_000L));
    for(ConsumerRecord<String, String> record: records) {
        log.info("record key: {}, partition: {}, recordOffset: {}",
            record.key(), record.partition(), record.offset());
    }

    if (records.count() > 100 ){
        kafkaConsumer.commitSync();
        log.info("commit sync called");
    }

}

 

감사합니다 ^^

 

답변 1

1

권 철민님의 프로필 이미지
권 철민
지식공유자

안녕하십니까,

제가 돌려 볼수 있도록 테스트 코드 일부 말고, 전체를 올려 주시겠습니까?

 

감사합니다.

연어님의 프로필 이미지
연어
질문자

선생님 안녕하세요.

죄송합니다. 잘 모르다보니 횡설수설 했습니다. 생각을 좀 정리하였고, 다시 한 번 말씀드리겠습니다.

 

조건 : seek(N)으로 데이터를 읽으려고 할 때 토픽에 N보다 적은 수의 데이터가 있는 경우 (즉 offset이 없거나 N보다 작을 경우)

현상 : 처음 FetchPosition의 offset은 N이었으나 이어서 consumer가 토픽의 마지막 offset으로 컨슈머의 FetchPosition을 덮어쓰고, topic에 데이터가 쌓이면 그 설정된 FetchPosition(토픽의 마지막 offset) 데이터를 읽어옵니다 (Fetcher.java의 resetOffsetIfNeeded() 에서 수행됩니다. )

왜 FetchPosition이 seek(N)했을 때 FetchPosition{offset=N}이 어느 순간 토픽의 마지막 offset으로 덮어씌워지는 정책을 가지는지 궁금합니다.

(FetchPosition의 offset을 seek에서 준 값을 그대로 가지고 있지, 왜 굳이 토픽의 마지막 오프셋으로 업데이트하지? 싶어서요)

// seek 값 10을 가지는 FetchPosition 로그 (FetchPosition{offset=10 ...)
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-11-1, groupId=group-pizza-assign-seek-maintenance-11] Fetch position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition pizza-topic-0, resetting offset

// topic의 마지막 offset인 1로 덮어씌워지는 FetchPosition 로그 (FetchPosition{offset=1 ...)
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-11-1, groupId=group-pizza-assign-seek-maintenance-11] Resetting offset for partition pizza-topic-0 to position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

 

테스트한 코드도 첨부드립니다.

(partiton3개인 pizzaProducer를 사용하였고 consumer도 선생님께서 강의에서 진행해주신 코드와 거의 똑같습니다)

  public static void main(String[] args) {

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-pizza-assign-seek-maintenance-11");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        String topicName = "pizza-topic";

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        TopicPartition topicPartition = new TopicPartition(topicName, 0);

        kafkaConsumer.assign(List.of(topicPartition));
        kafkaConsumer.seek(topicPartition, 10L);

        Thread mainThread = Thread.currentThread();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("main program starts to exit by calling wakeup");
            kafkaConsumer.wakeup();

            try{
                mainThread.join();
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        }));


        pollCommitSync(kafkaConsumer);
    }

    private static void pollCommitSync(final KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;
        try {
            while(true) {
                ConsumerRecords<String, String> records =  kafkaConsumer.poll(Duration.ofMillis(1_000L));
                log.info("loopCnt: {}, consumerRecords count: {}", loopCnt++, records.count());
                for(ConsumerRecord<String, String> record: records) {
                    log.info("record key: {}, partition: {}, recordOffset: {}",
                        record.key(), record.partition(), record.offset());
                }
                
                try {
                    if (records.count() > 100 ){
                        kafkaConsumer.commitSync();
                        log.info("commit sync called");
                    }
                } catch(CommitFailedException e) {
                    log.error("CommitFailedException! ", e);
                }
            }
        } catch (WakeupException e) {
            log.error("wakeup exception has been called", e);
        } finally {
            log.info("finally consumer is closed");
            kafkaConsumer.close();
        }
    }

 

질문이 고르지 못했던 점 양해 부탁드립니다.

주말에도 답변 감사합니다 선생님!

권 철민님의 프로필 이미지
권 철민
지식공유자

오, 굉장히 재미있는(?) 부분을 테스트 해보셨군요.

seek(N)인데 아직 offset이 N까지 없는 경우 , (제 기억으로는) 과거 버전에서는 Out of range 오류를 내었던 걸로 기억합니다만,

근데 지금은 out of range 오류 시 default offset으로 Fetcher가 position을 잡는 걸로 보입니다. 만약 이렇게 처리하지 않으면 오류를 발생 시키던지, 아님 N 이 될 때 까지 Consumer가 계속 대기 해야 하는데, 이런 부분을 개선하기 위해서 default offset으로 resetting하는 걸로 추정됩니다.

연어님의 프로필 이미지
연어
질문자

아하!

이런 현상이 카프카 설계 원리로 인해 발생하는 것이 아니라,

현 버전의 정책정도일 뿐이고 계속해서 업데이트 될 수 있는 부분이군요!

 

답변 감사드립니다 선생님. 남은 강의도 열심히 듣겠습니다!

즐거운 한주 시작하십쇼!

연어님의 프로필 이미지
연어

작성한 질문수

질문하기