묻고 답해요
169만명의 커뮤니티!! 함께 토론해봐요.
인프런 TOP Writers
-
미해결스프링 배치
flow 위에 jobparameters를 사용하기 위해서는 어떤 scope를 정의해줘야 하나요?
안녕하세요, 강사님!job > flow > step으로 구성되어 있는데 flow위에 jobparameters를 사용하기 위해서는 어떤 scope를 정의해줘야 하나요? step scope is not active라고 자꾸 오류가 나네요.강의를 봐도 잘 이해가 안 가서요. 아직 제가 개념이 덜 잡혔나봅니다. ㅠㅠ답변 감사드립니다! 코드입니다!@Slf4j @Getter @NoArgsConstructor public class CreateDateJobParameter { private String startDate; private String endDate; @Value("#{jobParameters[startDate]}") public void setStartDate(String startDate) { this.startDate = startDate; } @Value("#{jobParameters[endDate]}") public void setEndDate(String endDate) { this.endDate = endDate; } } @Slf4j @RequiredArgsConstructor @Configuration public class OrderProcessingJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final CreateDateJobParameter jobParameterCreator; @JobScope @Bean public CreateDateJobParameter jobParameterCreator() { return new CreateDateJobParameter(); } @Bean public Job orderProcessingJob() { return jobBuilderFactory.get("orderProcessingJob") .incrementer(new UniqueRunIdIncrementer()) .start(fetchOrderDataFlow()) .next(getOrderListFlow()) .split(taskExecutor()) .add() .end() .build(); } @Bean @StepScope public Flow fetchOrderDataFlow() { log.debug(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>fetchOrderDataFlow"); Flow flow1 = new FlowBuilder<Flow>("flow1") .start(fetchOrderDataStep1()) .build(); Flow flow2 = new FlowBuilder<Flow>("flow2") .start(fetchOrderDataStep2()) .build(); Flow flow3 = new FlowBuilder<Flow>("flow3") .start(fetchOrderDataStep3()) .build(); Flow flow4 = new FlowBuilder<Flow>("flow4") .start(fetchOrderDataStep4()) .build(); Flow flow5 = new FlowBuilder<Flow>("flow5") .start(fetchOrderDataStep5()) .build(); FlowBuilder<SimpleFlow> flowBuilder = new FlowBuilder<>("fetchOrderDataFlow"); flowBuilder.split(taskExecutor()) .add(flow1, flow2, flow3, flow4, flow5); return flowBuilder.build(); } @Bean public Step fetchOrderDataStep1() { return createFetchOrderDataStep("fetchOrderDataStep1", "store1", jobParameterCreator.getStartDate(), jobParameterCreator.getEndDate()); } ... }
-
미해결스프링 배치
springbatch partition vs pararell 질문드립니다.
안녕하세요. 정수원 강사님스프링 배치 강의 잘 보고 있습니다. 많은 도움 받고 있어요. 정말 감사드립니다.토이 프로젝트를 진행하고 있는데 막상 구현하려니 쉽지 않네요. 어떤식으로 설계하면 좋을지 조언 구합니다.구현하려는 부분은 아래와 같습니다.rest api로 가맹점 5곳에 각각 api를 호출하여 주문id의 총 개수와 주문id 목록( total number of orderId and orderIds)을 가져와서 DB에 저장합니다.가맹점 5곳에서 받아온 total number of orderId and orderIds 를 DB에서 조회해와서 해당 목록을 chunk 사이즈로 나누어서 각각 rest api로 가맹점 5곳에 호출하여 목록을 가져와서 DB에 저장합니다.partiton과 pararell step을 이용해서 구성하려고 하는데 설계부터가 만만치 않네요.1은 pararell step을 이용하여 병렬로 각각 수행하도록 구현하려고 하고, 2는 총 개수와 grid 개수를 가지고 pagination 혹은 partiton을 이용해서 구현하려고 하는데 어떻게 해야할지 감이 잘 안 잡힙니다. 혹은 혼합하여 사용할 수도 있을까요? 강사님께서 부연설명 덧붙여 주신다면 큰 도움이 될 것 같습니다. 감사드립니다.
-
미해결스프링 배치
@JobScope JobParmeter Binding 질문
안녕하세요. @JobScope에 대해서 궁금한게 생겨서요!@Jobscope 를 쓰면 갖는 이점에 대해서는 이해했습니다. 웹어플리케이션이 실행될 때, Job Bean 객체에는 Scope의 JDKProxy 또는 CGLibProxy가 주입되는 것을 이해했고 Step 메소드 실행 시, 실제 Step 객체가 주입되는 것을 이해했습니다.그러므로 여러 스레드 ( 병렬 처리 ) 가 요청에 대한 처리를 할 때, thread safe에 대한 이점도 있을 거고 동시성에도 이점이 있을 거라고 생각됩니다. 여기서 궁금한게, @Value JobParmeters를 바인딩 하는 부분이 궁금한데요.질문 1. 이 JobParmeters는 그럼 Job에서 설정한 여러 Jobpameters 중 꼭 한가지 key값이여야만 하는 걸까요? 가령, name, requestDate 라한다면 둘 중 하나를 선택해도 무방한거죠? 두개는 안될까요?2. 당연한 말이겠지만 여러 Job들에서 다른 Step을 공유하기 위해선 이 Jobparmeter를 정적인 변수보다는 requestDate와 같은 yyyyhhmm ss 같은 변수를 사용하는 것이 좋겠죠?감사합니다!
-
미해결스프링 배치
JdbcCursorItemReader ,JdbcBatchItemWriter한개의 db에서 데이터 전환
안녕하세요 JdbcCursorItemReader 와 JdbcBatchItemWriter를 이용해 기존 데이터를 변환해서 다른 테이블로 이관하는 테스트를 진행하고 있습니다. 근데 JdbcCursorItemReader 에서 select는 잘되는데 JdbcBatchItemWriter에서 insert쿼리를 날릴때 무한대기 현상이 발생하는거 같습니다.@Bean public Job transferDBJob(JobRepository jobRepository, Step transferBookStep){ return new JobBuilder("transferDBJob",jobRepository) .start(transferBookStep) .build(); } @Bean public Step transferBookStep(JobRepository jobRepository, PlatformTransactionManager tm, JdbcPagingItemReader<TransSpecies> pagingSpeciesReader, ItemProcessor<TransSpecies, Species> transItemProcessor, JdbcBatchItemWriter<Species> transItemWriter) { return new StepBuilder("transferBookStep",jobRepository) .<TransSpecies,Species>chunk(10,tm) .reader(pagingSpeciesReader) .processor(transItemProcessor) .writer(transItemWriter) .build(); } @Bean public JdbcPagingItemReader<TransSpecies> pagingSpeciesReader(DataSource dataSource,PagingQueryProvider createQueryProvider) { return new JdbcPagingItemReaderBuilder<TransSpecies>() .name("transSpeciesReader") .pageSize(10) .fetchSize(10) .dataSource(dataSource) .rowMapper(new BeanPropertyRowMapper<>(TransSpecies.class)) .queryProvider(createQueryProvider) .build(); } @Bean public PagingQueryProvider createQueryProvider(DataSource dataSource) throws Exception { SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean(); queryProvider.setDataSource(dataSource); queryProvider.setSelectClause("SPECIESKEY,AUTHOR,TITLE"); queryProvider.setFromClause("FROM TRANSSPECIES"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("SPECIESKEY", Order.ASCENDING); queryProvider.setSortKeys(sortKeys); return queryProvider.getObject(); } @Bean public ItemProcessor<TransSpecies,Species> transItemProcessor(){ return transSpecies ->{ Species species = new Species(); species.setRecKey(transSpecies.getSpeciesKey()); species.setTitle(transSpecies.getTitle()); species.setAuthor(transSpecies.getAuthor()); return species; }; } @Bean public JdbcBatchItemWriter<Species> transItemWriter(DataSource dataSource){ return new JdbcBatchItemWriterBuilder<Species>() .sql("INSERT INTO SPECIES VALUES (:recKey, :author, :title)") .beanMapped() .dataSource(dataSource) .build(); }현재 작성한 간단하 코드이며로그에는 현재 이렇게 무한대기 상태에 놓이게 됩니다.확인 부탁드립니다 ㅠㅠ
-
미해결스프링 배치
스프링 배치 스케일 아웃 관련 문의
안녕하세요. 선생님 강의 들으면서 실무에 Spring Batch를 잘 녹여보고자 노력하고 있는 직장인입니다. 다름이 아니라 부서에서 Spring Batch를 활용해 배치의 성능을 높여야 하는 상황이라, 스케일 아웃을 고려하고 있는데요. 문제는 스케일 아웃을 하되 워커(혹은 슬레이브) 서버에서는 DB에 접근을 하지 못하는 상황이라, read-process-write의 구조에서 read/write는 마스터 서버에서 처리 해야 하는 상황입니다. 그래서 처음에는 AsyncItemProcessor/AsyncItemWriter를 활용하여, 마스터 프로세스에서 워커 서버들에 HTTP API를 호출하고 결과를 리턴 받으면 write를 하는 방식을 고려 했는데요.그런데, chunk 단위로 순차적으로 read-process-write를 하다보니 기대보다는 성능이 안나오고 있습니다 😭 사실 기대했던 바는 process를 하고 있을 때 read/write도 동시에 진행되면 어떨까였습니다. (이 부분이 혹시 잘못됐다면 지적해주세요.) 그래서 든 생각이 read-process-write가 chunk 단위로 순차적으로 되는 진행되는 것이 아니라, reader와 processor와 writer가 서로 publisher-subscriber 구조로 개별적으로 동작하는게 어떨까라는 생각이 들었습니다.조금 더 말씀드리면 reader가 파일을 읽어 큐에 넣고 processor는 큐의 데이터를 읽어 처리하고 writer가 보고 있는 queue에 넣으면 writer가 해당 큐를 참조하여 파일 write를 하는 방식입니다. 혹시 이 아이디어에서 제가 잘못 생각하고 있는게 있을까요? 아니면 기존의 chunk oriented tasklet을 손보면 말씀드린 것과 같은 구조를 만들 수가 있나요?
-
미해결스프링 배치
@Autowired 관련 질문입니다.
@Autowired private Job fileJob; @Autowired private Job apiJob;각각 다른 클래스에서 주입받아야하는 Job이 다른데 변수 명으로 스프링이 알아서 알맞은 Job을 주입해주는것인가요?
-
미해결스프링 배치
branch Part11 관련질문입니다.
안녕하세요 정수원 선생님 브랜치 Part11에 관해 동작문의가 2개 있습니다.1) SendJobConfiguration에서 apiJob이 제일 먼저 실행이 되는 job이고apiJob에서 apiStep1 (시작로그를 남기는) , apiStep2(끝냈다는 로그를 남기는) 사이에 주입받은 jobStep을 실행시키는데 이 jobStep은 SendChildJobConfiguration에 있는 jobStep 인가요? 2) 이후에 jobStep에서 apiMasterStep을 실행시키고 Partition으로 쪼개서 3개의 스레드에 할당할때는 apiSlaveStep에서 read부터 각 스레드가 담당하는것이 맞나요?
-
해결됨스프링 배치
이클립스에서 아무리 Application을 실행해도 잡이 실행이 안됩니다.
구글링도 많이해보고 질문 게시판도 많이 뒤져봤는데, 강사님이 작성해주신 그대로 Job을 작성했지만, Application 을 실행하면 그냥Started SpringBatchApplication in 1.373 seconds (process running for 2.321)이 실행되었다고만 뜨고 Job은 돌지 않습니다 ........ 따로 설정해주어야 하는 것이 있나요? Application 파일 위에 @EnableBatchProcessing 도 다 붙여놓은 것 확인하였습니다 제 코드는 여기 있습니다.https://github.com/SMJin/Spring-batch
-
해결됨스프링 배치
트랜잭션 경계 와 트랜잭션 begin에 대한 구분
안녕하세요 정수원 선생님 질문이 2개 있습니다.1) "Chunk Process 아키텍처" 첫번째 사진 설명부분 4:26쯤에 트랜잭션 경계와 실제 트랜잭션 begin을 구분하셨는데 이것이 무슨 차이인가요?아니면 어떻게 받아들여야하는것인가요? 2) 또한 코드를 디버깅 걸어서 따라가보니 TaskletStep#doInChunkContext 에서 new TransactionTemplate.execute() 에서 매 chunk마다 트랜잭션이 시작되는것은 확인하였는데 그림에 나온것 처럼 SImpleChunkProvider 다음과정에서 Transaction 시작하는곳을 찾지 못하겠습니다...
-
해결됨스프링 배치
JdbcBatchItemWriter Query
안녕하세요 선생님!JdbcBatchItemWriter 에서 발생하는 Query 가batch query 로 나가지 않는 것 같아서 질문 드립니다! 혹시 jdbc 에서 batch query 로 보내더라도 optimizer 에 의해단일쿼리로 변경되어서 나갈수도 있나요?! 감사합니다. 위 사진은 MySQL 의 Query Log 입니다. 제 설정은 아래와 같습니다@Configuration class HelloJobConfiguration( private val jobBuilderFactory: JobBuilderFactory, private val stepBuilderFactory: StepBuilderFactory, private val dataSource: DataSource, ) { @Bean fun job(): Job { return jobBuilderFactory.get("job") .incrementer(RunIdIncrementer()) .start(step1()) .build() } @Bean fun step1(): Step { return stepBuilderFactory.get("step1") .chunk<Customer, Customer>(5) .reader(customItemReader()) .writer(customItemWriter()) .build() } @Bean fun customItemReader(): ItemReader<Customer> { return JdbcPagingItemReaderBuilder<Customer>() .name("jdbcPagingItemReader") .dataSource(dataSource) .pageSize(5) .rowMapper(BeanPropertyRowMapper(Customer::class.java)) .queryProvider(queryProvider()) .build() } @Bean fun queryProvider(): PagingQueryProvider { return SqlPagingQueryProviderFactoryBean().apply { setDataSource(dataSource) setSelectClause("SELECT id, firstName, lastName, birthdate") setFromClause("FROM customer") setSortKeys(mapOf("id" to Order.ASCENDING)) }.`object` } @Bean fun customItemWriter(): ItemWriter<Customer> { return JdbcBatchItemWriterBuilder<Customer>() .dataSource(dataSource) .sql("INSERT INTO customer2 values(:id, :firstName, :lastName, :birthdate);") .beanMapped() .build() } }
-
해결됨스프링 배치
cursor 동작원리
안녕하세요 선생님.https://www.inflearn.com/questions/341918 좋은 강의 잘 듣고있습니다! 감사합니다. 안녕하세요 선생님.비슷한 고민을 하다가 이 질문을 찾아오게 되었는데요,예를들어 1000 건의 데이터가 있고, 100건씩 데이터를 처리하려고 하더라도실제로 SQL 자체는 단 한번 실행되고,데이터베이스 서버에서 해당 resultSet 을 가지고있으면서cursor 를 batch application 으로 반환하고,cursor 를 통해서 필요시 DB server 에 실제 데이터를 요청한다고 이해했습니다. (그리고 받은걸 메모리에 올려서 작업) 그러면 결국 DB 서버에서는 그 많은 데이터를 전부 메모리에 올려놓고 batch 작업이 종료될 때까지 유지해야한다고 이해했는데, 혹시 제가 이해한게 맞나요?!
-
미해결스프링 배치
ItemStream의 open 메서드 관련 궁금합니다
안녕하세요 강사님~먼저 질좋은 강의를 만들어주셔서 감사합니다 :)ItemStream 관련 궁금증이 생겨 질문드리게 되었는데요!public void open(ExecutionContext executionContext)위 메서드의 ExecutionContext 는 Step의 ExecutionContext 로 이해했습니다.그런데 Step 중간에 오류가 발생할 경우. 전체 Job이 실패하고.JobExecution & JobExecutionContext 가 새로 생성되며, 해당 Step의 StepExecution & StepExecutionContext 또한 새로 생성되어. 재시작전의 StepExecutionContext 에 저장해둔 index 는 꺼내어 사용 못하는게 아닌가요..?!재시작 후에 ExecutionContext 에서 index를 가져올 수 있는 부분이 헷갈립니다ㅜ추가로 index 예제는 ItemStream 통한 복구대비를 해두어. 재시작시 오류가 발생한 Chunk의 item부터 이어서 처리할 수 있었는데요.ItemStream 없이 ItemReader 만 사용할 경우는 트랜잭션 커밋되어 반영된 Chunk 가 있어도 무시하고 처음 Chunk 부터 다시 작업을 하게되는걸까요?
-
미해결[스프링 배치 입문] 예제로 배우는 핵심 Spring Batch
Chunk size에 대해
안녕하세요.코드를 보면 Step을 만들 때, .<Orders, Accounts>chunk(5) 와 같이 chunk size를 5로 지정했는데요.ItemReader에서도 PageSize를 5개로 지정했습니다.그리고 실제로 배치를 실행하면 정상적으로 동작합니다.하지만, 로그를 보면 select와 insert가 섞여있는데요.5개씩 실행되는게 어느 단위인지 궁금합니다.1. Reader에서 5개를 읽어온다.2. 5개를 Processcor에서 처리한다.3. 5개를 Writer에서 DB에 작성한다.저는 위와 같은 방식이라고 생각했는데, 어떤게 맞는건가요?좋은 강의 감사합니다!
-
해결됨스프링 배치
job execution에 stopped status가 안나타납니다.
안녕하세요 정수원 선생님 현재 Part.4.3.1.3.3 branch에서 mysql db로 실행시켰을때step execution 에서 step1,step2 의 batch status는 각각 completed 이고exit code는 각각 Failed , Pass인 상태입니다. 하지만 job execution에는 status , exitcode 둘다 모두 failed 인상태입니다. 왜 job execution이 stopped로 안나타는지 알 수 있을까요? addDanglingEndStates 에서 끝나는 시점에 transitions에 2개가 추가 되어서 그런것일까요? 4를 타야할 것같은데 뭔가 5를 타는것같습니다.5,6은 StepState:name=[batchJob.step1] 시점에서 !hasFail 분기를 타고 추가된것입니다.
-
미해결스프링 배치
optionalKeys, requiredKeys
안녕하세요 20:00 부분입니다 옵셔널 키는 없어도 된다고 앞에서 해주셨는데왜 체크를 하는건가요 ?!
-
해결됨스프링 배치
Child Job 2번실행되는것에 관한 질문입니다.
안녕하세요 정수원 선생님저도 2번 실행되서 디버깅 해보니 ChildJob까지 @Bean으로 등록하면 JobLauncherApplicationRunner#executeLocalJobs에 ParentJob, ChildJob 이렇게 2개가 등록이 됩니다.그래서 ParentJob 실행할때 child 한번또 다시 child 실행총 2번이 진행된것을 확인 할 수 있었습니다. ChildJob을 @Bean등록을 해제하면 되는데 보통 이렇게 사용하는것이 맞나요?
-
미해결스프링 배치
spring batch 트랜잭션 질문입니다.
아래 설명해주신것처럼 스프링배치에서 외부에서 @Transactional 을 쓰는것을 허용하지 않는다는것을 확인했습니다.https://brunch.co.kr/@anonymdevoo/50여기를보고 semaphore.acquire를 하고 외부 트랜잭션이 끝나지 않아서 afterCompletion에서 해당 semaphore.release를 호출하지 못한다는것을 확인하였습니다.하지만 아래처럼@RequiredArgsConstructor @Configuration public class TaskletStepConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Bean public Job batchJob() { return this.jobBuilderFactory.get("batchJob") .incrementer(new RunIdIncrementer()) .start(taskStep()) .next(chunkStep()) .build(); } @Bean @Transactional public Step taskStep() { return stepBuilderFactory.get("taskStep") .tasklet((contribution, chunkContext) -> { System.out.println("step1 has executed"); return RepeatStatus.FINISHED; }) .build(); } @Bean @Transactional public Step chunkStep() { return stepBuilderFactory.get("chunkStep") .<String, String>chunk(3) .reader(new ListItemReader(Arrays.asList("item1","item2","item3"))) .processor(new ItemProcessor<String, String>() { @Override public String process(String item) throws Exception { return item.toUpperCase(); } }) .writer(list -> { list.forEach(item -> System.out.println(item)); }) .build(); } } @Transactional을 걸어도 잘 동작하는데 제가 놓친 부분이있을까요?디버깅을 거며 확인했는데도 잘 모르겠습니다..브랜치는 4.2.4.1입니다.
-
미해결스프링 배치
스키마설정
안녕하세요 강의 초반이지만 대박강의네요 질문이 하나 있습니다배치 설정중에 initialize-schema: never네버라는 설정은 누가 실수로 스키마를 사용할까봐안전 장치용으로 만들어진건가요??(개인적인 생각입니다)
-
미해결스프링 배치
스프링배치와 쿼츠의 연동 질문
안녕하세요, 실무에서 스프링배치를 처음 접하게 되어 알아보던 중 강사님의 강의가 신뢰할 수 있다고 생각되어 신청하였어요. 너무 잘 듣고 있습니다! 다만 강의를 듣다 추가적으로 궁금한 사항이 생겨서 여기저기 찾아보았는데 해결 방법을 찾지 못해 질문을 남깁니다실전예제를 따라하는 과정에서 quartz와 함께 스케줄링을 하는 부분을 보았는데, quartz의 경우 spring batch처럼 메타데이터를 테이블에서 관리할 수 있도록 하는 기능이 있어 해당 설정을 추가하였습니다.spring: quartz: job-store-type: jdbc이렇게 했을 경우 quartz 테이블에 자동으로 JobDetail 및 Trigger 등의 정보가 insert 되어야 할 것 같은데 안 되더라구요, 또한 기존에 작동하던 spring batch의 job이 실행이 되지 않습니다.quartz와 더 관련있는 부분이라 강의 내용과 조금 다른 부분일 거라는 생각에 질문이 조심스러웠지만 해결하고 싶은 마음에 이곳에 글을 남깁니다!
-
미해결스프링 배치
adapter
외부 서비스 호출을 하려면adapter를 사용해야 하는지요?맞다면 청크와 태스크릿 모두 사용이 가능한지와예제 소스 브랜치명 부탁드립니다.