@Slf4j
//@Configuration
@RequiredArgsConstructor
public class PublicServiceReservationInfoInsertJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager platformTransactionManager;
private final WebClientService webClientService;
private final PublicServiceReservationRepository repository;
@Bean
public Job publicServiceReservationInfoInsertJob(Step fetchLastUpdated, Step insertPublicServiceReservationStep) {
log.info("[PublicServiceReservationInfoInsertJobConfig] Job Launched");
return new JobBuilder("publicServiceReservationInfoInsertJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(fetchLastUpdated).on("CONTINUABLE")
.to(insertPublicServiceReservationStep)
.next(fetchLastUpdated)
.from(fetchLastUpdated).on("COMPLETED")
.end()
.end()
.build();
}
@Bean
@JobScope
public Step fetchLastUpdated(Tasklet fetchLastUpdatedTasklet) {
return new StepBuilder("fetchLastUpdated", jobRepository)
.tasklet(fetchLastUpdatedTasklet, platformTransactionManager)
.build();
}
@Bean
@JobScope
public Step insertPublicServiceReservationStep(ItemReader<WebClientDTO> publicServiceReservationReader,
ItemProcessor<WebClientDTO, List<PublicServiceReservation>> publicServiceReservationProcessor,
ItemWriter<List<PublicServiceReservation>> publicServiceReservationWriter) {
log.info("[PublicServiceReservationInfoInsertJobConfig] Insert Info Step Launched");
return new StepBuilder("insertPublicServiceReservationStep", jobRepository)
.<WebClientDTO, List<PublicServiceReservation>>chunk(10, platformTransactionManager)
.reader(publicServiceReservationReader)
.processor(publicServiceReservationProcessor)
.writer(publicServiceReservationWriter)
.build();
}
@Bean
@StepScope
public ItemReader<WebClientDTO> publicServiceReservationReader() {
return new PublicServiceReservationReader(webClientService);
}
@Bean
@StepScope
public ItemProcessor<WebClientDTO, List<PublicServiceReservation>> publicServiceReservationProcessor() {
Set<String> serviceIds = repository.findAllDistinctServiceId();
return item -> item.toDto().stream()
.filter(i -> !serviceIds.contains(i.getServiceId()))
.map(PublicServiceReservation::fromDto)
.collect(Collectors.toList());
}
@Bean
@StepScope
public ItemWriter<List<PublicServiceReservation>> publicServiceReservationWriter() {
return items -> items.forEach(i -> repository.saveAllAndFlush(i));
}
}
@Slf4j
@RequiredArgsConstructor
public class PublicServiceReservationReader implements ItemReader<WebClientDTO> {
private final WebClientService webClientService;
private static int size = 10;
private static int startIndex = 1;
private static int lastIndex = -1;
private static boolean IS_END = false;
@Override
public WebClientDTO read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (IS_END) {
return null;
}
WebClientDTO response = webClientService.getPublicServiceReservation(startIndex, startIndex + size - 1);
if (lastIndex == -1) {
lastIndex = response.fetchListTotalCount();
}
startIndex += size;
if (startIndex > lastIndex) {
IS_END = true;
}
return response;
}
}
ItemReader<WebClientDTO>
인터페이스를 구현한다. 이 ItemReader는 WebClientService를 사용하여 공공 서비스 예약 정보를 읽어온다.
⚽ size, startIndex, lastIndex, IS_END는 읽어올 데이터의 크기와 현재 읽는 위치, 마지막 인덱스, 작업이 끝났는지 여부를 관리하기 위한 변수!
⚽ read() 메서드
는 실제로 데이터를 읽어오는 메서드.
IS_END가 true인 경우, 작업이 이미 끝났으므로 null을 반환하여 읽기를 종료한다.
webClientService.getPublicServiceReservation()
를 사용하여 공공 서비스 예약 정보를 가져온다. startIndex부터 startIndex + size - 1까지의 데이터를 가져오며, 처음 호출 시 lastIndex가 -1이라면 전체 데이터의 개수를 가져와 lastIndex에 저장
startIndex을 업데이트하고, startIndex이 lastIndex를 넘어서면 IS_END를 true로 설정하여 데이터 읽기가 종료되었음을 표시한다.
@Slf4j
@Component
@RequiredArgsConstructor
public class FetchLastUpdatedTasklet implements Tasklet {
private final PublicReservationLastUpdatedRepository repository;
private boolean updatedToday = false;
private LocalDate today = LocalDate.now();
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
PublicReservationLastUpdated publicReservationLastUpdated = repository.findFirstByPublicReservationTypeOrderByLastUpdatedDesc(PublicReservationType.GENERAL)
.orElseGet(() -> PublicReservationLastUpdated.of(0L, PublicReservationType.GENERAL, LocalDate.MIN));
LocalDate lastUpdated = publicReservationLastUpdated.getLastUpdated();
if(lastUpdated.equals(today)){
log.info("[FetchLastUpdatedTasklet] [lastUpdated == today] Already updated for today.");
contribution.setExitStatus(ExitStatus.COMPLETED);
return RepeatStatus.FINISHED;
}
if(updatedToday){
log.info("[FetchLastUpdatedTasklet] [updatedToday == true] Already updated for today.");
repository.saveAndFlush(PublicReservationLastUpdated.of(PublicReservationType.GENERAL, today));
contribution.setExitStatus(ExitStatus.COMPLETED);
return RepeatStatus.FINISHED;
}
updatedToday = true;
contribution.setExitStatus(new ExitStatus("CONTINUABLE"));
return RepeatStatus.FINISHED;
}
}
⚽ Tasklet 인터페이스를 구현. 이 Tasklet은 공공 서비스 예약 정보의 최근 업데이트를 확인하고 처리한다.
👻 execute() 메서드
는 실제 작업을 수행하는 메서드.
repository.findFirstByPublicReservationTypeOrderByLastUpdatedDesc(PublicReservationType.GENERAL)
를 사용하여 최근 업데이트된 공공 서비스 예약 정보를 가져온다.
lastUpdated와 today를 비교하여 이미 오늘 업데이트가 된 경우 작업을 완료.
updatedToday가 true인 경우에도 작업을 완료합니다.
그 외의 경우 CONTINUABLE 상태로 표시하여 다음 작업을 진행할 수 있도록 한다.
⚽ 이렇게 코드는 배치 작업에서 필요한 데이터를 읽어오거나 업데이트하고, 작업의 상태를 관리하는 역할
⚽ publicServiceReservationInfoInsertJob 메서드
: Job을 정의하는 메서드이다. JobBuilder를 사용하여 Job을 생성하고, 두 개의 Step(fetchLastUpdated와 insertPublicServiceReservationStep)를 순차적으로 실행하도록 설정. 또한 incrementer를 사용하여 Job 파라미터를 증가시키고, 실행이 가능한지 여부에 따라 다음 Step을 지정한다.
⚽ fetchLastUpdated 메서드
: 첫 번째 Step인 fetchLastUpdated를 정의하는 메서드입니다. tasklet을 사용하여 작업을 수행하며, platformTransactionManager를 통해 트랜잭션 관리를 합니다.
⚽ insertPublicServiceReservationStep 메서드
: 두 번째 Step인 insertPublicServiceReservationStep을 정의하는 메서드이다. 이 Step은 공공 서비스 예약 정보를 읽고 처리하는 역할을 합니다. chunk 크기는 10으로 설정되어 한 번에 10개의 아이템을 처리하며, ItemReader, ItemProcessor, ItemWriter를 설정합니다.
⚽ publicServiceReservationReader 메서드
: 공공 서비스 예약 정보를 읽어오는 ItemReader를 정의하는 메서드입니다. PublicServiceReservationReader 클래스를 사용하여 데이터를 읽어옵니다.
⚽ publicServiceReservationProcessor 메서드
: 읽어온 데이터를 처리하는 ItemProcessor를 정의하는 메서드입니다. 중복을 제거하고 DTO 객체를 PublicServiceReservation 객체로 변환하여 반환합니다.
⚽ publicServiceReservationWriter 메서드
: 처리된 공공 서비스 예약 정보를 저장하는 ItemWriter를 정의하는 메서드입니다. repository를 통해 데이터를 저장하고 즉시 플러시합니다.