해커톤 프로젝트 - Spring batch 관련 : employmentJobConfig

Chooooo·2023년 9월 1일
0

TIL

목록 보기
7/28
post-thumbnail

가장 먼저 중요한게 스프링 배치가 jdk 17 + spring boot 3.xx 버전부터 바뀐 부분이 있다. 해당 내용은 따로 정리해두었으니 코드 정리만 ~

😎 EmploymentJobConfig

@Slf4j
@RequiredArgsConstructor
//@Configuration
public class EmploymentJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;
    private final WebClientService webClientService;
    private final EmploymentRepository repository;



    @Bean
    public Job employmentInfoJob(Step employmentInsertStep) {
        log.info("start job [EmploymentInfoJob]");

        return new JobBuilder("employmentInfoJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(employmentInsertStep)
                .build();

    }

    @Bean
    @JobScope
    public Step employmentInsertStep(ItemReader<EmploymentJsonDto> employmentReader, ItemProcessor<EmploymentJsonDto, List<Employment>> employmentProcessor,
                                    ItemWriter<List<Employment>> employmentWriter) {
        return new StepBuilder("employmentInsertStep", jobRepository)
                .<EmploymentJsonDto, List<Employment>>chunk(10, platformTransactionManager)
                .reader(employmentReader)
                .processor(employmentProcessor)
                .writer(employmentWriter)
                .build();
    }

    @Bean
    @StepScope
    public ItemReader<EmploymentJsonDto> employmentReader() {
        return new ItemReader<EmploymentJsonDto>() {
            private static int CHUNK_SIZE = 10;
            private static int startIndex = 1;
            private static int lastIndex = -1;
            boolean IS_END = false;
            @Override
            public EmploymentJsonDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                if (IS_END)
                    return null;

                EmploymentJsonDto employmentJsonDto = webClientService.returnEmploymentDto(startIndex, startIndex + CHUNK_SIZE - 1);
                if (lastIndex == -1) {
                    lastIndex = employmentJsonDto.fetchListTotalCount();
                    log.info("lastIndex = {}", lastIndex);
                }

                startIndex += CHUNK_SIZE;
                if (startIndex > lastIndex) {
                    IS_END = true;
                }
                return employmentJsonDto;
            }
        };
    }

    @Bean
    @StepScope
    public ItemProcessor<EmploymentJsonDto, List<Employment>> employmentProcessor() {
        return new ItemProcessor<EmploymentJsonDto, List<Employment>>() {
            @Override
            public List<Employment> process(EmploymentJsonDto item) throws Exception {
                Set<String> allDistinctSubject = repository.findAllDistinctSubject();  //나중에 또 돌릴때도 중복 제거를 위해
                List<Employment> res = item.toDto().stream()
                        .filter(i -> !allDistinctSubject.contains(i.getSubject()))
                        .map(Employment::fromDto)
                        .collect(Collectors.toList());
                return res;
            }
        };
    }

    @Bean
    @StepScope
    public ItemWriter<List<Employment>> employmentWriter() {
        return new ItemWriter<List<Employment>>() {
            @Override
            public void write(Chunk<? extends List<Employment>> chunk) throws Exception {
//                log.info("item write is good {}", chunk);
                chunk.forEach(i -> repository.saveAllAndFlush(i));
                log.info("save !!");
            }
        };
    }
}

😎 전체 코드 개요

🎈 스프링부트 3.x 부터 JobRepository, PlatformTransactionManager 이 두개를 활용해서 스프링 배치를 진행해야 한다. ( + main에 어노테이션 추가 안해야함 )

🎈 먼저 구조를 보면 따로 employmentInfoJob 이라는 이름의 Job을 구동시키면 employmentInsertStep step이 시작된다. (해당 배치는 하나의 step만으로 구성)

🎈 일단 주어진 스프링 배치 코드는 취업 정보를 처리하는 작업을 수행한다.

  1. employmentInfoJob 메서드: Job을 정의하는 메서드. JobBuilder를 사용하여 Job을 생성하고, employmentInsertStep를 시작 스텝으로 설정한다.

  2. employmentInsertStep 메서드: 배치 작업의 스텝을 정의하는 메서드입니다. StepBuilder를 사용하여 스텝을 생성하고, ItemReader, ItemProcessor, ItemWriter를 설정한다. 각각의 역할은 아래와 같다.

  3. employmentReader: 취업 정보를 읽어오는 ItemReader를 정의. webClientService를 통해 데이터를 가져오며, startIndex와 lastIndex를 사용하여 데이터를 분할하여 읽어온다.

  4. employmentProcessor: 읽어온 취업 정보를 처리하는 ItemProcessor를 정의한다. 중복을 제거하고, DTO 객체를 Employment 객체로 변환하여 반환한다.

  5. employmentWriter: 처리된 취업 정보를 저장하는 ItemWriter를 정의한다. repository를 통해 데이터를 저장하고, Chunk를 순회하며 DB에 저장한다.

⚽ 이제 각 ItemReader, ItemProcessor, ItemWriter를 순서대로 분석해보자.

👻 ItemReader

@Bean
    @StepScope
    public ItemReader<EmploymentJsonDto> employmentReader() {
        return new ItemReader<EmploymentJsonDto>() {
            private static int CHUNK_SIZE = 10;
            private static int startIndex = 1;
            private static int lastIndex = -1;
            boolean IS_END = false;
            @Override
            public EmploymentJsonDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                if (IS_END)
                    return null;

                EmploymentJsonDto employmentJsonDto = webClientService.returnEmploymentDto(startIndex, startIndex + CHUNK_SIZE - 1);
                if (lastIndex == -1) {
                    lastIndex = employmentJsonDto.fetchListTotalCount();
                    log.info("lastIndex = {}", lastIndex);
                }

                startIndex += CHUNK_SIZE;
                if (startIndex > lastIndex) {
                    IS_END = true;
                }
                return employmentJsonDto;
            }
        };
    }
  • 해당 코드는 ItemReader 인터페이스를 구현하여 취업 정보를 읽어오는 역할을 한다.

ItemReader<EmploymentJsonDto>: ItemReader 인터페이스를 구현하고 있으며, EmploymentJsonDto 타입의 데이터를 읽어옵니다.

CHUNK_SIZE, startIndex, lastIndex: 데이터를 묶어서 읽어올 크기와 시작 인덱스, 마지막 인덱스를 관리하는 변수들입니다.

IS_END: 데이터 읽기가 완료되었는지를 나타내는 플래그 변수입니다.

read() 메서드: 이 메서드는 데이터를 한 개씩 읽어오는 로직을 구현하는 부분입니다.

IS_END가 true인 경우 데이터 읽기가 이미 종료된 상태이므로 null을 반환하여 읽기를 종료합니다.
webClientService.returnEmploymentDto() 메서드를 통해 취업 정보 데이터를 가져옵니다. startIndex부터 startIndex + CHUNK_SIZE - 1까지의 데이터를 가져옵니다.
최초 호출 시 lastIndex가 -1이라면, 데이터의 전체 개수를 가져와 lastIndex에 저장합니다.
startIndex을 업데이트하고, startIndex가 lastIndex를 넘어서면 IS_END를 true로 설정하여 데이터 읽기가 종료되었음을 표시합니다.

이 employmentReader는 데이터를 조각 단위로 읽어오며, CHUNK_SIZE만큼의 데이터를 가져와 한 번에 처리될 수 있도록 합니다. 데이터가 모두 읽혔으면 IS_END를 통해 읽기 작업이 종료되었음을 알 수 있습니다.

👻 ItemProcessor

@Bean
    @StepScope
    public ItemProcessor<EmploymentJsonDto, List<Employment>> employmentProcessor() {
        return new ItemProcessor<EmploymentJsonDto, List<Employment>>() {
            @Override
            public List<Employment> process(EmploymentJsonDto item) throws Exception {
                Set<String> allDistinctSubject = repository.findAllDistinctSubject();  //나중에 또 돌릴때도 중복 제거를 위해
                List<Employment> res = item.toDto().stream()
                        .filter(i -> !allDistinctSubject.contains(i.getSubject()))
                        .map(Employment::fromDto)
                        .collect(Collectors.toList());
                return res;
            }
        };
    }

ItemProcessor<EmploymentJsonDto, List<Employment>>: ItemProcessor 인터페이스를 구현하고 있으며, 입력으로 EmploymentJsonDto를 받고 출력으로 List<Employment>를 반환합니다.

process() 메서드: ItemProcessor 인터페이스를 구현하는 메서드로, 입력 데이터인 item을 처리하고 처리된 결과를 반환합니다.

Set<String> allDistinctSubject = repository.findAllDistinctSubject();: repository를 통해 중복되지 않는 취업 과목(subject)을 모두 가져오는 작업을 수행합니다. 이 작업은 중복 제거를 위해 필요합니다.
item.toDto().stream(): item 객체를 DTO로 변환하고, 스트림을 생성합니다.
.filter(i -> !allDistinctSubject.contains(i.getSubject())): 중복되지 않는 과목 목록에 포함되지 않는 과목만 필터링합니다.
.map(Employment::fromDto): DTO를 Employment 객체로 변환합니다.
.collect(Collectors.toList()): 필터링 및 매핑된 결과를 리스트로 수집합니다.

즉, employmentProcessor는 입력으로 받은 EmploymentJsonDto를 처리하여 중복되지 않는 취업 정보(Employment 객체)들을 리스트로 반환하는 역할을 합니다. 이렇게 중복을 제거하고 처리된 데이터를 ItemWriter로 전달하게 됩니다.

👻 ItemWriter

@Bean
    @StepScope
    public ItemWriter<List<Employment>> employmentWriter() {
        return new ItemWriter<List<Employment>>() {
            @Override
            public void write(Chunk<? extends List<Employment>> chunk) throws Exception {
//                log.info("item write is good {}", chunk);
                chunk.forEach(i -> repository.saveAllAndFlush(i));
                log.info("save !!");
            }
        };
    }

ItemWriter<List<Employment>>: ItemWriter 인터페이스를 구현하고 있으며, 입력으로 List<Employment>를 받습니다.

write() 메서드: ItemWriter 인터페이스를 구현하는 메서드로, 입력 데이터를 처리하는 역할을 합니다.

chunk.forEach(i -> repository.saveAllAndFlush(i));: Chunk 내의 모든 Employment 객체들을 repository를 통해 저장합니다. saveAllAndFlush 메서드는 데이터를 저장하고 즉시 플러시하여 데이터베이스에 쓰기 작업을 수행합니다.
log.info("save !!");: 저장이 완료되면 로그에 "save !!" 메시지를 출력합니다.

profile
back-end, 지속 성장 가능한 개발자를 향하여

0개의 댓글