[ 정수원 스프링 배치 #13 ] - 반복과 예외 처리 : RepeatOperation, Skip

정동욱·2023년 10월 31일
0
post-thumbnail

이번 글에서는 스프링 배치에서 반복과 예외를 제어하는 방법에 대해 알아보겠습니다.

반복은 배치 프로그램에서 특정 조건이 충족될 때까지, Job이나 Step을 반복 실행하도록 프로그램을 구성할 수 있음을 의미합니다. 이때 Step의 반복과 Chunk의 반복을 RepeatOperation 인터페이스를 사용해 처리하는데요, 기본 구현체인 RepeatTemplate를 주로 이용합니다.

그리고 예외는 faultTolerant()라는 API를 이용하는데요, 예외 발생 시 건너뛰거나 재처리하는 기능을 지원합니다. 사실 예외를 처리한다는 것 자체도 복잡한 일인데, 단순히 핸들링이 아닌 재처리 등 더욱 많은 기능을 제공하기 때문에 알아야 할 내용이 꽤나 많습니다.

먼저 반복을 구현한 코드로 바로 볼텐데요, 이번 Job은 1개의 Step으로 구성되어 있고 1개의 Step은 익명 클래스를 사용해 간단히 구현했습니다. ItemReader에서는 단순히 문자열을 읽어 ItemProcessor로 보냅니다. ItemProcessor에서는 내부에서 RepeatTemplate를 이용해 ItemProcessor의 반복 횟수를 제어합니다. 그리고 ItemWriter에서는 받은 Item을 출력합니다.

@Configuration
@RequiredArgsConstructor
public class JobConfiguration2 {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;


    @Bean
    public Job batchjob2() {
        return jobBuilderFactory.get("batchjob2")
                .start(step2())
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step22")
                .<String, String>chunk(5)
                .reader(new ItemReader<>() {
                    int i = 0;
                    @Override
                    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        i++;
                        System.out.println("ItemReader(" + i + ")");
                        return i > 3 ? null : "item" + i;
                    }
                })
                .processor(new ItemProcessor<String, String>() {
                    RepeatTemplate repeatTemplate = new RepeatTemplate();
                    int i = 0;

                    @Override
                    public String process(String item) throws Exception {
                        i++;
                        System.out.println("ItemProcessor(" + i + ")");

                        repeatTemplate.setCompletionPolicy(new SimpleCompletionPolicy(3)); // 3번 동안 실행
                        repeatTemplate.iterate(new RepeatCallback() {
                            @Override
                            public RepeatStatus doInIteration(RepeatContext context) throws Exception {
                                System.out.println("repeatTemplate test");
                                
                                return RepeatStatus.CONTINUABLE;
                            }
                        });

                        return item;
                    }
                })
                .writer(items -> System.out.println("items = " + items))
                .allowStartIfComplete(true)
                .build();
    }
}

실행 결과 아래처럼 출력됩니다. ItemReader에서 3개의 Item이 생성되었고, ItemProcessor에서는 이를 Item 1개당 3번씩 "repeatTemplate test" 문자열을 반복 출력했습니다. 그리고 ItemWriter를 보면 3개의 Item이 잘 전달되어 출력된 걸 확인할 수 있습니다.

ItemReader(1)
ItemReader(2)
ItemReader(3)
ItemReader(4)
ItemProcessor(1)
repeatTemplate test
repeatTemplate test
repeatTemplate test
ItemProcessor(2)
repeatTemplate test
repeatTemplate test
repeatTemplate test
ItemProcessor(3)
repeatTemplate test
repeatTemplate test
repeatTemplate test
items = [item1, item2, item3]

반복 조건은 횟수 뿐만 아니라 시간으로도 제어할 수도 있습니다.

.processor(new ItemProcessor<String, String>() {
    RepeatTemplate repeatTemplate = new RepeatTemplate();
        int i = 0;

	@Override
    public String process(String item) throws Exception {
        i++;
        System.out.println("ItemProcessor(" + i + ")");

        repeatTemplate.setCompletionPolicy(new TimeoutTerminationPolicy(1)); // 1ms 동안 실행

        repeatTemplate.iterate(new RepeatCallback() {
            @Override
            public RepeatStatus doInIteration(RepeatContext context) throws Exception {
                System.out.println("repeatTemplate test");
                return RepeatStatus.CONTINUABLE;
            }
        });

        return item;
    }
})

Exceptionhandler를 설정함으로써 예외가 발생해도 프로그램을 바로 종료시키지 않고 정해진 횟수만큼 실행시킬 수 있습니다.

@Configuration
@RequiredArgsConstructor
public class JobConfiguration2 {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job batchjob2() {
        return jobBuilderFactory.get("batchjob2")
                .start(step2())
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step22")
                .<String, String>chunk(5)
                .reader(new ItemReader<>() {
                    int i = 0;
                    @Override
                    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        i++;
                        System.out.println("ItemReader(" + i + ")");
                        return i > 3 ? null : "item" + i;
                    }
                })
                .processor(new ItemProcessor<String, String>() {
                    RepeatTemplate repeatTemplate = new RepeatTemplate();
                    int i = 0;

                    @Override
                    public String process(String item) throws Exception {
                        i++;
                        System.out.println("ItemProcessor(" + i + ")");
					    repeatTemplate.setExceptionHandler(exceptionHandler()); // 예외 발생 시 제어

                        repeatTemplate.iterate(new RepeatCallback() {
                            @Override
                            public RepeatStatus doInIteration(RepeatContext context) throws Exception {
                                System.out.println("repeatTemplate test");
                                throw new RuntimeException("CustomException");
                            }
                        });

                        return item;
                    }
                })
                .writer(items -> System.out.println("items = " + items))
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public ExceptionHandler exceptionHandler() {
        return new SimpleLimitExceptionHandler(3); // 예외가 발생해도 3번 실행
    }

}

실행해보면 아래와 같은 결과가 나옵니다.

ItemReader(1)
ItemReader(2)
ItemReader(3)
ItemReader(4)
ItemProcessor(1)
repeatTemplate test
repeatTemplate test
repeatTemplate test
repeatTemplate test
2023-10-30 21:19:48.871 ERROR 6348 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step step22 in job batchjob2

java.lang.RuntimeException: CustomException

이제 예외를 제어해볼텐데요, 위의 ExceptionHandler를 이용하지 않고 faultTolerant() API 외 별도의 API들을 이용합니다. FaultTolerant란 단어를 풀어보면 "결점"과 "관용"의 붙임말로, 자연스러운 단어로 바꿔 보면 "결함 허용"이 될 것입니다. 예외가 발생할 때 해당 작업을 건너뛸 것인지, 혹은 재처리를 할지 설정하는 등의 더욱 섬세한 작업을 지원합니다. faultTolerant() API가 제공하는 기능은 SkipRetry인데요, 먼저 Skip에 대해 알아보겠습니다.

SkipItemReader/Processor/Writer 모두에서 사용할 수 있는데요, 결과적으로는 모두 동일하지만 적용되는 방식은 다 다릅니다. ItemReader의 경우 4개의 ItemReader 가운데 3번째 ItemReader에서 예외가 발생했을 때, 해당 ItemReader를 건너뛰고 4번째 ItemReader가 실행됩니다. 즉, 1번과 2번과 4번 ItemReaderChunk<Input>이 되는 것이죠.

ItemProcessor에서는 또 다른 방식으로 처리됩니다. 똑같이 4개의 ItemProcessor 가운데 3번째 ItemProcessor에서 예외가 발생한다면, 다시 1번째 ItemProcessor로 돌아가 다시 시작합니다. 이 때, 예외가 발생한 3번째 ItemProcessor은 건너뛰게 됩니다. 즉, 결과적으로는 1번 2번 4번 ItemProcessor가 처리되게 됩니다. ItemReader와 결과는 동일하지만, 내부적으로 동작되는 방식은 다른 것이죠. 그리고 ItemReader를 재차 실행시키지 않아도 Chunk<Input>는 캐시에 저장되어 있어 이를 가져와 사용합니다. 또한 예외가 발생한 ItemProcessor에 대한 정보가 내부적으로 남아있기 때문에 해당 ItemProcessor를 건너 뛸 수 있는 것입니다.

ItemWriterItemProcessor로 돌아가 실행합니다. 4개의 ItemWriter 가운데 3번째ItemWriter에서 예외가 발생한다면, ItemProcessor로 돌아가 하나의 item씩 다시 시작해 예외가 발생한 ItemWriter를 건너뛰게 됩니다.

예외 발생 시 Skip을 이용한 코드를 바로 작성해볼텐데요, ItemReader는 1부터 20까지의 숫자를 문자열로 반환합니다. ChunkSize는 5입니다. 그리고 ItemProcessorItemReader로 부터 받은 문자열을 출력하고 해당 수를 음수로 바꿔 ItemWriter로 전달합니다. 그런데 이 과정에서 받은 문자열이 "6"이거나 "7"이라면 RuntimeException을 발생시킵니다. 그리고 ItemWriterItemProcessor로 부터 받은 문자열을 출력하는데, 받은 문자열이 "-15"라면 똑같이 RuntimeException을 발생시킵니다. 그리고 Step에 faultTolerant()와 skip() API를 사용해 예외 발생 시 Skip 처리를 합니다.

public class SkipItemProcessor implements ItemProcessor<String, String> {

    private int count = 0;

    @SneakyThrows
    @Override
    public String process(String item) throws Exception {
        count++;

        if (item.equals("6") || item.equals("7")) {
            throw new RuntimeException("ItemProcessor Failed, count = " + count);
        } else {
            System.out.println("ItemProcessor, item = " + item);
            return String.valueOf(Integer.parseInt(item) * -1);
        }
    }
}
public class SkipItemWriter implements ItemWriter<String> {

    private int count = 0;

    @SneakyThrows
    @Override
    public void write(List<? extends String> items) throws Exception {
        count++;

        for (String item : items) {
            if (item.equals("-15")) {
                throw new RuntimeException("ItemWriter Failed, count = " + count);
            } else {
                System.out.println("ItemWriter, item = " + item);
            }
        }
    }
}
@Bean
public Step step3() {
    return stepBuilderFactory.get("step33")
            .<String, String>chunk(5)
            .reader(new ItemReader<>() {
                int i = 0;
                @Override
                public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                    i++;
                    System.out.println("index = " + i);
                    return i > 20 ? null : String.valueOf(i);
                }
            })
            .processor(new SkipItemProcessor())
            .writer(new SkipItemWriter())
            .faultTolerant() // 장애 처리 설정
            .skip(RuntimeException.class) // 건너뛸 예외 지정
            .skipLimit(3) //건너뛸 횟수 지정
//            .skipPolicy(new LimitCheckingItemSkipPolicy()) // 스킵 정책 설정
//            .noSkip(IllegalAccessException.class) // 해당 예외는 건너뛰지 않음
            .allowStartIfComplete(true)
            .build();
}

실행시켜보면 아래와 같이 출력됩니다. 보면 1~5까지의 숫자들은 정상적으로 출력됩니다. 하지만 ItemProcessor에서 "6"과 "7"에서 예외가 발생하기 때문에 내부적으로 재실행되었고, 결과적으로 8~10까지만 출력하고 ItemWriter로 결과를 넘겨주었습니다. ItemProcessor도 역시 받은대로 -8~-10을 출력했습니다.

index = 1
index = 2
index = 3
index = 4
index = 5
ItemProcessor, item = 1
ItemProcessor, item = 2
ItemProcessor, item = 3
ItemProcessor, item = 4
ItemProcessor, item = 5
ItemWriter, item = -1
ItemWriter, item = -2
ItemWriter, item = -3
ItemWriter, item = -4
ItemWriter, item = -5
index = 6
index = 7
index = 8
index = 9
index = 10
ItemProcessor, item = 8
ItemProcessor, item = 9
ItemProcessor, item = 10
ItemWriter, item = -8
ItemWriter, item = -9
ItemWriter, item = -10

그리고 이제 ItemWriter에서 "-15"가 나오면 예외를 발생하게 되는데요, 보면 ItemProcessor에는 아무런 이상 없이 첫 회에는 출력이 되었습니다. 그런데 ItemWirter에서 문제가 생겼고, 다시 ItemProcessor로 돌아가 다시 실행되는 것을 볼 수 있습니다. 그리고 이 때에는 전체를 한번에 실행하는 게 아니라 한 Item씩 실행하게 되죠.

index = 11
index = 12
index = 13
index = 14
index = 15
ItemProcessor, item = 11
ItemProcessor, item = 12
ItemProcessor, item = 13
ItemProcessor, item = 14
ItemProcessor, item = 15
ItemWriter, item = -11
ItemWriter, item = -12
ItemWriter, item = -13
ItemWriter, item = -14
ItemProcessor, item = 11
ItemWriter, item = -11
ItemProcessor, item = 12
ItemWriter, item = -12
ItemProcessor, item = 13
ItemWriter, item = -13
ItemProcessor, item = 14
ItemWriter, item = -14
ItemProcessor, item = 15
index = 16
index = 17
index = 18
index = 19
index = 20
ItemProcessor, item = 16
ItemProcessor, item = 17
ItemProcessor, item = 18
ItemProcessor, item = 19
ItemProcessor, item = 20
ItemWriter, item = -16
ItemWriter, item = -17
ItemWriter, item = -18
ItemWriter, item = -19
ItemWriter, item = -20
index = 21

사실 이러한 Skip 설정은 매우 케이스가 다양하기 때문에 깊게 들어가기 보다는 대략적인 개념과 사용법을 알고 다음 필요한 경우에 다시 알아보는 게 좋은 방법인 것 같습니다.

이번 글에서는 배치에서 반복 처리와 예외 발생 시 건너뛰는 Skip에 대해 알아봤습니다. 다음 글에서는 예외 발생 시 재처리하는 Retry에 대해 알아보겠습니다.

profile
거인의 어깨 위에서 탭댄스를

0개의 댓글

관련 채용 정보