• 카테고리

    질문 & 답변
  • 세부 분야

    데이터 엔지니어링

  • 해결 여부

    미해결

Kafka Offset 및 LAG 관련 질문

24.02.05 14:22 작성 24.02.05 21:38 수정 조회수 245

0

안녕하세요. 강의 수강 중 궁금한점이 있어 문의드립니다.  

 

[구성]

파티션 - 5개

컨슈머 - 5개  

 

[문의]

1. 컨슈머 offset과 lag 관련 문의

프로듀서에서 메시지를 생성해서 Kafka로 전송하면 Offset값이 2씩 증가합니다.

CURRENT-OFFSET - 2증가 (7 -> 9)

LOG-END-OFFSET - 2증가 (8 -> 10)

메시지가 소비되고 나면 LAG은 항상 1이 유지되고 있는데 어떤 부분을 체크해 봐야할까요? 
(추가확인 : LAG이 항상 1이 유지되는데 kafka-console.consumer.sh로 확인해보면 메시지는 없는데 LAG이 0으로 변경됩니다.)

 

2. 소비된 메시지가 다시 소비되는 현상

프로듀서로 메시지 생성 후 컨슈머에서 메시지를 소비하였는데 한참 시간이 지난 후 새벽시간(12시간 이후)에 이미 처리된 메시지가 컨슈머에서 다시 처리되는 현상이 발생하는데 설정값에 따라 발생할 수 있는 현상일까요? 

 

3. 이중화 (Active-Active) 구성일 경우 컨슈머 설정

이중화 구성이 되어 있는 경우 컨슈머를 @KafkaListener( concurrency = "2")로 설정하면 컨슈머는 총 4개로 운영되는 구조가 맞는지 궁금합니다. 

 

감사합니다.

답변 1

답변을 작성해보세요.

0

안녕하세요 답변드립니다.

 

1) 프로듀서에서 메시지를 1개 생성해서 카프카로 전송 할 경우 오프셋은 1씩 증가하는 것이 맞습니다. 왜냐면 레코드가 1개 추가되기 때문입니다. 만약 2개 이상으로 변경되는 것에 대한 이유를 알아보시려면 실제로 저장된 브로커에 들어가셔서 해당 레코드를 까서 확인해보는 수 밖에 없습니다. 파일을 확인하기 위한 스크립트는 kafka-dump-log.sh이고, 상세 사용 방법은 https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-dump-log-sh 를 참고하세요.

 

2) 일반적으로 컨슈머가 새벽 이후 이미 처리된 데이터를 다시 처리하지 않습니다. 컨슈머의 상세 옵션과 관련된 로직을 살펴보시는 것이 중요하겠습니다. 기본적으로 실제 시간이 지남과 상관없이 컨슈머는 지속적으로 들어오는 레코드를 파티션별로 차례대로 처리합니다.

 

3) 이중화 구성이라 함은 어떤 부분에서 구축하셨는지에 따라 다릅니다. 만약 컨슈머 애플리케이션을 2개를 운영하고 각 컨슈머 애플리케이션에 스프링 카프카로 concurrency=2로 설정하셨다면 각 컨슈머 애플리케이션에 2개의 컨슈머 스레드로 실행되는 구조라고 이해하시면 좋습니다.

yeon님의 프로필

yeon

질문자

2024.02.06

답변 감사드립니다.

 

kafka-dump-log.sh 를 통해 확인해보니 레코드 정보는 아래와 같습니다.

 

총 4건의 테스트 레코드를 생성하였는데 lastOffset은 7이고 확인결과

baseSequence: -1이 레코드 생성 시 쌍을 이루며 생기고 있는데 해당 부분은 옵션에 따른 문제일까요?

 

[정보]
Log starting offset: 0

baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 3 lastSequence: 3 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1707202069651 size: 405 magic: 2 compresscodec: snappy crc: 1417159843 isvalid: true

baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 405 CreateTime: 1707202069658 size: 78 magic: 2 compresscodec: none crc: 537964878 isvalid: true

baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 4 lastSequence: 4 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 483 CreateTime: 1707202116492 size: 405 magic: 2 compresscodec: snappy crc: 1346530187 isvalid: true

baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 888 CreateTime: 1707202116497 size: 78 magic: 2 compresscodec: none crc: 3576694629 isvalid: true

baseOffset: 4 lastOffset: 4 count: 1 baseSequence: 5 lastSequence: 5 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 966 CreateTime: 1707208543183 size: 405 magic: 2 compresscodec: snappy crc: 655235739 isvalid: true

baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 1371 CreateTime: 1707208543200 size: 78 magic: 2 compresscodec: none crc: 3411664088 isvalid: true

baseOffset: 6 lastOffset: 6 count: 1 baseSequence: 6 lastSequence: 6 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 1449 CreateTime: 1707212758588 size: 405 magic: 2 compresscodec: snappy crc: 2032489199 isvalid: true

baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 1854 CreateTime: 1707212758595 size: 78 magic: 2 compresscodec: none crc: 3917563614 isvalid: true

보여주신 log dump를 보면, isTransactional: true 로 되어 있는 것이 보입니다. 해당 부분은 producer가 transaction producer로 동작했음을 뜻합니다. producer를 실행시킬 때 나오는 로그에 transaction이 켜져 있을 것입니다.

transaction producer는 transaction 처리를 위한 추가 레코드를 보내기 때문에 실제로 보낸 레코드 개수보다 오프셋의 번호가 더 높을 수도 있음을 참고해주세요~