inflearn logo
강의

강의

N
챌린지

챌린지

멘토링

멘토링

N
클립

클립

로드맵

로드맵

지식공유

카프카 완벽 가이드 - 코어편

Consumer에서 토픽 특정 파티션의 특정 offset 부터 읽어오기 구현 실습

nocommit 관련 질문

164

JUNI

작성한 질문수 22

0

package com.example;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

public class ConsumerPartitionAssignSeek {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerPartitionAssignSeek.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_pizza_assign_seek_v001");
        //props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "6000");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        TopicPartition topicPartition = new TopicPartition(topicName, 0);
        //kafkaConsumer.subscribe(List.of(topicName));
        kafkaConsumer.assign(Arrays.asList(topicPartition));
        kafkaConsumer.seek(topicPartition, 5L);

        //main thread
        Thread mainThread = Thread.currentThread();

        //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                logger.info(" main program starts to exit by calling wakeup");
                kafkaConsumer.wakeup();

                try {
                    mainThread.join();
                } catch(InterruptedException e) { e.printStackTrace();}
            }
        });

        //kafkaConsumer.close();
        //pollAutoCommit(kafkaConsumer);
        //pollCommitSync(kafkaConsumer);
        //pollCommitAsync(kafkaConsumer);
        pollNoCommit(kafkaConsumer);

    }

    private static void pollNoCommit(KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                }


            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }

    }

    private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                }
                kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if(exception != null) {
                            logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
                        }
                    }
                });

            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("##### commit sync before closing");
            kafkaConsumer.commitSync();
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }

    }

    private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                }
                try {
                    if(consumerRecords.count() > 0 ) {
                        kafkaConsumer.commitSync();
                        logger.info("commit sync has been called");
                    }
                } catch(CommitFailedException e) {
                    logger.error(e.getMessage());
                }

            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }

    }


    public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                }
                try {
                    logger.info("main thread is sleeping {} ms during while loop", 10000);
                    Thread.sleep(10000);
                }catch(InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }catch(WakeupException e) {            logger.error("wakeup exception has been called");
        }finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }

    }
}

 

해당 코드에 문제가 없는 것으로 보입니다.

 

 

java.lang.IllegalStateException: This consumer has already been closed.

at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2456)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)

at com.example.ConsumerPartitionAssignSeek.pollNoCommit(ConsumerPartitionAssignSeek.java:63)

at com.example.ConsumerPartitionAssignSeek.main(ConsumerPartitionAssignSeek.java:53)

 

해당하는 에러가 띄는데 이유를 알 수 있을까요?

 

git 코드는 잘돌아가는 것을 확인했습니다. 차이점이 알 수가 없어서 질문드립니다,

kafka 데이터-엔지니어링

답변 1

0

권 철민

안녕하십니까,

먼저 강의 실습 코드와 동일한 코드인가요? 아님 뭔가 변경을 하신 코드인지 확인 부탁드립니다.

 

감사합니다.

virtual box 설치 문제

0

62

2

카프카 서버 구축 관련 문의

0

67

2

카프카 토픽 Key 타입 변경에 관한 질문

0

70

2

Zookeeper에서 KRaft

0

78

1

카프카 학습과 관련하여 질문 드립니다

0

98

2

파티션 증가시 비용 증가 고려

0

72

2

Kafka 초기 Partition 개수 설정 관련 질문

0

83

2

VM 과 도커의 차이

0

139

2

vm 어댑터설정 문의

0

78

2

Cooperative Sticky Rebalancing 질문

0

58

2

consumer 설정 질문

0

68

2

consumer.poll 질문입니다.

1

69

2

mainThread.join() 관련 질문

0

57

2

문의

0

153

2

멀티 브로커 설정 중 포트 충돌 발생

0

120

2

Consumer Group 강의 Lag 질문있습니다!

0

107

2

강의 설명 및 코드 정리

0

174

2

실습 코드는 어디서 받나요.. 아무리 찾아도 엄네요..

0

151

1

java.nio.BufferUnderflowException 에러 발생합니다..

0

158

3

KafkaTimeoutError:

0

157

2

acks 1 이면 비동기가 아니지 않나요?!

0

185

2

Producer의 메시지 비동기화 전송 구현 강좌 내용 중 질문

0

109

2

자문자답: 데이터 누락된다고 하시는 분 참고하세요.

0

216

2

자문자답: kafka Error connecting to node utuntu-20.myguest.virtualbox.org:9092

0

203

2