Inflearn brand logo image

인프런 커뮤니티 질문&답변

장원용님의 프로필 이미지
장원용

작성한 질문수

Kafka & Spark 활용한 Realtime Datalake

Simple Producer 이해하기

confluent-kafka 의 produce 파라미터(on_devliery)

작성

·

36

·

수정됨

0

안녕하세요!

먼저 이 강의를 통해 많은 것을 배우고 있습니다. 감사합니다

강의를 수강중에 Simple Producer 코드에서 질문이 있습니다.

 

  1. 강의에서 비동기 방식으로 producer를 생성할 때 on_delivery 파라미터를 설명해주셨고, 아래 문서에서 예제 코드를 확인했을 때는 on_delivery가 아닌 callback 으로 파라미터를 받고 있는 것을 확인했습니다.

https://docs.confluent.io/kafka-clients/python/current/overview.html

 

직접 코드를 확인해보니 아래와 같이 alias로 사용하는 것까지 확인했습니다.
하지만, callback으로 파라미터를 넘겼을 때 어떻게 on_delivery로 값을 바인딩할 수 있는지에 대한 부분은 찾지 못하여 질문드립니다!

alias가 어떻게 바인딩 되는지 어느 코드에서 찾을 수 있을까요? 

This is an asynchronous operation, an application may use the ``callback`` (alias ``on_delivery``) argument to pass a function (or lambda) that will be called from :py:func:`poll()` when the message has been successfully delivered or permanently fails delivery.

 

  1. confluent-kafka (python) 코드에서 강의 중에 poll() 메소드는 반드시 필요한 것이라고 이해를 했습니다.
    자바 기반인 apache kafka를 구현한 예제들을 보면 producer에서는 poll() 메소드를 사용하지 않는 것 같아서 질문드립니다.
    자바 기반인 경우 동작 방식이 달라서 그런걸가요?

답변 1

1

김현진님의 프로필 이미지
김현진
지식공유자

안녕하세요 장원용님!

훌륭한 질문이에요. 이렇게 본질적이고 깊은 질문 좋습니다.

순서대로 답변드리면 말씀하신대로 Producer.produce() 함수의 callback 파라미터는 on_delivery 파라미터의 alias입니다. 그 구현을 어떻게 보냐면, 결론적으로 C 코드를 봐야합니다.

강의에서 설명드렸듯이 confluent_kafka는 librdkafka를 기반으로 만들어졌고 librdkafka는 C언어를 기반으로 만들어진 라이브러리입니다. confluent_kafka 의 Producer 클래스의 코드를 보면 대부분 pass처리 되어있을 겁니다. 단순히 C언어를 wrapping하고 있기 때문이고 실제 코드를 보려면 결국 C언어를 봐야합니다.

https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/src/Producer.c

여기에 보시면 librdkafka의 producer c코드를 볼 수 있습니다.

 

image.png

 

여기 관련된 함수가 있는데 어떻게 구현되었는지는 이 부분을 확인해보면 알 수 있지 않을까 합니다.

 

그리고 두 번째 질문 관련해서 답변드리자면 Java 기반의 Producer는 poll 과정이 백그라운드로 수행됩니다.

https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

여기 설명을 보시면 send() 함수 설명하는 부분 아래에

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.

이렇게 설명돼 있습니다. 즉 callback은 I/O 쓰레드(백그라운드)에서 수행되고 빠르게 동작해야 한다는 내용입니다.

이런식으로 Java kafka와 librdkafka는 구현 방식이 조금씩 다르다는 걸 알 수 있어요.

답변이 되었을까요?

장원용님의 프로필 이미지
장원용
질문자

자세히 설명해주셔서 감사합니다.

너무 잘 이해가 되었습니다!

마지막으로 한가지만 더 질문 드리고 싶습니다.

 

enable.idempotence=true로 설정하게 되면 멱등성이 보장되어 데이터 순서가 보장된다고 이해했습니다.

강의에서 설명해주신 batch data 0, 1, 2, 3, 4 를 처리 중에 2번 batch가 실패하였고,

재처리를 시도하면서 다른 batch가 모두 브로커에 저장된 이후 시간이 지나 저장 되었다고 했을 때도 순서가 보장이 되는 걸까요?

consumer가 2 batch 저장이 되기 전에 0,1,3,4를 먼저 가져가게 되면 순서가 변경될 것 같아서 질문드립니다.

아니면, 2 batch가 저장되기 전까지는 consumer가 데이터를 가져가지 못하도록 하는 걸까요?

 

감사합니다!

 

 

 

 

김현진님의 프로필 이미지
김현진
지식공유자

안녕하세요 장원용님!

답변이 좀 늦었습니다.

결과적으로 말씀드리자면 컨슈머는 순서대로 읽을 수 있습니다. 제가 강의에서 다루지는 않았지만 브로커는 high watermark 라는걸 관리합니다. 이 때 sequence number를 순서대로 받도록 관리되며 만약 예상하지 못한 sequence number를 받게 되면 해당 배치를 모두 폐기처리합니다. 이때 브로커에 발생하는 exception이 OutOfOrderSequenceException입니다.

https://kafka.apache.org/10/javadoc/org/apache/kafka/common/errors/OutOfOrderSequenceException.html

여기 보시면 해당 익셉션의 내용을 볼 수 있는데

This exception indicates that the broker received an unexpected sequence number from the producer, which means that data may have been lost. If the producer is configured for idempotence only (i.e. if enable.idempotence is set and no transactional.id is configured), it is possible to continue sending with the same producer instance, but doing so risks reordering of sent records. For transactional producers, this is a fatal error and you should close the producer.

그래서 2번 배치가 누락된 경우 프로듀서는 2,3,4 를 모두 재전송합니다. (강의안에서는 2만 재전송하는걸로 표현했지만 실제로는 2,3,4를 모두 재전송합니다)

그래서 2,3,4가 모두 들어온 이후 broker는 high watermark를 4까지 commit 하게 되고 컨슈머는 2,3,4 순서대로 읽을 수 있게 됩니다.

 

이해 되셨을까요?

 

 

장원용님의 프로필 이미지
장원용

작성한 질문수

질문하기