월 19,800원
5개월 할부 시다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결카프카 완벽 가이드 - 코어편
nocommit 관련 질문
package com.example; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; public class ConsumerPartitionAssignSeek { public static final Logger logger = LoggerFactory.getLogger(ConsumerPartitionAssignSeek.class.getName()); public static void main(String[] args) { String topicName = "pizza-topic"; Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_pizza_assign_seek_v001"); //props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "6000"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); TopicPartition topicPartition = new TopicPartition(topicName, 0); //kafkaConsumer.subscribe(List.of(topicName)); kafkaConsumer.assign(Arrays.asList(topicPartition)); kafkaConsumer.seek(topicPartition, 5L); //main thread Thread mainThread = Thread.currentThread(); //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함. Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { logger.info(" main program starts to exit by calling wakeup"); kafkaConsumer.wakeup(); try { mainThread.join(); } catch(InterruptedException e) { e.printStackTrace();} } }); //kafkaConsumer.close(); //pollAutoCommit(kafkaConsumer); //pollCommitSync(kafkaConsumer); //pollCommitAsync(kafkaConsumer); pollNoCommit(kafkaConsumer); } private static void pollNoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception != null) { logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage()); } } }); } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("##### commit sync before closing"); kafkaConsumer.commitSync(); logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { if(consumerRecords.count() > 0 ) { kafkaConsumer.commitSync(); logger.info("commit sync has been called"); } } catch(CommitFailedException e) { logger.error(e.getMessage()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { logger.info("main thread is sleeping {} ms during while loop", 10000); Thread.sleep(10000); }catch(InterruptedException e) { e.printStackTrace(); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } } 해당 코드에 문제가 없는 것으로 보입니다. java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2456) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at com.example.ConsumerPartitionAssignSeek.pollNoCommit(ConsumerPartitionAssignSeek.java:63) at com.example.ConsumerPartitionAssignSeek.main(ConsumerPartitionAssignSeek.java:53) 해당하는 에러가 띄는데 이유를 알 수 있을까요? git 코드는 잘돌아가는 것을 확인했습니다. 차이점이 알 수가 없어서 질문드립니다,
- 미해결카프카 완벽 가이드 - 코어편
Confluent Kafka 라이센스
Confluent Kafka의 커뮤니티 버전은아파치 카프카처럼 회사에서도 사용해도 문제없나요?
- 해결됨카프카 완벽 가이드 - 코어편
Kafka 클라이언트 To VM kafka 연결 질문
안녕하세요.강의 잘 보고 있습니다. 제가 회사에서 강의를 보고 있어서 그런데 강의 세팅과 조금 다르게 진행해서 연결에서 막힙니다. 일단 저는, 개인 PC로 IP - 192.168.100.170 인 서버 컴퓨터로 원격 연결을 하고그 안에서 VB로 ubuntu VM을 생성했습니다.VM의 고정 IP는 192.168.88.111로 설정했습니다.이후 편한 환경을 위해 putty같은 프로그램으로 ssh 연결을 했습니다.VM의 Port Forwarding으로ssh는 192.168.100.170:27722 -> 192.168.88.111:22192.168.100.170:29092 -> 192.168.88.111:9092 으로 진행했고 성공했습니다.이후 개인 PC에서 Intelij로 SimpleProducer 실습을 진행하는데, props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.170:29092");로 나름 머리를 써서 작성했습니다. 물론, VM의 server.properties에서 외부 연결을 허용하도록 했습니다만, 정확한지 확신은 없습니다. 이후, 코드를 실행했더니, socket timeout 에러가 나오고 카프카 컨슈머에 들어오지 않았습니다. Log를 자세히 보니 분명히 kafka topicId를 인지하는 걸 보니 연결은 된 것 같은데 뭐가 문제인지 모르겠습니다.Starting Gradle Daemon... Gradle Daemon started in 1 s 324 ms > Task :producers:compileJava UP-TO-DATE > Task :producers:processResources NO-SOURCE > Task :producers:classes UP-TO-DATE > Task :producers:SimpleProducer.main() [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [192.168.100.170:39092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1706742127571 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition test-topic-0 to 0 since the associated topicId changed from null to jRkpHnfwT8mfWJ3PB9HHmg [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: ysNHdh2DQTKvR3X0yruxdg [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to socket connection setup timeout. The timeout value is 9728 ms. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to socket connection setup timeout. The timeout value is 18153 ms. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0. You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins. For more on this, please refer to https://docs.gradle.org/8.4/userguide/command_line_interface.html#sec:command_line_warnings in the Gradle documentation. BUILD SUCCESSFUL in 2m 5s 2 actionable tasks: 1 executed, 1 up-to-date 오전 8:04:07: Execution finished ':producers:SimpleProducer.main()'. [Producer clientId=producer-1] Resetting the last seen epoch of partition test-topic-0 to 0 since the associated topicId changed from null to jRkpHnfwT8mfWJ3PB9HHmg이 부분을 보아하니 토픽은 인지하는 것 같은데 말이죠..감사합니다.
- 미해결카프카 완벽 가이드 - 코어편
kafka 연결 질문 드립니다.
안녕하세요. mac m1에 utm으로 고정IP(192.168.56.101) 설정후 ssh 접속 및 kafka-console 명령어 잘 됩니다.하지만, java 코드로 실행시 아래 이미지와 같이 접속 이슈가 있어서 문의 드립니다.ubuntu 설정에 이름이 "ubuntu"로 설정하게 문제일까요?
- 해결됨카프카 완벽 가이드 - 코어편
confluent local 질문있습니다.
안녕하세요, confluent local도 bin 스크립트를 보니 kafka_server_start가 있고 이를 통해 커뮤니티처럼 멀티 브로커 클러스터를 생성할 수 있는 것이 아닌지 궁금합니다. 또한 실제로 confluent_local을 통해 모니터링 하는 사례가 현업에서 있는지 궁금합니다.
- 해결됨카프카 완벽 가이드 - 코어편
log dir 관련 질문있습니다!
안녕하세요, log dir에 대해 질문이 있습니다.제가 실습을 하다보니 kafka-logs-0? 디렉토리에 많은 하위 디렉토리들이 생겨서 새로운 실습을 위해 비웠습니다. 기동 중인 모든 브로커에 대한 log dir를 초기화하였는데, 브로커에서 log dir관련 에러로그가 발생한 후 shutdown이 되길래 다시 구동을 시켰습니다. 그리고 각 브로커의 log dir를 다시 확인하니 topic 파티션 dir를 비롯해서 기존 모든 dir가 복원되어있었습니다. 제가 궁금한 점은 카프카 클러스터가 동작하면서 브로커만 있는 것이 아니니 복원 지점이 어디엔가 있을 수 있겠다고 생각은 하였는데, topic-partition의 log들은 replication을 배울 때 혹시 특정 브로커(노드)에 문제가 생길 때를 위한 복제라고 배운 것 같은데, 다른 모든 브로커도 모두 이 정보가 지워진 상태에서 어떻게 복원이 가능했던 걸까요?
- 미해결카프카 완벽 가이드 - 코어편
동일 groupId에서 하나의 컨슈머에 특정 파티션을 지정하는 경우
안녕하세요, 동일 groupId를 가진 두 개의 컨슈머가 하나의 토픽에 대해 poll을 하는데, 컨슈머 중 하나는 0번 파티션을 읽도록 assign하였고 나머지 컨슈머는 그냥 토픽에 대해 subscribe를 하였습니다. 그런데 0번 파티션에 assign한 컨슈머는 데이터를 읽어오지 못하고 아래와 같은 에러 로그가 발생하였는데요 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member 이 경우는 어떻게 해석하는게 좋을까요?
- 미해결카프카 완벽 가이드 - 코어편
질문] SimpleProducer.java 실행 시 SLF4J(W): No SLF4J providers were found.
안녕하세요. confluent kafka 에서 consumer 뛰운 상태 입니다.SimpleProducer.java 작성 후, Run 하면 아래 로그 발생 합니다.SLF4J(W): No SLF4J providers were found. consumer에서 메시지 출력 하지 않습니다. 검색해 보니, This warning, i.e. not an error, message is reported when no SLF4J providers could be found on the class path. Placing one (and only one) of the many available providers such as slf4j-nop.jar slf4j-simple.jar, slf4j-reload4j.jar, slf4j-jdk14.jar or logback-classic.jar on the class path should solve the problem. 어디를 수정 해서, 적용 해야 할지 잘 모르겠습니다. 강의 내용 따라서, Gradle 설정 하고, jdk를 확인 해보니, 17 oracle jdk 가 보이지 않아, jbrskd-17로 설정해서 빌드 했습니다. (제 intellij에서 add > oracle 17jdk가 안 보입니다???) 검토 하시고 답변 부탁 드립니다.
- 해결됨카프카 완벽 가이드 - 코어편
프로듀서 전송과 ack 관계에 대해 질문있습니다.
안녕하세요, 프로듀서 전송 부분에 대해 강의를 듣던 중 궁금한 점이 있습니다.최대 한 번 전송 외에는 ack를 받고 다음 메시지를 전송한다고 설명하셨는데, 비동기 전송은 ack와 상관 없이 지속적으로 메시지를 전송할 수 있는거 아닌가요? 메시지 a,b,c 3개를 보낸다고 했을 때ack = 0의 경우는a,b,c를 보내고 오는 ack에 대해 신경 쓰지 않는것이고,ack != 0의 경우 비동기라면 마찬가지로 전송은 ack 여부와 상관 없이 a,b,c를 보내고 대신 ack 여부에 따라 특정 메시지에 대한 재전송이 이루어 지는 것이 아닌지 궁금합니다.즉, ack 여부에 따라 a이후 b를 전송하는 것이 결정되는게 맞는지 궁금합니다.
- 미해결카프카 완벽 가이드 - 코어편
브로커 네트워크 주소 질문
안녕하세요, 인텔리제이에서 클라이언트 프로듀서 생성 후 데이터 전송 시 브로커를 찾을 수 없다는 에러 로그가 자꾸 발생해서, server.properties의 advertised.listeners에서 저희가 사용하는 고정 IP를 직접 작성해주니 해결되었는데요, 이렇게 하니 외부 클라이언트에서 192.168.56.101 주소로 전송한 데이터는 VM에서 콘솔 컨슈머를 통해 read 할 때 localhost로 접근 시 조회가 되지 않습니다. 혹시 Host PC에서 VM의 브로커에 send 할 때 advertised.listeners 값을 고정 IP로 등록해주어야 하는 이유가 있는지, 그리고 이렇게 등록 시 VM 내부에서도 localhost 참조가 불가능해지는게 맞는지 알 수 있을까요?
- 미해결카프카 완벽 가이드 - 코어편
파티셔너에서 파티션 선정 방법에 대한 질문
안녕하세요, producer의 send 로직을 보면 Serialization을 먼저 수행하고 Partitioner에서 파티션을 결정 짓는것으로 보이는데요,key 값이 존재하는 경우 key 값에 대한 hash 알고리즘을 통해 파티셔너에서 파티션을 결정한다고 하였는데 직렬화 되어버리면 hash는 bytes 값에 대한 hash가 수행되는 건가요?? 만약 맞다면, 파티셔너도 특정 키를 가진 데이터에 대해서 원하는 파티션을 지정하도록 커스터마이징이 가능한 것으로 알고 있는데, 파티셔너를 커스터마이징한다면 key 값을 직렬화 하고 그것에 대한 해시알고리즘을 적용해서 파티션 넘버를 반환하도록 짜야하는 것인지 궁금합니다.
- 해결됨카프카 완벽 가이드 - 코어편
VM 기반 테스트 환경 질문있습니다.
VM에 카프카 테스트 환경 구축하고 IP 할당, SSH 연결과 같은 현재 구조가 도커로 로컬 호스트에 구축하는 것과 비교해서 테스트 할 때 더 좋은 점이 있을까요??
- 미해결카프카 완벽 가이드 - 코어편
CLI에서 메시지를 컨슈머로 읽을 때 배치시간에 영향을 받나요?
Key가 없는 메시지의 파티션 분배전략 - 라운드로빈과 스티키 파티셔닝 해당 파트에서 프로듀서가 보낸 메시지를 우선 배치에 넣고 토픽에 삽입한다고 적혀있습니다. 그 전 강의에서 컨슈머가 해당 파티션을 읽고 있고 프로듀서로 값을 보내면 바로바로 읽혔습니다. 이것도 프로듀서에서 보낸 메시지를 배치에서 대기하다가 대기시간이 만료돼서 토픽에 값이 삽입된건가요? 그 시간이 너무 짧아서 실시간으로 읽히는 것처럼 보이는 것으로 이해하면 될까요?
- 미해결카프카 완벽 가이드 - 코어편
Idempotence 관련 질문 드립니다.
안녕하세요,좋은 강의 만들어주셔서 감사합니다.Idempotence 관련해서 보다가 궁금한게 있어서 문의 드립니다.Idempotence를 사용하면 Producer에서 전송된 Broker로 전송 시 메시지 전송 순서가 유지가 된다고 하셨는데,파티션된 토픽에 대해서는 어떻게 되는지 궁금합니다.파티션 내에서만 순서가 유지되는 걸까요? 파티션이 3개로 된 토픽의 경우는 10개의 메시지가 전달 될 때 10개의 메시지가 쪼개져서 파티션별로 데이터가 들어갈텐데 그렇게 되면 전체 토픽에 대해서 순서 유지가 안되지 않을까해서 질문 드렸습니다. 감사합니다.
- 미해결카프카 완벽 가이드 - 코어편
Partition에 할당된 Batch Buffer
안녕하세요강의 잘 듣고 있습니다."Kafka Producer의 send() 메소드 호출 프로세스" 장표에 묘사된 Batch 관련해서 의견 남깁니다. 강의 중반부터 나오듯이 각 Batch 버퍼는 Partition과 1대1 대응하도록 되어있는데, 해당 장표에서는 Partition#1 안에 여러개의 배치가 존재할 수 있는 것처럼 되어 있어 혼동이 생길 수 있어 보입니다. Partition#1에 갈 Message는 Batch#1에만 적재되어야하므로 그림 상 P#1 안에는 B#1만 있는 것이 맞지 않을까요? 만약 제가 의도와는 다르게 이해한 것이라면, 간략한 설명 덧붙여주시면 감사하겠습니다.
- 미해결카프카 완벽 가이드 - 코어편
consumer group 과 topic의 관계가 궁급합니다.
안녕하세요, 강의를 듣다 궁금증이 생겨 질문드립니다. Topic T1, T2 가 존재하고, Consumer group G1이 존재한다 할 때..A Consumer가 G1 group에 속하면서 T1 topic을 subscribe하고, B Consumer가 G1 group에 속하면서 T2 topic을 subscribe하도록 코드를 짰는데도 잘 동작하더라구요. Consumer group은 topic과 1 : N 관계가 될 수 있는 것인가요?
- 미해결카프카 완벽 가이드 - 코어편
key를 갖는 메시지 전송 시 궁금한 사항이 있습니다.
강사님, 안녕하세요! 강의 잘 듣고 있습니다.강의 들으면서 궁금한 사항이 생겨서 질문드려요.key를 지정해서 메시지를 전송하면 같은 key에 한해서는 같은 파티션으로 메시지가 전송되는 것을 보장 받을 수 있고, 해당 key로 전송된 메세지는 순서를 보장받을 수 있다. 까지는 이해했는데요~운영하면서 파티션의 개수가 늘어날 경우, 늘어난 직후에도 같은 key로 전송되는 메세지는 순서를 보장받을 수 있는 것일까요?(즉, 파티션 개수가 늘어나기 이전에 key:a, partition:0으로 전송되었다면 추후, 파티션 개수가 늘어나 리밸런싱이 일어나더라도 key:a 메세지는 partition:0으로 전송되는 것일까요?)
- 미해결카프카 완벽 가이드 - 코어편
메시지 배치 내의 데이터 저장 관련 질문
안녕하세요.producer에서 메시지가 배치 단위로 전송이 되고, retry도 해당 배치 단위로 이뤄지는 것 같은데요.예를 들어 1~10번의 메시지가 하나의 배치를 이루고 있을 때 그 안에서 3,5번 메시지만 전송이 실패한다거나 하는 경우가 생기면 해당 메시지 배치 전체가 실패로 처리되어 retry하게 되는 걸까요?그리고 해당 배치를 이루고 있는 내부 메시지들의 전송순서 또한 seq 기반으로 구현되어 있는 건지도 궁금합니다.
- 미해결카프카 완벽 가이드 - 코어편
파티션별로 topic 이 관리되는거 같은데...
파티션과 topic 의 관계를 알고 싶습니다.
- 미해결카프카 완벽 가이드 - 코어편
토픽 삭제와 관련한 질문
고생 많으십니다. 강사님 만약 topic 에 producer 와 consumer 가 있는 상태에서 topic의 삭제가 가능한지요? 아울러 topic 도 자동삭제 기능이 있는지요?강사님 또는 비전러닝머신관련 새로운 추가 강의 부탁드립니다.