인프런 커뮤니티 질문&답변
병렬 중첩
작성
·
20
·
수정됨
1
형 병렬 중첩 시키는 예제 코드있잖아.
Worker Step Multithreading Implementation
@Bean
public Step managerStep(Step workerStep) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", dailyTimeRangePartitioner)
.step(workerStep)
.taskExecutor(partitionTaskExecutor())
.gridSize(4)
.build();
}
@Bean
public Step workerStep(SynchronizedItemReader<TestLog> redisLogReader,
ItemProcessor<TestLog, TestLog> logProcessor, MongoItemWriter<TestLog> mongoLogWriter) {
return new StepBuilder("workerStep", jobRepository)
.<TestLog, TestLog>chunk(500, transactionManager)
.reader(redisLogReader)
.processor(logProcessor)
.writer(mongoLogWriter)
.taskExecutor(workerTaskExecutor())
.build();
}
@Bean
@StepScope
public SynchronizedItemReader<TestLog> redisLogReader(@Value("#{stepExecutionContext['startDateTime']}") LocalDateTime startDateTime) {
log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));
RedisItemReader<String, TestLog> itemReader = new RedisItemReaderBuilder<String, TestLog>()
.redisTemplate(redisTemplate())
.scanOptions(ScanOptions.scanOptions()
.match("logs:" + startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")) + ":*")
.count(10000)
.build())
.build();
return new SynchronizedItemReader<>(itemReader);
}@Bean
public TaskExecutor workerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.setThreadNamePrefix("Worker-Thread-");
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(30);
return executor;
}이렇게 했는데
2025-12-17T13:34:07.885+09:00 ERROR 1568 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step workerStep in job testLogJob java.lang.NullPointerException: Cannot invoke "org.springframework.data.redis.core.Cursor.hasNext()" because "this.cursor" is null at org.springframework.batch.item.redis.RedisItemReader.read(RedisItemReader.java:62) ~[spring-batch-infrastructure-5.2.4.jar:5.2.4]
이런 오류가 나.
그리고 ItemReader쪽에
log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));로그를 찍었는데
Worker-Thread-5 read 2025121712
Worker-Thread-1 read 2025121706
Worker-Thread-2 read 2025121700
Worker-Thread-3 read 2025121718
이런식으로 찍히고 있어.
하나의 파티션을 여러 스레드가 읽어야 하는데 workerTaskExecutor가 서로 다른 파티션을 읽고 있어.
내가 뭐 놓친게 있는걸까?
답변 3
1
0
0
안녕하세요, 인프런 AI 인턴입니다.
ABBCD님, 병렬 중첩 시 RedisItemReader를 사용하면서 NullPointerException이 발생하고 있는 상황인 것 같습니다. 이 오류는 주로 RedisItemReader의 cursor가 제대로 초기화되지 않았을 때 발생할 수 있습니다.
문제 분석
- 오류 메시지에서 보듯이,
RedisItemReader에서cursor객체에 대한 초기화가 제대로 이루어지지 않았기 때문에hasNext()메서드를 호출할 때 예외가 발생하는 것으로 보입니다.
- 병렬 환경에서는 각 스레드가 데이터를 읽기 위해
RedisItemReader의 인스턴스를 독립적으로 제공받아야 합니다.cursor가 초기화되기 전 또는 실패하는 경우 같은 예외 상황이 반복될 수 있습니다.
해결 방법
- Reader의 새로운 인스턴스 제공
- 각 워커 스레드가 고유의
RedisItemReader인스턴스를 사용하도록 설정해야 합니다. Spring Batch의 경우StepScope를 통해 각 스레드가 독립적인 Bean을 사용하도록 할 수 있습니다.
- 예를 들어, 다음과 같이
RedisItemReader를 Bean으로 정의하면서@StepScope를 사용하시기 바랍니다:
@Bean @StepScope public RedisItemReader<YourDataType> redisItemReader( @Value("#{jobParameters['someParam']}") String param) { return new RedisItemReader<>(/* initialization parameters */); }
- 각 워커 스레드가 고유의
- Thread Safety 확인
- 각 스레드가 사용하게 되는 모든 Reader, Processor, Writer가 스레드에 안전한(Thread-safe) 방식으로 설계되었는지 반드시 확인하세요.
- 각 스레드가 사용하게 되는 모든 Reader, Processor, Writer가 스레드에 안전한(Thread-safe) 방식으로 설계되었는지 반드시 확인하세요.
기타 유사한 문제 해결 사례는 아래의 링크에서 확인하실 수 있습니다:
- Spring Batch에서 병렬 처리와 관련된 문제
위의 방법들을 통해 문제를 해결할 수 있을 것이라 기대합니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해 드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.




