서버에 카프카 컨슈머를 계속올려놓을경우 종료를 해야하나요?
805
작성한 질문수 4
안녕하세요!
제가 카프카 컨슈머 애플리케이션을 개발해서
aws 서버에 올려놓고 계속 컨슈밍을 할 예정입니다.
강의에서 컨슈머의 안전한 종료 파트를 듣고 질문이 생겨 글을 남깁니다.
이렇게 서버에 계속 올려놓고 컨슈밍을 받아야할때 컨슈머의 종료가 필요한가요?
만약 필요하다면 종료된 컨슈머는 어떻게 재실행이 되는 건지 여쭤볼 수 있을까요?
일단 저는 스프링기반 카프카 컨슈머를 개발하였고
Runnable 인터페이스로 run() 메서드로 서버가 실행되면 바로 컨슈머가 실행되게 개발해놓은 상태입니다.
저의 코드입니다.
@PostConstruct
public void startConsuming() {
Thread consumerThread = new Thread(new Consumer());
consumerThread.start();
}
private class Consumer implements Runnable {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
private final static String TOPIC_NAME = "";
private final static String BOOTSTRAP_SERVERS = "";
private final static String GROUP_ID = "";
public void run() {
try {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 구독 중인 주제 파티션에서 사용 가능한 가장 빠른 오프셋부터 읽기
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("record: {}", record);
// 받은 메세지
String value = record.value();
// commit the offset
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)));
}
}
} catch (Exception e) {
logger.error("Error occurred while consuming messages", e);
}
}
}
답변 1
0
카프카 컨슈머 애플리케이션의 안전한 종료가 필요한 가장 대표적인 사례는 해당 애플리케이션을 배포할 때 일것 같습니다. 아무래도 배포를 위해서는 애플리케이션 종료가 필수적이기 때문인데요. 이 경우 애플리케이션을 안전하게 종료함으로써 리소스의 낭비를 막을 수 있습니다.
일반적으로 배포시에는 다음과 같은 순서로 진행됩니다.
애플리케이션의 안전한 종료(kill -term)
종료 확인
신규 애플리케이션 실행
consume 이후 lag가 줄어들지 않음
0
75
2
안녕하세요. 강의의 카프카 버전과 현재 시점의 카프카의 차이점 문의 드립니다.
0
118
2
멱등성 프로듀서 retries 관련 질문입니다.
0
99
2
채팅 서비스 개발 시 주의점이 있을까요?
0
100
2
충분히 큰 파티션 생성시 궁금증이 존재합니다.
0
88
2
KTable 키가 없는 레코드 처리
0
73
2
컨슈머 테스트 코드 작성
0
94
2
리밸런스 onPartitionRevoked이 필요한 상황
0
74
2
카프카 클러스터에서 감당 가능한 파티션(레플리카) 수 문의
0
120
2
reset offset 질문
0
77
2
KStreamJoinKTable 실행시 오류
0
78
2
auto.commit.interval.ms 옵션 관련 질문 드립니다.
0
142
2
파티션, 컨슈머 그룹, 컨슈머 관련 질문
0
117
1
java, kotlin
0
130
2
shutdownThread 에 대한 문의 입니다.
0
155
2
zookeeper실행시 오류가 발생합니다.
0
271
2
커스텀 소스 커넥터에서 Thread.sleep (1000) 은 왜 하는거에요?
0
153
2
처리량을 늘리기 위해서 파티션을 늘리고 컨슈머를 늘려야한다고 설명하셨는데요
0
181
3
파티션 개수와 컨슈머 개수의 처리량 관련 질문
0
164
2
동영상 및 이미지 처리 관련 문의 드립니다.
0
226
2
주키퍼 없이 사용 문의 드립니다.
0
303
1
kafka 를 띄우니 오류가 발생하고 종료합니다.
0
281
3
zookeeper 실행시 오류 .. 무엇을 해야 할까요?
0
220
2
파티션 추가로 해결할 수 있지만 늘어난 파티션은 줄일 수 없지 않나요?
0
192
2





