월 19,800원
5개월 할부 시다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결카프카 완벽 가이드 - 코어편
OrderDBHandler 에서 데이터가 다 안들어가고 누락이 됩니다.
원인을 찾아보니insertOrders 메소드에서pstmt.executeUpdate();이 부분이 문제입니다.대신에pstmt.executeBatch();이렇게 하니누락된 데이터가 없이다 들어갑니다.
- 미해결카프카 완벽 가이드 - 코어편
Producer의 acks 설정 관련 실습 질문
안녕하세요 선생님 질문이 있습니다.acks = -1 이 All이라 하셨는데예제에서 erorr를 따로 던지지 않는 이유가 min_insync_replicas 가 기본값이 0 이라서 그냥 성공하는것인가요?
- 미해결카프카 완벽 가이드 - 코어편
카프카 브로커와 레플리카 수 관계
안녕하세요 카프카 브로커 수와 토픽의 레플리카 팩터 수 관계가 궁금합니다! 토픽 레플리카 팩터를 3으로 할꺼면브로커는 최소 3개를 가져가야하는지 궁금합니다. 토픽 레플리카 팩터와 브로커 수가 연관이 있는지요!
- 미해결카프카 완벽 가이드 - 코어편
여러 개의 파티션을 가지는 메시지 전송 실습 / 질문
안녕하세요 선생님실습에서 multipart-topic으로 key 없이 전송을 했는데 모조리 partition2로 가는데 어떤 설정을 살펴보면 될까요?kafka는 confluent 7.6으로 실습하고 있습니다.key값을 준 경우에는 강의 실습과 동일하게 진행되었습니다.
- 미해결카프카 완벽 가이드 - 코어편
VM화면에서 마우스 클릭 불가
안녕하세요. 강의 잘 듣고 있습니다.VM 환경에 들어와서 마우스 클릭을 하면 팝업창이 뜨면서 마우스 잡기 버튼이 생성되어 잡기를 클릭하면 마우스 커서도 사라지며 아무 클릭이 안돼요.구글링해보니 설정 -> 시스템-> 마더보드 -> 포인트 장치 -> USB 태블릿 or USB 멀티터치 태블릿으로 변경위와 방법을 사용해도 변경이 안됩니다.혹시 어떻게 해결해야 할까요?
- 미해결카프카 완벽 가이드 - 코어편
Kafka Consumer 데이터 유실 관련
안녕하세요 강사님!Consumer와 auto.offset.reset의 내부 동작 메커니즘 상세 이해듣던 중 궁금한 부분이 생겨서 질문 남깁니다 🙂 Kafka Consumer는 __consumer_offsets 기준으로데이터를 읽어드린다고 이해를 했는데 동일 컨슈머 그룹에 이미 메세지가 있는 경우프로듀서가 메세지를 전송한 이후에 컨슈머를 기동을 한다고 하면 earlist, lastest 모두 전송한 메세지를 받을 수 없어 데이터 유실이 발생할 가능성이 있을 것 같은데 이러한 데이터 유실을 방지하는 방법이 있을까요?프로듀서 같은경우 ACK 설정으로 이를 방지할 수 있을 것 같은데 컨슈머에도 동일한 설정이 있는지 궁금합니다.
- 미해결카프카 완벽 가이드 - 코어편
프로듀서 직렬화 관련 질문있습니다.
안녕하세요, 실무에서 발생할 법한 사항에 대한 질문이 있습니다. 외부에서 데이터를 전송할 수 있는 API 엔드포인트가 프로젝트 내에 열려있고, 이 곳을 향해 들어오는 데이터를 적절히 파싱해서 Object에 담은 상태라고 했을 때이 Object를 토픽에 저장하고 꺼내오는 과정에 강의의 예를 적용하면 동일한 커스텀 직렬화 및 역직렬화가 적용된 프로듀서, 컨슈머 클라이언트를 통해 가능할 것 같은데요 만약 프로듀서는 그대로 자바 (Spring 환경) 클라이언트로 구현된 것을 사용한다고 하고, 토픽에 저장된 데이터를 ksqldb를 사용한 스트림처리나 connect를 붙여 타겟에 보내는 용도로 쓴다고 했을 때는 어떻게 해야할 까요?이런 상황에서는 커스텀 역직렬화를 ksqldb나 connect에서 사용하기 어려울 것 같은데, 프로듀서에서 Avro 타입등으로 전송해야 하는지 궁금합니다. 특정 상황에서는 스키마를 굳이 관리하지 않고 그냥 Json타입을 사용하고 싶을 수도 있을 것 같은데 커스텀 직렬화를 통해 바이트 코드로 토픽에 전송시킨 후 ksqldb 등에서 JSON 포맷으로 읽으면 읽혀질까요?
- 미해결카프카 완벽 가이드 - 코어편
kafka paraller-consumer 처리방법을 알고 싶습니다.
하나의 토픽, 하나의 파티션으로 순서보장을 하는 Consumer 처리를 알고싶습니다.kafka paraller-consumer 처리를 하고 싶은데 마땅한 예제가 나와있지 않아서요.해당 예제를 찾아보면 보통 multi thread 방식이라 순서보장이 되지않은것 같아 사용을 못할것 같습니다.최종적으로 순서보장을 하는 consumer 병렬처리 방법을 알고 싶어 질문드렸습니다.
- 미해결카프카 완벽 가이드 - 코어편
consumer 병렬처리를 알고 싶습니다.
안녕하세요.해당 수업을 잘듣고 있습니다.혹시 consumer에 대한 병렬처리방법에 대한 강의가 있을까요?참고할만한 강의가 있을까요?감사합니다.
- 해결됨카프카 완벽 가이드 - 코어편
컨슈머 스태틱 그룹 멤버십
안녕하세요, 스태틱 그룹 멤버십 관련 질문있습니다!만약 group instance id를 동일 counsumer group에 속한 counsumer 중에 일부에만 할당하는 경우는 어떻게 되나요?이 경우에도 스태틱 그룹 멤버십이 적용되는 건가요?
- 미해결카프카 완벽 가이드 - 코어편
UnknownHostException : kafka
똑같이 아이피 고정해서 했는데 kafka < 라는걸 계속 찾네요 ㅠㅠ192.xx 대역으로 하면 못찾습니다.. 그래서일단은 windows hosts 설정에 kafka 도메인으로 고정IP를 넣어줘서 해결은 했는데..도커로 띄워서 그런걸까요?
- 미해결카프카 완벽 가이드 - 코어편
mtputty 액세스가 거부됩니다
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요. 쁘띠, mt쁘띠도 삭제해봐도 자꾸 리커넥 메세지가 나옵니다.. 일단 쁘띠는 정상동작합니다 ㅜㅜ
- 미해결카프카 완벽 가이드 - 코어편
vi 에서 wq!명령어가 안 먹혀요
- 학습 관련 질문을 남겨주세요. 상세히 작성하면 더 좋아요! - 먼저 유사한 질문이 있었는지 검색해보세요. - 서로 예의를 지키며 존중하는 문화를 만들어가요. - 잠깐! 인프런 서비스 운영 관련 문의는 1:1 문의하기를 이용해주세요.이렇게 :wq!했는데도 불구하고 저장하고 나가기 기능이 안됩니다 ㅜㅜ
- 미해결카프카 완벽 가이드 - 코어편
강사님 인텔리제이 출력메시지 줄이는 방법 한번만 다시 부탁드립니다.
지난번 강의중에 intellj 메시지 줄이는 방법을 알려주셨는데제가 깜빡 잊고 설정을 하지 않았습니다.다시 그 강의를 찾아보려니 엄두가 나지 않아서 번거로우겠지만 다시 부탁드려도 되는지요?
- 미해결카프카 완벽 가이드 - 코어편
멀티 노드구성 관련 질문입니다
안녕하세요!좋은 강의 잘 듣고 있는 수강생입니다.강의에 나온 예제는 하나의 VM에 멀티 노드를 구성하는 예제인데! 예를 들어 VM3대에 각각 한 개의 브로커를 띄우고 VM3대를 하나의 클러스터로 구성하려면 강의에서 보여주신 설정이외에 추가적인 설정이 필요한건지 궁금해 질문을 남깁니다. (전제: VM3대가 네트워크로 연결되어있다) (추측이지만, 실제로 멀티 브로커를 구성하는 상황이라면 여러대의 VM에 브로커를 구성할 거 같다고 생각했기 때문입니다)
- 미해결카프카 완벽 가이드 - 코어편
클러스터 관련 질문입니다!
안녕하세요! 카프카를 멀티 브로커로 구성하는 기준이 있을까요?예를 들면 어느정도 이상부터 멀티 브로커로 구성을 하는지에 대한 best practice가 있을까요?아니면, 단일 노드로 구성하고 scale out하는 방식으로 해야 하는 걸까요?
- 미해결카프카 완벽 가이드 - 코어편
nocommit 관련 질문
package com.example; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; public class ConsumerPartitionAssignSeek { public static final Logger logger = LoggerFactory.getLogger(ConsumerPartitionAssignSeek.class.getName()); public static void main(String[] args) { String topicName = "pizza-topic"; Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_pizza_assign_seek_v001"); //props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "6000"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); TopicPartition topicPartition = new TopicPartition(topicName, 0); //kafkaConsumer.subscribe(List.of(topicName)); kafkaConsumer.assign(Arrays.asList(topicPartition)); kafkaConsumer.seek(topicPartition, 5L); //main thread Thread mainThread = Thread.currentThread(); //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함. Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { logger.info(" main program starts to exit by calling wakeup"); kafkaConsumer.wakeup(); try { mainThread.join(); } catch(InterruptedException e) { e.printStackTrace();} } }); //kafkaConsumer.close(); //pollAutoCommit(kafkaConsumer); //pollCommitSync(kafkaConsumer); //pollCommitAsync(kafkaConsumer); pollNoCommit(kafkaConsumer); } private static void pollNoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception != null) { logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage()); } } }); } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("##### commit sync before closing"); kafkaConsumer.commitSync(); logger.info("finally consumer is closing"); kafkaConsumer.close(); } } private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { if(consumerRecords.count() > 0 ) { kafkaConsumer.commitSync(); logger.info("commit sync has been called"); } } catch(CommitFailedException e) { logger.error(e.getMessage()); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }catch(Exception e) { logger.error(e.getMessage()); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) { int loopCnt = 0; try { while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)); logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { logger.info("record key:{}, partition:{}, record offset:{} record value:{}", record.key(), record.partition(), record.offset(), record.value()); } try { logger.info("main thread is sleeping {} ms during while loop", 10000); Thread.sleep(10000); }catch(InterruptedException e) { e.printStackTrace(); } } }catch(WakeupException e) { logger.error("wakeup exception has been called"); }finally { logger.info("finally consumer is closing"); kafkaConsumer.close(); } } } 해당 코드에 문제가 없는 것으로 보입니다. java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2456) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at com.example.ConsumerPartitionAssignSeek.pollNoCommit(ConsumerPartitionAssignSeek.java:63) at com.example.ConsumerPartitionAssignSeek.main(ConsumerPartitionAssignSeek.java:53) 해당하는 에러가 띄는데 이유를 알 수 있을까요? git 코드는 잘돌아가는 것을 확인했습니다. 차이점이 알 수가 없어서 질문드립니다,
- 미해결카프카 완벽 가이드 - 코어편
Confluent Kafka 라이센스
Confluent Kafka의 커뮤니티 버전은아파치 카프카처럼 회사에서도 사용해도 문제없나요?
- 해결됨카프카 완벽 가이드 - 코어편
Kafka 클라이언트 To VM kafka 연결 질문
안녕하세요.강의 잘 보고 있습니다. 제가 회사에서 강의를 보고 있어서 그런데 강의 세팅과 조금 다르게 진행해서 연결에서 막힙니다. 일단 저는, 개인 PC로 IP - 192.168.100.170 인 서버 컴퓨터로 원격 연결을 하고그 안에서 VB로 ubuntu VM을 생성했습니다.VM의 고정 IP는 192.168.88.111로 설정했습니다.이후 편한 환경을 위해 putty같은 프로그램으로 ssh 연결을 했습니다.VM의 Port Forwarding으로ssh는 192.168.100.170:27722 -> 192.168.88.111:22192.168.100.170:29092 -> 192.168.88.111:9092 으로 진행했고 성공했습니다.이후 개인 PC에서 Intelij로 SimpleProducer 실습을 진행하는데, props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.170:29092");로 나름 머리를 써서 작성했습니다. 물론, VM의 server.properties에서 외부 연결을 허용하도록 했습니다만, 정확한지 확신은 없습니다. 이후, 코드를 실행했더니, socket timeout 에러가 나오고 카프카 컨슈머에 들어오지 않았습니다. Log를 자세히 보니 분명히 kafka topicId를 인지하는 걸 보니 연결은 된 것 같은데 뭐가 문제인지 모르겠습니다.Starting Gradle Daemon... Gradle Daemon started in 1 s 324 ms > Task :producers:compileJava UP-TO-DATE > Task :producers:processResources NO-SOURCE > Task :producers:classes UP-TO-DATE > Task :producers:SimpleProducer.main() [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [192.168.100.170:39092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1706742127571 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition test-topic-0 to 0 since the associated topicId changed from null to jRkpHnfwT8mfWJ3PB9HHmg [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: ysNHdh2DQTKvR3X0yruxdg [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to socket connection setup timeout. The timeout value is 9728 ms. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to socket connection setup timeout. The timeout value is 18153 ms. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 0 disconnected. [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/192.168.88.111:9092) could not be established. Broker may not be available. [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0. You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins. For more on this, please refer to https://docs.gradle.org/8.4/userguide/command_line_interface.html#sec:command_line_warnings in the Gradle documentation. BUILD SUCCESSFUL in 2m 5s 2 actionable tasks: 1 executed, 1 up-to-date 오전 8:04:07: Execution finished ':producers:SimpleProducer.main()'. [Producer clientId=producer-1] Resetting the last seen epoch of partition test-topic-0 to 0 since the associated topicId changed from null to jRkpHnfwT8mfWJ3PB9HHmg이 부분을 보아하니 토픽은 인지하는 것 같은데 말이죠..감사합니다.
- 미해결카프카 완벽 가이드 - 코어편
kafka 연결 질문 드립니다.
안녕하세요. mac m1에 utm으로 고정IP(192.168.56.101) 설정후 ssh 접속 및 kafka-console 명령어 잘 됩니다.하지만, java 코드로 실행시 아래 이미지와 같이 접속 이슈가 있어서 문의 드립니다.ubuntu 설정에 이름이 "ubuntu"로 설정하게 문제일까요?