인프런 커뮤니티 질문&답변
[예제][3장][작전2] windows 에서 마지막 예제
작성
·
21
0
...
@Slf4j
@Configuration
@RequiredArgsConstructor
public class LogProcessingJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final String BASE_PATH = "C:\\Users\\user\\Desktop\\batch\\section3\\flatfileitemwriter";
@Bean
public Job logProcessingJob(
Step createDirectoryStep,
Step logCollectionStep,
Step logProcessingStep
) {
return new JobBuilder("logProcessingJob", jobRepository)
.start(createDirectoryStep)
.next(logCollectionStep)
.next(logProcessingStep)
.build();
}
@Bean
public Step createDirectoryStep(SystemCommandTasklet mkdirTasklet) {
return new StepBuilder("createDirectoryStep", jobRepository)
.tasklet(mkdirTasklet, transactionManager)
.build();
}
@Bean
@StepScope
public SystemCommandTasklet mkdirTasklet(
@Value("#{jobParameters['date']}") String date
) {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setWorkingDirectory(BASE_PATH);
String collectedLogsPath = "collected_ecommerce_logs\\" + date;
String processedLogsPath = "processed_logs\\" + date;
try {
tasklet.setCommand("cmd.exe", "/c", "mkdir", collectedLogsPath, processedLogsPath);
tasklet.setTimeout(3000);
} catch (Exception e) {
log.error("mkdirTasklet error: ", e);
}
return tasklet;
}
@Bean
public Step logCollectionStep(SystemCommandTasklet scpTasklet) {
return new StepBuilder("logCollectionStep", jobRepository)
.tasklet(scpTasklet, transactionManager)
.build();
}
@Bean
@StepScope
public SystemCommandTasklet scpTasklet(
@Value("#{jobParameters['date']}") String date
) {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setWorkingDirectory(BASE_PATH);
String processedLogsPath = "collected_ecommerce_logs\\" + date;
StringJoiner commandBuilder = new StringJoiner(" & ");
for (String host : List.of("localhost")) {
// Windows 환경에서 해당 코드를 적용하려면 ssh 관련 설정들을 찾아야 해서 주석 처리
// String command = String.format("scp %s:~/ecommerce_logs/%s.log %s\\%s.log", host, date, processedLogsPath, host);
String sourcePath = BASE_PATH + "\\ecommerce_logs\\" + date + ".log";
String command = String.format("copy %s %s\\%s.log", sourcePath, processedLogsPath, host);
commandBuilder.add(command);
}
tasklet.setCommand("cmd.exe", "/c", commandBuilder.toString());
tasklet.setTimeout(10000);
return tasklet;
}
@Bean
public Step logProcessingStep(
MultiResourceItemReader<LogEntry> multiResourceItemReader,
LogEntryProcessor logEntryProcessor,
FlatFileItemWriter<ProcessedLogEntry> processedLogEntryJsonWriter
) {
return new StepBuilder("logProcessingStep", jobRepository)
.<LogEntry, ProcessedLogEntry>chunk(10, transactionManager)
.reader(multiResourceItemReader)
.processor(logEntryProcessor)
.writer(processedLogEntryJsonWriter)
.build();
}
@Bean
@StepScope
public MultiResourceItemReader multiResourceItemReader(
@Value("#{jobParameters['date']}") String date
) {
MultiResourceItemReader<LogEntry> resourceItemReader = new MultiResourceItemReader<>();
resourceItemReader.setName("multiResourceItemReader");
resourceItemReader.setResources(getResources(date));
resourceItemReader.setDelegate(logFileReader());
return resourceItemReader;
}
private Resource[] getResources(String date) {
try {
String formattedBasePath = BASE_PATH.replace("\\", "/");
String location = "file:///" + formattedBasePath + "/collected_ecommerce_logs/" + date + "/*.log";
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
return resolver.getResources(location);
} catch (IOException e) {
throw new RuntimeException("Failed to resolve log files", e);
}
}
@Bean
public FlatFileItemReader<LogEntry> logFileReader() {
return new FlatFileItemReaderBuilder<LogEntry>()
.name("logFileReader")
.delimited()
.delimiter(",")
.names("dateTime", "level", "message")
.targetType(LogEntry.class)
.build();
}
@Bean
public LogEntryProcessor logEntryProcessor() {
return new LogEntryProcessor();
}
@Bean
@StepScope
public FlatFileItemWriter<ProcessedLogEntry> processedLogEntryFlatFileItemWriter(
@Value("#{jobParameters['date']}") String date
) {
String outputPath = Paths.get(BASE_PATH, "processed_logs", date, "processed_logs.jsonl").toString();
ObjectMapper objectMapper = new ObjectMapper();
JavaTimeModule javaTimeModule = new JavaTimeModule();
javaTimeModule.addSerializer(LocalDateTime.class,
new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")));
objectMapper.registerModule(javaTimeModule);
return new FlatFileItemWriterBuilder<ProcessedLogEntry>()
.name("processedLogEntryJsonWriter")
.resource(new FileSystemResource(outputPath))
.lineAggregator(item -> {
try {
return objectMapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
throw new RuntimeException("Error Converting item to JSON", e);
}
})
.build();
}
@Data
public static class LogEntry {
private String dateTime;
private String level;
private String message;
}
@Data
public static class ProcessedLogEntry {
private LocalDateTime dateTime;
private LogLevel level;
private String message;
private String errorCode;
}
public enum LogLevel {
INFO, WARN, ERROR, DEBUG, UNKNOWN;
public static LogLevel fromString(String level) {
if (level == null || level.trim().isEmpty()) {
return UNKNOWN;
}
try {
return valueOf(level.toUpperCase());
} catch (IllegalArgumentException e) {
return UNKNOWN;
}
}
}
public static class LogEntryProcessor implements ItemProcessor<LogEntry, ProcessedLogEntry> {
private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_DATE_TIME;
private static final Pattern ERROR_CODE_PATTERN = Pattern.compile("ERROR_CODE\\[(\\w+)]");
@Override
public ProcessedLogEntry process(LogEntry item) throws Exception {
ProcessedLogEntry processedLogEntry = new ProcessedLogEntry();
processedLogEntry.setDateTime(parseDateTime(item.getDateTime()));
processedLogEntry.setLevel(parseLevel(item.getLevel()));
processedLogEntry.setMessage(item.getMessage());
processedLogEntry.setErrorCode(extractErrorCode(item.getMessage()));
return processedLogEntry;
}
private LocalDateTime parseDateTime(String dateTime) {
return LocalDateTime.parse(dateTime, ISO_FORMATTER);
}
private LogLevel parseLevel(String level) {
return LogLevel.fromString(level);
}
private String extractErrorCode(String message) {
if (message == null) {
return null;
}
Matcher matcher = ERROR_CODE_PATTERN.matcher(message);
if (matcher.find()) {
return matcher.group(1);
}
// ERROR 문자열이 포함되어 있지만 패턴이 일치하지 않는 경우
if (message.contains("ERROR")) {
return "UNKNOWN_ERROR";
}
return null;
}
}
}
윈도우즈에서는 scp 명령어 관련해서
windows 서비스에서 open-ssh-server 실행
현재 사용하고 있는 사용자(windows 사용자) 가 관리자 권한을 보유 할 경우 programdata/sshd_config 파일 수정 필요
기타 파일 권한 정리 필요
로 인하여 scp 프로그램을 windows 환경의 intellij idea 에서 id, 비번치게끔 ide 콘솔에서 지원하지 않는거 같아.
그래서 제미니가 추천해준대로 실습 환경에서 scp 접속 단을 copy 로 그냥 퉁치는 것으로 바꿔놨어. 나처럼 windows 환경인 사람들이 있으면 혹시나 참고했으면 좋겠네.
ObjectMapper, JavaTimeModule 같은 경우는 org.springframework.boot:spring-boot-starter-json 의존성 추가 필요
퀴즈
Spring Batch의 청크 지향 처리(Chunk-Oriented Processing) 모델의 기본적인 3단계는 무엇일까요?
입력(Input) - 처리(Process) - 출력(Output)
읽기(Read) - 가공(Process) - 쓰기(Write)
수집(Collect) - 변환(Transform) - 저장(Save)
검증(Validate) - 실행(Execute) - 기록(Log)
답변 1
0
안녕하세요, 인프런 AI 인턴입니다.
qlghwp123 님께서 Windows 환경에서 SCP 명령어 대신 파일을 복사(Copy) 방식으로 대체한 부분이 잘 설명되어 있습니다. Windows에서는 SCP 사용이 복잡할 수 있으므로 copy 명령어로 처리하는 것은 매우 실용적인 접근법입니다. 추가적으로, ObjectMapper와 JavaTimeModule 사용을 위해 spring-boot-starter-json 의존성 추가가 필요하다는 점도 잘 명시해 주셨습니다.
Windows에서 SCP 관련 작업을 할 때의 주의사항과 대안을 제공해 주셔서 감사드리고, 다른 분들에게도 유익한 정보가 될 것이니 매우 좋은 설명이라고 생각합니다.
구성한 SystemCommandTasklet에서 Windows 명령어를 사용해 파일 작업을 잘 처리하시기 바랍니다. 도움이 필요하시거나 질문이 더 있다면 언제든지 남겨주세요!
결과 코드 실행에 도움이 되는 링크는 다음과 같습니다: Spring Boot Reference Documentation




