[ 정수원 스프링 배치 #16 ] - Event Listener

정동욱·2023년 11월 14일
0
post-thumbnail

이번 글에서는 배치의 Event Listener에 대해 알아보겠습니다.

Event Listener는 배치에서 Job, Step, Chunk 단계의 실행 전/후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스입니다. 주로 각 단계별 로그를 남기거나 실행 상태 정보를 참조하기 위해 사용되며 모든 단계별로 지정할 수 있습니다. Event Listener는 Job, Step, Skip, Retry 모든 단계를 지원하는데요, 각 작업의 성공/실패와 관계없이 무조건 호출됩니다. 사실 Event Listener는 어느 단계든 거의 동일한 구조로 이루어져 있기 때문에 간단한 편입니다.

먼저 Job을 구성하고 그 아래 간단한 2개의 Step을 만들어 보겠습니다. Job과 Step 모두 Event Listener를 가집니다. JobExecutionListener는 시작을 알리는 BeforeJob과 총 소요시간을 출력하는 AfterJob으로 구성하고, StepExecutionListener는 Step의 정보를 출력합니다. Event Listener를 사용하는 방법은 인터페이스를 구현하는 방법과 어노테이션을 사용하는 방식 두 가지가 있는데, JobListener는 어노테이션 기반으로 생성하고 StepListener는 인터페이스를 구현하는 방식으로 생성하겠습니다.

@Configuration
@RequiredArgsConstructor
public class JobConfiguration10 {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job batchjob1() {
        return jobBuilderFactory.get("batchjob10")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(step2())
                .listener(new CustomAnnotationJobExecutionListener())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet(((contribution, chunkContext) -> RepeatStatus.FINISHED))
                .listener(new CustomStepExecutionListener())
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step10")
                .tasklet(((contribution, chunkContext) -> RepeatStatus.FINISHED))
                .listener(new CustomStepExecutionListener())
                .allowStartIfComplete(true)
                .build();
    }
}
public class CustomAnnotationJobExecutionListener {

    @BeforeJob
    public void beforeJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName() + " Started");
    }

    @AfterJob
    public void afterJob(JobExecution jobExecution) {
        long duration = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
        System.out.println(jobExecution.getJobInstance().getJobName() + " 총 소요시간 = " + duration);
    }
}
public class CustomStepExecutionListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName() + " Started");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        ExitStatus exitStatus = stepExecution.getExitStatus();
        BatchStatus batchStatus = stepExecution.getStatus();

        System.out.println(stepExecution.getStepName() + " exitStatus = " + exitStatus);
        System.out.println(stepExecution.getStepName() + " batchStatus = " + batchStatus);

        return ExitStatus.COMPLETED;
    }
}

그리고 실행하면 아래처럼 간단한 로그가 찍히게 됩니다.

batchjob1 Started
step1 Started
step1 exitStatus = exitCode=COMPLETED;exitDescription=
step1 batchStatus = COMPLETED
step2 Started
step2 exitStatus = exitCode=COMPLETED;exitDescription=
step2 batchStatus = COMPLETED
batchjob1 총 소요시간 = 143

그리고 ChunkListener 와 ItemRead/Process/WriteListener에 대해 알아보겠습니다. 이 역시 매우 간단합니다. 각각에 맞는 Event Listener를 구현해 사용하면 됩니다. 경우는 모두 before, after, onError가 있는데요, 코드로 보겠습니다. 먼저 어노테이션 방식으로 구현한 ChunkListener입니다.

public class CustomChunkListener {

    @BeforeChunk
    public void beforeChunk(ChunkContext chunkContext) {
        System.out.println(">>>>> Before Chunk");
    }

    @AfterChunk
    public void afterChunk(ChunkContext chunkContext) {
        System.out.println(">>>>> After Chunk");
    }

    @AfterChunkError
    public void afterChunkError(ChunkContext chunkContext) {
        System.out.println(">>>>> After Chunk Error");
    }
}

그리고 ItemListener들 입니다. 인터페이스를 구현했고 간단히 로그만 찍었는데요, 실제 사용 시 원하는 데이터를 출력하거나 시작을 알리는 용도로 사용하면 될 것 같습니다. ItemListener들은 각 단계에서 chunk에 대해 작업할 때마다 동작합니다.

public class CustomItemReadListener implements ItemReadListener<Customer> {

    @Override
    public void beforeRead() {
        System.out.println(">>>>> Before Read");
    }

    @Override
    public void afterRead(Customer item) {
        System.out.println(">>>>> After Read");
    }

    @Override
    public void onReadError(Exception ex) {
        System.out.println(">>>>> After Read Error");
    }
}
  
public class CustomItemProcessListener implements ItemProcessListener<Customer, Customer2> {

    @Override
    public void beforeProcess(Customer item) {
        System.out.println(">>>>> Before Process");
    }

    @Override
    public void afterProcess(Customer item, Customer2 result) {
        System.out.println(">>>>> After Process");
    }

    @Override
    public void onProcessError(Customer item, Exception e) {
        System.out.println(">>>>> On Process Error");
    }
}
  
public class CustomItemWriteListener implements ItemWriteListener<Customer2> {

    @Override
    public void beforeWrite(List<? extends Customer2> items) {
        System.out.println(">>>>> Before Write");
    }

    @Override
    public void afterWrite(List<? extends Customer2> items) {
        System.out.println(">>>>> After Write");
    }

    @Override
    public void onWriteError(Exception exception, List<? extends Customer2> items) {
        System.out.println(">>>>> After Write Error");
    }
}

마지막으로 SkipListener와 RetryListener를 살펴보겠습니다. 먼저 SkipListener는 Read/Process/Write 모든 단계에서 발생하는 Skip에 대해 Listen합니다.

public class CustomSkipListener implements SkipListener {

    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println(">> onSkipInRead : "+ t.getMessage());
    }

    @Override
    public void onSkipInWrite(Object item, Throwable t) {
        System.out.println(">> onSkipInWrite : "+ item);
		System.out.println(">> onSkipInWrite : "+ t.getMessage());
    }

    @Override
    public void onSkipInProcess(Object item, Throwable t) {
        System.out.println(">> onSkipInProcess : "+ item);
        System.out.println(">> onSkipInProcess : "+ t.getMessage());
    }
}

아래 방식으로 사용하면 됩니다.

@Bean
public Step step12() throws Exception {
    return stepBuilderFactory.get("step12")
            .<Integer, String>chunk(10)
            .reader(linkedListItemReader())
            .processor(new CustomItemProcessor4())
            .writer(items -> {
                for (String item : items) {
                    if (item.equals("item5")) {
                        throw new CustomSkipException("write skipped");
                    }
                    System.out.println("write : " + item);
                }
            })
            .faultTolerant()
            .skip(CustomSkipException.class)
            .skipLimit(3)
            .listener(new CustomSkipListener())
            .build();
}

그리고 RetryListener를 볼텐데요, 아직 정확하게 이해하지 못해 코드만 남깁니다.

public class CustomRetryListener implements RetryListener {

    // 재시도 전 매번 호출
    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        System.out.println("open");
        return true;
    }

    // 재시도 후 매번 호출
    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("close");
    }

    // 재시도 실패 시 매번 호출
    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("Retry Count: " + context.getRetryCount());
    }
}
@Bean
public Step step12() throws Exception {
    return stepBuilderFactory.get("step12")
            .<Integer, String>chunk(10)
            .reader(linkedListItemReader2())
            .processor(new CustomItemProcessor5())
            .writer(new CustomItemWriter2())
            .faultTolerant()
            .retry(CustomRetryException.class)
            .retryLimit(2)
            .listener(new CustomRetryListener())
            .build();
}

이번 글에서는 배치에서의 Listener에 대해 알아봤습니다. 다음 글에는 배치 테스트 방법에 대해 알아보겠습니다.

profile
거인의 어깨 위에서 탭댄스를

0개의 댓글

관련 채용 정보