강의

멘토링

로드맵

인프런 커뮤니티 질문&답변

qlghwp123님의 프로필 이미지
qlghwp123

작성한 질문수

죽음의 Spring Batch: 새벽 3시의 처절한 공포는 이제 끝이다.

2장. 작전2: FlatFileItemWriter (죽음을 파일에 새기다 ☠️)

[예제][3장][작전2] windows 에서 마지막 예제

작성

·

21

0

...

@Slf4j
@Configuration
@RequiredArgsConstructor
public class LogProcessingJobConfig {
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final String BASE_PATH = "C:\\Users\\user\\Desktop\\batch\\section3\\flatfileitemwriter";

    @Bean
    public Job logProcessingJob(
            Step createDirectoryStep,
            Step logCollectionStep,
            Step logProcessingStep
    ) {
        return new JobBuilder("logProcessingJob", jobRepository)
                .start(createDirectoryStep)
                .next(logCollectionStep)
                .next(logProcessingStep)
                .build();
    }

    @Bean
    public Step createDirectoryStep(SystemCommandTasklet mkdirTasklet) {
        return new StepBuilder("createDirectoryStep", jobRepository)
                .tasklet(mkdirTasklet, transactionManager)
                .build();
    }

    @Bean
    @StepScope
    public SystemCommandTasklet mkdirTasklet(
            @Value("#{jobParameters['date']}") String date
    ) {
        SystemCommandTasklet tasklet = new SystemCommandTasklet();
        tasklet.setWorkingDirectory(BASE_PATH);

        String collectedLogsPath = "collected_ecommerce_logs\\" + date;
        String processedLogsPath = "processed_logs\\" + date;

        try {
            tasklet.setCommand("cmd.exe", "/c", "mkdir", collectedLogsPath, processedLogsPath);
            tasklet.setTimeout(3000);
        } catch (Exception e) {
            log.error("mkdirTasklet error: ", e);
        }

        return tasklet;
    }

    @Bean
    public Step logCollectionStep(SystemCommandTasklet scpTasklet) {
        return new StepBuilder("logCollectionStep", jobRepository)
                .tasklet(scpTasklet, transactionManager)
                .build();
    }

    @Bean
    @StepScope
    public SystemCommandTasklet scpTasklet(
            @Value("#{jobParameters['date']}") String date
    ) {
        SystemCommandTasklet tasklet = new SystemCommandTasklet();
        tasklet.setWorkingDirectory(BASE_PATH);
        String processedLogsPath = "collected_ecommerce_logs\\" + date;

        StringJoiner commandBuilder = new StringJoiner(" & ");
        for (String host : List.of("localhost")) {
            // Windows 환경에서 해당 코드를 적용하려면 ssh 관련 설정들을 찾아야 해서 주석 처리
//            String command = String.format("scp %s:~/ecommerce_logs/%s.log %s\\%s.log", host, date, processedLogsPath, host);

            String sourcePath = BASE_PATH + "\\ecommerce_logs\\" + date + ".log";
            String command = String.format("copy %s %s\\%s.log", sourcePath, processedLogsPath, host);
            commandBuilder.add(command);
        }

        tasklet.setCommand("cmd.exe", "/c", commandBuilder.toString());
        tasklet.setTimeout(10000);

        return tasklet;
    }

    @Bean
    public Step logProcessingStep(
            MultiResourceItemReader<LogEntry> multiResourceItemReader,
            LogEntryProcessor logEntryProcessor,
            FlatFileItemWriter<ProcessedLogEntry> processedLogEntryJsonWriter
    ) {
        return new StepBuilder("logProcessingStep", jobRepository)
                .<LogEntry, ProcessedLogEntry>chunk(10, transactionManager)
                .reader(multiResourceItemReader)
                .processor(logEntryProcessor)
                .writer(processedLogEntryJsonWriter)
                .build();
    }

    @Bean
    @StepScope
    public MultiResourceItemReader multiResourceItemReader(
            @Value("#{jobParameters['date']}") String date
    ) {
        MultiResourceItemReader<LogEntry> resourceItemReader = new MultiResourceItemReader<>();
        resourceItemReader.setName("multiResourceItemReader");
        resourceItemReader.setResources(getResources(date));
        resourceItemReader.setDelegate(logFileReader());

        return resourceItemReader;
    }

    private Resource[] getResources(String date) {
        try {
            String formattedBasePath = BASE_PATH.replace("\\", "/");
            String location = "file:///" + formattedBasePath + "/collected_ecommerce_logs/" + date + "/*.log";

            PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();

            return resolver.getResources(location);
        } catch (IOException e) {
            throw new RuntimeException("Failed to resolve log files", e);
        }
    }

    @Bean
    public FlatFileItemReader<LogEntry> logFileReader() {
        return new FlatFileItemReaderBuilder<LogEntry>()
                .name("logFileReader")
                .delimited()
                .delimiter(",")
                .names("dateTime", "level", "message")
                .targetType(LogEntry.class)
                .build();
    }

    @Bean
    public LogEntryProcessor logEntryProcessor() {
        return new LogEntryProcessor();
    }

    @Bean
    @StepScope
    public FlatFileItemWriter<ProcessedLogEntry> processedLogEntryFlatFileItemWriter(
            @Value("#{jobParameters['date']}") String date
    ) {
        String outputPath = Paths.get(BASE_PATH, "processed_logs", date, "processed_logs.jsonl").toString();

        ObjectMapper objectMapper = new ObjectMapper();
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        javaTimeModule.addSerializer(LocalDateTime.class,
                new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")));
        objectMapper.registerModule(javaTimeModule);

        return new FlatFileItemWriterBuilder<ProcessedLogEntry>()
                .name("processedLogEntryJsonWriter")
                .resource(new FileSystemResource(outputPath))
                .lineAggregator(item -> {
                    try {
                        return objectMapper.writeValueAsString(item);
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException("Error Converting item to JSON", e);
                    }
                })
                .build();
    }

    @Data
    public static class LogEntry {
        private String dateTime;
        private String level;
        private String message;
    }

    @Data
    public static class ProcessedLogEntry {
        private LocalDateTime dateTime;
        private LogLevel level;
        private String message;
        private String errorCode;
    }

    public enum LogLevel {
        INFO, WARN, ERROR, DEBUG, UNKNOWN;

        public static LogLevel fromString(String level) {
            if (level == null || level.trim().isEmpty()) {
                return UNKNOWN;
            }
            try {
                return valueOf(level.toUpperCase());
            } catch (IllegalArgumentException e) {
                return UNKNOWN;
            }
        }
    }

    public static class LogEntryProcessor implements ItemProcessor<LogEntry, ProcessedLogEntry> {
        private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_DATE_TIME;
        private static final Pattern ERROR_CODE_PATTERN = Pattern.compile("ERROR_CODE\\[(\\w+)]");

        @Override
        public ProcessedLogEntry process(LogEntry item) throws Exception {
            ProcessedLogEntry processedLogEntry = new ProcessedLogEntry();
            processedLogEntry.setDateTime(parseDateTime(item.getDateTime()));
            processedLogEntry.setLevel(parseLevel(item.getLevel()));
            processedLogEntry.setMessage(item.getMessage());
            processedLogEntry.setErrorCode(extractErrorCode(item.getMessage()));
            return processedLogEntry;
        }

        private LocalDateTime parseDateTime(String dateTime) {
            return LocalDateTime.parse(dateTime, ISO_FORMATTER);
        }

        private LogLevel parseLevel(String level) {
            return LogLevel.fromString(level);
        }

        private String extractErrorCode(String message) {
            if (message == null) {
                return null;
            }

            Matcher matcher = ERROR_CODE_PATTERN.matcher(message);
            if (matcher.find()) {
                return matcher.group(1);
            }
            // ERROR 문자열이 포함되어 있지만 패턴이 일치하지 않는 경우
            if (message.contains("ERROR")) {
                return "UNKNOWN_ERROR";
            }
            return null;
        }
    }
}

 

윈도우즈에서는 scp 명령어 관련해서

  1. windows 서비스에서 open-ssh-server 실행

  2. 현재 사용하고 있는 사용자(windows 사용자) 가 관리자 권한을 보유 할 경우 programdata/sshd_config 파일 수정 필요

  3. 기타 파일 권한 정리 필요

로 인하여 scp 프로그램을 windows 환경의 intellij idea 에서 id, 비번치게끔 ide 콘솔에서 지원하지 않는거 같아.

 

그래서 제미니가 추천해준대로 실습 환경에서 scp 접속 단을 copy 로 그냥 퉁치는 것으로 바꿔놨어. 나처럼 windows 환경인 사람들이 있으면 혹시나 참고했으면 좋겠네.

 

ObjectMapper, JavaTimeModule 같은 경우는 org.springframework.boot:spring-boot-starter-json 의존성 추가 필요

퀴즈

Spring Batch의 청크 지향 처리(Chunk-Oriented Processing) 모델의 기본적인 3단계는 무엇일까요?

입력(Input) - 처리(Process) - 출력(Output)

읽기(Read) - 가공(Process) - 쓰기(Write)

수집(Collect) - 변환(Transform) - 저장(Save)

검증(Validate) - 실행(Execute) - 기록(Log)

답변 1

0

안녕하세요, 인프런 AI 인턴입니다.

qlghwp123 님께서 Windows 환경에서 SCP 명령어 대신 파일을 복사(Copy) 방식으로 대체한 부분이 잘 설명되어 있습니다. Windows에서는 SCP 사용이 복잡할 수 있으므로 copy 명령어로 처리하는 것은 매우 실용적인 접근법입니다. 추가적으로, ObjectMapperJavaTimeModule 사용을 위해 spring-boot-starter-json 의존성 추가가 필요하다는 점도 잘 명시해 주셨습니다.

Windows에서 SCP 관련 작업을 할 때의 주의사항과 대안을 제공해 주셔서 감사드리고, 다른 분들에게도 유익한 정보가 될 것이니 매우 좋은 설명이라고 생각합니다.

구성한 SystemCommandTasklet에서 Windows 명령어를 사용해 파일 작업을 잘 처리하시기 바랍니다. 도움이 필요하시거나 질문이 더 있다면 언제든지 남겨주세요!

결과 코드 실행에 도움이 되는 링크는 다음과 같습니다: Spring Boot Reference Documentation

qlghwp123님의 프로필 이미지
qlghwp123

작성한 질문수

질문하기