inflearn logo
강의

講義

知識共有

カフカパーフェクトガイド - コネクト編

offset 커밋 관련 질문

159

apfhd

投稿した質問数 5

0

커넥터와 별개긴 한데, 카프카 관련하여 문의드립니다.

현재 프로젝트 디폴트 설정이 자동 커밋인데,

리스너를 내리거나 서버를 내릴때에 예외적으로 커밋을 치고 싶습니다.

이런 경우 수동커밋이 가능한지 문의드립니다.

kafka 데이터-엔지니어링

回答 1

1

dooleyz3525

안녕하십니까,

정합성이 중요해서 시스템 재 기동 중에 아예 중복 데이터 처리를 없게 하려면 기본적으로는 Consumer를 수동 commit sync 모드로 적용하는게 좋습니다.

근데 속도 문제로 개선이 필요하다면 수동 commit인데 async로 하시면 좋습니다. 다만 sync 만큼 100% 보장이 안될 수도 있습니다.

default가 수동 async commit 이라면 아래와 같이 addShutdownHook을 사용하여 종료시 sync commit로 시도해 볼 수 있습니다(저도 정확히 작동하는가 돌려보지는 않았습니다 ^^;;) 그런데, 이걸 auto commit에서 수동 commit으로 바꿀 수 있는지는 모르겠습니다. consumer가 자동 commit인지 수동 commit인지는 Consumer 생성시에 properties를 할당해서 정해지는데 이게 변경이 가능할 것 같지 않습니다.

암튼 아래는 수동 async commit에서 수동 sync commit로 변경하는 코드 입니다.

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

public class KafkaConsumerExample {

public static void main(String[] args) {

Properties props = new Properties();

props.put("bootstrap.servers", "your.kafka.broker:9092");

props.put("group.id", "your-group-id");

props.put("enable.auto.commit", "false"); // Disable auto-commit

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("your-topic"));

Thread mainThread = Thread.currentThread();

 

# 아래와 wakeup으로 consumer 종료

Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {

consumer.wakeup();

try {

mainThread.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

try {

while (true) {

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {

System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

}

// 평소엔 비동기 모두로 동작.

consumer.commitAsync();

}

} catch (Exception e) {

e.printStackTrace();

} finally {

#동기 모두로 offset 저장

consumer.commitSync()

consumer.close();

}

}

}

 

 

0

apfhd

답변 감사합니다!

계속 csv파일을 폴더로 인식하는 중입니다.

0

65

2

cdc failover

0

69

2

avro plugin에 안보이는거 같아요

0

68

2

zookeeper가 실행이 안되요

0

73

3

Debezium CDC Source Connector 이벤트 메시지

0

82

2

재 질문! 다른 connector가 동일 토픽 사용

0

63

2

메시지 발행이 안 됩니다.

0

75

2

디비지움 오프셋 리셋

0

94

2

mysql_jdbc_oc_sink_customers_02.json 에서 오류납니다.

0

69

3

io.debezium.connector.mysql.MySqlConnector 질문

0

67

2

Debezium 이벤트 메시지 발행 시 성공여부

0

80

2

connect 구동 오류

0

97

3

CDC Connect 통시에서의 암호화 관련 하여 문의 드립니다.

0

121

2

GCSSinkconnector

0

64

1

debezium source connector에서 `poll.interval.ms` 파라미터

0

142

2

자문자답: JDBC Sink Connector가 PostgreSQL의 Schema를 바라보지 않습니다.

0

212

1

table.name.format에 관하여..

0

196

2

binlog와 offset 관련 추가 질문드립니다

0

149

1

Debezium cdc source for MSSQL-Server [등록오류]

0

190

2

http http://localhost:8083/connector-plugins 시 플러그인이 보이지 않습니다.

0

187

2

안녕하세요 sink connector 생성 config 관련 질문입니다.

0

159

2

수강환경 VirtualBox -> Docker

0

223

2

일반적인 예시들이 궁금해서 질문 남깁니다.

1

145

2

debezium connector schema 찾지 못하는 오류

0

426

2