월 33,000원
5개월 할부 시다른 수강생들이 자주 물어보는 질문이 궁금하신가요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
simple consumer 실행 시 무한 로그 찍힘
안녕하세요!강의를 열심히 따라가는 중입니다.simple consumer 개발 시 실행을 하면15:46:20.636 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-group-1, groupId=test-group] Built incremental fetch (sessionId=349253680, epoch=15) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) 15:46:20.636 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-group-1, groupId=test-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-0)) to broker localhost:9092 (id: 0 rack: null) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-group-1, groupId=test-group] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 349253680 with 0 response partition(s), 1 implied partition(s) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-group-1, groupId=test-group] Added READ_UNCOMMITTED fetch request for partition test-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} to node localhost:9092 (id: 0 rack: null) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-group-1, groupId=test-group] Built incremental fetch (sessionId=349253680, epoch=16) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) 15:46:21.150 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-group-1, groupId=test-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-0)) to broker localhost:9092 (id: 0 rack: null)로그가 이런식으로 무한대로 찍히고스프링부트의 카프카 라이브러리를 통해 확인을 하면이렇게 상태가 empty라고 뜨며디버깅을 해보면 레코드에 데이터가 들어오지 않는 것을 확인 할 수 있습니다. 혹시 어느부분을 설정을 해주지않아서 그럴까요..? kafka-console-producer 스크립트를 통해서 메시지를 생성하면 읽을수 있긴 합니다..16:00:43.464 [main] INFO wendy.consumer.simpleconsumer.SimpleConsumer - record: ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1703660442453, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = is it work??)
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
컨슈머 랙 - 처리량 이슈 부분에서 질문이 있습니다.
안녕하세요, 먼저 좋은 강의 잘 듣고 있습니다 (- -) (_ _) 컨슈머 랙 모니터링 - 처리량 이슈 파트에서 궁금한 점이 생겨서 질문드립니다. 파티션 1개 - 컨슈머 1개에서 파티션 2개 - 컨슈머 2개가 되면 linear하게 처리량이 늘어난다고 하셨는데, 어떻게 처리량이 늘어나는 건지 조금 이해가 안가서요. 상단과 같은 구조는 하나의 컨슈머가 0,1,2,3... 이렇게 하나씩 처리를 한다고 하면, 하단과 같은 구조는 다음과 같이 레코드를 동시에 소비하게 되어 2배로 늘어난다는 말씀일까요?컨슈머 1 - 0 : 2 : 4 : 6 : ...컨슈머 2 - 1 : 3 : 5 : 7 : ...( : 는 레코드가 소비되는 시점을 구분한 것 입니다!) 만약 그렇다면, offset은 어떻게 관리가 되는건지도 궁금하네요..
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
카프카 스트림즈에 대해서 질문이 있습니다.
KTable이 로컬 기반의 rocksDB에 기록된 데이터를 가져다 쓴다는 설명을 듣고 궁금증이 생깁니다.카프카 스트림을 docker기반 컨테이너로 배포한다 치면 그때마다 rocksDB는 초기화 될 것으로 생각되는데요..그렇다면 조인시에 데이터가 없어서 의도한 동작이 이루어지지 않을 수 있겠다는 생각이 드는데 맞는지요?마찬가지로 windowing 프로세스도 집계중에 컨테이너가 갈아쳐지면 집계중이던 데이터는 소실될 수 있어보이는데 맞는지 모르겠습니다.실무에서는 보통 카프카 스트림즈를 어떻게 쓰는지가 궁금합니다. rocksDB를 따로 두어서 모든 스트림이 공유하도록 설정하거나 컨테이너 배포는 사용하지 않는지.. 아니면 EC2같은곳에 배포해서 로컬 DB데이터를 훼손시키지 않도록 하는지 같은것이요.감사합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
KStream, KTable 코파티셔닝 질문이 있습니다.
만약 KStream, KTable 파티션 개수가 2개이고, 파티셔너 전략도 동일합니다. 근데 데이터 발생양이 증가하여, 파티션 개수를 둘다 5개를 늘려야 하는 상황이 생겼습니다.이때는 어떻게 해야하나요? 하나 씩 파티션을 증가할 때, 파티션 개수가 다르면 TopologyException이 발생할텐데요.또, 파티션이 추가되면 파티션 1번으로 가던 메시지 키가 다시 1번으로 간다는 보장도 없고요..2개의 토픽을 리파티셔닝 작업을 해야하는걸까요?리파티셔닝 작업을 하는 동안은 스트림즈의 다운 타임이 발생할 수 있는거고요..?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
프로듀서, 컨슈머 애플리케이션, 그 외 몇가지 궁금한게 있습니다.
안녕하세요.강사님의 강의를 듣고, 이제는 강사님 책을 보고 있습니다. 실제 상용에서 애플리케이션을 개발할 때 궁금한 점이 있습니다. 첫번째로 프로듀서 애플리케이션 입니다.강사님의 책에서 봤듯이, 스프링을 사용합니다.예를 들어 저는 사용자 서비스에서 디비 트랜잭션(스프링에서 @Transactionl)을 사용하여 사용자 디비에 저장을 성공 후 프로듀서를 사용하여 레코드를 브로커에 보내야 한다고 생각합니다.밑에 코드를 간략하게 작성하였는데, 저렇게 되면 디비는 롤백됬지만 토픽에 레코드가 저장된 상태가 될 수 있다고 생각합니다.혹시 실무에서 커밋이 성공 후에 프로듀서 레코드를 전송하는 방법을 어떻게 하는지 간략한 코드가 궁금합니다.개인적인 생각은 UserService 클래스 상위 클래스로 카프카 프로듀서 처리하는 클래스를 만들어서 처리를 하는건지.. 궁금합니다.(이렇게 되면 카프카를 위한 래퍼 클래스가 항상 생기게 되는 불편함이 있는 것 같기도 하고요.. // 제가 생각하기에 잘못된 방법? // 만약 레코드 전송 후 어떤 이유로 에러가 발생하면, 디비에 저장된 데이터를 롤백되지만 // 프로듀서 레코드는 브로커 전송이 되버림 public class UserService { @Transactionl public void save(){ ... 프로듀서 레코드 전송 코드 ... 예외 발생 } } ----------------------- // 개인적인 생각 public class KafkaServce { public void save(){ userService.save(); 프로듀서 레코드 전송 코드 } } public class UserService { @Transactionl public void save(){ ... } } 두번째로 컨슈머 애플리케이션 개발 시 궁금한 점이 있습니다.컨슈머에서 데이터 처리를하다가 어떤 이유로 에러가 발생하여 해당 레코드 처리를 계속 실패했다고 가정합니다.그럴 경우 이 레코드의 대한 커밋 처리를 어떻게 해야할지가 궁금합니다.커밋을 처리하지 않으면, 다음 레코드 처리를 하지 못하는거라 생각되는데.. 어떤 방법으로 풀어내는지가 궁금합니다. 세번째로 컨슈머 애플리케이션에서 데이터베이스의 데이터를 저장해야하는 상황이다.스프링을 사용하는 경우 기본적으로 히카리 커넥션 풀에 커넥션 10개를 생성합니다.만약 파티션이 10개여서, 컨슈머를 10개 실행해야 한다면, 스프링 커넥션 풀을 사용하면 100개의 커넥션이 연결됩니다.50개면 500개의 풀 계속 증가할 듯 싶네요.이 경우 어떻게 해야할까요?컨슈머에서 레코드들을 for 문으로 돌리기 때문에 커넥션 풀을 1개를 사용해서 개발하는게 맞는건지?아니면 스프링에서 제공하는 히카리 커넥션 풀을 사용하지 않고, 직접 커넥션 풀을 구현하든가, 그것도 아니면 커넥션 풀을 사용하지 않고 1개의 트랜잭션당 1개의 커넥션을 생성 후 해제를 해야할까요?강사님의 생각이 궁금합니다. 마지막으로 궁금한게 있습니다.혹시 카프카를 활용하여 MSA에서 보상 트랜잭션(사가 패턴 - Orchestration) 처리를 할 수 있는건지 궁금합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
한 서버에서 producer와 consumer를 같이 구축해도 될까요?
안녕하세요. 한 서버에서 producer와 consumer를 같이 구축하게 된다면, 생길 수 있는 문제가 있을까요?혹시 현업에서 producer와 consumer를 같이 구축하는 경우가 많은지 아니면 보통 분리해서 사용하는지 궁금합니다.만약 consumer 서버를 스케일아웃해서 3대로 운영한다면, consumer 서버에서 구독하는 모든 토픽들의 컨슈머 수가 3배가 되는게 맞을까요?예를 들어 A, B, C 토픽을 "가consumer"서버에서 구독하고 있을때 가consumer"서버가 3대가 된다면 토픽 A <- 컨슈머 3개 / 토픽 B <- 컨슈머 3개 / 토픽 C <- 컨슈머 3개가 되는것인지, 따로 설정해서 토픽마다 컨슈머의 수를 다르게 가져갈 수 있는 것인지 궁금합니다. 강의 잘 듣고 있습니다!감사합니다 :)
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
tumbling window 가 close 되는 시점에 로직이 수행되도록 하고 싶습니다.
final KStream<String, MonitoringClass> ks0 = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(STRING_SERDE, MONITORING_CLASS_SERDE)); Duration windowDuration = Duration.ofMinutes(2); TimeWindows tumblingWindow = TimeWindows.of(windowDuration.toMillis()).grace(Duration.ZERO); ks0 .groupByKey() .windowedBy(tumblingWindow) .count() .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream() 안녕하세요, 다음과 같이 suppress() 함수를 사용해서 윈도우가 종료되는 시점에만 특정 로직을 실행하고 싶은데 suppress() 함수를 추가한 후부터는 테스트 실행시 아예 컨슈밍이 안되는 것 처럼 보여서 도움을 얻고자 글 남겼습니다.해당 부분도 커밋 주기 때문에 예상대로 동작하지 않는 걸까요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
Consumer Lag은 어느정도 수치면 정상적인 상황일까요?
안녕하세요, 강의 재밌게 듣고 있습니다 컨슈머 그룹편을 보다가 궁금해졌는데.. 실무하실때 Lag이 어느정도 쌓이면 장애상황이라고 생각하시나요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
실시간으로 데이터를 전송해야할때
안녕하세요. 프로듀서의 accumulator 관련 질문이 있습니다.Accumulator 내부에서 레코드를 배치로 모아 보낸다고 하셨는데, 데이터를 실시간으로 처리해야하는 경우에는 배치 작업을 스킵할 수도 있는 건가요?.? 감사합니다 :)
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
Streams테스트중 질문드립니다.
StreamsFilter를 실행시키는 중 강사님 화면과 다른 게 있어서 질문드립니다.강사님께서는 실행시켜도 콘솔에 로그가 출력되지 않는데 저는 실행시키면 로그가 계속 출력됩니다. 혹시 잘못한 건지 궁금해서 질문드립니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
kafka-connect-web관련 문의드립니다.
안녕하세요강의중 Kafka-connect-web에 대해 말씀하셔서 실습차 아래와 같은 구성으로 세팅후 확인중 에러가 있어 문의드립니다. 환경설정 zookeeper, Kafka, Kafka Connect 서버 : 10.0.3.16, 10.0.3.17, 10.0.3.18Kafka-connect-web 서버 : 10.0.1.9 Kafka-connect-web은 위의 서버에 도커 컨테이너로 생성하였고생성명령어는 아래와 같이 했습니다. docker run -d -p 8080:8080 --name kafka-connect-web -e "VUE_APP_CONNECT_SERVERS=http://10.0.3.16:8083,http://10.0.3.17:8083,http://10.0.3.18:8083" officialkakao/kafka-connect-web:latest 문제 내용container 안에서 curl 10.0.3.16:8083/connector-plugins로 확인시 값을 받아와서 네트워크 문제는 없는것 같은데 Kafka-connect-web에서 Connector class 값을 받을려고 실행하는 axios가 timeout이 나고,container 안에서 tcpdump를 떠도 패킷이 보이지 않습니다.Kafka-connect-web에 추가적인 설정이 필요한지 문의드립니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
컨슈머 랙 모니터링 질문
안녕하세요. 강의 잘 듣고 있습니다! 컨슈머 랙 모니터링 부분 강의를 듣고 있는데 의문이 생겨서 질문 드립니다. 컨슈머 랙을 모니터링하다가 사용량이 많아지게 되면 데이터 처리를 위해 컨슈머 개수와 파티션 개수를 늘려 병렬 처리량을 늘리는 방법이 좋다라고 말씀 하셨는데요. 이전 강의에서는 파티션 수를 늘리면 다시 줄일 수 없기 때문에 처음에 잘 선정해야 한다라고 들어서 이 부분이 약간 헷갈리는 거 같습니다. 많은 데이터량을 처리하려면 파티션 수를 늘리는 방법말고는 없을 거 같은데 설날같이 일시적인 트래픽이 증가하는 날을 위해 평소에도 과도한 파티션 수로 운영하는게 더 좋을까라는 의문도 같이 생기네요. 혹시 현업에서는 어떠한 방식으로 처리하고 있으신가요? 좋은 강의 감사합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
여러 컨슈머 그룹에서 데이터 소비
안녕하세요. 수업 잘 듣고 있습니다!컨슈머 관련해서 궁금한 점이 생겨 질문드립니다.제가 지금까지 이해한 내용으로는 하나의 토픽에 대해서 이벤트를 발생하면 key 값을 통해 파티션으로 분배되는 걸로 이해하고 있습니다. 그리고 하나의 토픽에 대해서 여러 컨슈머 그룹이 운영될 수 있고 별도 offset으로 관리되어 각 컨슈머 그룹에서는 이벤트에 대해서 원하는 용도로 처리될 수 있다고 하셨습니다.이부분에서 궁금한 점이 생겼는데요. 하나의 이벤트에 대해서는 한개의 파티션에 하나의 레코드만 생성되는데, 여러 컨슈머 그룹에서 해당 이벤트를 다 가져가는 경우는 발생할 수 없는 거 같아서 헷갈리는 거 같습니다.이벤트 -> 토픽에서 특정 파티션 선정 -> 파티션에 레코드 등록 -> 컨슈머이런 형태로 진행되면 한개의 이벤트는 어떻게 여러 컨슈머 그룹에서 독립적으로 처리할 수 있지라는 의문이 생기게 되었습니다.혹시 하나의 이벤트에 대해서 레코드가 여러개 생성될 수 있는 걸까요??아니면 하나의 컨슈머 그룹에서는 하나의 파티션만 할당될 수 잇지만 여러 컨슈머 그룹인 경우에는 하나의 파티션에 대해서 그룹 당 하나의 컨슈머가 할당이 가능한것일까요??
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
커스텀 elasticsearch sink connector를 만들었는데 class를 찾지 못하고 있어요
{ "error_code": 500, "message": "Error trying to forward REST request: Failed to find any class that implements Connector and which name matches com.example.connector.kafka.ElasticsearchSinkConnector, available connectors are: PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, PluginDesc{klass=class com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector, name='com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector', version='0.0.0.0', encodedVersion=0.0.0.0, type=source, typeName='source', location='file:/data01/connectors/spooldir_source/'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='11.0.0', encodedVersion=11.0.0, type=sink, typeName='sink', location='file:/data01/connectors/elasticsearch_sink/'}, PluginDesc{klass=class io.debezium.connector.sqlserver.SqlServerConnector, name='io.debezium.connector.sqlserver.SqlServerConnector', version='1.7.1.Final', encodedVersion=1.7.1.Final, type=source, typeName='source', location='file:/data01/connectors/mssql_source/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.8.2', encodedVersion=2.8.2, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.8.2', encodedVersion=2.8.2, type=source, typeName='source', location='classpath'}" }connector 등록 api를 날리면 이런 에러가 뜹니다.그런데 /connector-plugins request를 날리면 plugin으로 인식하고 있어요.SinkConnector를 상속한 클래스의 path도 맞는데 왜 동작하지 않는 건지 모르겠어요[ { "class": "com.example.connector.kafka.ElasticsearchSinkConnector", "type": "sink", "version": "1.0" }, { "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "type": "source", "version": "0.0.0.0" }, { "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector", "type": "source", "version": "0.0.0.0" }, { "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector", "type": "source", "version": "0.0.0.0" }, { "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector", "type": "source", "version": "0.0.0.0" }, { "class": "com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector", "type": "source", "version": "0.0.0.0" }, { "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type": "sink", "version": "11.0.0" }, { "class": "io.debezium.connector.sqlserver.SqlServerConnector", "type": "source", "version": "1.8.1.Final" }, { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "2.8.2" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "2.8.2" }, { "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "type": "source", "version": "1" }, { "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "type": "source", "version": "1" }, { "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "type": "source", "version": "1" } ]로그를 봐도 커넥트 로그만 나오고 등록한 커넥터의 로그는 나오지 않는 것 같은데 혹시 카프카와 연결해서 커넥터를 ide에서 디버깅할 수 있는 방법이 있을까요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
스트림즈dsl의 state.dir에 대해
state.dir을 설명하시다가 /tmp의 생명주기가 다르다고 하셨는데 os 마다 /tmp의 데이터가 삭제되는 조건들이 다르다는 말씀인가요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
MSA에서 카프카 사용
학습 목적으로 카프카를 사용 중인데, MSA 구조에서의 카프카 프로듀서, 컨슈머 개념이 잘 이해가 가지 않습니다 ㅠspring boot로 MSA 구조를 구축한 상태입니다. 각 서비스별로 스프링 부트 서버가 존재합니다. 각 서비스가 하나의 데이터베이스 (MySQL 혹은 MongoDB)를 공유하여 사용하려고 합니다. 이 때 스프링 부트가 카프카 토픽에 데이터를 저장하고, 토픽에 있는 데이터를 DB에 저장하여 MSA 환경에서 DB의 일관성을 유지하고자 하는데 이 경우에 카프카를 사용하는 것이 적합할까요?또한 스프링 부트 서버에서 카프카 토픽에 데이터를 주고받을 프로듀서와 컨슈머, MySQL에 토픽의 데이터를 넣고 빼올 프로듀서와 컨슈머 이런식으로 한 서버 당 최소 4개씩을 각각 모두 설정해야하나요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
rebalancing 관리 관련 질문이 있습니다
안녕하세요 강사님rebalancing에 질문이 있습니다 commit이 Fail하여 rebalanced나 assigned partitions 같은 에러가 나올떄는보통 어떻게 관리를 하나요?rebalance가 안날순 없다고 알고 있습니다.보통 어떻게 이런 오류를 관리하고 처리하는지 알고싶습니다.따로 consumer를 restart하는 방법도 있나요? 그러면 문제가 될 게 있는지도 궁금합니다.
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
조인관련해서 실행하려면 오류가 뜹니다
jnilib가 없다는 에러가 뜨는데 뭔가 설정을 해야하는 걸까요?Exception in thread "global-table-join-application-76f56ff6-212f-4940-b4fb-fd8379e83d55-GlobalStreamThread" java.lang.UnsatisfiedLinkError: Can't load library: /var/folders/zy/b19yps9j095601_vkghc25wh0000gn/T/librocksdbjni17187958455810980136.jnilib
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
강사님 오류 관련하여 질문이 있습니다.
저는 지금 AIOkafka를 사용하고 있는데 commit()을 해주면 종종commit cannot be completed since the group has already rebalanced이 에러가 나오더라구요찾아보니 aio는 자동으로 리밸런싱 해서 그렇다는데 그렇다면 commit을 어떻게 써야 중복도 안되고 자동 리밸런싱으로 오류도 안생길까요?
- 미해결[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!
heartbeat에 관하여 질문이 있습니다.
강사님 heartbeat관련 문제를poll 사이즈를 늘려도 해결이 안되는데 보통 어떤 상황에 맞게 어떻게 설정하여 해결을 하나요?지금 전 aiokafka를 사용중입니다