offset 커밋 관련 질문
161
5 câu hỏi đã được viết
커넥터와 별개긴 한데, 카프카 관련하여 문의드립니다.
현재 프로젝트 디폴트 설정이 자동 커밋인데,
리스너를 내리거나 서버를 내릴때에 예외적으로 커밋을 치고 싶습니다.
이런 경우 수동커밋이 가능한지 문의드립니다.
Câu trả lời 1
1
안녕하십니까,
정합성이 중요해서 시스템 재 기동 중에 아예 중복 데이터 처리를 없게 하려면 기본적으로는 Consumer를 수동 commit sync 모드로 적용하는게 좋습니다.
근데 속도 문제로 개선이 필요하다면 수동 commit인데 async로 하시면 좋습니다. 다만 sync 만큼 100% 보장이 안될 수도 있습니다.
default가 수동 async commit 이라면 아래와 같이 addShutdownHook을 사용하여 종료시 sync commit로 시도해 볼 수 있습니다(저도 정확히 작동하는가 돌려보지는 않았습니다 ^^;;) 그런데, 이걸 auto commit에서 수동 commit으로 바꿀 수 있는지는 모르겠습니다. consumer가 자동 commit인지 수동 commit인지는 Consumer 생성시에 properties를 할당해서 정해지는데 이게 변경이 가능할 것 같지 않습니다.
암튼 아래는 수동 async commit에서 수동 sync commit로 변경하는 코드 입니다.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "your.kafka.broker:9092");
props.put("group.id", "your-group-id");
props.put("enable.auto.commit", "false"); // Disable auto-commit
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-topic"));
Thread mainThread = Thread.currentThread();
# 아래와 wakeup으로 consumer 종료
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 평소엔 비동기 모두로 동작.
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
#동기 모두로 offset 저장
consumer.commitSync()
consumer.close();
}
}
}
계속 csv파일을 폴더로 인식하는 중입니다.
0
67
2
cdc failover
0
69
2
avro plugin에 안보이는거 같아요
0
68
2
zookeeper가 실행이 안되요
0
75
3
Debezium CDC Source Connector 이벤트 메시지
0
83
2
재 질문! 다른 connector가 동일 토픽 사용
0
65
2
메시지 발행이 안 됩니다.
0
76
2
디비지움 오프셋 리셋
0
94
2
mysql_jdbc_oc_sink_customers_02.json 에서 오류납니다.
0
69
3
io.debezium.connector.mysql.MySqlConnector 질문
0
67
2
Debezium 이벤트 메시지 발행 시 성공여부
0
81
2
connect 구동 오류
0
97
3
CDC Connect 통시에서의 암호화 관련 하여 문의 드립니다.
0
121
2
GCSSinkconnector
0
64
1
debezium source connector에서 `poll.interval.ms` 파라미터
0
143
2
자문자답: JDBC Sink Connector가 PostgreSQL의 Schema를 바라보지 않습니다.
0
212
1
table.name.format에 관하여..
0
196
2
binlog와 offset 관련 추가 질문드립니다
0
150
1
Debezium cdc source for MSSQL-Server [등록오류]
0
191
2
http http://localhost:8083/connector-plugins 시 플러그인이 보이지 않습니다.
0
190
2
안녕하세요 sink connector 생성 config 관련 질문입니다.
0
159
2
수강환경 VirtualBox -> Docker
0
224
2
일반적인 예시들이 궁금해서 질문 남깁니다.
1
145
2
debezium connector schema 찾지 못하는 오류
0
429
2

