작성
·
316
0
안녕하세요 철민님, 항상 좋은 강의 감사드립니다
질문 드리고 싶은 것이 있어 글을 적습니다.
철민님께서 강의해주신 Consumer의 동기 방식 중에는 이런 내용이 있습니다.
브로커에 Commit 적용이 완료된 후에 다시 메세지를 읽어옴.
그런데 위의 부분이 제가 생각하기에는 조금 다른 것 같아서, 질문드리고 싶습니다.
먼저 제가 코드 단위로 살펴보니 다음과 같습니다.
ConsumerNetworkClient는 내부적으로 unsent에 보내지 못한 API 요청을 가지고 있음.
HeartBeatThread / MainThread는 주기적으로 ConsumerNetworkClient의 trySend()를 호출해서 unsent에 있는 모든 요청을 보냄.
Unsent에는 HeartBeat API, Fetch API, Commit API 등이 존재함.
이 말은 Main 쓰레드가 CommitSync()에 blocking 처리가 되어 있어도 HeartBeat Thread에 의해서 Fetch 요청을 계속 보낼 수 있다는 것으로 저는 이해하고 있습니다.
철민님께서 강의에서 설명하신 내용은 다음과 같이 이해하면 될까요?
CommitSync()의 실패 유무와 상관없이 ConsumerNetworkClient는 Broker에서 메세지를 Fetch해서 Fetcher에 저장은 해둔다.
다만 CommitSync()에 Blocking 되면 Main 쓰레드가 consumer.poll()을 호출하지 못하기 때문에 Fetcher에서 데이터를 가져올 수 없다.
라고 이해를 하면 괜찮을까요?
항상 좋은 강의 감사합니다 :)
답변 1
0
안녕하십니까,
오, 무척 예리한 질문이군요.
강의에서 말씀드린대로 동기 방식의 경우 commit을 통해 __consumer_offsets 내부 토픽에 기록을 하기 전까지는 메시지를 fetcher를 통해서 추가로 읽어들이지 않습니다.
다만 말씀하신대로 동기 작업이 완료 될때까지 fetcher는 consumer의 linked queue에서 데이터를 가져오는 작업을 하지는 않지만 ConsumerNetworkClient는 비동기로 동작할 수 있기 때문에 말씀하신대로 동기작업을 기다리지 않고도 linked queue에 메시지를 쌓을 수는 있습니다.
하지만 이 부분은 확인이 필요합니다. 저도 거기까지는 소스코드를 파보지 않았습니다만, ConsumerNetworkClient가 만약에 데이터를 계속 가져온다면 Linked queue가 full이 되어서 더이상 넣지 못하고 오류를 발생하지 않을까라는 생각을 해봅니다.
때문에 동기작업을 기다린 다음에 ConsumerNetworkClient는 데이터를 가져올 것 같습니다.
성능을 생각한다면 말씀하신 방법이 맞겠지만, 안정성 측면에서는 위에 언급된 방식이 더 나을 것도 같지만 확인이 필요할 것 같습니다.
위에 적어 주신 소스코드의 URL과 해당 파일을 여기에 적어주시겠습니까, 제가 다시 확인해 보겠습니다.
감사합니다.
와우, 이렇게까지 내부 소스코드를 분석하시다니 대단하시군요.
제 생각엔,
말씀하신대로 unsent는 ConsumerClientNetwork이 broker에 보내야 할 Request들을 가지고 있습니다. 가령 broker의 Consumer Group Coordinator와의 통신을 위한 Request라든지...
근데 이들 Request는 NetworkClient 비동기적으로 broker에 요청을 보내지만, 아직 보내지지 않은 Request도 있을 수 있습니다. 이때 ConsumerClientNetwork()의 poll()을 호출하면 trysend()로 기존에 unsent에 있는 Request를 먼저 broker로 다 보낸 뒤에 NetworkClient의 poll()이 다시 호출되어 broker에서 메시지를 받아오는 처리를 하게 됩니다.
unsent에 들어가 있는 Request가 fetch request, heartbeat request도 들어갈 수는 있겠지만, unsent는 브로커로 보내지 못한 request를 가지고 있을 뿐이고 fetch의 경우는 poll()을 호출했을 때만 수행이 됩니다.
그러니까, 외부 Consumer Client가 poll( )을 호출하면 ConsumerNetworkClient가 trysend()로 기존에 쌓여있는 Request를 Broker로 일단 먼저 보내고, NetworkClient가 poll( )을 호출해서 데이터를 브로커로 부터 가져오게 됩니다.
NetworkClient가 비동기 I/O Selector로 구성되었다 하더라도, 이건 Request와 RequestCompletion, 그리고 poll()을 비동기적으로 작업을 한다는 의미지 NetworkClient 혼자 poll() 수행해서 스스로 브로커로 부터 가져올 수는 없습니다. 외부 Consumer Client에서 fetch를 위한 poll()또는 Heartbeat Thread에서 poll()이 수행되어야 NetworkClient가 poll( )을 수행 할 수 있습니다.
그런데 commit sync 방식일 경우에는 poll ( ) 수행 후 commit 시 까지는 blocking이 걸리기 때문에 commit 작업을 수행하는 도중에 NetworkClient가 스스로 브로커로 부터 메시지를 poll( )하지는 않습니다. 물론 이 부분은 100% 확실하지는 않습니다만 소스 코드 분석과 Consumer의 내부 메커니즘으로 판단컨데 제 생각으로 (확실히) 그럴거라 생각합니다.
감사합니다.
답변 감사드립니다! 제가 질문 드렸던 부분은 Confluent Kafka의 오픈소스 버전이라 따로 url을 링크드리긴 어려울 것 같습니다.
대신에 철민님께서 IDE에서 아래 클래스 및 메서드를 확인해봐주시면 좋을 것 같습니다.
ConsumerNetworkClient.poll() 메서드
위 메서드를 보면 client.poll()을 하기 전/후로 trysend()라는 메서드를 호출하고 있습니다.
여기서 client는 ConsumerNetworkClient 내부에 있는 NetworkClient이고, 이녀석은 내부적으로 nioSelector를 가지고 있어서, nioSelector를 통해 실제로 Broker에 통신을 하는 것처럼 보입니다.
trysend()에서 뭔가 메세지를 보내는 역할을 합니다.
ConsumerNetworkClient.send()
ConsumerNetworkClient는 unsent라는 리스트 형태의 자료구조를 가지고 있습니다. 이 자료구조는 ConsumerNetworkClient가 보내야 하지만 아직 보내지 못한 요청을 가지고 있습니다. 이 요청에는 Find Cooridnator / Fetch / Commit / Heartbeat 등의 API가 담겨져 있습니다.
client.ready() 메서드를 이용해서 현재 client / node의 상황을 판단하고 보낼 수 있는 경우에는 unsent에 포함되어 있는 모든 요청을 보내는 것처럼 보입니다. 예를 들어 unsent에 [HeartBeat API - HeartBeat API - Fetch API - Fetch API - HeartBeat API]가 존재한다면 trysend() 메서드를 한번 실행하면 unsent = []가 될 것입니다.
제가 살펴봤을 때는 Kafka-Heartbeat-Thread와 Main Thread는 모두 ConsumerNetworkClient.send()를 호출하고 있습니다. 위의 로직을 살펴봤을 때, HeartBeat-Thread가 어떤 요청을 보내야는지 구별할 수 없기 때문에 Heartbeat Thread는 자신이 HeartBeat Thread라고 할지라도 unsent에 Fetch API가 있으면 그 요청을 바로 보내주는 것으로 예상됩니다!
혹시 더 필요하신 부분이 있으시다면 언제든지 알려주세요!
항상 좋은 강의와 답변 감사합니다 :)