묻고 답해요
169만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
해결됨Spring Batch 입문: 3시간 만에 끝내는 대용량 처리의 기초
[참고] Tasklet 인터페이스로 단일 배치 작업 처리 수업에 나온 코드 돌리는 방법
package com.system.batch.sy_batch_system.ch03; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.job.Job; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.Step; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; @Configuration @RequiredArgsConstructor public class CafeJobConfig2 { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; @Bean public Job cafeJob2() { return new JobBuilder("cafeJob2", jobRepository) .start(cafeStep2()) .build(); } @Bean public Step cafeStep2() { return new StepBuilder("cafeStep2", jobRepository) .tasklet(cafeJobTasklet2(), transactionManager) .build(); } @Bean public CafeJobTasklet cafeJobTasklet2() { return new CafeJobTasklet(); } }ch03 에 위에 코드 작성하시고 powerShell(윈도우일 때).\gradlew bootRun --args="--spring.batch.job.name=cafeJob2" 로 실행하시면 됩니다감사합니다
-
해결됨Spring Batch 입문: 3시간 만에 끝내는 대용량 처리의 기초
batch_db 생성 관련 문의 드립니다.
안녕하세요우선 강의 잘 들었습니다~batch 를 실무에 적용해보고자 하는데요spring batch의 경우 batch_db에batch_job 관련 테이블이 자동 실행되는 걸로 보이는데요실무에서는 dba를 통해 테이블을 생성해야되는데요batch_job 관련 스키마를 모두 반드시 생성을 해야 되는 건가요??
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
Batch 패키지 설계 — 실무에서 Job 단위 구조
킬구형 ㅎㅇ 요즘에 배치 다시 학습하면서 좀 고민인 부분이 있음. 현재 회사에서 배치 Job이 좀 많이 늘어나면서 코드 관리가 너무 어렵더라고. Spring MVC에서는 Controller-Service-Repository 같은 레이어드 아키텍처나 클린 아키텍처처럼 널리 쓰이는 구조가 있는데, 배치 쪽은 이런 패키지 구조나 설계 방식에 대한 레퍼런스가 상대적으로 적은 것 같아서. 최근에 클린 아키텍처를 배우면서 배치에도 이런 구조를 적용할 수 있는지 궁금해졌거든.지금 나는 대략 이런 식으로 구성하고 있어:batch/ ├── order/ │ ├── confirm/ │ │ ├── OrderConfirmJobConfig.java │ │ ├── OrderConfirmReader.java │ │ ├── OrderConfirmProcessor.java │ │ ├── OrderConfirmWriter.java │ │ └── OrderConfirmJobListener.java ├── payment/ │ ├── settle/ │ └── cancel/ ├── support/ ├── domain/ ├── application/ └── infrastructure/ 하나의 Job을 하나의 큰 애그리게이트처럼 보고 있어서, 관련 클래스를 패키지별로 분리하기보다 한 폴더 안에서 로우 레벨로 관리하는 게 더 편하더라고.근데 Job이 계속 많아질수록 이게 맞는 건지, 더 나은 방법이 있는 건지 잘 모르겠어서. 혹시 형은 실무에서 Job이 많아졌을 때 패키지 구조나 코드 구성을 어떻게 가져가? 객체 간 위계나 import 의존성 방향 같은 것도 신경 써서 설계하는지도 궁금해. 물론 사람마다 차이가 있겠지만 시간 괜찮으면 경험 공유해주면 도움이 많이 될 것 같아!
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
Spring batch를 실행해줄 스케줄러와 batch 실행 환경
킬9형 안녕실무에서 Spring batch를 실행해줄 스케줄러와 batch 실행 환경은 어떤게 있는지 궁금해.현재 인프라는 AWS에서 EKS, ElastiCache(Redis), Aurora MySQL을 사용하고 있어.
-
미해결[스프링 배치 입문] 예제로 배우는 핵심 Spring Batch
배치 실행시 파라미터 (파일 이름)받기 및 (csv) 검증 5.0버전 공유
6분대 까지의 기록 입니다.5.0 버전에 맞게 ValidatedPramJobConig 구현 내용입니다 참고하세용package com.example.SpringBatchTutorial.job.ValidatedParam; import lombok.RequiredArgsConstructor; import org.jspecify.annotations.Nullable; import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.Job; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.job.parameters.CompositeJobParametersValidator; import org.springframework.batch.core.job.parameters.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.Step; import org.springframework.batch.core.step.StepContribution; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.infrastructure.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import java.util.Arrays; /** * desc: 파일 이름 파라미터 전달 그리고 검증 * run: --spring.batch.job.name=validateParamJob -fileName=test.csv */ @Configuration @RequiredArgsConstructor public class ValidatedParamJobConfig { /* * [Spring Batch 5.0 변경점] * JobBuilderFactory, StepBuilderFactory가 Deprecated(삭제)되었습니다. * 대신 JobRepository와 PlatformTransactionManager를 직접 주입받아 사용합니다. */ private final JobRepository jobRepository; private final PlatformTransactionManager platformTransactionManager; @Bean public Job validateParamJob() { /* * [변경] new JobBuilder("Job이름", jobRepository) 형태로 생성합니다. */ return new JobBuilder("validateParamJob", jobRepository) .incrementer(new RunIdIncrementer()) // 실행할 때마다 JobParameter에 run.id를 증가시켜 재실행 가능하게 함 .validator(new FileParamValidator()) // 검증 로직이 필요하면 여기에 validator를 추가합니다. .start(validateParamJobStep()) // 첫 번째 Step 시작 .build(); } private CompositeJobParametersValidator multipleValidator(){ CompositeJobParametersValidator validator = new CompositeJobParametersValidator(); validator.setValidators(Arrays.asList(new FileParamValidator())); // 여러개의 검증 클래스를 변수로 할당가능 return validator; } @Bean @JobScope // Job 실행 시점에 Bean이 생성되도록 설정 public Step validateParamJobStep() { /* * [변경] new StepBuilder("Step이름", jobRepository) 형태로 생성합니다. */ return new StepBuilder("validateParamStep", jobRepository) /* * [변경] tasklet이나 chunk를 설정할 때 TransactionManager를 반드시 인자로 넘겨야 합니다. */ .tasklet(validateParamTasklet(), platformTransactionManager) .build(); } @Bean @StepScope // Step 실행 시점에 Bean이 생성되도록 설정 public Tasklet validateParamTasklet() { return (contribution, chunkContext) -> { // 비즈니스 로직 영역 System.out.println("validateParamTasklet 호출됐나요~"); // Tasklet 종료 상태 반환 return RepeatStatus.FINISHED; }; } } /* [Spring Boot 2.x ~ Batch 4.x 버전 기록용] 당시 특징: JobBuilderFactory와 StepBuilderFactory를 사용하여 간편하게 생성. 현재(5.0+)는 삭제된 방식임. @Configuration @RequiredArgsConstructor public class ValidatedParamJobConfig { [과거 방식의 핵심] 이 당시에는 @EnableBatchProcessing이 선언되어 있으면 스프링이 자동으로 JobBuilderFactory, StepBuilderFactory를 빈으로 등록해줬음. -> 개발자가 JobRepository나 TransactionManager를 신경 쓸 필요 없이 Factory만 주입받으면 됐음. [코드가 왜 방식을 바꾼이유에 대한 고찰] 과거에는 Factory 방식이 유행했다면 요즘은 Builder 방식으로 전환하려는것 같음. 추가로 PlatformTransactionManager 을 과거에는 써서 DB에 접근을 하였는데 이 방식이 DB를 하나를 바라보게끔 되어있었다고함 그래서 Step 마다 트랜잭션을 사용하려면 설정이 까다로웠다고(Bean을 재정의 하거나 Factory를 커스텀) 현재는 Step을 만들때 트랜잭션을 인자로 직접 넘겨주기에 Step A는 metaTransactionManager Step B 는 serviceTransactionManager 로 설정하는것으로 쉬워졌다고함 https://github.com/spring-projects/spring-batch/wiki/Spring-Batch-5.0-Migration-Guide 공식 깃헛 사이트에서 Transaction manager bean exposure/configuration 을 컨트롤 F 해서 보면 상세히 나와있음 아래 슬로건을 미는 느낌인거같음 개발자가 코드를 짤 때 귀찮더라도, '누가(Repository)', '어떻게(TransactionManager)' 일하는지 명시적으로 적어라. 그래야 나중에 사고가 덜 난다 5.0있는데 왜 2.x ~ 4.x 를 기록함 이라할수있는데 프로젝트를 뛰다보면 3.0버전대가 너무 많음 5.0으로 신규로 구현할 수도 있으니 과거와 현재 방식을 기록하는거에 초점 두는게 좋다생각함 private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job validateParamJob() { [Job 생성] factory.get("이름")을 호출하면 내부적으로 JobRepository가 자동 주입된 빌더가 반환됨. -> 5.0부터는 new JobBuilder("이름", jobRepository)로 직접 주입해야 함. return jobBuilderFactory.get("validateParamJob") .incrementer(new RunIdIncrementer()) //.validator(new FileParamValidator()) // 여기서 정의해줘도 되지만 재활용성을 위해 별도의 클래스로 제작 job.ValidatedParam.FileParamValidator .validator(multipleValidator) .start(validateParamJobStep()) .build(); } private CompositeJobParametersValidator multipleValidator(){ CompositeJobParametersValidator validator = new CompositeJobParametersValidator(); validator.setValidators(Arrays.asList(new FileParamValidator())); return validator; } @JobScope @Bean public Step validateParamJobStep() { [Step 생성] factory.get("이름") 사용. 특징: .tasklet() 설정 시 TransactionManager를 넣지 않아도 Factory가 내부적으로 가지고 있는 기본 TransactionManager를 사용함. -> 이 '암시적(Implicit) 구성'이 편하긴 했으나, 명확성이 떨어져 5.0에서 제거됨. return stepBuilderFactory.get("validateParamStep") .tasklet(validateParamTasklet()) 트랜잭션 매니저 생략 가능했음 .build(); } @StepScope @Bean public Tasklet validateParamTasklet(@Value("#{jobParameters['fileName']} String fileName) { //System.out.println(fileName); // 여기서 파일명을 받아 작업할 수 있지만 // Tasklet 까지 오기 전 Job이 실행할 때 검증할 수 있도록 Validator 를 제공을 함 위쪽의 validateParamJob 메서드로 이동 [Tasklet 구현] 익명 클래스 방식으로 구현. 단순 출력 후 FINISHED를 반환하여 스텝을 종료함. return (contribution, chunkContext) -> { System.out.println("validateParamTasklet"); return RepeatStatus.FINISHED; }; } } */ job.ValidatedParam.Validtor 시작package com.example.SpringBatchTutorial.job.ValidatedParam.Validator; import org.springframework.batch.core.job.parameters.InvalidJobParametersException; import org.springframework.batch.core.job.parameters.JobParameters; import org.springframework.batch.core.job.parameters.JobParametersValidator; import org.springframework.util.StringUtils; public class FileParamValidator implements JobParametersValidator { // thorws 저거밖에 못찾겠어서 저걸로 했는데 문제없이 됐습니다. @Override public void validate(JobParameters parameters) throws InvalidJobParametersException { String fileName = parameters.getString("fileName"); if (!StringUtils.endsWithIgnoreCase(fileName,"csv")){ throw new InvalidJobParametersException("This is CSV"); } } }job.ValidatedParam.Validtor 끝
-
미해결[스프링 배치 입문] 예제로 배우는 핵심 Spring Batch
질문x 1강 일단 실행 코드부분 5.0에 맞춰 수정
yaml 수정 시작spring: batch: job: name: ${job.name:NONE} # 'names' (복수형)가 삭제되고 'name' (단수형)으로 변경됨 enabled: true # 자동 실행을 원할 경우 (기본값 true) jdbc: initialize-schema: ALWAYS # Spring Boot 3.x 이상에서는 이 속성을 사용 (기존 spring.batch.initialize-schema 삭제됨) datasource: url: jdbc:mysql://127.0.0.1:3306/spring_batch driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 1234 sql: init: mode: always yaml 수정 끝 job 패키지 하위 HelloWoldJobConf 수정 시작package com.example.SpringBatchTutorial.job; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.job.Job; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.Step; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.infrastructure.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; @Configuration @RequiredArgsConstructor public class HelloWorldJobConfig { /** * [Spring Batch 5.0 변경점 1] Factory 삭제 * - 과거: JobBuilderFactory, StepBuilderFactory를 사용 (Deprecated/삭제됨) * - 현재: JobRepository와 TransactionManager를 직접 주입받아서 Builder에 전달해야 함 */ private final JobRepository jobRepository; // Job과 Step의 상태(실행 기록 등)를 DB에 저장/관리하는 저장소 private final PlatformTransactionManager transactionManager; // 트랜잭션 관리자 (Commit/Rollback 담당) /** * Job 생성 설정 * - Job은 배치의 가장 큰 실행 단위입니다. */ @Bean public Job helloWorldJob() { // [변경점 2] new JobBuilder("이름", jobRepository) 사용 // Factory.get() 대신 Builder를 직접 생성하며, 두 번째 인자로 jobRepository가 필수입니다. return new JobBuilder("helloWorldJob", jobRepository) .start(helloWorldStep()) // 첫 번째로 실행할 Step 지정 .build(); // Job 생성 } /** * Step 생성 설정 * - Step은 Job 내부에서 실제 비즈니스 로직(읽기/처리/쓰기)을 담당하는 단계입니다. */ @Bean public Step helloWorldStep() { // [변경점 3] new StepBuilder("이름", jobRepository) 사용 return new StepBuilder("helloWorldStep", jobRepository) /* * Tasklet 정의 (단순 작업용) * - 람다(Lambda) 식을 사용하여 코드를 간결하게 작성했습니다. * - (contribution, chunkContext) -> { ... } 구조입니다. */ .tasklet((contribution, chunkContext) -> { System.out.println("Hello, World! Spring Batch 5.0"); // Step이 정상적으로 끝났음을 반환 (FINISHED) return RepeatStatus.FINISHED; }, transactionManager) // [변경점 4] TransactionManager 필수 전달 // Spring Batch 5부터는 Step을 만들 때 어떤 트랜잭션 매니저를 쓸지 명시해야 합니다. .build(); // Step 생성 } } /* 스프링 3.0.x 대의 버전 @Configuration // 이 클래스가 Spring의 설정(Configuration) 클래스임을 명시 @RequiredArgsConstructor // final이 선언된 필드에 대해 생성자를 자동으로 생성 (의존성 주입) public class HelloWorldJobConfig { // Job과 Step을 쉽게 생성할 수 있도록 도와주는 빌더 팩토리 (Spring Batch 5.0 이전 방식) private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; //Job 생성 설정 //- Job은 배치의 가장 큰 실행 단위입니다. @Bean public Job helloWorldJob() { return jobBuilderFactory.get("helloWorldJob") // "helloWorldJob"이라는 이름으로 Job 생성 // RunIdIncrementer: Job 실행 시마다 파라미터 ID를 증가시켜, 같은 Job을 여러 번 재실행할 수 있게 함 .incrementer(new RunIdIncrementer()) .start(helloWorldStep()) // Job 시작 시 실행할 첫 번째 Step 지정 .build(); // Job 빌드 및 반환 } //Step 생성 설정 // - Step은 Job 내부에서 실질적인 처리를 담당하는 단계입니다. // - @JobScope: Job이 실행될 때 이 Bean이 생성되도록 설정 (Late Binding) @JobScope @Bean public Step helloWorldStep() { return stepBuilderFactory.get("helloWorldStep") // "helloWorldStep"이라는 이름으로 Step 생성 .tasklet(helloWorldTasklet()) // 이 Step에서 수행할 기능(Tasklet)을 지정 .build(); // Step 빌드 및 반환 } // // Tasklet 생성 설정 // - Tasklet은 Step 안에서 단일 작업을 수행하는 로직입니다. (단순 작업용) // - @StepScope: Step이 실행될 때 이 Bean이 생성되도록 설정 /// @StepScope @Bean public Tasklet helloWorldTasklet() { // 익명 클래스로 Tasklet 구현 return new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { // 비즈니스 로직 작성 구간 System.out.println("Hello World Spring Batch"); // 이 Tasklet의 처리가 끝났음을 반환 (FINISHED: 종료, CONTINUABLE: 다시 실행) return RepeatStatus.FINISHED; } }; } } * */job 패키지 하위 HelloWoldJobConf 수정 끝 한국어 패치하신분들은 실행/디버그구성 > 빌드 및 실행 > 옵션 및 수정 > 프로그램 인수에 --spring.batch.job.name=helloWorldJob넣고 값 조회
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
메타테이블에 데이터가 생기지 않는 이유
킬구형 내가 그동안 보면서 정리한내용으로, 실습을 진행해보려고 혼자 스스로 파일을 만들었어. 실습 환경은 h2 데이터 베이스로 테스트 했고,메타테이블은 자동으로 만들어졌어. 간단한 스텝과 잡을 만들어서 콘솔에서 잘 실행되는것 까지는 확인을 했는데 실제 데이터 베이스 테이블인 batch_job_instance와 같은 테이블에는 데이터가 들어가지 않아. 혹시 몰라서 db url도 확인해봤는데 정확하게 일치하고.. 이런경우는 어디를 확인해보면 제일 좋을까..? 콘솔에서 에러도 나지 않고 정상적으로 배치가 실행돼서 답답해 미치겠어 ㅠㅠㅠ콘솔에서 insert 쿼리는 안나가고 있는데, 쿼리가 왜 안나가는지를 도통 모르겠어 ㅠㅠ 다 잘되는데 메타테이블에만 데이터가 들어가지 않네.. 참고로 tasklet 방식으로 테스트 해봤어.부트 4.0.1 버전 사용하고 있어 킬구형 강의자료 에서도 중간에 postgresql로 전환하던데 혹시 h2는 뭔가 지원을 하지 않는걸까?
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
Spring Batch 4.3.10 ExecutionContext 한글 인코딩 이슈 관련 문의
안녕하세요, 강사님.강의와 직접적인 내용은 아니지만, 사내에서 발생한 Spring Batch 오류와 관련해 조언을 구하고자 문의드립니다.혹시 시간이 괜찮으시다면 의견을 주시면 정말 감사하겠습니다. 1. 사용 환경Java 1.8Spring Boot 2.7.18Spring Batch 4.3.10 2. 문제 상황Spring Batch의 ExecutionContext에 한글 문자열을 저장한 이후,다음 배치 실행 시 아래와 같은 오류가 발생하며 Job이 지속적으로 실패하는 현상이 있었습니다.java.lang.IllegalArgumentException: Unable to deserialize the execution context at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao$ExecutionContextRowMapper.mapRow ~ Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 start byte 0xb2 해당 Job은 chunk 기반 배치이며, 마지막 close() 시점에다음 Step에서 사용할 정보를 ExecutionContext에 저장하고 있었습니다.이때 ResultMsg: "정상 처리" 와 같이 한글 문자열을 저장한 이후부터 문제가 발생했습니다. 3. 원인 분석IDE(Eclipse) 인코딩: UTF-8DB 인코딩: NLS_CHARACTERSET = KO16MSWIN949BATCH_JOB_EXECUTION_CONTEXT 테이블에 데이터가 저장되는 구조로 확인했습니다.SHORT_CONTEXT (VARCHAR2(2500))SERIALIZED_CONTEXT (CLOB) 2500byte 이하의 데이터는 SHORT_CONTEXT에 저장되는데,해당 컬럼이 MS949 기반 VARCHAR2이다 보니UTF-8 기반으로 직렬화된 ExecutionContext 내 한글 데이터가 깨지면서다음 실행 시 역직렬화 실패가 발생한 것으로 판단했습니다. 4. 고민 중인 해결 방안현재 아래와 같은 방안들을 검토 중입니다.ExecutionContext에 한글 자체를 사용하지 않도록 제한로깅/메시지 성 데이터는 저장하지 않거나, 필요 시 영문만 사용ExecutionContext에 putString 시 인코딩 설정ExecutionContext 직렬화 방식 변경ExecutionContextSerializer를 DefaultExecutionContextSerializer 명시정으로 설정(Spring Batch 4는 기본적으로 JacksonExecutionContextStringSerializer) 5. 질문강사님께서 보시기에 위 상황에서 가장 권장되는 해결 방향 또는 실무적으로 안전한 접근 방식은 무엇이라고 생각하시는지 궁금합니다.긴 글 읽어주셔서 감사드리며, 가능하실 때 조언 주시면 많은 도움이 될 것 같습니다.감사합니다.
-
해결됨죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
@StepScope 또는 @JobScope와 JobOperator
킬구형아래는 step에서 ItemWriter의 jobParameter자리에 null을 넣는 방식으로 처리한거야. @Scheduled(cron = "0 0 19,22 * * *") fun runSampleJob() { jobOperator.start(sampleJob(),jobParameters) } @Bean fun sampleJob(): Job = JobBuilder("sampleJob",jobRepository) .start(sampleStep()) .build() @Bean fun sampleStep(): Step = StepBuilder("sampleStep", jobRepository) .chunk<String, String>(CHUNK_SIZE) .transactionManager(transactionManager) .reader(sampleReader()) .writer(sampleWriter(null, null)) .build() @Bean @StepScope fun sampleReader(): JdbcPagingItemReader<String> = JdbcPagingItemReaderBuilder<String>() ... .build() @Bean @StepScope fun sampleWriter( @Value("#{jobParameters['title']}") title: String?, @Value("#{jobParameters['content']}") content: String?, ): ItemWriter<String> = ItemWriter { chunk -> ...doSomeWrite }위 코드를 빈 주입방식으로 변경하는 방법을 모르겠어.빈 주입 방식으로 변경하면 아래처럼 되잖아?이때 jobOpterator로 잡을 호출하는 부분까지 파라미터가 올라와버리는데 이걸 어떻게 해야할지 모르겠단 말이야~~!@Scheduled(cron = "0 0 19,22 * * *") fun runSampleJob() { jobOperator.start(sampleJob(**여기를 어떻게 처리하지?**),jobParameters) } @Bean fun sampleJob( sampleStep: Step ): Job = JobBuilder("sampleJob",jobRepository) .start(sampleStep) .build() @Bean fun sampleStep( sampleReader: ItemReader<String>, sampleWriter: ItemWriter<String> ): Step = StepBuilder("sampleStep", jobRepository) .chunk<String, String>(CHUNK_SIZE) .transactionManager(transactionManager) .reader(sampleReader) .writer(sampleWriter) .build() @Bean @StepScope fun sampleReader(): JdbcPagingItemReader<String> = JdbcPagingItemReaderBuilder<String>() ... .build() @Bean @StepScope fun sampleWriter( @Value("#{jobParameters['title']}") title: String?, @Value("#{jobParameters['content']}") content: String?, ): ItemWriter<String> = ItemWriter { chunk -> ...doSomeWrite }새해 복 많이 받아 형~
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
Remote Partitioning
킬구형 6장 작전3에 첫번째 예제 Manager 노드 실행 명령까지 전체 코드를 볼 방법이 있어?
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
킬구형 실무에서는 JPA ItemReader / ItemWriter를 더 선호해? 아니면 JDBC itemReader/ItemWriter를 더 선호해?
킬구형, 히사시 부리데쓰요야 JPA ItemReader와 ItemWriter를 배워보니 생각보다 복잡하고 여러 고려사항들이 많이 보이는 것 같아, 물론 JPA와 통합이 되서 좋지만 JPA를 쓰고 있어도 Jdbc ItemReader/ItemWriter가 훨씬 더 간단해보이고 별다른 추상화도 많이 없어서 성능도 괜찮아 보이는데, 실무에서는 어때? JPA ItemReader / ItemWriter를 더 많이 써 Jdbc ItemReader/ItemWriter를 더 많이써? PS: 킬구형 어떻게 이렇게 자세한 내용들을 알 수 있는거야? spring batch 공식 docs를 봐도 내용이 없던데 ㅋㅋㅋ 킬구형 없었으면 큰일 날뻔했자너~
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
병렬 중첩
형 병렬 중첩 시키는 예제 코드있잖아.Worker Step Multithreading Implementation@Bean public Step managerStep(Step workerStep) { return new StepBuilder("managerStep", jobRepository) .partitioner("workerStep", dailyTimeRangePartitioner) .step(workerStep) .taskExecutor(partitionTaskExecutor()) .gridSize(4) .build(); } @Bean public Step workerStep(SynchronizedItemReader<TestLog> redisLogReader, ItemProcessor<TestLog, TestLog> logProcessor, MongoItemWriter<TestLog> mongoLogWriter) { return new StepBuilder("workerStep", jobRepository) .<TestLog, TestLog>chunk(500, transactionManager) .reader(redisLogReader) .processor(logProcessor) .writer(mongoLogWriter) .taskExecutor(workerTaskExecutor()) .build(); } @Bean @StepScope public SynchronizedItemReader<TestLog> redisLogReader(@Value("#{stepExecutionContext['startDateTime']}") LocalDateTime startDateTime) { log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH"))); RedisItemReader<String, TestLog> itemReader = new RedisItemReaderBuilder<String, TestLog>() .redisTemplate(redisTemplate()) .scanOptions(ScanOptions.scanOptions() .match("logs:" + startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")) + ":*") .count(10000) .build()) .build(); return new SynchronizedItemReader<>(itemReader); }@Bean public TaskExecutor workerTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(5); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(10); executor.setThreadNamePrefix("Worker-Thread-"); executor.setAllowCoreThreadTimeOut(true); executor.setKeepAliveSeconds(30); return executor; }이렇게 했는데2025-12-17T13:34:07.885+09:00 ERROR 1568 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step workerStep in job testLogJob java.lang.NullPointerException: Cannot invoke "org.springframework.data.redis.core.Cursor.hasNext()" because "this.cursor" is null at org.springframework.batch.item.redis.RedisItemReader.read(RedisItemReader.java:62) ~[spring-batch-infrastructure-5.2.4.jar:5.2.4] 이런 오류가 나.그리고 ItemReader쪽에log.info("{} read {}", Thread.currentThread().getName(), startDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")));로그를 찍었는데Worker-Thread-5 read 2025121712Worker-Thread-1 read 2025121706Worker-Thread-2 read 2025121700Worker-Thread-3 read 2025121718이런식으로 찍히고 있어.하나의 파티션을 여러 스레드가 읽어야 하는데 workerTaskExecutor가 서로 다른 파티션을 읽고 있어.내가 뭐 놓친게 있는걸까?
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
형 코드에 public static class 이게 뭐야 ?
형 코드 보고 있는데 아래와 같이 public static class로 되어 있어. 이게 무슨 의미야 ? 따로 static 메소드는 없는거 같은데
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
ExitStatus
킬구형사용자 정의 ExitStatus를 애플리케이션 종료 코드로 활용하기. 이거 커스텀 ExitCodeGenerator까지 만들었으면 jar를 실행하고 나서 $LASTEXITCODE로 조회했을 때 코드 값이 바뀌어 있어야 하는거지?
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
Batch6: jobOperator.startNextInstance() throws UnexpectedJobExecutionException
KILL-9형 도와줘,,!!!spring boot 4.0.0, spring batch6, java24 사용중이야 아래 코드를 스케줄러를 통해 "deleteSuspendedJob"을 1분마다 동작하게 하고 싶었어. 그리고 실제로 동작하긴 해. 딱 1번만.... @Configuration class DeleteSuspendedScheduler( private val jdbcTemplate: JdbcTemplate, private val jobOperator: JobOperator, private val jobRepository: JobRepository, private val transactionManager: PlatformTransactionManager, ) { @Scheduled(cron = "0 */1 * * * *") //1분마다 실행되길 기대함 fun runDeleteSuspendedJob() { jobOperator.startNextInstance(deleteSuspendedJob()) } @Bean fun deleteSuspendedJob(): Job = JobBuilder("deleteSuspendedJob", jobRepository) .incrementer(RunIdIncrementer()) .start( deleteSuspendedStep()) .build() @Bean fun deleteSuspendedStep(): Step = StepBuilder("deleteSuspendedStep", jobRepository) .tasklet(DeleteSuspendedTasklet(jdbcTemplate), transactionManager) .build() }아래는 에러 로그야. 2025-12-11T19:37:31.332+09:00 INFO 39640 --- [ main] com.clip.BatchApplicationKt : Started BatchApplicationKt in 6.378 seconds (process running for 6.894) 2025-12-11T19:38:00.017+09:00 INFO 39640 --- [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobOperator : Launching next instance of job: [deleteSuspendedJob] with parameters: [{JobParameter{name='run.id', value=1, type=class java.lang.Long, identifying=true}}] 2025-12-11T19:38:00.019+09:00 INFO 39640 --- [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=deleteSuspendedJob]] launched with the following parameters: [{JobParameter{name='run.id', value=1, type=class java.lang.Long, identifying=true}}] 2025-12-11T19:38:00.052+09:00 INFO 39640 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [deleteSuspendedStep] 2025-12-11T19:38:00.065+09:00 INFO 39640 --- [ scheduling-1] c.c.b.b.t.DeleteExpiredBlacklistTasklet : 0개의 기간 만료된 탈퇴 이력(재가입 방지용) 레코드가 삭제되었습니다. 2025-12-11T19:38:00.067+09:00 INFO 39640 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [deleteSuspendedStep] executed in 14ms 2025-12-11T19:38:00.067+09:00 INFO 39640 --- [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=deleteSuspendedJob]] completed with the following parameters: [{JobParameter{name='run.id', value=1, type=class java.lang.Long, identifying=true}}] and the following status: [COMPLETED] in 15ms 2025-12-11T19:39:00.007+09:00 INFO 39640 --- [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobOperator : Launching next instance of job: [deleteSuspendedJob] with parameters: [{JobParameter{name='run.id', value=2, type=class java.lang.Long, identifying=true}}] 2025-12-11T19:39:00.009+09:00 ERROR 39640 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task org.springframework.batch.core.job.UnexpectedJobExecutionException: Illegal state (only happens on a race condition): job instance already complete with name=deleteSuspendedJob and parameters={JobParameter{name='run.id', value=2, type=class java.lang.Long, identifying=true}} at org.springframework.batch.core.launch.support.SimpleJobOperator.startNextInstance(SimpleJobOperator.java:314) ~[spring-batch-core-6.0.0.jar:6.0.0] at org.springframework.batch.core.launch.support.TaskExecutorJobOperator.startNextInstance(TaskExecutorJobOperator.java:133) ~[spring-batch-core-6.0.0.jar:6.0.0] at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:565) ~[na:na] at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:359) ~[spring-aop-7.0.1.jar:7.0.1] at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-7.0.1.jar:7.0.1] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:158) ~[spring-aop-7.0.1.jar:7.0.1] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:370) ~[spring-tx-7.0.1.jar:7.0.1] Caused by: org.springframework.batch.core.launch.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for identifying parameters={JobParameter{name='run.id', value=1, type=class java.lang.Long, identifying=true}}. If you want to run this job again, change the parameters. at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher.createJobExecution(TaskExecutorJobLauncher.java:149) ~[spring-batch-core-6.0.0.jar:6.0.0] at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher.run(TaskExecutorJobLauncher.java:108) ~[spring-batch-core-6.0.0.jar:6.0.0] at org.springframework.batch.core.launch.support.SimpleJobOperator.startNextInstance(SimpleJobOperator.java:294) ~[spring-batch-core-6.0.0.jar:6.0.0]원래 Kill9형 강의 보고 잡 런쳐로 정상동작 하도록 만들었던 걸, 이번에 배치6로 올리면서 JobLauncher가 JobOperator로 옮겨졌다는 문서를 보고 바꾼뒤로 퇴근을 못하고 있어,,역시 공식 문서보단 kill9 형 문서를 보고 했어야 했던걸까??오퍼레이터와 스케줄러를 통해 잡을 특정 주기마다 동작하는 방법(위 내 코드)이 뭐가 잘못된건지 알려주면 고맙겠어!!형 제발 도와줘!!!cf.https://github.com/spring-projects/spring-batch/issues/5115
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
jdbc 커서, 페이징에서 일대다 관계 데이터 뻥튀기 조회 처리 방법 질문
강의 섹션4. 데이터베이스를 지배하라. 챕터에서 아래는 JPA 방식으로 일대다 관계를 가진 데이터를 페치 조인으로 가져오는 예제 코드야.returnnewJpaCursorItemReaderBuilder<Post>().name("postBlockReader").entityManagerFactory(entityManagerFactory).queryString(""" SELECT p FROM Post p JOIN FETCH p.reports r WHERE r.reportedAt >= :startDateTime AND r.reportedAt < :endDateTime """).parameterValues(Map.of( "startDateTime", startDateTime, "endDateTime", endDateTime )) .build();근데 jdbc 커서 예제에서는 킬구형이 아래처럼 단일 테이블만 조회하는 예제를 사용했어.@Bean publicJdbcCursorItemReader<Victim> terminatedVictimReader() {returnnew JdbcCursorItemReaderBuilder<Victim>().name("terminatedVictimReader").dataSource(dataSource).sql("SELECT * FROM victims WHERE status = ? AND terminated_at <= ?") .queryArguments(List.of("TERMINATED", LocalDateTime.now())) .beanRowMapper(Victim.class) .build();}근데 내가 지금 하려고 하는 건 jdbc 커서 방식에서 일대다 관계 테이블 데이터를 조회해서 일일정산 데이터 만드는 기능으로 복습해보려고 하는데, 데이터가 뻥튀기 되서 강의 예제에서 해당 케이스를 찾아보려고 하는데, 못 찾아서 질문글 작성했어. jdbc 방식으로 일대다 관계 데이터를 Reader로 읽어와서 위에 jpa 구조로 매핑하려면 어떻게 해야하는지 알려줄 수 있어?@Bean @StepScope public JdbcCursorItemReader<OrderBatchJoinDto> jdbcCursorReader( @Value("#{jobParameters['orderDate']}") LocalDate orderDate) { LocalDateTime startOrderDate = orderDate.atStartOfDay(); LocalDateTime endOrderDate = orderDate.atTime(23, 59, 59); return new JdbcCursorItemReaderBuilder<OrderBatchJoinDto>() .name("jdbcCursorReader") .dataSource(dataSource) .sql(""" SELECT ob.ORDERS_BATCH_ID, ob.USER_ID, ob.STATUS, ob.ORDER_DATE_TIME, oib.ORDERS_ITEM_BATCH_ID, oib.ORDERS_BATCH_ID, oib.PRODUCT_BATCH_ID, oib.PRODUCT_NAME, oib.PRICE, oib.QUANTITY FROM orders_batch ob LEFT JOIN orders_item_batch oib ON ob.ORDERS_BATCH_ID = oib.ORDERS_BATCH_ID WHERE ob.ORDER_DATE_TIME BETWEEN ? AND ? ORDER BY ob.ORDERS_BATCH_ID, oib.ORDERS_ITEM_BATCH_ID """) .queryArguments(List.of(startOrderDate, endOrderDate)) .beanRowMapper(OrderBatchJoinDto.class) }public class OrderItemBatchDto { private Long id; private Long ordersBatchId; private Long productBatchId; private String productName; private int price; private int quantity; }public class OrderItemBatchDto { private Long id; private Long ordersBatchId; private Long productBatchId; private String productName; private int price; private int quantity; }
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
SkipPolicy는 여러번 불릴 수 있는가?
skip policy 에 대한 질문Firebase message를 writer 쪽에서 사용하고,override fun shouldSkip(throwable: Throwable, skipCount: Long): Boolean { if (throwable !is BatchUnregisteredException) return true if (throwable.errorCode == FCM_UNREGISTERED_TOKEN || throwable.errorCode == FCM_MULTIPLE_TOKEN_ERROR) { throwable.tokens.forEach { fcmToken -> checkUnregisterToken(fcmToken) } } return true }skipPolicy에서 위와 같이 unregister token들을 제거해주려고 했어. 그리고, 테스트코드에서 제거 로직이 한번만 불렸는지 체크했는데, 총 3번이 불렸다고 테스트가 실패하더라구(실제 데이터는 1개라는 가정하에)GPT는 여러번 불릴 수 있다고, SkipListener 에서 onSkipWrite 에서 unregister 된 토큰을 제거하라고 하는데1. 실제로 skipPolicy는 여러번 불리는게 맞는지1-1. Skip 여부 체크1-2 Skip 처리 중에서도 체크1-3 Chunk 완료 처리 시에도 확인 이라는데 맞아 ,,?2. 보통 이러한 토큰 제거 작업이 있다면 어디서 수행하는게 맞는지 알려줘 ~
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
형 실무에서 배치 시스템은 어떤 식으로 HA를 구성해??
형! 퇴근도 못하고 일하다가 이제야 형 강의보면서 주말을 맞이하고 있어! 스프링 배치를 써서 분 단위, 하루 단위 KPI를 산출하는 배치 프로그램을 만드려고 하는데, 실무에서는 어떻게 HA를 구성하는 지 궁금해졌어. 가령, k8s에서 같은 배치를 돌리는 pod가 여러 개이면 배치가 동시에 돌 것 같고, pod가 한 개이면 하나의 배치 시스템이라서 위험할 것 같은데, 어떤 식으로 실무에서 하는 지 궁금해!
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
메타데이터 관리
킬구형메타데이터쪽 update를 읽다가 이해가 안가는게 있는데 형이update() 메서드는 매 트랜잭션의 커밋 직전에 호출된다. 단, 처리 도중 예외가 발생하여 트랜잭션이 롤백되는 경우에는 호출되지 않는다. 이는 실패한 처리 내용이 실행 정보에 반영되는 것을 방지한다.라고 했는데 그러면 문제가 생겨서 update를 호출하지 않고 롤백이 됬다고 하면 open입장에서는 실패했는지 안했는지도 모르는거아니야? 실패를 해도 마지막으로 저장된 곳부터 다시 시작하니까 실패를 아예 저장을 안한다는거야?재시작할때 메타테이블에서 execution값을 받아와서 괜찮은건가?
-
미해결죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.
2장. 작전2: 분산 서버 로그 처형 작전 Resource[]의 대체방안(읽어야할 내용이 매우 커지면?)
형님 안녕하십니까!2장 작전2 내용 중에, collected_logs에 모아진 모든 로그파일 내용을 Resource[]에 담고, 이를 flatFileItemReader로 읽기를 위임하는 부분이 있습니다. 아래는 그 중 Resource[] 배열에 해당 로그 내용을 담는 부분입니다. private Resource[] getResources(String date) { try { //String userHome = System.getProperty("user.home"); String userHome = "C:/Users/gyrbs/OneDrive/Desktop"; String location = "file:" + userHome + "/collected_logs/" + date + "/*.log"; //경로 직접 주입이 아닌 Resolver를 통한 url구성은 백슬래쉬가 아닌 /로 해야 인식 PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); return resolver.getResources(location); } catch (IOException e) { throw new RuntimeException("Failed to resolve log files", e); } } 이에 대해서 gpt한테 몇가지를 물어보았는데, 먼저 배열에 읽어야할 내용을 담고 이를 filterReader로 읽는 과정이 Resource[] 배열에 읽어야할 모든 내용을 모두 저장하고, 이 저장한 내용을 reader가 받아서 청크지향처리라면 청크사이즈만큼 읽고 넘기고 아니면 지금과 같이 일괄적인 읽기를 진행하는 것으로 보였습니다. 그래서 만약에 읽어야할 내용이 매우 커지면 Resource[]라는 배열에 이 모든 내용을 넣기가 어려울 것이고, 강의에서 항상 강조해주시는 JVM무덤, 배치무덤이 되지않을까 하는 생각이 들었습니다. 그래서 이에 대한 방안을 gpt한테 한번 물어보았는데, public class StreamingMultiFileReader<T> extends AbstractItemStreamItemReader<T> { private Iterator<Path> fileIterator; private FlatFileItemReader<T> currentReader; private final Function<Path, FlatFileItemReader<T>> readerFactory; public StreamingMultiFileReader(Path dir, Function<Path, FlatFileItemReader<T>> readerFactory) throws IOException { this.fileIterator = Files.list(dir).iterator(); this.readerFactory = readerFactory; } @Override public T read() throws Exception { if (currentReader == null) { if (!fileIterator.hasNext()) { return null; // 모든 파일 처리 완료 } Path nextFile = fileIterator.next(); currentReader = readerFactory.apply(nextFile); currentReader.open(new ExecutionContext()); } T item = currentReader.read(); if (item == null) { // 파일 끝 currentReader.close(); currentReader = null; return read(); // 다음 파일로 넘어감 } return item; } } 이런 방법이 있다고 하는데(StreamingMultiFileReader로 deletegate 처리없이, 파일 하나하나씩 바로바로 처리해나가는 과정), 파일 하나하나씩 읽어들이면서 더이상의 파일이 없을때까지 순회하여 메모리부담은 없다고는 하였습니다. 혹시, 이 방법 말고도 청크지향처리를 이용해서 청크사이즈 만큼 파일, 혹은 파일 내부의 record를 읽고 넘기는, 그리고 이 과정을 모든 파일에 대해 순회하는 방식은 없을지 질문드려보고자 합니다! 실무에서 사용하는 방법이 궁금해서 질문드리게 되었습니다! 감사합니다.