이번 글에서는 배치의 Event Listener에 대해 알아보겠습니다.
Event Listener는 배치에서 Job, Step, Chunk 단계의 실행 전/후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스입니다. 주로 각 단계별 로그를 남기거나 실행 상태 정보를 참조하기 위해 사용되며 모든 단계별로 지정할 수 있습니다. Event Listener는 Job, Step, Skip, Retry 모든 단계를 지원하는데요, 각 작업의 성공/실패와 관계없이 무조건 호출됩니다. 사실 Event Listener는 어느 단계든 거의 동일한 구조로 이루어져 있기 때문에 간단한 편입니다.
먼저 Job을 구성하고 그 아래 간단한 2개의 Step을 만들어 보겠습니다. Job과 Step 모두 Event Listener를 가집니다. JobExecutionListener는 시작을 알리는 BeforeJob과 총 소요시간을 출력하는 AfterJob으로 구성하고, StepExecutionListener는 Step의 정보를 출력합니다. Event Listener를 사용하는 방법은 인터페이스를 구현하는 방법과 어노테이션을 사용하는 방식 두 가지가 있는데, JobListener는 어노테이션 기반으로 생성하고 StepListener는 인터페이스를 구현하는 방식으로 생성하겠습니다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration10 {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job batchjob1() {
return jobBuilderFactory.get("batchjob10")
.incrementer(new RunIdIncrementer())
.start(step1())
.next(step2())
.listener(new CustomAnnotationJobExecutionListener())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(((contribution, chunkContext) -> RepeatStatus.FINISHED))
.listener(new CustomStepExecutionListener())
.allowStartIfComplete(true)
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step10")
.tasklet(((contribution, chunkContext) -> RepeatStatus.FINISHED))
.listener(new CustomStepExecutionListener())
.allowStartIfComplete(true)
.build();
}
}
public class CustomAnnotationJobExecutionListener {
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName() + " Started");
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
long duration = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();
System.out.println(jobExecution.getJobInstance().getJobName() + " 총 소요시간 = " + duration);
}
}
public class CustomStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " Started");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
ExitStatus exitStatus = stepExecution.getExitStatus();
BatchStatus batchStatus = stepExecution.getStatus();
System.out.println(stepExecution.getStepName() + " exitStatus = " + exitStatus);
System.out.println(stepExecution.getStepName() + " batchStatus = " + batchStatus);
return ExitStatus.COMPLETED;
}
}
그리고 실행하면 아래처럼 간단한 로그가 찍히게 됩니다.
batchjob1 Started
step1 Started
step1 exitStatus = exitCode=COMPLETED;exitDescription=
step1 batchStatus = COMPLETED
step2 Started
step2 exitStatus = exitCode=COMPLETED;exitDescription=
step2 batchStatus = COMPLETED
batchjob1 총 소요시간 = 143
그리고 ChunkListener 와 ItemRead/Process/WriteListener에 대해 알아보겠습니다. 이 역시 매우 간단합니다. 각각에 맞는 Event Listener를 구현해 사용하면 됩니다. 경우는 모두 before, after, onError가 있는데요, 코드로 보겠습니다. 먼저 어노테이션 방식으로 구현한 ChunkListener입니다.
public class CustomChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext chunkContext) {
System.out.println(">>>>> Before Chunk");
}
@AfterChunk
public void afterChunk(ChunkContext chunkContext) {
System.out.println(">>>>> After Chunk");
}
@AfterChunkError
public void afterChunkError(ChunkContext chunkContext) {
System.out.println(">>>>> After Chunk Error");
}
}
그리고 ItemListener들 입니다. 인터페이스를 구현했고 간단히 로그만 찍었는데요, 실제 사용 시 원하는 데이터를 출력하거나 시작을 알리는 용도로 사용하면 될 것 같습니다. ItemListener들은 각 단계에서 chunk에 대해 작업할 때마다 동작합니다.
public class CustomItemReadListener implements ItemReadListener<Customer> {
@Override
public void beforeRead() {
System.out.println(">>>>> Before Read");
}
@Override
public void afterRead(Customer item) {
System.out.println(">>>>> After Read");
}
@Override
public void onReadError(Exception ex) {
System.out.println(">>>>> After Read Error");
}
}
public class CustomItemProcessListener implements ItemProcessListener<Customer, Customer2> {
@Override
public void beforeProcess(Customer item) {
System.out.println(">>>>> Before Process");
}
@Override
public void afterProcess(Customer item, Customer2 result) {
System.out.println(">>>>> After Process");
}
@Override
public void onProcessError(Customer item, Exception e) {
System.out.println(">>>>> On Process Error");
}
}
public class CustomItemWriteListener implements ItemWriteListener<Customer2> {
@Override
public void beforeWrite(List<? extends Customer2> items) {
System.out.println(">>>>> Before Write");
}
@Override
public void afterWrite(List<? extends Customer2> items) {
System.out.println(">>>>> After Write");
}
@Override
public void onWriteError(Exception exception, List<? extends Customer2> items) {
System.out.println(">>>>> After Write Error");
}
}
마지막으로 SkipListener와 RetryListener를 살펴보겠습니다. 먼저 SkipListener는 Read/Process/Write 모든 단계에서 발생하는 Skip에 대해 Listen합니다.
public class CustomSkipListener implements SkipListener {
@Override
public void onSkipInRead(Throwable t) {
System.out.println(">> onSkipInRead : "+ t.getMessage());
}
@Override
public void onSkipInWrite(Object item, Throwable t) {
System.out.println(">> onSkipInWrite : "+ item);
System.out.println(">> onSkipInWrite : "+ t.getMessage());
}
@Override
public void onSkipInProcess(Object item, Throwable t) {
System.out.println(">> onSkipInProcess : "+ item);
System.out.println(">> onSkipInProcess : "+ t.getMessage());
}
}
아래 방식으로 사용하면 됩니다.
@Bean
public Step step12() throws Exception {
return stepBuilderFactory.get("step12")
.<Integer, String>chunk(10)
.reader(linkedListItemReader())
.processor(new CustomItemProcessor4())
.writer(items -> {
for (String item : items) {
if (item.equals("item5")) {
throw new CustomSkipException("write skipped");
}
System.out.println("write : " + item);
}
})
.faultTolerant()
.skip(CustomSkipException.class)
.skipLimit(3)
.listener(new CustomSkipListener())
.build();
}
그리고 RetryListener를 볼텐데요, 아직 정확하게 이해하지 못해 코드만 남깁니다.
public class CustomRetryListener implements RetryListener {
// 재시도 전 매번 호출
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("open");
return true;
}
// 재시도 후 매번 호출
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
System.out.println("close");
}
// 재시도 실패 시 매번 호출
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
System.out.println("Retry Count: " + context.getRetryCount());
}
}
@Bean
public Step step12() throws Exception {
return stepBuilderFactory.get("step12")
.<Integer, String>chunk(10)
.reader(linkedListItemReader2())
.processor(new CustomItemProcessor5())
.writer(new CustomItemWriter2())
.faultTolerant()
.retry(CustomRetryException.class)
.retryLimit(2)
.listener(new CustomRetryListener())
.build();
}
이번 글에서는 배치에서의 Listener에 대해 알아봤습니다. 다음 글에는 배치 테스트 방법에 대해 알아보겠습니다.