작성
·
456
0
안녕하세요! 항상 좋은 강의 감사합니다
Offset의 경우 어떻게 No Commit을 인식하고 값을 가져오는지 궁금합니다.
Commit이라는 행위 자체가 Broker의 __consumer_offsets의 그룹별로 offsets 번호를 갱신하는 것으로 이해하고 있습니다.
이 때, No Commit을 하게 되면 Broker는 어떻게 Consumer에게 적절한 Offset을 전달하는 것일까요? 생각한 내용은 아래와 같은데 이게 맞을까요?
Broker는 데이터를 전달하기 전에__consumer_offsets에 그 값을 읽고 보낸다.
보낸 후에 __consumer_offsets에 그 값을 저장한다.
만약, consumer가 offsets commit 메세지를 보내온다면 이 값을 __consumer_offsets에 업데이트한다.
No Commit이라면 Commit 메세지가 없기 때문에 온전히 Broker의 논리대로만 __consumer_offsets가 정해진다.
위와 같이 동작하는게 맞을까요? 만약 맞다면, Consumer async에 대해서도 조금 이상한 것 같습니다. 아래와 같은 방식으로 이상하게 동작할 것 같은데.. 제가 생각하는 것이 맞을까요?
브로커는 100까지 전송한 후 __consumer_offsets에 101을 기록한다.
consumer는 100의 메세지를 전송 받고, 그 다음 메세지를 전송 요청함과 동시에 offsets = 101을 커밋 요청한다.
이 때, 브로커는 300까지 전송한 후 __consumer_offsets에 301을 기록한다.
이 때, consumer가 보낸 offsets = 101 커밋이 전달되어 __consumer_offset은 301 → 101로 변경된다.
브로커는 offsets가 101이기 때문에 101부터 다시 메세지를 보낸다.
바꿔서 이야기하면, Consumer나 Broker에 에러가 발생하지 않아도 중복이 발생할 가능성이 매우 많은 것처럼 보입니다.
또한, sync() 메서드를 이용 + No Commit으로 할 경우에도 동작이 잘 이해가 되지 않습니다.
동일 그룹 ID로 접근할 경우, 이미 __consumer_offsets은 1000 이상의 값이 설정되어 있을 수 있습니다.
이 때, sync()로 10을 설정 + No Commit을 할 경우... 어떻게 첫번째부터 꾸준히 값을 불러오게 되는걸까요?
예를 들어 Broker에서 전달해준 offsets 값을 꾸준히 업데이트 해주는 것이라면, 초기에 10을 전달해준다고 하더라도 Broker는 __consumer_offsets의 값을 1000 이상을 가지고 있기 때문에 1000 이상의 값을 계속 전달해줘야 할 것 같습니다.
그런데 그렇지 않고 10번부터 차곡차곡 전달해주고 있습니다.
이 경우에는 Broker가 전달한 값을 매번 __consumer_offsets에 저장하지 않는 것처럼 보입니다.
이것저것 다 따져보면...결론은 다음과 같을텐데 맞을까요...?
Consumer Client는 자신이 읽고 있는 토픽 + 파티션별로 Offset 정보를 가지고 있다.
poll()을 할 때, Consumer Client는 자신이 가지고 있는 정보를 바탕으로 Broker에게 메세지를 달라고 한다.
Broker는 Consumer로부터 Commit이 온 것만 __consumer_offsets에 저장한다.
제가 위에 생각한 경우들이 맞을까요??
감사합니다!
답변 1
1
안녕하십니까,
질문 내용이 No Commit, __consumer_offsets 그리고 seek가 섞여 있어서 제가 차례로 답변 드리겠습니다.
먼저 No commit 으로 consumer가 message를 읽으면 __consumer_offsets에 offset 정보가 기록되지 않습니다.
broker가 __consumer_offfsets에 offset 정보를 기록할 때는 consumer client로부터 동기 또는 비동기 commit 요청을 받았을 때만 진행합니다. 따라서 client code에서 kafkaConsumer.commitAsync() 또는 kafkaConsumer.commitSync()를 호출하지 않으면 __consumer_offsets에 offset 정보가 기록되지 않으며 Consumer가 다시 해당 topic partition을 consume할 경우 이미 읽어온 데이터를 다시 읽게 되는 중복 처리를 하게 됩니다.
브로커의 토픽 파티션 메시지는 Consumer Group내에서 중복 처리되지 않게 유지하는게 대부분의 업무처리에서 필요한 사항입니다(1개의 주문을 이미 처리했으면 다시 처리할 필요 없음). 때문에 commit의 정합성을 유지하는것 역시 매우 필요한 사항입니다. 만약 해당 토픽 파티션 메시지를 동일하게 반복해서 consume하려면 별도의 Consumer Group에서 각각 처리해야 합니다.
__consumer_offsets 내부 토픽은 개별 Consumer Group 별로 특정 토픽의 특정 파티션을 어느 offset 까지 consume했는지 offset 정보를 가지고 있습니다. 즉 consumer group + 토픽 + 파티션 레벨로 어디까지 offset을 읽었는지에 대한 정보를 가집니다.
consumer는 poll() 수행 시 자신이 가지고 있는 정보를 바탕으로 메시지를 요구하지 않습니다. consumer는 자신이 소속된 Consumer Group내에서 자신이 읽어야 할 토픽 파티션의 offset 번호가 무엇인지를 __consumer_offsets에서 확인한 후 그 지점부터 지속적으로 메시지를 읽어 들입니다. 그리고 자신이 메시지를 읽었으면 commit을 호출해서 읽은 메시지의 offset을 __consumer_offsets 에 저장합니다.
하지만 유지보수 업무의 차원에서 토픽 파티션의 특정 offset 번호부터 다시 읽어서 처리해야 할 필요가 발생 할 수 있습니다. 그런 경우에는 consumer에서 seek( )를 호출해서 __consumer_offsets에 기재된 offset이 아니라 seek( )로 지정된 특정 offset 이후로 메시지를 읽어 들일 수 있습니다. 다만 이렇게 처리할 경우 seek( )로 읽은 다음에 commit을 해버리게 되면 기존의 commit offset 정보가 update 되어 버릴 수 있음에 유의 해야 합니다.
결론적으로 말씀드리자면
Consumer Client는 자신이 읽고 있는 토픽 + 파티션별로 Offset 정보를 가지고 있다.
poll()을 할 때, Consumer Client는 자신이 가지고 있는 정보를 바탕으로 Broker에게 메세지를 달라고 한다.
=> Consumer Client는 poll( ) 수행시 자신이 가지고 있는 정보가 아니라 __consumer_offsets 의 offset 정보 이후의 메시지를 Broker로 부터 가져 갑니다.
Broker는 Consumer로부터 Commit이 온 것만 __consumer_offsets에 저장한다.
=> 네 맞습니다.
감사합니다.
seek() 메소드를 사용하겠다는 것은 consumer가 __consumer_offsets에서 offset을 참조하지 않겠다는 의미 입니다. seek() 메소드의 인자로 지정된 offset 번호를 이용해서 메시지를 읽겠다는 의미 입니다.
즉 아래 코드는 consumer가 읽을 토픽 파티션을 할당하고 seek()를 통해 직접 5번 offset 부터 브로커의 메시지를 읽겠다는 것입니다.
kafkaConsumer.assign(Arrays.asList(topicPartition));
kafkaConsumer.seek(topicPartition, 5L);
그리고 poll()을 통해 5번 offset부터 메시지를 10개 읽으면 다음 poll()에서 15번 offset부터 읽어오면 됩니다. poll() 을 수행할 때 마다 다음에 읽어 들일 offset은 consumer client에서 당연히 가지고 있습니다.
__consumer_offsets은 consumer가 처음 메시지를 읽어들일 때 참조합니다. 마찬가지로 seek()로 지정된 offset도 consumer가 처음 메시지를 읽어 들일때 참조합니다. 이후의 동작은 poll()을 수행할 때 마다 자기가 읽어들일 offset을 가지고 있게 됩니다. 보통은 poll()을 할때마다 commit을 합니다. 그래야 해당 메시지들이 poll()로 consume이 되었다는 것을 명시적으로 기록하게 됩니다.
No commit을 하게 되면 __consumer_offsets에 기록하지 않습니다. 하지만 poll()로 계속해서 읽어들일 offset번호는 consumer client가 계속 유지하고 있습니다.
철민님 답변 너무 감사합니다! 이해가 되었습니다! 아래와 같이 정리하면 될까요?
Consumer Client는 기본적으로 Offset 정보를 가지고 있다. 그리고 이 Offset 정보를 poll() 할 때마다 Broker에게 전달한다.
첫번째 Poll()을 수행할 때는 Offsets 정보가 없기 때문에 __Consumer_offset에서 Offset 정보를 가져온다.
이후의 Poll() 에서는 브로커로 Request를 할 때, Consumer의 offset 정보를 request에 포함해서 요청한다. 브로커는 메세지를 전달할 때, Consumer가 보내온 requeste의 offset을 읽고, offset부터 Batch 단위로 메세지를 만들어서 Consumer에게 보낸다.
비슷한 측면이지만, consumer client가 기본적으로 offset정보를 가지고 있다는 걸로 이해하는 것보다는 consumer는 자기가 읽어들인 offset 정보 이후의 메시지를 poll()을 통해 계속 요구하는 걸로 이해하시면 좋을 것 같습니다. broker는 consumer가 읽어 들인 offset 이후에 메시지가 있다면 consumer에게 전달하는 것이고 없으면 전달하지 않습니다.
나머지 사항은 적어주신 사항이 맞습니다.
답변 달아주셔서 감사합니다! 이해에 많은 도움이 되었습니다. 하지만 한 가지 풀리지 않는 부분이 있어서 이걸 더 질문드리고 싶습니다.
컨슈머 B가 해당 토픽 + 파티션에서 이미 #1000의 Offset을 읽고 있는 상황을 가정해보겠습니다.
컨슈머A가 컨슈머B의 토픽 + 파티션에서 No Commit + Seek()로 동작할 경우, 컨슈머A는 Seek()에 기록된 인덱스만으로 브로커에서 값을 불러오는 것으로 알고 있습니다.
컨슈머 A는 while (true){poll()}에 의해서 계속 메세지를 가져오는데, 이 때 컨슈머 A는 어떤 인덱스 정보를 가지고 값을 불러오게 되나요?
위에서 말씀해주신 것처럼 컨슈머 A는 No Commit으로 하기 때문에 현재 컨슈머 A가 어디까지 읽었는지에 대한 정보는 __commit_offsets에는 기록되지 않습니다. 컨슈머 B가 읽었던 부분만 저장이 되어있을텐데요. 이 때 컨슈머 A는 어떻게 seek()로 시작하는 인덱스부터 꾸준히 값을 읽어올 수 있을까요?
첫번째 poll()에서 Seek()를 이용해서 인덱스를 읽어온다고 하더라도, 그 이후의 poll()에서 어떻게 다음 읽어야 할 offsets을 읽어오는지 이해가 안되네요...ㅠㅠ.. poll() 할 때의 값을 살펴봐도 request 하는 쪽에서 offset을 포함해서 브로커에 보내는 거 같진 않던데... 동일 Group ID에서 Seek()를 하고, No Commit을 하게 되면 Seek한 Consumer에 대한 임시 내부 토픽이 생성되는 것일까요...?
긴 답변 너무 감사드립니다!
바쁘시겠지만 한번 더 알려주시면 큰 도움이 될 것 같습니다!