병렬 중첩
103
投稿した質問数 11
형 병렬 중첩 시키는 예제 코드있잖아.
Worker Step Multithreading Implementation
@Bean
public Step managerStep(Step workerStep) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", dailyTimeRangePartitioner)
.step(workerStep)
.taskExecutor(partitionTaskExecutor())
.gridSize(4)
.build();
}
@Bean
public Step workerStep(SynchronizedItemReader<TestLog> redisLogReader,
ItemProcessor<TestLog, TestLog> logProcessor, MongoItemWriter<TestLog> mongoLogWriter) {
return new StepBuilder("workerStep", jobRepository)
.<TestLog, TestLog>chunk(500, transactionManager)
.reader(redisLogReader)
.processor(logProcessor)
.writer(mongoLogWriter)
.taskExecutor(workerTaskExecutor())
.build();
}
@Bean
@StepScope
public SynchronizedItemReader<TestLog> redisLogReader(@Value("#{stepExecutionContext['startDateTime']}") LocalDateTime startDateTime) {
log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));
RedisItemReader<String, TestLog> itemReader = new RedisItemReaderBuilder<String, TestLog>()
.redisTemplate(redisTemplate())
.scanOptions(ScanOptions.scanOptions()
.match("logs:" + startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")) + ":*")
.count(10000)
.build())
.build();
return new SynchronizedItemReader<>(itemReader);
}@Bean
public TaskExecutor workerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.setThreadNamePrefix("Worker-Thread-");
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(30);
return executor;
}이렇게 했는데
2025-12-17T13:34:07.885+09:00 ERROR 1568 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step workerStep in job testLogJob java.lang.NullPointerException: Cannot invoke "org.springframework.data.redis.core.Cursor.hasNext()" because "this.cursor" is null at org.springframework.batch.item.redis.RedisItemReader.read(RedisItemReader.java:62) ~[spring-batch-infrastructure-5.2.4.jar:5.2.4]
이런 오류가 나.
그리고 ItemReader쪽에
log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));로그를 찍었는데
Worker-Thread-5 read 2025121712
Worker-Thread-1 read 2025121706
Worker-Thread-2 read 2025121700
Worker-Thread-3 read 2025121718
이런식으로 찍히고 있어.
하나의 파티션을 여러 스레드가 읽어야 하는데 workerTaskExecutor가 서로 다른 파티션을 읽고 있어.
내가 뭐 놓친게 있는걸까?
回答 3
0
안녕하세요, 인프런 AI 인턴입니다.
ABBCD님, 병렬 중첩 시 RedisItemReader를 사용하면서 NullPointerException이 발생하고 있는 상황인 것 같습니다. 이 오류는 주로 RedisItemReader의 cursor가 제대로 초기화되지 않았을 때 발생할 수 있습니다.
문제 분석
- 오류 메시지에서 보듯이,
RedisItemReader에서cursor객체에 대한 초기화가 제대로 이루어지지 않았기 때문에hasNext()메서드를 호출할 때 예외가 발생하는 것으로 보입니다.
- 병렬 환경에서는 각 스레드가 데이터를 읽기 위해
RedisItemReader의 인스턴스를 독립적으로 제공받아야 합니다.cursor가 초기화되기 전 또는 실패하는 경우 같은 예외 상황이 반복될 수 있습니다.
해결 방법
- Reader의 새로운 인스턴스 제공
- 각 워커 스레드가 고유의
RedisItemReader인스턴스를 사용하도록 설정해야 합니다. Spring Batch의 경우StepScope를 통해 각 스레드가 독립적인 Bean을 사용하도록 할 수 있습니다.
- 예를 들어, 다음과 같이
RedisItemReader를 Bean으로 정의하면서@StepScope를 사용하시기 바랍니다:
@Bean @StepScope public RedisItemReader<YourDataType> redisItemReader( @Value("#{jobParameters['someParam']}") String param) { return new RedisItemReader<>(/* initialization parameters */); }
- 각 워커 스레드가 고유의
- Thread Safety 확인
- 각 스레드가 사용하게 되는 모든 Reader, Processor, Writer가 스레드에 안전한(Thread-safe) 방식으로 설계되었는지 반드시 확인하세요.
- 각 스레드가 사용하게 되는 모든 Reader, Processor, Writer가 스레드에 안전한(Thread-safe) 방식으로 설계되었는지 반드시 확인하세요.
기타 유사한 문제 해결 사례는 아래의 링크에서 확인하실 수 있습니다:
- Spring Batch에서 병렬 처리와 관련된 문제
위의 방법들을 통해 문제를 해결할 수 있을 것이라 기대합니다.
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해 드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏
추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.
Json 요청 처리
1
33
2
[건의][6장][작전1] deprecated 메소드
1
48
2
[예제][3장][작전2] windows 에서 마지막 예제
0
39
1
[오타][3장][작전1] 형 이건 빨리 해줘
2
95
2
[오타] "리스너 활용: 시스템 완전 장악 매뉴얼" 부분
1
61
1
[오타] "JobScope와 StepScope 사용 시 주의사항" 부분
1
68
2
spring boot 4 + spring batch 6 설정 변경?
1
100
2
Batch 패키지 설계 — 실무에서 Job 단위 구조
1
97
1
Spring batch를 실행해줄 스케줄러와 batch 실행 환경
1
89
1
메타테이블에 데이터가 생기지 않는 이유
1
108
2
Spring Batch 4.3.10 ExecutionContext 한글 인코딩 이슈 관련 문의
1
106
2
@StepScope 또는 @JobScope와 JobOperator
1
148
5
Remote Partitioning
1
112
2
킬구형 실무에서는 JPA ItemReader / ItemWriter를 더 선호해? 아니면 JDBC itemReader/ItemWriter를 더 선호해?
1
137
2
형 코드에 public static class 이게 뭐야 ?
1
136
3
ExitStatus
1
67
2
Batch6: jobOperator.startNextInstance() throws UnexpectedJobExecutionException
1
126
5
jdbc 커서, 페이징에서 일대다 관계 데이터 뻥튀기 조회 처리 방법 질문
1
120
2
SkipPolicy는 여러번 불릴 수 있는가?
1
52
2
형 실무에서 배치 시스템은 어떤 식으로 HA를 구성해??
1
107
1
메타데이터 관리
1
64
2
2장. 작전2: 분산 서버 로그 처형 작전 Resource[]의 대체방안(읽어야할 내용이 매우 커지면?)
1
84
3
2장. 작전2: 분산 서버 로그 처형 작전 (시스템에 의존적인) SystemCommandTasklet 관련 질문
1
97
2
CommandLineJobRunner를 통한 실행
1
86
2

