inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지!

서버에 카프카 컨슈머를 계속올려놓을경우 종료를 해야하나요?

805

emlookstudy

작성한 질문수 4

0

안녕하세요!

 

제가 카프카 컨슈머 애플리케이션을 개발해서

aws 서버에 올려놓고 계속 컨슈밍을 할 예정입니다.

강의에서 컨슈머의 안전한 종료 파트를 듣고 질문이 생겨 글을 남깁니다.

이렇게 서버에 계속 올려놓고 컨슈밍을 받아야할때 컨슈머의 종료가 필요한가요?

만약 필요하다면 종료된 컨슈머는 어떻게 재실행이 되는 건지 여쭤볼 수 있을까요?

 

일단 저는 스프링기반 카프카 컨슈머를 개발하였고

Runnable 인터페이스로 run() 메서드로 서버가 실행되면 바로 컨슈머가 실행되게 개발해놓은 상태입니다.

 

 

 

저의 코드입니다.

@PostConstruct
	public void startConsuming() {
		Thread consumerThread = new Thread(new Consumer());
		consumerThread.start();
	}

	
	private class Consumer implements Runnable {
	private final Logger logger = LoggerFactory.getLogger(Consumer.class);
	private final static String TOPIC_NAME = "";
	private final static String BOOTSTRAP_SERVERS = "";
	private final static String GROUP_ID = "";

	public void run() {
	  try {
		Properties configs = new Properties();
		configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
		configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
		configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		// 구독 중인 주제 파티션에서 사용 가능한 가장 빠른 오프셋부터 읽기
	            configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	            configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
	            configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
	            configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");

		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

		consumer.subscribe(Arrays.asList(TOPIC_NAME));

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
			for (ConsumerRecord<String, String> record : records) {
			logger.info("record: {}", record);
						
			// 받은 메세지
			String value = record.value();
						
						
						
						
			// commit the offset
	                consumer.commitSync(Collections.singletonMap(
	                new TopicPartition(record.topic(), record.partition()),
	                new OffsetAndMetadata(record.offset() + 1)));
						
	                    
					}
				}   
			} catch (Exception e) {
				logger.error("Error occurred while consuming messages", e);
			}
		}           
	}

kafka 데이터-엔지니어링

답변 1

0

데브원영 DVWY

카프카 컨슈머 애플리케이션의 안전한 종료가 필요한 가장 대표적인 사례는 해당 애플리케이션을 배포할 때 일것 같습니다. 아무래도 배포를 위해서는 애플리케이션 종료가 필수적이기 때문인데요. 이 경우 애플리케이션을 안전하게 종료함으로써 리소스의 낭비를 막을 수 있습니다.

일반적으로 배포시에는 다음과 같은 순서로 진행됩니다.

  1. 애플리케이션의 안전한 종료(kill -term)

  2. 종료 확인

  3. 신규 애플리케이션 실행

consume 이후 lag가 줄어들지 않음

0

75

2

안녕하세요. 강의의 카프카 버전과 현재 시점의 카프카의 차이점 문의 드립니다.

0

118

2

멱등성 프로듀서 retries 관련 질문입니다.

0

99

2

채팅 서비스 개발 시 주의점이 있을까요?

0

100

2

충분히 큰 파티션 생성시 궁금증이 존재합니다.

0

88

2

KTable 키가 없는 레코드 처리

0

73

2

컨슈머 테스트 코드 작성

0

94

2

리밸런스 onPartitionRevoked이 필요한 상황

0

74

2

카프카 클러스터에서 감당 가능한 파티션(레플리카) 수 문의

0

120

2

reset offset 질문

0

77

2

KStreamJoinKTable 실행시 오류

0

78

2

auto.commit.interval.ms 옵션 관련 질문 드립니다.

0

142

2

파티션, 컨슈머 그룹, 컨슈머 관련 질문

0

117

1

java, kotlin

0

130

2

shutdownThread 에 대한 문의 입니다.

0

155

2

zookeeper실행시 오류가 발생합니다.

0

271

2

커스텀 소스 커넥터에서 Thread.sleep (1000) 은 왜 하는거에요?

0

153

2

처리량을 늘리기 위해서 파티션을 늘리고 컨슈머를 늘려야한다고 설명하셨는데요

0

181

3

파티션 개수와 컨슈머 개수의 처리량 관련 질문

0

164

2

동영상 및 이미지 처리 관련 문의 드립니다.

0

226

2

주키퍼 없이 사용 문의 드립니다.

0

303

1

kafka 를 띄우니 오류가 발생하고 종료합니다.

0

281

3

zookeeper 실행시 오류 .. 무엇을 해야 할까요?

0

220

2

파티션 추가로 해결할 수 있지만 늘어난 파티션은 줄일 수 없지 않나요?

0

192

2