offset 커밋 관련 질문
159
投稿した質問数 5
커넥터와 별개긴 한데, 카프카 관련하여 문의드립니다.
현재 프로젝트 디폴트 설정이 자동 커밋인데,
리스너를 내리거나 서버를 내릴때에 예외적으로 커밋을 치고 싶습니다.
이런 경우 수동커밋이 가능한지 문의드립니다.
回答 1
1
안녕하십니까,
정합성이 중요해서 시스템 재 기동 중에 아예 중복 데이터 처리를 없게 하려면 기본적으로는 Consumer를 수동 commit sync 모드로 적용하는게 좋습니다.
근데 속도 문제로 개선이 필요하다면 수동 commit인데 async로 하시면 좋습니다. 다만 sync 만큼 100% 보장이 안될 수도 있습니다.
default가 수동 async commit 이라면 아래와 같이 addShutdownHook을 사용하여 종료시 sync commit로 시도해 볼 수 있습니다(저도 정확히 작동하는가 돌려보지는 않았습니다 ^^;;) 그런데, 이걸 auto commit에서 수동 commit으로 바꿀 수 있는지는 모르겠습니다. consumer가 자동 commit인지 수동 commit인지는 Consumer 생성시에 properties를 할당해서 정해지는데 이게 변경이 가능할 것 같지 않습니다.
암튼 아래는 수동 async commit에서 수동 sync commit로 변경하는 코드 입니다.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "your.kafka.broker:9092");
props.put("group.id", "your-group-id");
props.put("enable.auto.commit", "false"); // Disable auto-commit
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-topic"));
Thread mainThread = Thread.currentThread();
# 아래와 wakeup으로 consumer 종료
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 평소엔 비동기 모두로 동작.
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
#동기 모두로 offset 저장
consumer.commitSync()
consumer.close();
}
}
}
계속 csv파일을 폴더로 인식하는 중입니다.
0
65
2
cdc failover
0
69
2
avro plugin에 안보이는거 같아요
0
68
2
zookeeper가 실행이 안되요
0
73
3
Debezium CDC Source Connector 이벤트 메시지
0
82
2
재 질문! 다른 connector가 동일 토픽 사용
0
63
2
메시지 발행이 안 됩니다.
0
75
2
디비지움 오프셋 리셋
0
94
2
mysql_jdbc_oc_sink_customers_02.json 에서 오류납니다.
0
69
3
io.debezium.connector.mysql.MySqlConnector 질문
0
67
2
Debezium 이벤트 메시지 발행 시 성공여부
0
80
2
connect 구동 오류
0
97
3
CDC Connect 통시에서의 암호화 관련 하여 문의 드립니다.
0
121
2
GCSSinkconnector
0
64
1
debezium source connector에서 `poll.interval.ms` 파라미터
0
142
2
자문자답: JDBC Sink Connector가 PostgreSQL의 Schema를 바라보지 않습니다.
0
212
1
table.name.format에 관하여..
0
196
2
binlog와 offset 관련 추가 질문드립니다
0
149
1
Debezium cdc source for MSSQL-Server [등록오류]
0
190
2
http http://localhost:8083/connector-plugins 시 플러그인이 보이지 않습니다.
0
187
2
안녕하세요 sink connector 생성 config 관련 질문입니다.
0
159
2
수강환경 VirtualBox -> Docker
0
223
2
일반적인 예시들이 궁금해서 질문 남깁니다.
1
145
2
debezium connector schema 찾지 못하는 오류
0
426
2

