가장 먼저 중요한게 스프링 배치가 jdk 17 + spring boot 3.xx 버전부터 바뀐 부분이 있다. 해당 내용은 따로 정리해두었으니 코드 정리만 ~
@Slf4j
@RequiredArgsConstructor
//@Configuration
public class EmploymentJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager platformTransactionManager;
private final WebClientService webClientService;
private final EmploymentRepository repository;
@Bean
public Job employmentInfoJob(Step employmentInsertStep) {
log.info("start job [EmploymentInfoJob]");
return new JobBuilder("employmentInfoJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(employmentInsertStep)
.build();
}
@Bean
@JobScope
public Step employmentInsertStep(ItemReader<EmploymentJsonDto> employmentReader, ItemProcessor<EmploymentJsonDto, List<Employment>> employmentProcessor,
ItemWriter<List<Employment>> employmentWriter) {
return new StepBuilder("employmentInsertStep", jobRepository)
.<EmploymentJsonDto, List<Employment>>chunk(10, platformTransactionManager)
.reader(employmentReader)
.processor(employmentProcessor)
.writer(employmentWriter)
.build();
}
@Bean
@StepScope
public ItemReader<EmploymentJsonDto> employmentReader() {
return new ItemReader<EmploymentJsonDto>() {
private static int CHUNK_SIZE = 10;
private static int startIndex = 1;
private static int lastIndex = -1;
boolean IS_END = false;
@Override
public EmploymentJsonDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (IS_END)
return null;
EmploymentJsonDto employmentJsonDto = webClientService.returnEmploymentDto(startIndex, startIndex + CHUNK_SIZE - 1);
if (lastIndex == -1) {
lastIndex = employmentJsonDto.fetchListTotalCount();
log.info("lastIndex = {}", lastIndex);
}
startIndex += CHUNK_SIZE;
if (startIndex > lastIndex) {
IS_END = true;
}
return employmentJsonDto;
}
};
}
@Bean
@StepScope
public ItemProcessor<EmploymentJsonDto, List<Employment>> employmentProcessor() {
return new ItemProcessor<EmploymentJsonDto, List<Employment>>() {
@Override
public List<Employment> process(EmploymentJsonDto item) throws Exception {
Set<String> allDistinctSubject = repository.findAllDistinctSubject(); //나중에 또 돌릴때도 중복 제거를 위해
List<Employment> res = item.toDto().stream()
.filter(i -> !allDistinctSubject.contains(i.getSubject()))
.map(Employment::fromDto)
.collect(Collectors.toList());
return res;
}
};
}
@Bean
@StepScope
public ItemWriter<List<Employment>> employmentWriter() {
return new ItemWriter<List<Employment>>() {
@Override
public void write(Chunk<? extends List<Employment>> chunk) throws Exception {
// log.info("item write is good {}", chunk);
chunk.forEach(i -> repository.saveAllAndFlush(i));
log.info("save !!");
}
};
}
}
🎈 스프링부트 3.x 부터 JobRepository
, PlatformTransactionManager
이 두개를 활용해서 스프링 배치를 진행해야 한다. ( + main에 어노테이션 추가 안해야함 )
🎈 먼저 구조를 보면 따로 employmentInfoJob
이라는 이름의 Job을 구동시키면 employmentInsertStep
step이 시작된다. (해당 배치는 하나의 step만으로 구성)
🎈 일단 주어진 스프링 배치 코드는 취업 정보를 처리하는 작업을 수행한다.
employmentInfoJob
메서드: Job을 정의하는 메서드. JobBuilder
를 사용하여 Job을 생성하고, employmentInsertStep
를 시작 스텝으로 설정한다.
employmentInsertStep
메서드: 배치 작업의 스텝을 정의하는 메서드입니다. StepBuilder
를 사용하여 스텝을 생성하고, ItemReader, ItemProcessor, ItemWriter를 설정한다. 각각의 역할은 아래와 같다.
employmentReader
: 취업 정보를 읽어오는 ItemReader를 정의. webClientService
를 통해 데이터를 가져오며, startIndex와 lastIndex를 사용하여 데이터를 분할하여 읽어온다.
employmentProcessor
: 읽어온 취업 정보를 처리하는 ItemProcessor를 정의한다. 중복을 제거하고, DTO 객체를 Employment 객체로 변환하여 반환한다.
employmentWriter
: 처리된 취업 정보를 저장하는 ItemWriter를 정의한다. repository를 통해 데이터를 저장하고, Chunk를 순회하며 DB에 저장한다.
⚽ 이제 각 ItemReader, ItemProcessor, ItemWriter를 순서대로 분석해보자.
@Bean
@StepScope
public ItemReader<EmploymentJsonDto> employmentReader() {
return new ItemReader<EmploymentJsonDto>() {
private static int CHUNK_SIZE = 10;
private static int startIndex = 1;
private static int lastIndex = -1;
boolean IS_END = false;
@Override
public EmploymentJsonDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (IS_END)
return null;
EmploymentJsonDto employmentJsonDto = webClientService.returnEmploymentDto(startIndex, startIndex + CHUNK_SIZE - 1);
if (lastIndex == -1) {
lastIndex = employmentJsonDto.fetchListTotalCount();
log.info("lastIndex = {}", lastIndex);
}
startIndex += CHUNK_SIZE;
if (startIndex > lastIndex) {
IS_END = true;
}
return employmentJsonDto;
}
};
}
ItemReader<EmploymentJsonDto>
: ItemReader 인터페이스를 구현하고 있으며, EmploymentJsonDto 타입의 데이터를 읽어옵니다.
CHUNK_SIZE, startIndex, lastIndex: 데이터를 묶어서 읽어올 크기와 시작 인덱스, 마지막 인덱스를 관리하는 변수들입니다.
IS_END: 데이터 읽기가 완료되었는지를 나타내는 플래그 변수입니다.
read() 메서드: 이 메서드는 데이터를 한 개씩 읽어오는 로직을 구현하는 부분입니다.
IS_END가 true인 경우 데이터 읽기가 이미 종료된 상태이므로 null을 반환하여 읽기를 종료합니다.
webClientService.returnEmploymentDto() 메서드를 통해 취업 정보 데이터를 가져옵니다. startIndex부터 startIndex + CHUNK_SIZE - 1까지의 데이터를 가져옵니다.
최초 호출 시 lastIndex가 -1이라면, 데이터의 전체 개수를 가져와 lastIndex에 저장합니다.
startIndex을 업데이트하고, startIndex가 lastIndex를 넘어서면 IS_END를 true로 설정하여 데이터 읽기가 종료되었음을 표시합니다.
이 employmentReader는 데이터를 조각 단위로 읽어오며, CHUNK_SIZE만큼의 데이터를 가져와 한 번에 처리될 수 있도록 합니다. 데이터가 모두 읽혔으면 IS_END를 통해 읽기 작업이 종료되었음을 알 수 있습니다.
@Bean
@StepScope
public ItemProcessor<EmploymentJsonDto, List<Employment>> employmentProcessor() {
return new ItemProcessor<EmploymentJsonDto, List<Employment>>() {
@Override
public List<Employment> process(EmploymentJsonDto item) throws Exception {
Set<String> allDistinctSubject = repository.findAllDistinctSubject(); //나중에 또 돌릴때도 중복 제거를 위해
List<Employment> res = item.toDto().stream()
.filter(i -> !allDistinctSubject.contains(i.getSubject()))
.map(Employment::fromDto)
.collect(Collectors.toList());
return res;
}
};
}
ItemProcessor<EmploymentJsonDto, List<Employment>>
: ItemProcessor 인터페이스를 구현하고 있으며, 입력으로 EmploymentJsonDto를 받고 출력으로 List<Employment>
를 반환합니다.
process() 메서드
: ItemProcessor 인터페이스를 구현하는 메서드로, 입력 데이터인 item을 처리하고 처리된 결과를 반환합니다.
Set<String> allDistinctSubject = repository.findAllDistinctSubject();
: repository를 통해 중복되지 않는 취업 과목(subject)을 모두 가져오는 작업을 수행합니다. 이 작업은 중복 제거를 위해 필요합니다.
item.toDto().stream()
: item 객체를 DTO로 변환
하고, 스트림을 생성합니다.
.filter(i -> !allDistinctSubject.contains(i.getSubject()))
: 중복되지 않는 과목 목록에 포함되지 않는 과목만 필터링합니다.
.map(Employment::fromDto)
: DTO를 Employment 객체로 변환합니다.
.collect(Collectors.toList())
: 필터링 및 매핑된 결과를 리스트로 수집합니다.
즉, employmentProcessor는 입력으로 받은 EmploymentJsonDto를 처리하여 중복되지 않는 취업 정보(Employment 객체)들을 리스트로 반환하는 역할을 합니다. 이렇게 중복을 제거하고 처리된 데이터를 ItemWriter로 전달하게 됩니다.
@Bean
@StepScope
public ItemWriter<List<Employment>> employmentWriter() {
return new ItemWriter<List<Employment>>() {
@Override
public void write(Chunk<? extends List<Employment>> chunk) throws Exception {
// log.info("item write is good {}", chunk);
chunk.forEach(i -> repository.saveAllAndFlush(i));
log.info("save !!");
}
};
}
ItemWriter<List<Employment>>
: ItemWriter 인터페이스를 구현하고 있으며, 입력으로 List<Employment>
를 받습니다.
write() 메서드
: ItemWriter 인터페이스를 구현하는 메서드로, 입력 데이터를 처리하는 역할을 합니다.
chunk.forEach(i -> repository.saveAllAndFlush(i));
: Chunk 내의 모든 Employment 객체들을 repository를 통해 저장합니다. saveAllAndFlush 메서드는 데이터를 저장하고 즉시 플러시하여 데이터베이스에 쓰기 작업을 수행합니다.
log.info("save !!");: 저장이 완료되면 로그에 "save !!" 메시지를 출력합니다.