묻고 답해요
161만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
해결됨Backend 멀티쓰레드 이해하고 통찰력 키우기
코틀린으로 해당 C# 예제를 비슷하게 만들어봤는데, 제가 잘못 작성한 걸까요?
<상황>지식 공유자님께서 작성해주신 C# 코드 예제를 코틀린으로 비슷하게 작성해서 시도해보았지만 같은 상황이 재현되지 않습니다. <질문 의도>제가 지식 공유자님의 코드를 잘못 이해하고 작성한 것인지, 아니면 JVM의 의도치 않은 최적화 때문에 의도와 다르게 동작하는 것인지 궁금합니다. <작성한 코드>fun main (args: Array<String>) { Example().startUp() } class Example() { private var shouldStop = false fun startUp() { println("process start") val thread = Thread(Runnable { doWork() }) thread.start() Thread.sleep(1000) shouldStop = true thread.join() println("process end") } // shouldStop에 @Volatile을 붙이지 않으면 무한 루프를 돌 것이라고 생각했으나 // graceful shutdown이 잘 되어버림 private fun doWork() { while (!shouldStop) { println("doWork..") Thread.sleep(1000) } } } 좋은 강의 만들어주셔서 감사합니다!
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
shutdown hooking 이 되지 않는것 같습니다.
강의 내용과 같이 로그가 남지 않는데 왜 그런걸까요?
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
simple consumer 실행 시 무한 로그 찍힘
안녕하세요!강의를 열심히 따라가는 중입니다.simple consumer 개발 시 실행을 하면15:46:20.636 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-group-1, groupId=test-group] Built incremental fetch (sessionId=349253680, epoch=15) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) 15:46:20.636 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-group-1, groupId=test-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-0)) to broker localhost:9092 (id: 0 rack: null) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-group-1, groupId=test-group] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 349253680 with 0 response partition(s), 1 implied partition(s) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-group-1, groupId=test-group] Added READ_UNCOMMITTED fetch request for partition test-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} to node localhost:9092 (id: 0 rack: null) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-group-1, groupId=test-group] Built incremental fetch (sessionId=349253680, epoch=16) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-group-1, groupId=test-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-0)) to broker localhost:9092 (id: 0 rack: null)로그가 이런식으로 무한대로 찍히고스프링부트의 카프카 라이브러리를 통해 확인을 하면이렇게 상태가 empty라고 뜨며디버깅을 해보면 레코드에 데이터가 들어오지 않는 것을 확인 할 수 있습니다. 혹시 어느부분을 설정을 해주지않아서 그럴까요..? kafka-console-producer 스크립트를 통해서 메시지를 생성하면 읽을수 있긴 합니다..16:00:43.464 [main] INFO wendy.consumer.simpleconsumer.SimpleConsumer - record: ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1703660442453, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = is it work??)
-
미해결카프카 완벽 가이드 - 코어편
consumer group 과 topic의 관계가 궁급합니다.
안녕하세요, 강의를 듣다 궁금증이 생겨 질문드립니다. Topic T1, T2 가 존재하고, Consumer group G1이 존재한다 할 때..A Consumer가 G1 group에 속하면서 T1 topic을 subscribe하고, B Consumer가 G1 group에 속하면서 T2 topic을 subscribe하도록 코드를 짰는데도 잘 동작하더라구요. Consumer group은 topic과 1 : N 관계가 될 수 있는 것인가요?
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
로드 밸런싱 관련 질문
안녕하세요! 강사님 로드 밸런싱 & 서버 관련 질문드립니다.Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)현재 저는 섹션 2 API Gateway Service 까지 강의를 진행한 상태입니다. 현업에서는 해당 강의에서 나와있는 것처럼 어플리케이션 단에서의 로드 밸런싱 구현을 많이 하나요?아니라면 현업에서는 어떠한 방법으로 자주 사용하여 구현을 하나요? 어떤 상황에서 어플리케이션 단에서의 로드 밸런싱 구현을 하나요?트래픽 분산을 목적으로 로드 밸런싱을 사용하기 위해 여러 개의 어플리케이션 인스턴스들을 서버에 올린다고 하였을 때 현업에서는 어떻게 관리하나요? AWS EC2 기준으로 설명해주시면 좋을 것 같습니다! 읽어주셔서 감사합니다(__)
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
docker network ip 대역 질문
docker network 지정 시 해당 network내에서 할당되는 ip 대역에 서비스 ip가 순차적으로 할당되는 것 같은데요, kafka같은 경우는 그 대역에서 직접 ip하나를 지정해서 사용하고 서비스에서도 그 kafka ip를 하드코딩으로 직접 지정해두는데, 환경에 따라 network ip 대역이 변경될 일은 없는건가요?docker container 네트워크 스펙인지 궁금합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
kafka docker 사용 질문
질문이 있습니다. Local에 kafka를 직접 설치한 경우 kafka connect를 위해 jdbc connector 경로 지정 등 작업이 필요했던 것 같은데 docker compose로 구동시킬 때는 이러한 작업에 대한 지정이 필요 없는건가요??
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
컨슈머 랙 - 처리량 이슈 부분에서 질문이 있습니다.
안녕하세요, 먼저 좋은 강의 잘 듣고 있습니다 (- -) (_ _) 컨슈머 랙 모니터링 - 처리량 이슈 파트에서 궁금한 점이 생겨서 질문드립니다. 파티션 1개 - 컨슈머 1개에서 파티션 2개 - 컨슈머 2개가 되면 linear하게 처리량이 늘어난다고 하셨는데, 어떻게 처리량이 늘어나는 건지 조금 이해가 안가서요. 상단과 같은 구조는 하나의 컨슈머가 0,1,2,3... 이렇게 하나씩 처리를 한다고 하면, 하단과 같은 구조는 다음과 같이 레코드를 동시에 소비하게 되어 2배로 늘어난다는 말씀일까요?컨슈머 1 - 0 : 2 : 4 : 6 : ...컨슈머 2 - 1 : 3 : 5 : 7 : ...( : 는 레코드가 소비되는 시점을 구분한 것 입니다!) 만약 그렇다면, offset은 어떻게 관리가 되는건지도 궁금하네요..
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
혹시 이 강의도 업데이트 예정에 있을까요?
로드맵 따라서 공부하고 있는데, 첫번째 강의처럼 이번 강의도 스프링 부트 3.x대로 업데이트 예정이 있는지 궁금합니다! 언제쯤 업데이트가 될지 알고싶어요..!
-
미해결카프카 완벽 가이드 - 코어편
key를 갖는 메시지 전송 시 궁금한 사항이 있습니다.
강사님, 안녕하세요! 강의 잘 듣고 있습니다.강의 들으면서 궁금한 사항이 생겨서 질문드려요.key를 지정해서 메시지를 전송하면 같은 key에 한해서는 같은 파티션으로 메시지가 전송되는 것을 보장 받을 수 있고, 해당 key로 전송된 메세지는 순서를 보장받을 수 있다. 까지는 이해했는데요~운영하면서 파티션의 개수가 늘어날 경우, 늘어난 직후에도 같은 key로 전송되는 메세지는 순서를 보장받을 수 있는 것일까요?(즉, 파티션 개수가 늘어나기 이전에 key:a, partition:0으로 전송되었다면 추후, 파티션 개수가 늘어나 리밸런싱이 일어나더라도 key:a 메세지는 partition:0으로 전송되는 것일까요?)
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
consumer가 작동하지 않습니다
consumer를 실행시키면WARN [Consumer clientId=console-consumer, groupId=console-consumer-83590] Error while fetching metadata with correlation id 2 : {my_topic_users=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)이게 뜨고 이후에 데이터베이스에 데이터를 넣어도 아무것도 뜨지 않습니다. http://localhost:8083/connectors/my-source-connect/status 실행시키면 이렇게 나옵니다.{ "name": "my-source-connect", "connector": { "state": "FAILED", "worker_id": "", "trace": "org.apache.kafka.connect.errors.ConnectException: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=localhost)(port=3307)(type=master) : GSS-API authentication exception\r\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:59)\r\n\tat io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:94)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)\r\n\tat org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)\r\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\r\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\r\n\tat java.base/java.lang.Thread.run(Thread.java:842)\r\nCaused by: java.sql.SQLInvalidAuthorizationSpecException: Could not connect to address=(host=localhost)(port=3307)(type=master) : GSS-API authentication exception\r\n\tat org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.createException(ExceptionFactory.java:66)\r\n\tat org.mariadb.jdbc.internal.util.exceptions.ExceptionFactory.create(ExceptionFactory.java:192)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1392)\r\n\tat org.mariadb.jdbc.internal.util.Utils.retrieveProxy(Utils.java:635)\r\n\tat org.mariadb.jdbc.MariaDbConnection.newConnection(MariaDbConnection.java:150)\r\n\tat org.mariadb.jdbc.Driver.connect(Driver.java:89)\r\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:681)\r\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:190)\r\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250)\r\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)\r\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)\r\n\t... 12 more\r\nCaused by: java.sql.SQLException: GSS-API authentication exception\r\n\tat org.mariadb.jdbc.internal.com.send.authentication.gssapi.StandardGssapiAuthentication.authenticate(StandardGssapiAuthentication.java:169)\r\n\tat org.mariadb.jdbc.internal.com.send.authentication.SendGssApiAuthPacket.process(SendGssApiAuthPacket.java:133)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.authenticationHandler(AbstractConnectProtocol.java:752)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.createConnection(AbstractConnectProtocol.java:553)\r\n\tat org.mariadb.jdbc.internal.protocol.AbstractConnectProtocol.connectWithoutProxy(AbstractConnectProtocol.java:1387)\r\n\t... 20 more\r\nCaused by: javax.security.auth.login.LoginException: No LoginModules configured for Krb5ConnectorContext\r\n\tat java.base/javax.security.auth.login.LoginContext.init(LoginContext.java:269)\r\n\tat java.base/javax.security.auth.login.LoginContext.<init>(LoginContext.java:357)\r\n\tat org.mariadb.jdbc.internal.com.send.authentication.gssapi.StandardGssapiAuthentication.authenticate(StandardGssapiAuthentication.java:117)\r\n\t... 24 more\r\n" }, "tasks": [], "type": "source" } 강사님이 올려주신 카프카 파일을 사용했어요. 당연하겠지만 이후 sink나 DB연동 강의 따라해도 전혀 작동하지 않습니다..
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 스트림즈에 대해서 질문이 있습니다.
KTable이 로컬 기반의 rocksDB에 기록된 데이터를 가져다 쓴다는 설명을 듣고 궁금증이 생깁니다.카프카 스트림을 docker기반 컨테이너로 배포한다 치면 그때마다 rocksDB는 초기화 될 것으로 생각되는데요..그렇다면 조인시에 데이터가 없어서 의도한 동작이 이루어지지 않을 수 있겠다는 생각이 드는데 맞는지요?마찬가지로 windowing 프로세스도 집계중에 컨테이너가 갈아쳐지면 집계중이던 데이터는 소실될 수 있어보이는데 맞는지 모르겠습니다.실무에서는 보통 카프카 스트림즈를 어떻게 쓰는지가 궁금합니다. rocksDB를 따로 두어서 모든 스트림이 공유하도록 설정하거나 컨테이너 배포는 사용하지 않는지.. 아니면 EC2같은곳에 배포해서 로컬 DB데이터를 훼손시키지 않도록 하는지 같은것이요.감사합니다.
-
해결됨Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
섹션 2.API Gateway Service 중 Spring Cloud Gateway-프로젝트 생성 부분 중 수행이 안됩니다.
강의와 똑같이 수행했다고 생각은 하는데, http://localhost:8081/first-service/welcome과 localhost:8082/second-service/welcome 를 개별적으로 호출했을 떄는 문제 없이 호출됩니다.그런데 http://localhost:8000/first-service/welcome을 호출하면 404로 문구가 나오네요.강의를 계속 봤는데 다른 부분 찾기가 어렵네요. 어느 부분에서 오류가 났는지 확인이 가능할까요? firstController.java import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/first-service") public class FirstServiceController { /* RequestMapping과 GetMapping이 조합이 되어서 호출됨 */ @GetMapping("/welcome") public String welcome(){ return "Welcome to the First service"; } } secondController.java@RestController @RequestMapping("/second-service") public class SecondServiceController { /* RequestMapping과 GetMapping이 조합이 되어서 호출됨 */ @GetMapping("/welcome") public String welcome(){ return "Welcome to the Second service"; } } apigateway-service 프로젝트의 pom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>apigateway-service</artifactId> <version>0.0.1-SNAPSHOT</version> <name>apigateway-service</name> <description>Demo project for Spring Boot</description> <properties> <java.version>17</java.version> <spring-cloud.version>2023.0.0</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway-mvc</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project> apigateway-service 프로젝트의 application.ymlserver: port: 8000 eureka: client: register-with-eureka: false fetch-registry: false service-url: defaultZone : http://localhost:8761/eureka Spring: application: name: apigateway-service cloud: gateway: routes: - id: first-service url: http://localhost:8081/ predicates: - Path=/first-service/** - id: second-service url: http://localhost:8082/ predicates: - Path=/second-service/**
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
KStream, KTable 코파티셔닝 질문이 있습니다.
만약 KStream, KTable 파티션 개수가 2개이고, 파티셔너 전략도 동일합니다. 근데 데이터 발생양이 증가하여, 파티션 개수를 둘다 5개를 늘려야 하는 상황이 생겼습니다.이때는 어떻게 해야하나요? 하나 씩 파티션을 증가할 때, 파티션 개수가 다르면 TopologyException이 발생할텐데요.또, 파티션이 추가되면 파티션 1번으로 가던 메시지 키가 다시 1번으로 간다는 보장도 없고요..2개의 토픽을 리파티셔닝 작업을 해야하는걸까요?리파티셔닝 작업을 하는 동안은 스트림즈의 다운 타임이 발생할 수 있는거고요..?
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
### 강사님 꼭 좀 답변 부탁드립니다. ###
몇주전에 MSA 후속 강의에 대해 문의드렸습니다. 이벤트 소싱, 보상 트랜잭션 등 후속 강의 계획이 있으신건가요?계획이 있고, 로드맵을 알려주시면 기다릴텐데.. 아무 소식이 없이 계속 미루어지고 있습니다. 저와 같이 해당 강의를 이미 들으셨던 분, 새로 들으시는 분들 모두가 궁금해하고 듣고싶은 강의라고 생각됩니다.후속 강의가 없다면, 해당 강의의 내용만으로는 사실 실무에서는 사용하기 어렵다고 생각됩니다.axon framework 를 별도로 공부해서 어떻게 할지 감은 오지만, 그래도 강사님이 실무에서 사용했던 방법을 공유받고 싶은 마음입니다. 꼭 좀 계획을 공유부탁드립니다.혹시라도 후속 강의에 대한 계획이 없는거여도 말씀 좀 부탁드립니다.
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
프로듀서, 컨슈머 애플리케이션, 그 외 몇가지 궁금한게 있습니다.
안녕하세요.강사님의 강의를 듣고, 이제는 강사님 책을 보고 있습니다. 실제 상용에서 애플리케이션을 개발할 때 궁금한 점이 있습니다. 첫번째로 프로듀서 애플리케이션 입니다.강사님의 책에서 봤듯이, 스프링을 사용합니다.예를 들어 저는 사용자 서비스에서 디비 트랜잭션(스프링에서 @Transactionl)을 사용하여 사용자 디비에 저장을 성공 후 프로듀서를 사용하여 레코드를 브로커에 보내야 한다고 생각합니다.밑에 코드를 간략하게 작성하였는데, 저렇게 되면 디비는 롤백됬지만 토픽에 레코드가 저장된 상태가 될 수 있다고 생각합니다.혹시 실무에서 커밋이 성공 후에 프로듀서 레코드를 전송하는 방법을 어떻게 하는지 간략한 코드가 궁금합니다.개인적인 생각은 UserService 클래스 상위 클래스로 카프카 프로듀서 처리하는 클래스를 만들어서 처리를 하는건지.. 궁금합니다.(이렇게 되면 카프카를 위한 래퍼 클래스가 항상 생기게 되는 불편함이 있는 것 같기도 하고요.. // 제가 생각하기에 잘못된 방법? // 만약 레코드 전송 후 어떤 이유로 에러가 발생하면, 디비에 저장된 데이터를 롤백되지만 // 프로듀서 레코드는 브로커 전송이 되버림 public class UserService { @Transactionl public void save(){ ... 프로듀서 레코드 전송 코드 ... 예외 발생 } } ----------------------- // 개인적인 생각 public class KafkaServce { public void save(){ userService.save(); 프로듀서 레코드 전송 코드 } } public class UserService { @Transactionl public void save(){ ... } } 두번째로 컨슈머 애플리케이션 개발 시 궁금한 점이 있습니다.컨슈머에서 데이터 처리를하다가 어떤 이유로 에러가 발생하여 해당 레코드 처리를 계속 실패했다고 가정합니다.그럴 경우 이 레코드의 대한 커밋 처리를 어떻게 해야할지가 궁금합니다.커밋을 처리하지 않으면, 다음 레코드 처리를 하지 못하는거라 생각되는데.. 어떤 방법으로 풀어내는지가 궁금합니다. 세번째로 컨슈머 애플리케이션에서 데이터베이스의 데이터를 저장해야하는 상황이다.스프링을 사용하는 경우 기본적으로 히카리 커넥션 풀에 커넥션 10개를 생성합니다.만약 파티션이 10개여서, 컨슈머를 10개 실행해야 한다면, 스프링 커넥션 풀을 사용하면 100개의 커넥션이 연결됩니다.50개면 500개의 풀 계속 증가할 듯 싶네요.이 경우 어떻게 해야할까요?컨슈머에서 레코드들을 for 문으로 돌리기 때문에 커넥션 풀을 1개를 사용해서 개발하는게 맞는건지?아니면 스프링에서 제공하는 히카리 커넥션 풀을 사용하지 않고, 직접 커넥션 풀을 구현하든가, 그것도 아니면 커넥션 풀을 사용하지 않고 1개의 트랜잭션당 1개의 커넥션을 생성 후 해제를 해야할까요?강사님의 생각이 궁금합니다. 마지막으로 궁금한게 있습니다.혹시 카프카를 활용하여 MSA에서 보상 트랜잭션(사가 패턴 - Orchestration) 처리를 할 수 있는건지 궁금합니다.
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
kafka sink 설정 시 테이블 생성이 안됩니다
{ "name": "my-sink-connect", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mariadb://localhost:3306/mydb", "connection.user": "root", "connection.password": "test1234", "mode": "incrementing", "incrementing.column.name": "id", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "false", "tasks.max": "1", "topic": "my_topic_users", "table.whitelist": "mydb.users" } }confluent-community-connect-7.5.0-zOS confluentinc-kafka-connect-jdbc-10.7.4이렇게 사용중이고 모드를 빼면 에러가 발생하네요커넨터 로그엔 에러가 없어요 [2023-12-14 00:57:25,395] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = io.confluent.connect.jdbc.JdbcSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none exactly.once.support = requested header.converter = null key.converter = null name = my-sink-connect offsets.storage.topic = null predicates = [] tasks.max = 1 topic.creation.groups = [] transaction.boundary = poll transaction.boundary.interval.ms = null transforms = [] value.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig:369)[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)[2023-12-14 00:57:25,396] INFO [my-sink-connect|task-0] Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask:138)[2023-12-14 00:57:25,396] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = io.confluent.connect.jdbc.JdbcSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none exactly.once.support = requested header.converter = null key.converter = null name = my-sink-connect offsets.storage.topic = null predicates = [] tasks.max = 1 topic.creation.groups = [] transaction.boundary = poll transaction.boundary.interval.ms = null transforms = [] value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:369)[2023-12-14 00:57:25,397] INFO [my-sink-connect|task-0] [Producer clientId=connector-producer-my-sink-connect-0] Cluster ID: 61ETmEcJQASp3yeJGdTmPw (org.apache.kafka.clients.Metadata:287)[2023-12-14 00:57:25,413] INFO [my-sink-connect|task-0] Found offset {{table=users}=null, {protocol=1, table=mydb.users}={incrementing=17}} for partition {protocol=1, table=mydb.users} (io.confluent.connect.jdbc.source.JdbcSourceTask:234)[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:307)[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] WorkerSourceTask{id=my-sink-connect-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)[2023-12-14 00:57:25,414] INFO [my-sink-connect|task-0] Begin using SQL query: SELECT * FROM mydb.`users` WHERE mydb.`users`.`id` > ? ORDER BY mydb.`users`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
-
미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
한 서버에서 producer와 consumer를 같이 구축해도 될까요?
안녕하세요. 한 서버에서 producer와 consumer를 같이 구축하게 된다면, 생길 수 있는 문제가 있을까요?혹시 현업에서 producer와 consumer를 같이 구축하는 경우가 많은지 아니면 보통 분리해서 사용하는지 궁금합니다.만약 consumer 서버를 스케일아웃해서 3대로 운영한다면, consumer 서버에서 구독하는 모든 토픽들의 컨슈머 수가 3배가 되는게 맞을까요?예를 들어 A, B, C 토픽을 "가consumer"서버에서 구독하고 있을때 가consumer"서버가 3대가 된다면 토픽 A <- 컨슈머 3개 / 토픽 B <- 컨슈머 3개 / 토픽 C <- 컨슈머 3개가 되는것인지, 따로 설정해서 토픽마다 컨슈머의 수를 다르게 가져갈 수 있는 것인지 궁금합니다. 강의 잘 듣고 있습니다!감사합니다 :)
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
source-connect 오라클 적용 오류
mariadb가 아닌 oracle로 진행을 해보고 있는데 mariadb의 auto_increment 대신 oracle로 seq를 만들어 자동 증가하게 했습니다. 이렇게 적용을 하니 connector 에서 이런 오류를 주는데 오라클로 할 때는 설정이 많이 다른지 질문드립니다!!! ERROR [my-oracle-connect|task-0] WorkerSourceTask{id=my-oracle-connect-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196) org.apache.kafka.connect.errors.ConnectException: Scale of Decimal value for incrementing column must be 0 at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractDecimalId(TimestampIncrementingCriteria.java:283) at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetIncrementedId(TimestampIncrementingCriteria.java:268) at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:208) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:230) at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:418) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) [2023-12-06 19:56:27,342] INFO [my-oracle-connect|task-0] Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:354) [2023-12-06 19:56:27,342] INFO [my-oracle-connect|task-0] Closing resources for JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:366) [2023-12-06 19:56:27,342] INFO [my-oracle-connect|task-0] [Producer clientId=connector-producer-my-oracle-connect-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1297) [2023-12-06 19:56:27,344] INFO [my-oracle-connect|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693) [2023-12-06 19:56:27,344] INFO [my-oracle-connect|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:697) [2023-12-06 19:56:27,344] INFO [my-oracle-connect|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703) [2023-12-06 19:56:27,344] INFO [my-oracle-connect|task-0] App info kafka.producer for connector-producer-my-oracle-connect-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83) { "name": "my-oracle-connect", "config" : { "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url":"jdbc:oracle:thin:@localhost:1521:orcl", "connection.user":"test", "connection.password":"test", "mode": "incrementing", "incrementing.column.name":"ID", "table.whitelist":"USERS", "topic.prefix" : "my_oracle_", "tasks.max" : "1" } }
-
미해결Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)
강의록 파일 열기 오류
안녕하세요? 수강신청한 후에 강의록을 다운로드했는데 PDF 파일이 열리지 않습니다.혹시 DRM이 걸려 있거나 다른 이슈가 있는 건 아닐지요?아니면 유사한 문의가 있었는지 확인해 주시면 감사하겠습니다.