묻고 답해요
156만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
Kafka Consumer GroupID
안녕하세요!제가 강의를 다 듣고 난 뒤 추가로 공부를 해볼 생각으로catalog-service 에서 example-catalog-topic 를 받은 뒤상품의 개수를 파악 후 재고 유무의 따라 order-service 에주문 상태를 전달 해주는 example-order-topic 를 만들어 구독 중인 order-service 에서example-order-topic 에서 전달 받은 주문 상태를 업데이트 하는 로직을 구현하려고 했습니다.로컬에서 order-service 와 catalog-service 의 consumer groupId 를 consumerGroupId 로 통일해서 사용 했을 때는 괜찮았으나, docker 에 올리니 example-order-topic 이 생성은 됐으나 구독 및 전달이 되지 않더라구요.... 혹시나 해서 order-service 의 consumer groupId 를 consumerGroupId2 로 변경하여 docker 에 올리니 정상 작동이 됩니다. 혹시 각각의 service 는 groupId 를 다르게 해주어야 할까요??
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
서버에 카프카 컨슈머를 계속올려놓을경우 종료를 해야하나요?
안녕하세요! 제가 카프카 컨슈머 애플리케이션을 개발해서aws 서버에 올려놓고 계속 컨슈밍을 할 예정입니다.강의에서 컨슈머의 안전한 종료 파트를 듣고 질문이 생겨 글을 남깁니다.이렇게 서버에 계속 올려놓고 컨슈밍을 받아야할때 컨슈머의 종료가 필요한가요?만약 필요하다면 종료된 컨슈머는 어떻게 재실행이 되는 건지 여쭤볼 수 있을까요? 일단 저는 스프링기반 카프카 컨슈머를 개발하였고 Runnable 인터페이스로 run() 메서드로 서버가 실행되면 바로 컨슈머가 실행되게 개발해놓은 상태입니다. 저의 코드입니다.@PostConstruct public void startConsuming() { Thread consumerThread = new Thread(new Consumer()); consumerThread.start(); } private class Consumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(Consumer.class); private final static String TOPIC_NAME = ""; private final static String BOOTSTRAP_SERVERS = ""; private final static String GROUP_ID = ""; public void run() { try { Properties configs = new Properties(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 구독 중인 주제 파티션에서 사용 가능한 가장 빠른 오프셋부터 읽기 configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000"); configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { logger.info("record: {}", record); // 받은 메세지 String value = record.value(); // commit the offset consumer.commitSync(Collections.singletonMap( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1))); } } } catch (Exception e) { logger.error("Error occurred while consuming messages", e); } } }
-
해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
질문있어요!
mySQL과 카프카 연결로 강의해주셨는데요아직 일부만 수강한 상태입니다.debezium이 차후에는 싱크 커넥터들을 데이터베이스 상관없이 지원할거같기도해서mySQL말고도 강의를 참고해서오라클 SQL, MSSQL에도 카프카를 연결하고 싶습니다.이때 접근방향이나 차이점등이 궁금한데 혹시 경험해보신적있다면 조언받고 싶습니다. 감사합니다
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 이해
제가 카프카에 대한 구조와 사용법을 이해하지 못해서 학습 재미가 떨어지는데요.카프카를 이렇게 써야 하는지를 맞는지가 궁금합니다. 많은 영상을 봤지만서도 사용법이 잘 이해가 안되서 질문드립니다
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
connector update 방법
안녕하세요 전에도 질문을 올렸었는데 답변 주신 대로 했더니 잘 해결 되었습니다.감사합니다.전에 올린 질문 중 답변해주신 내용이 oracle connector는 하나의 source connector만 생성하면 된다고 하셨는데요.하나의 source connector로 구성 후 테이블을 추가하려고 하는데 추가할 경우에는connector를 다시 생성해야 할까요?아래의 명령어로 update 후 재기동을 해봤으나 추가한 테이블에 대한 topic이 생성 되지 않습니다.http PUT http://localhost:8083/connectors/ora_source_connector/config @ora_source_connector_test.json http POST http://localhost:8083/connectors/ora_source_connector/restart json 내용{"connector.class" : "io.debezium.connector.oracle.OracleConnector","db_type":"oracle","tasks.max" : "1","database.server.name" : "source_connector_01","database.user" : "xx","database.password" : "xx","database.url": "jdbc:oracle:thin:@xx","database.dbname" : "xx","database.out.server.name":"ora_source_out","schema.history.internal.kafka.bootstrap.servers" : "xx:9092","schema.history.internal.kafka.topic": "ora_source_history","schema.include.list": "xx","include.schema.changes": "true","database.connection.adapter": "logminer","topic.prefix": "ORA_SOURCE","schema.include.list": "xx","table.include.list":"xx.AF_CLAIM , xx.AF_CS_MGMT , xx.AF_BRAND","include.schema.changes": "true","auto.evolve": "true","time.precision.mode": "connect","key.converter": "io.confluent.connect.avro.AvroConverter","value.converter": "io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url": "http://localhost:8081","value.converter.schema.registry.url": "http://localhost:8081","snapshot.mode" : "initial","tombstones.on.delete": "true","transforms": "rename_topic","transforms.rename_topic.type" : "org.apache.kafka.connect.transforms.RegexRouter","transforms.rename_topic.regex" : "ORA_SOURCE(.*)","transforms.rename_topic.replacement" : "source_$1","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false"}
-
미해결15일간의 빅데이터 파일럿 프로젝트
가상서버 설치 및 시작 에러 문의
안녕하세요, 빅디님 ! 좋은강의 감사합니다.가상서버 설치 + 설정에서 문제가 있어 질문남깁니다.macOS에서 M1칩을 탑재한 모델을 사용중인데, 최신버전의 virtualbox만 설치가 됩니다.(이하 버전은 설치조차 되지 않습니다..ㅠ)설치는 되었으나 청천벽력같이.... 최신버전의 버츄얼박스에서는 server01/02가 시작이 되질 않습니다.네트워크 구성 에러로 보입니다. [에러 전문]===Host-only adapters are no longer supported!For your convenience a host-only network named 'Legacy Network' has been created with network mask '255.255.255.0' and IP address range '192.168.56.1' - '192.168.56.254'.===이러한 경우에 유일한 해결방법은 최신버전의 VM을 설치하고 네트워크 구성을 하지 않아야 시작이 되는 것으로 파악했습니다. 서버에 네트워크가 없다면 의미가 없어보입니다..이 수업에서는 네트워크 구성이 필수일까요?혹은 다른 방법(가상화 서버 등) 아신다면 말씀 부탁드리겠습니다!!감사합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
bootstrap.yml 파일의 "encrypt.key-store.password" 암호화
spring config를 적용하면 설정 값들에 암호화를 수행하여 "{cipher}..." 구문으로 적용할 수 있는데, bootstrap.yml 파일의 "encrypt.key-store.password", "encrypt.key-store.secret" 항목의 값은 암호화를 어떻게 할 수 있는지 방법 부탁드립니다.
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
404 페이지
안녕하세요 실습중 동작이 되지 않아서 문의드립니다.zuul server .yml 파일 및 gradle 파일server: port: 8000 spring: application: name: my-zuul-service zuul: routes: first-service: path: /first-service/** url: http://localhost:8081 second-service: path: /second-service/** url: http://localhost:8082 plugins { id 'java' id 'org.springframework.boot' version '2.3.9.RELEASE' id 'io.spring.dependency-management' version '1.0.11.RELEASE' } group = 'com.msa' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.cloud:spring-cloud-starter-netflix-zuul:2.2.3.RELEASE' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' } tasks.named('test') { useJUnitPlatform() } localhost:8081/welcome 요청시 정상적으로 오지만localhost:8000/first-service/welcome 요청시 404 에러떠러지고 있는데 이유를 모르겠어서 문의드립니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
kafka streams 데이터 보관 주기 질문
안녕하세요카프카 스트림즈 학습중에 궁금한 점이 있어 질문드립니다.예를 들어 말씀드리겠습니다kstream 으로 변경되는 상품의 가격 정보를 받아온다고 가정하고, ktable 로 해당 상품의 마스터성 정보를 받는다고 해보겠습니다.카프카 스트림즈를 써도 결국엔 카프카 브로커에 로그로 쌓이는 것이고, 로그 압축이나 삭제 정책에 따라 A라는 상품의 정보가 있다가 사라질 수 있을 것 같은데요4월 1일에는 A상품의 정보(상품이름, 판매처 등)가 로그에 있어서 데이터를 읽어올수 있었지만 4월4일에는 기간이 지나 삭제되었다거나 하는 경우가 있을 것 같아서요카프카 스트림즈를 띄우는 서버의 메모리나 로컬 디스크에 모든 내용을 가지고 있는 건지가 궁금하구요만약 해당 서버에 데이터가 있다면 서버가 다운되거나 했을때 리밸런싱 혹은 서버가 재시작 되었을때 사라진 로컬데이터의 복구 기능이 구현되어있는 건지 궁금합니다또한 로컬에 저장된다면 그동안 스트림을 거쳐간 모든 데이터를 가지고 있는건지.. 데이터가 대용량이 된다면 로컬 머신의 저장공간을 고려해야하는지 궁금합니다 그리고.. 만약 카프카 스트림엔 데이터가 없고 rdb등의 저장소에 데이터가 있다면 이걸 가져와서 쓸수 있는지도 궁금합니다
-
미해결15일간의 빅데이터 파일럿 프로젝트
가상서버 이미지 다운 에러
가상서버 이미지 다운시 전체 용량을 다운로드 못받네요.확인 후 다시 링크 주시면 감사하겠습니다.
-
미해결카프카 완벽 가이드 - 코어편
kafka에서 elastic search에 저장
안녕하세요.강의 잘 수강중입니다. 다름 아니라, kafka에서 elastic search로 저장하는 일을 조만간 할 예정입니다.본 과정을 다 수강하고 나면 그런 응용력이 생길지요?아니면 connect 편 까지 다 수강해야 elastic search에 저장하는 응용력을 키울 수 있을지 문의드립니다. 감사합니다.
-
미해결15일간의 빅데이터 파일럿 프로젝트
서버 메모리 문제
안녕하세요~선생님,강의를 잘 해 주셔서 현재 RStudio를 진행하고 있는데 클라우라가 작동 되지 않습니다.강의 후반에 오면서 메모리 문제가 발생했는데 어떻게 조치해야 합니까?이전에 질문 조회를 해서 불필요한 파일 삭제하고,hadoop fs -setrep 1 /명령도 실행해 보았습니다.클라우데라 현상태는 아래와 같습니다,, 제 PC사양은 RAM 32GB이고 아래와 같습니다
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
sink로 등록된 topic의 값이 db에 반영되지 않습니다.
sink도 잘 등록되었고, orders topic에 json 값도 잘 전달되지만 전달된 값이 디비에는 반영되지 않아 데이터 삽입이 되지 않습니다. ㅠㅠ무엇이 문제일까요?
-
미해결카프카 완벽 가이드 - 커넥트(Connect) 편
mysql source connector 문의
안녕하세요. 강의를 들으며 테스트 해보던 도중에 질문이 있습니다.mysql -> elasticsearch 로 데이터 마이그레이션에 connector 를 활용하려고 테스트를 하고 있습니다. 그래서 DB 에 있는 여러 테이블 중 필요한 일부 테이블만 커넥터 생성 시 등록하려고 합니다. 하지만, 이후에 같은 DB 내에 다른 테이블도 필요해지는 상황이 올 경우가 생길텐데 이런 경우에 어떻게 대응하는 것이 좋은 방법인지 궁금합니다.이런 요구사항들이 생길 때마다 커넥터를 하나씩 더 등록-> 같은 DB 를 바라보는 커넥터가 여러개이면 비효율적이지 않을까 싶었습니다.이런 상황을 대비해서 커넥터 등록시 DB 내의 모든 테이블을 등록-> DB 내에 꽤 많은 테이블이 있어 카프카의 스토리지 및 비용 등의 문제가 있지 않을까 싶었습니다.위의 방법 정도로 생각이 드는데, 조언 부탁드립니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
filter에서 response body를 수정해서 client에 리턴하고 싶은데요.
response.getBody() 뭐 이런식의 메서드가 없어서요...getHeader()는 있는데.... body의 특정 문자열을 바꿔서 리턴하고자 해서 문의 해봅니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
docker 생성 후 gateway userservice 연결 시 에러
게이트 웨이를 도커에만 올리면 에러가 나옵니다 500 Server Error for HTTP POST "/user-service/login"io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:8889 유레카에서 유저 서비스 클릭 시 : /127.0.0.1:8889요렇게 나오는데 http://localhost:8888/user-service/default에서는{token.expireDate: 20002666,order_service.url: "http://order-service/order-service/%s/orders",order_service.exception.user_empty: "user's ord exist3333",user_service.url: "http://user-service/user",user_service.exception.user_empty: "user_Empty",token.secret: "1234"}gateway.host: "172.18.0.6",token.expireDate: 20002666,이렇게 나옵니다 http://localhost:8000/user-service/default이건 에러가 나구요도커의 네트워크는 추가 했구요"Containers": { "11a12dd45aaa3274226b1b462e1997b50e9b7ed61b405595c0c3b89393d6e036": { "Name": "rabbitmq", "EndpointID": "3425a23e5c021d2bcc307bed1d9b4ac03c17b92d4828f8b633469d8bbe8df8ac", "MacAddress": "02:42:ac:12:00:04", "IPv4Address": "172.18.0.4/16", "IPv6Address": "" }, "28da3474585596f6a3a435e48cfc9084e7d397fafaea751537ac7c82ac81bfd1": { "Name": "user-service1", "EndpointID": "543022aef70070d571f9d193151af692e59c5bc752feb15b5a0d75d23338ff34", "MacAddress": "02:42:ac:12:00:03", "IPv4Address": "172.18.0.3/16", }, "47dbfe92f425d933ca6e72018eb11b95f0cba48434be9b28f571570787db0d02": { "Name": "gateway-service", "EndpointID": "f3cfa36a9b73a2a925d62f0d8045d51dd8bdb4afe3b2926cae6a4997bac92114", "MacAddress": "02:42:ac:12:00:06", "IPv4Address": "172.18.0.6/16", "IPv6Address": "" }, "b4910d66fee36364691710a189c79f47c2a7fac7675166bbb102b6dfba83ef80": { "Name": "eureka", "EndpointID": "23d981be13ce1a1e7f1f4b85ea61553a13d73dfcf5af6039e6befa68d0cea497", "MacAddress": "02:42:ac:12:00:02", "IPv4Address": "172.18.0.2/16", "IPv6Address": "" }, "e5326aa441bdcf8133c66384251212b14d1d0a452291a462a667de21e3f23c6c": { "Name": "config", "EndpointID": "5f8de35288b1d51927f840e050203671ff0c880b0b8f39c2dd614055c49306bc", "MacAddress": "02:42:ac:12:00:05", "IPv4Address": "172.18.0.5/16", "IPv6Address": "" } 어디가 문제일까요?
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
JWT 토큰 유효성 검사
안녕하세요! 좋은 강의 제공해주셔서 감사합니다. 다름이 아니라, 수업에서 JWT 토큰 유효성 로직을 api gateway service에 구현했는데, user-service가 아닌 gateway service에 구현한 이유가 있을까요? api gateway에 토큰 유효성 로직을 구현하게 되면, 만료된 토큰으로 api gateway를 거치지 않고 바로 user-service에 요청하는 경우, 토큰 유효성 검사를 하지 않아서 문제가 발생할 수 있지 않나요?
-
해결됨카프카 완벽 가이드 - 커넥트(Connect) 편
스프링 연결시 오류
안녕하세요 선생님 가상머신 우분투에 올려서 터미널로 프로듀싱한 레코드를스프링에서 컨슈밍해서 보려고하는데 터미널 컨슈머에서는 정상으로 레코드를 받아오는데스프링에서는 브로커에 닿지 않는 것 같은데 혹시 서버 프로퍼티 문제일까요? 어떻게 하면 브로커에 연결이 될까요 ? package com.example; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.Duration; import java.util.*; public class SimpleConsumer { private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class); private final static String TOPIC_NAME = "test"; private final static String BOOTSTRAP_SERVERS = "localhost:9092"; private final static String GROUP_ID = "test-group"; public static void main(String[] args) { Properties configs = new Properties(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { logger.info("record:{}", record); String message = record.value(); System.out.println(message); } } } 구동시 에러[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test-group group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1679530659001 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-group-1, groupId=test-group] Subscribed to topic(s): test [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected server properties -----------------------------------------------------++ 로컬 터미널에서는 스프링에 정상적으로 연동이 되는데가상머신에서 띄운 터미널은 로컬터미널과 같은 토픽인데도 컨슈밍을 하지 않습니다(스프링에도 가상머신에 띄운 프로듀서 레코드 전송은 안되는데 로컬 터미널 프로듀서에서 보낸 것은 받음 ) 로컬 가상머신위와 같이 동시에 각각 터미널에 같은 토픽, 부트스트랩서버로 보냈는데 각각 터미널에서만 통신이 되는 것 같습니다 public class SimpleConsumer { private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class); private final static String TOPIC_NAME = "study"; private final static String BOOTSTRAP_SERVERS = "localhost:9092"; private final static String GROUP_ID = "test-group"; public static void main(String[] args) { Properties configs = new Properties(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { logger.info("record:{}", record); String message = record.value(); System.out.println("message : " + message); } } } }[main] INFO com.example.SimpleConsumer - record:ConsumerRecord(topic = study, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1679534086551, serialized key size = 1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = aaa) message : aaa [main] INFO com.example.SimpleConsumer - record:ConsumerRecord(topic = study, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1679534345840, serialized key size = 1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = 555) message : 555 위에 터미널 프로듀서로 보낸 값만 넘어옵니다 이상입니다감사합니다.
-
미해결15일간의 빅데이터 파일럿 프로젝트
너무 좋은 강의 완강하고 싶은데 연장가능할까요?
안녕하세요.빅디님의 책과 강의로 열심히 따라가고 있는데 여러 업무를 병행하다보니 강의의 반도 진행하지 못했습니다. 강의를 들으면 정말 제가 알지 못했던 분야를 너무 쉽게 알아가는 기쁨에 가슴이 뛰는데 업무에 치이면서 살다보니 어느덧 강의 만료일이 다가 왔습니다.혹시 기간 연장을 조금 더 해주실수 있다면 부탁드리겠습니다.이 강의는 꼭 완료하고 싶습니다.부탁드리겠습니다. 감사합니다.
-
미해결카프카 완벽 가이드 - 코어편
특정 메시지까지만 커밋이 가능할까요?
안녕하세요~ 강의 항상 잘듣고있습니다ㅎㅎ배치사이즈를 작게하면 성능이 낮아지고배치사이즈를 크게하면 ack유실시 중복처리가 많아져서배치사이즈를 크게하고ack(commit)을 전체가 아닌 일부분만 할 수 없을까? 생각하게되었어요.예를들어서 1000개 배치로 가져오고10개까지 commit, 100개까지커밋, ....요런것도 가능할려나요?.?