inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!

리밸런싱

Kafka Offset 및 LAG 관련 질문

578

yeon

작성한 질문수 5

0

안녕하세요. 강의 수강 중 궁금한점이 있어 문의드립니다.  

 

[구성]

파티션 - 5개

컨슈머 - 5개  

 

[문의]

1. 컨슈머 offset과 lag 관련 문의

프로듀서에서 메시지를 생성해서 Kafka로 전송하면 Offset값이 2씩 증가합니다.

CURRENT-OFFSET - 2증가 (7 -> 9)

LOG-END-OFFSET - 2증가 (8 -> 10)

메시지가 소비되고 나면 LAG은 항상 1이 유지되고 있는데 어떤 부분을 체크해 봐야할까요? 
(추가확인 : LAG이 항상 1이 유지되는데 kafka-console.consumer.sh로 확인해보면 메시지는 없는데 LAG이 0으로 변경됩니다.)

 

2. 소비된 메시지가 다시 소비되는 현상

프로듀서로 메시지 생성 후 컨슈머에서 메시지를 소비하였는데 한참 시간이 지난 후 새벽시간(12시간 이후)에 이미 처리된 메시지가 컨슈머에서 다시 처리되는 현상이 발생하는데 설정값에 따라 발생할 수 있는 현상일까요? 

 

3. 이중화 (Active-Active) 구성일 경우 컨슈머 설정

이중화 구성이 되어 있는 경우 컨슈머를 @KafkaListener( concurrency = "2")로 설정하면 컨슈머는 총 4개로 운영되는 구조가 맞는지 궁금합니다. 

 

감사합니다.

kafka 데이터-엔지니어링

답변 1

0

데브원영 DVWY

안녕하세요 답변드립니다.

 

1) 프로듀서에서 메시지를 1개 생성해서 카프카로 전송 할 경우 오프셋은 1씩 증가하는 것이 맞습니다. 왜냐면 레코드가 1개 추가되기 때문입니다. 만약 2개 이상으로 변경되는 것에 대한 이유를 알아보시려면 실제로 저장된 브로커에 들어가셔서 해당 레코드를 까서 확인해보는 수 밖에 없습니다. 파일을 확인하기 위한 스크립트는 kafka-dump-log.sh이고, 상세 사용 방법은 https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-dump-log-sh 를 참고하세요.

 

2) 일반적으로 컨슈머가 새벽 이후 이미 처리된 데이터를 다시 처리하지 않습니다. 컨슈머의 상세 옵션과 관련된 로직을 살펴보시는 것이 중요하겠습니다. 기본적으로 실제 시간이 지남과 상관없이 컨슈머는 지속적으로 들어오는 레코드를 파티션별로 차례대로 처리합니다.

 

3) 이중화 구성이라 함은 어떤 부분에서 구축하셨는지에 따라 다릅니다. 만약 컨슈머 애플리케이션을 2개를 운영하고 각 컨슈머 애플리케이션에 스프링 카프카로 concurrency=2로 설정하셨다면 각 컨슈머 애플리케이션에 2개의 컨슈머 스레드로 실행되는 구조라고 이해하시면 좋습니다.

0

yeon

답변 감사드립니다.

 

kafka-dump-log.sh 를 통해 확인해보니 레코드 정보는 아래와 같습니다.

 

총 4건의 테스트 레코드를 생성하였는데 lastOffset은 7이고 확인결과

baseSequence: -1이 레코드 생성 시 쌍을 이루며 생기고 있는데 해당 부분은 옵션에 따른 문제일까요?

 

[정보]
Log starting offset: 0

baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 3 lastSequence: 3 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1707202069651 size: 405 magic: 2 compresscodec: snappy crc: 1417159843 isvalid: true

baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 405 CreateTime: 1707202069658 size: 78 magic: 2 compresscodec: none crc: 537964878 isvalid: true

baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 4 lastSequence: 4 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 483 CreateTime: 1707202116492 size: 405 magic: 2 compresscodec: snappy crc: 1346530187 isvalid: true

baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 888 CreateTime: 1707202116497 size: 78 magic: 2 compresscodec: none crc: 3576694629 isvalid: true

baseOffset: 4 lastOffset: 4 count: 1 baseSequence: 5 lastSequence: 5 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 966 CreateTime: 1707208543183 size: 405 magic: 2 compresscodec: snappy crc: 655235739 isvalid: true

baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 1371 CreateTime: 1707208543200 size: 78 magic: 2 compresscodec: none crc: 3411664088 isvalid: true

baseOffset: 6 lastOffset: 6 count: 1 baseSequence: 6 lastSequence: 6 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 1449 CreateTime: 1707212758588 size: 405 magic: 2 compresscodec: snappy crc: 2032489199 isvalid: true

baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: 6243 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 1854 CreateTime: 1707212758595 size: 78 magic: 2 compresscodec: none crc: 3917563614 isvalid: true

1

데브원영 DVWY

보여주신 log dump를 보면, isTransactional: true 로 되어 있는 것이 보입니다. 해당 부분은 producer가 transaction producer로 동작했음을 뜻합니다. producer를 실행시킬 때 나오는 로그에 transaction이 켜져 있을 것입니다.

transaction producer는 transaction 처리를 위한 추가 레코드를 보내기 때문에 실제로 보낸 레코드 개수보다 오프셋의 번호가 더 높을 수도 있음을 참고해주세요~

consume 이후 lag가 줄어들지 않음

0

74

2

안녕하세요. 강의의 카프카 버전과 현재 시점의 카프카의 차이점 문의 드립니다.

0

110

2

멱등성 프로듀서 retries 관련 질문입니다.

0

97

2

채팅 서비스 개발 시 주의점이 있을까요?

0

97

2

충분히 큰 파티션 생성시 궁금증이 존재합니다.

0

86

2

KTable 키가 없는 레코드 처리

0

72

2

컨슈머 테스트 코드 작성

0

94

2

리밸런스 onPartitionRevoked이 필요한 상황

0

74

2

카프카 클러스터에서 감당 가능한 파티션(레플리카) 수 문의

0

119

2

reset offset 질문

0

76

2

KStreamJoinKTable 실행시 오류

0

77

2

auto.commit.interval.ms 옵션 관련 질문 드립니다.

0

141

2

파티션, 컨슈머 그룹, 컨슈머 관련 질문

0

117

1

java, kotlin

0

130

2

shutdownThread 에 대한 문의 입니다.

0

153

2

zookeeper실행시 오류가 발생합니다.

0

267

2

커스텀 소스 커넥터에서 Thread.sleep (1000) 은 왜 하는거에요?

0

152

2

처리량을 늘리기 위해서 파티션을 늘리고 컨슈머를 늘려야한다고 설명하셨는데요

0

180

3

파티션 개수와 컨슈머 개수의 처리량 관련 질문

0

163

2

동영상 및 이미지 처리 관련 문의 드립니다.

0

224

2

주키퍼 없이 사용 문의 드립니다.

0

302

1

kafka 를 띄우니 오류가 발생하고 종료합니다.

0

278

3

zookeeper 실행시 오류 .. 무엇을 해야 할까요?

0

220

2

파티션 추가로 해결할 수 있지만 늘어난 파티션은 줄일 수 없지 않나요?

0

192

2