월 19,800원
5개월 할부 시다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결카프카 완벽 가이드 - 코어편
producer의 record accumulator의 토픽에 대한 정보는 어떻게 얻어오게 되나요?
어플리케이션 기동시 브로커에서 설정정보를 읽은 다음에 존재하고 있는 Topic만큼 Record Accumulator내에 토픽별 파티션 Batch를 생성하는건가요?만약 브로커에서 C라는 토픽이 생성하게 되면 리밸런싱이 되면서 producer서버에 새로운 토픽이 생성되었다는 것을 알리고 프로듀서의 Record Accumulator 내에도 C토픽에 대한 영역이 추가가 되는지 궁금합니다
- 해결됨카프카 완벽 가이드 - 코어편
[섹션2] 메세지 비동기 전송 부분에 기본적인 질문인데요
카프카 관련 질문이라기 보다는.. 자바에 익숙하지 않아서 자바에 관한 질문입니다. kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { logger.info("partiion: " + metadata.partition()); logger.info("offset: " + metadata.offset()); logger.info("timestamp: " + metadata.timestamp()); } else { logger.error("exception error from broker: " + exception.getMessage()); } } });여기서 sendThread에서 callback에 대한 부분을 호출할때, 이런식으로 동작하는것으로 생각했습니다. 그래서 sendThread에서 broker에서 response를받아 callback에 해당하는 부분을 채워넣을때 이와 같이 동작한다고 생각합니다. (java에 익숙하지 않아서... python코드로 그냥 이해한대로 적어보겠씁니다.. ) def responseCallback(record, callback) { callback.onCompletion(record, exception) }이런식으로 callback 객체의 onCompletion 메서드를 호출하고 받은 정보를 parameter로 넘기는것으로 이해했는데요. 근데, lambda형식으로 바꾸게 되면, kafkaProducer.send(producerRecord, (metadata, exception) -> { if (exception == null) { logger.info("partiion: " + metadata.partition()); logger.info("offset: " + metadata.offset()); logger.info("timestamp: " + metadata.timestamp()); } else { logger.error("exception error from broker: " + exception.getMessage()); } } });이렇게 코드를 작성되는데, 이렇게 되면 callback 함수를 호출할때, onCompletion 메서드를 호출을 안하게 되는건가요?callback(metadata, exception)이와같이 호출을 하는건가요?? lambda에서의 호출방법으로 호출하는건지, 기존의 callback 객체를 호출하는 방식이 맞는건지.. 어떠한 부분이 맞는건지 궁금합니다.
- 해결됨카프카 완벽 가이드 - 코어편
브로커가 추가될 때 파티션 재분배
안녕하세요 선생님! 완강 후에 정리하며 이것저것 테스트를 하는 와중에 궁금한게 생겨 질문드립니다. 이미 특정 토픽의 파티션이 브로커들에게 분배된 상태에서, 새로운 브로커가 추가됐을 때 새로운 브로커는 특정 토픽의 파티션을 가질 수 있는 대상으로 선정되지 않는 것 같습니다.새로운 브로커가 추가 됐을 때 새 브로커에도 기존의 토픽의 파티션 재분배를 하는 방법이 있나요?불가능 하다면, 이런 모델을 가지는 이유가 있을까요? 테스트 과정 공유드립니다.broker #1, #2 총 2개 띄운 상태에서 partition 3개, replication 2개의 토픽 생성 (topic-p3r2)Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:broker #3 추가 후 topic-p3r2 토픽 상태Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:브로커#3은 후보에도 오르지 않았습니다. 제가 예상했던 건 아래와 같이 브로커#3이 추가되었을 때 브로커#3도 토픽의 파티션을 갖는 것이었습니다.(아래 로그는 제가 상상한 것을 임의로 만든 것입니다)Topic: topic-p3r2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: topic-p3r2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: topic-p3r2 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1 Offline: 감사합니다!!
- 해결됨카프카 완벽 가이드 - 코어편
토픽에 데이터가 없을 때 offset이 0이 되는 현상 문의
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요. 선생님 안녕하십니까!강의를 열심히 듣고 있는 수강생입니다. 다름이아니라, 토픽에 데이터가 없을 때 offset이 0이 된다는 로그는 어디서부터 기인하는 것인지 궁금하여 질문 드립니다. 제가 이해하기로 AUTO_COMMIT false가 되어있으면 명시적으로 commit을 호출하지 않으면 commit이 되지 않는 것으로 알고있습니다 (테스트도 해보았습니다.) 하지만, 강의 6:00경 시나리오.즉, 토픽에 아무런 데이터가 없는 상태에서 컨슈머만 켠 케이스에서 저는 커밋(commitSync)을 하지 않았는데 poll을 몇번 돌다보면 offset이 0이 되었다는 로그가 발생합니다.(실제로 __consumer_offsets-* 파일에는 offset이 기록되진 않고, 그래서 컨슈머를 껐다 켜면 offset 0부터 읽습니다) 저는 commit을 한적이 없으므로 실제로 __consumer_offsets에는 0으로 기록되지 않는데, 저 로그에 있는 offset이 0은 어디서부터 오는 것일까요? +) 코드상으로는 poll 시점에 updateAssignmentMetadataIfNeeded (maybeSeekUnvalidated) 에서 해당 로그가 찍히는데요, 이건 무슨 정책일까요? 관련 자료가 있으면 공유주셔도 감사합니다. // offset=0 설정이 되어버림 [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Fetch position FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition pizza-topic-0, resetting offset [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group-pizza-assign-seek-maintenance-6-1, groupId=group-pizza-assign-seek-maintenance-6] Resetting offset for partition pizza-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. 테스트한 코드 일부 공유드립니다.(필요할 설정들을 했으며, 정말 커밋이 안되고 있는지 확인하기 위해 records.count()>100일 때만 명시적으로 커밋을 한 코드입니다)props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); TopicPartition topicPartition = new TopicPartition(topicName, 0); kafkaConsumer.assign(List.of(topicPartition)); kafkaConsumer.seek(topicPartition, 10L);while(true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1_000L)); for(ConsumerRecord<String, String> record: records) { log.info("record key: {}, partition: {}, recordOffset: {}", record.key(), record.partition(), record.offset()); } if (records.count() > 100 ){ kafkaConsumer.commitSync(); log.info("commit sync called"); } } 감사합니다 ^^
- 해결됨카프카 완벽 가이드 - 코어편
백그라운드 스레드 Sender에 대한 예외처리도 가능한가요?
KafkaProducer의 send 메소드를 try-catch 하면 아래와 같은 예외는 잡을 수 있는데요 Throws:AuthenticationException – if authentication fails. See the exception for more detailsAuthorizationException – fatal error indicating that the producer is not allowed to writeIllegalStateException – if a transactional.id has been configured and no transaction has been started, or when send is invoked after producer has been closed.InterruptException – If the thread is interrupted while blockedSerializationException – If the key or value are not valid objects given the configured serializersTimeoutException – If the record could not be appended to the send buffer due to memory unavailable or missing metadata within max.block.ms.KafkaException – If a Kafka related error occurs that does not belong to the public API exceptions.Batch 로 레코드를 묶어서 실제로 브로커에게 전송하는 Sender Thread 에서 네트워크 이슈 등으로 실패하면 어떻게 처리해야할지가 궁금합니다.(retries가 아닌 예외로 잡는 방법이 없을까요? ㅠㅠ)
- 미해결카프카 완벽 가이드 - 코어편
mtputty 설치했는데 connect error가 뜹니다..
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.강의에서 하신대로 ip 할당도 하였고 192.168.56.101로 핑을 날려도 정상적으로 핑 되는거 확인했고 mtputty로 연결하면 connection error가 뜨네요 어떡하죠.?
- 미해결카프카 완벽 가이드 - 코어편
mac (m1) - utm 에서 고정 ip 할당하는법
mac os 환경에서 utm에 ubuntu 설치해서 사용중입니다.고정 IP할당하는 부분에서, 강의는 virtual box로 설명되어 세팅값 설정하는데 어려움이 있습니다구글링 해봐도 "utm ubuntu 수동 ip 설정" 관련해서는 따로 자료가 없어서요! ㅠㅠutm > edit selected vm > Devices > 네트워크 > 고급 설정 보기다음에 어떤걸 설정해야되는지 모르겠습니다!
- 미해결카프카 완벽 가이드 - 코어편
두 번 종료해야 꺼지는 이유?
wakeup()이 호출되지 않으면 종료되지 않고 다시 실행되는 이유가 무엇일까요..?
- 미해결카프카 완벽 가이드 - 코어편
콘솔에서 컨슈머 생성
토픽을 구독하는 컨슈머를 콘솔에서 생성하면, 생성하는 컨슈머마다 항상 새로운 그룹이 만들어지게 되는거고 해당 토픽에 내부 파티션인 __consumer_offset가 생성된 컨슈머의 수만큼 만들어지는게 맞나요?자바의 KafkaConsumer 를 사용할때 이미 존재하는 컨슈머 그룹에 참여한다면, 토픽에는 해당 컨슈머 그룹에 대한 정보인 __consumer_offset이 이미 존재하고 컨슈머에서 이 값을 읽어오니 earlist로 설정해도 0번 인덱스 레코드부터 읽는게 아닌 __consumer_offset에서 offset 값을 그대로 읽어와 사용하는게 맞나요?
- 미해결카프카 완벽 가이드 - 코어편
빅데이터 관리에 관해서
안녕하세요 강사님 강의 잘 듣고 있습니다.강의중간에 잠깐 언급하고 넘어가신 부분중에 궁금한게 있어서 질문드립니다.현업에서 실제로 하루에 10기가 혹은 20기가로 엄청난 양의 데이터가 나올때이 데이터를 실제로 어떻게 처리하는지 궁금합니다.감사합니다.
- 미해결카프카 완벽 가이드 - 코어편
레인지 방식
서로 다른 토픽에 같은 컨슈머로 받을 때, 토픽 A에 orderId가 들어오고, 토픽 B에 orderId를 키로 가진 orderItem이 들어왔을 때, 어그리게이트를 하려면 같은 시점에 들어와야하지 않나요? 이해가 잘 안가서 질문드립니다.
- 미해결카프카 완벽 가이드 - 코어편
auto.offset.reset은 첫구독일때만 의미가 있는건가요?
__consumer_offsets 토픽에 오프셋 데이터 적재 이후로는의미가 없는 설정값인게 맞을까요?
- 해결됨카프카 완벽 가이드 - 코어편
Kafka에 설정값이 엄청 많은데요
이직하는 회사가 Kafka 기반으로 MSA간 통신을 해서max.in.flight.~ 강의까지 열심히 듣는 중입니다 ㅎㅎ보다보니까 설정할 수 있는 영역이 엄청 많은데요~보통 실무에서 로드테스트 등을 통해서 서비스에 적합한 값을 찾아가나요?어느정도 경험이 없다면 최적의 설정값을 찾기가 어려울 것 같아서요 ㅎㅎ
- 미해결카프카 완벽 가이드 - 코어편
자바 클라이언트 메세지 전송 테스트 실패
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요. 안녕하세요.자바 클라이언트에서 메세지 전송시 아래와 같은 오류가 발생합니다.저는 m1 유저라 UTM으로 가상환경 구성하였고,터미널에서 테스트시에는 이전 수업내용은 잘 작동하였는데, 자바클라이언트에서 오류가 발생하네요어떻게 해결하면 좋을까요?
- 미해결카프카 완벽 가이드 - 코어편
addShutdownHook() 과 mainThread.join()
안녕하세요! 설명을 너무 잘 해주셔서 유익하게 잘 듣고 있습니다 ㅎㅎWakeup을 이용하여 Consumer를 효과적으로 종료하기 부분에서 질문이 있는데요,프로그램이 종료되는 시점에 새로운 쓰레드를 만들어 실행하는 함수인 addShutdownHook() 안에 mainThread.join() 부분이 잘 이해되지 않습니다. 새로운 스레드가 ConsumerNetworkClient의 wakeup을 부르고 메인 스레드가 종료될 때까지 기다려야 해서 mainThread.join()을 부른다고 하셨는데, 메인 스레드의 종료 절차(메모리 정리 등?)를 마칠 때까지 기다린다는 의미가 맞을까요? addShutdownHook() 함수가 프로그램이 종료될때 실행되는 함수인데 그 안에 종료를 기다리는 함수가 또 들어있어 헷갈렸습니다. 그리고 만약 mainThread.join()이 메인 스레드가 종료를 마칠 때까지 기다리는 게 맞다면, mainThead.join()을 먼저 부르고 kafkaConsumer.wakeup()을 이후에 호출해도 문제가 없는지도 궁금합니다~
- 해결됨카프카 완벽 가이드 - 코어편
addShutdownHook이 제대로 작동하지 않는것 같습니다
IntelliJ에서 ConsumerWakeup파일을 실행하고 종료 시,logger.info("main program starts to exit by calling wakeup");이 콘솔에 출력되지 않고 아래와 같은 오류가 출력됩니다.> Task :consumers:ConsumerWakeup.main() FAILED2 actionable tasks: 1 executed, 1 up-to-dateFAILURE: Build failed with an exception.* What went wrong:Execution failed for task ':consumers:ConsumerWakeup.main()'.> Build cancelled while executing task ':consumers:ConsumerWakeup.main()'* Try:> Run with --stacktrace option to get the stack trace.> Run with --info or --debug option to get more log output.> Run with --scan to get full insights.* Get more help at https://help.gradle.orgBUILD FAILED in 11s오후 4:02:44: 실행이 완료되었습니다 ':consumers:ConsumerWakeup.main()'. 만약, 커버리지 모드로 구동을 할 경우 아래와 같이 정상적으로 로그가 출력됩니다.[Thread-2] INFO com.example.kafka.ConsumerWakeup - main program starts to exit by calling wakeup[main] ERROR com.example.kafka.ConsumerWakeup - wakeup exception has been called[main] INFO com.example.kafka.ConsumerWakeup - finally consumer is closingClass transformation time: 0.1235649s for 1928 classes or 6.408967842323652E-5s per class VirtualBox 세팅 문제로 별개의 Bare-Metal 리눅스 서버를 구성하여 사용중입니다.Java 버전, Gradle버전, implementation환경 등은 강의와 같은 상태인데 혹시 어떤게 문제일까요?
- 해결됨카프카 완벽 가이드 - 코어편
Custom 객체 직렬화 kafka-console-consumer 조회 시 null
텍스트가 추가될 때마다 메시지를 전송하는 Producer의 Custom 객체 직렬화 구현 - 01~02강의에서 kafka서버에서 kafka-console-consumer --bootstrap-server localhost:9092 --topic order-serde-topic 로 데이터 조회 시 null이 조회되는 에러가 있습니다. 원인은 "텍스트가 추가될 때마다 메시지를 전송하는 Producer의 Custom 객체 직렬화 구현 - 01" 번 강의에서 OrderSerializer 객체에서 serialize 메소드의 serializedOrder 를 null 로 초기화 한 뒤에 값을 대입하지 않고 반환하도록 하고 지나갔네요 objectMapper.writeValueAsBytes(order); 부분을 아래와 같이 수정하면 됩니다.serializedOrder = objectMapper.writeValueAsBytes(order);
- 미해결카프카 완벽 가이드 - 코어편
컨슈머 배포 질문
안녕하세요.현재 k8s 환경에서 롤링 업데이트를 통해 팟을 배포하는 방식을 채택하여 사용중에 있습니다.메시지 컨슈밍 퍼포먼스를 줄여주기 위해 리밸런싱 시간을 줄여주는게 굉장히 중요하다는 생각이 드는데요.강의를 보다보니 Static membership과 Cooperative sticky 2가지 방식이 이에 적합한 것 같아 보입니다.실제 프로덕션 레벨에서는 어떠한 방식을 사용하는지 궁금합니다!
- 미해결카프카 완벽 가이드 - 코어편
kafka에서 elastic search에 저장
안녕하세요.강의 잘 수강중입니다. 다름 아니라, kafka에서 elastic search로 저장하는 일을 조만간 할 예정입니다.본 과정을 다 수강하고 나면 그런 응용력이 생길지요?아니면 connect 편 까지 다 수강해야 elastic search에 저장하는 응용력을 키울 수 있을지 문의드립니다. 감사합니다.
- 미해결카프카 완벽 가이드 - 코어편
특정 메시지까지만 커밋이 가능할까요?
안녕하세요~ 강의 항상 잘듣고있습니다ㅎㅎ배치사이즈를 작게하면 성능이 낮아지고배치사이즈를 크게하면 ack유실시 중복처리가 많아져서배치사이즈를 크게하고ack(commit)을 전체가 아닌 일부분만 할 수 없을까? 생각하게되었어요.예를들어서 1000개 배치로 가져오고10개까지 commit, 100개까지커밋, ....요런것도 가능할려나요?.?