강의

멘토링

커뮤니티

인프런 커뮤니티 질문&답변

ABBCD님의 프로필 이미지
ABBCD

작성한 질문수

죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.

6장. 작전2: Local Partitioning - 전장을 분할하여 동시 타격하라 ☠️

병렬 중첩

작성

·

20

·

수정됨

1

형 병렬 중첩 시키는 예제 코드있잖아.

Worker Step Multithreading Implementation

@Bean
public Step managerStep(Step workerStep) {
    return new StepBuilder("managerStep", jobRepository)
            .partitioner("workerStep", dailyTimeRangePartitioner)
            .step(workerStep)
            .taskExecutor(partitionTaskExecutor())
            .gridSize(4)
            .build();
}

@Bean
public Step workerStep(SynchronizedItemReader<TestLog> redisLogReader,
                       ItemProcessor<TestLog, TestLog> logProcessor, MongoItemWriter<TestLog> mongoLogWriter) {
    return new StepBuilder("workerStep", jobRepository)
            .<TestLog, TestLog>chunk(500, transactionManager)
            .reader(redisLogReader)
            .processor(logProcessor)
            .writer(mongoLogWriter)
            .taskExecutor(workerTaskExecutor())
            .build();
}

@Bean
@StepScope
public SynchronizedItemReader<TestLog> redisLogReader(@Value("#{stepExecutionContext['startDateTime']}") LocalDateTime startDateTime) {
    log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));
    RedisItemReader<String, TestLog> itemReader = new RedisItemReaderBuilder<String, TestLog>()
            .redisTemplate(redisTemplate())
            .scanOptions(ScanOptions.scanOptions()
                    .match("logs:" + startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")) + ":*")
                    .count(10000)
                    .build())
            .build();

    return new SynchronizedItemReader<>(itemReader);
}
@Bean
public TaskExecutor workerTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(5);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(10);
    executor.setThreadNamePrefix("Worker-Thread-");
    executor.setAllowCoreThreadTimeOut(true);
    executor.setKeepAliveSeconds(30);
    return executor;
}

이렇게 했는데

2025-12-17T13:34:07.885+09:00 ERROR 1568 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step workerStep in job testLogJob java.lang.NullPointerException: Cannot invoke "org.springframework.data.redis.core.Cursor.hasNext()" because "this.cursor" is null at org.springframework.batch.item.redis.RedisItemReader.read(RedisItemReader.java:62) ~[spring-batch-infrastructure-5.2.4.jar:5.2.4]

 

이런 오류가 나.

그리고 ItemReader쪽에

log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));

로그를 찍었는데

Worker-Thread-5 read 2025121712

Worker-Thread-1 read 2025121706

Worker-Thread-2 read 2025121700

Worker-Thread-3 read 2025121718

이런식으로 찍히고 있어.

하나의 파티션을 여러 스레드가 읽어야 하는데 workerTaskExecutor가 서로 다른 파티션을 읽고 있어.

내가 뭐 놓친게 있는걸까?

답변 3

1

KILL-9님의 프로필 이미지
KILL-9
지식공유자

open() 메서드가 호출안되었군 💀

SynchronizedItemReader 대신 SynchronizedItemStreamReader를 사용해야한다

0

KILL-9님의 프로필 이미지
KILL-9
지식공유자

아니?? 기존 강의내용이 잘못되었군 SynchronizedItemStreamReader로 변경하라 

0

안녕하세요, 인프런 AI 인턴입니다.
ABBCD님, 병렬 중첩 시 RedisItemReader를 사용하면서 NullPointerException이 발생하고 있는 상황인 것 같습니다. 이 오류는 주로 RedisItemReadercursor가 제대로 초기화되지 않았을 때 발생할 수 있습니다.

문제 분석

  • 오류 메시지에서 보듯이, RedisItemReader에서 cursor 객체에 대한 초기화가 제대로 이루어지지 않았기 때문에 hasNext() 메서드를 호출할 때 예외가 발생하는 것으로 보입니다.
  • 병렬 환경에서는 각 스레드가 데이터를 읽기 위해 RedisItemReader의 인스턴스를 독립적으로 제공받아야 합니다. cursor가 초기화되기 전 또는 실패하는 경우 같은 예외 상황이 반복될 수 있습니다.

해결 방법

  1. Reader의 새로운 인스턴스 제공
    • 각 워커 스레드가 고유의 RedisItemReader 인스턴스를 사용하도록 설정해야 합니다. Spring Batch의 경우 StepScope를 통해 각 스레드가 독립적인 Bean을 사용하도록 할 수 있습니다.
    • 예를 들어, 다음과 같이 RedisItemReader를 Bean으로 정의하면서 @StepScope를 사용하시기 바랍니다:
      
      @Bean
      @StepScope
      public RedisItemReader<YourDataType> redisItemReader(
           @Value("#{jobParameters['someParam']}") String param) {
       return new RedisItemReader<>(/* initialization parameters */);
      }
      

  2. Thread Safety 확인
    • 각 스레드가 사용하게 되는 모든 Reader, Processor, Writer가 스레드에 안전한(Thread-safe) 방식으로 설계되었는지 반드시 확인하세요.

기타 유사한 문제 해결 사례는 아래의 링크에서 확인하실 수 있습니다:
- Spring Batch에서 병렬 처리와 관련된 문제

위의 방법들을 통해 문제를 해결할 수 있을 것이라 기대합니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해 드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

ABBCD님의 프로필 이미지
ABBCD

작성한 질문수

질문하기