inflearn logo
강의

Course

Instructor

Spring Batch of Death: The desperate terror of 3 AM is finally over.

Chapter 6. Operation 2: Local Partitioning - Divide the Battlefield and Strike Simultaneously ☠️

병렬 중첩

103

ABBCD

11 asked

1

형 병렬 중첩 시키는 예제 코드있잖아.

Worker Step Multithreading Implementation

@Bean
public Step managerStep(Step workerStep) {
    return new StepBuilder("managerStep", jobRepository)
            .partitioner("workerStep", dailyTimeRangePartitioner)
            .step(workerStep)
            .taskExecutor(partitionTaskExecutor())
            .gridSize(4)
            .build();
}

@Bean
public Step workerStep(SynchronizedItemReader<TestLog> redisLogReader,
                       ItemProcessor<TestLog, TestLog> logProcessor, MongoItemWriter<TestLog> mongoLogWriter) {
    return new StepBuilder("workerStep", jobRepository)
            .<TestLog, TestLog>chunk(500, transactionManager)
            .reader(redisLogReader)
            .processor(logProcessor)
            .writer(mongoLogWriter)
            .taskExecutor(workerTaskExecutor())
            .build();
}

@Bean
@StepScope
public SynchronizedItemReader<TestLog> redisLogReader(@Value("#{stepExecutionContext['startDateTime']}") LocalDateTime startDateTime) {
    log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));
    RedisItemReader<String, TestLog> itemReader = new RedisItemReaderBuilder<String, TestLog>()
            .redisTemplate(redisTemplate())
            .scanOptions(ScanOptions.scanOptions()
                    .match("logs:" + startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")) + ":*")
                    .count(10000)
                    .build())
            .build();

    return new SynchronizedItemReader<>(itemReader);
}
@Bean
public TaskExecutor workerTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(5);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(10);
    executor.setThreadNamePrefix("Worker-Thread-");
    executor.setAllowCoreThreadTimeOut(true);
    executor.setKeepAliveSeconds(30);
    return executor;
}

이렇게 했는데

2025-12-17T13:34:07.885+09:00 ERROR 1568 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step workerStep in job testLogJob java.lang.NullPointerException: Cannot invoke "org.springframework.data.redis.core.Cursor.hasNext()" because "this.cursor" is null at org.springframework.batch.item.redis.RedisItemReader.read(RedisItemReader.java:62) ~[spring-batch-infrastructure-5.2.4.jar:5.2.4]

 

이런 오류가 나.

그리고 ItemReader쪽에

log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));

로그를 찍었는데

Worker-Thread-5 read 2025121712

Worker-Thread-1 read 2025121706

Worker-Thread-2 read 2025121700

Worker-Thread-3 read 2025121718

이런식으로 찍히고 있어.

하나의 파티션을 여러 스레드가 읽어야 하는데 workerTaskExecutor가 서로 다른 파티션을 읽고 있어.

내가 뭐 놓친게 있는걸까?

java spring spring-boot spring-batch backend

Answer 3

1

KILL-9

open() 메서드가 호출안되었군 💀

SynchronizedItemReader 대신 SynchronizedItemStreamReader를 사용해야한다

0

KILL-9

아니?? 기존 강의내용이 잘못되었군 SynchronizedItemStreamReader로 변경하라 

0

communityai8509

안녕하세요, 인프런 AI 인턴입니다.
ABBCD님, 병렬 중첩 시 RedisItemReader를 사용하면서 NullPointerException이 발생하고 있는 상황인 것 같습니다. 이 오류는 주로 RedisItemReadercursor가 제대로 초기화되지 않았을 때 발생할 수 있습니다.

문제 분석

  • 오류 메시지에서 보듯이, RedisItemReader에서 cursor 객체에 대한 초기화가 제대로 이루어지지 않았기 때문에 hasNext() 메서드를 호출할 때 예외가 발생하는 것으로 보입니다.
  • 병렬 환경에서는 각 스레드가 데이터를 읽기 위해 RedisItemReader의 인스턴스를 독립적으로 제공받아야 합니다. cursor가 초기화되기 전 또는 실패하는 경우 같은 예외 상황이 반복될 수 있습니다.

해결 방법

  1. Reader의 새로운 인스턴스 제공
    • 각 워커 스레드가 고유의 RedisItemReader 인스턴스를 사용하도록 설정해야 합니다. Spring Batch의 경우 StepScope를 통해 각 스레드가 독립적인 Bean을 사용하도록 할 수 있습니다.
    • 예를 들어, 다음과 같이 RedisItemReader를 Bean으로 정의하면서 @StepScope를 사용하시기 바랍니다:
      
      @Bean
      @StepScope
      public RedisItemReader<YourDataType> redisItemReader(
           @Value("#{jobParameters['someParam']}") String param) {
       return new RedisItemReader<>(/* initialization parameters */);
      }
      

  2. Thread Safety 확인
    • 각 스레드가 사용하게 되는 모든 Reader, Processor, Writer가 스레드에 안전한(Thread-safe) 방식으로 설계되었는지 반드시 확인하세요.

기타 유사한 문제 해결 사례는 아래의 링크에서 확인하실 수 있습니다:
- Spring Batch에서 병렬 처리와 관련된 문제

위의 방법들을 통해 문제를 해결할 수 있을 것이라 기대합니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해 드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

Json 요청 처리

1

33

2

[건의][6장][작전1] deprecated 메소드

1

48

2

[예제][3장][작전2] windows 에서 마지막 예제

0

39

1

[오타][3장][작전1] 형 이건 빨리 해줘

2

95

2

[오타] "리스너 활용: 시스템 완전 장악 매뉴얼" 부분

1

61

1

[오타] "JobScope와 StepScope 사용 시 주의사항" 부분

1

68

2

spring boot 4 + spring batch 6 설정 변경?

1

100

2

Batch 패키지 설계 — 실무에서 Job 단위 구조

1

97

1

Spring batch를 실행해줄 스케줄러와 batch 실행 환경

1

89

1

메타테이블에 데이터가 생기지 않는 이유

1

108

2

Spring Batch 4.3.10 ExecutionContext 한글 인코딩 이슈 관련 문의

1

106

2

@StepScope 또는 @JobScope와 JobOperator

1

148

5

Remote Partitioning

1

112

2

킬구형 실무에서는 JPA ItemReader / ItemWriter를 더 선호해? 아니면 JDBC itemReader/ItemWriter를 더 선호해?

1

137

2

형 코드에 public static class 이게 뭐야 ?

1

136

3

ExitStatus

1

67

2

Batch6: jobOperator.startNextInstance() throws UnexpectedJobExecutionException

1

126

5

jdbc 커서, 페이징에서 일대다 관계 데이터 뻥튀기 조회 처리 방법 질문

1

120

2

SkipPolicy는 여러번 불릴 수 있는가?

1

52

2

형 실무에서 배치 시스템은 어떤 식으로 HA를 구성해??

1

107

1

메타데이터 관리

1

64

2

2장. 작전2: 분산 서버 로그 처형 작전 Resource[]의 대체방안(읽어야할 내용이 매우 커지면?)

1

84

3

2장. 작전2: 분산 서버 로그 처형 작전 (시스템에 의존적인) SystemCommandTasklet 관련 질문

1

97

2

CommandLineJobRunner를 통한 실행

1

86

2