이번 글에서는 배치에서의 멀티 스레드 프로세싱에 대해 알아보겠습니다.
배치에서는 기본적으로 단일 스레드와 동기 방식으로 작업을 처리합니다. 하지만 멀티 스레드와 비동기를 지원하는데요, 보통 작업량이 많을 경우 이 방식을 사용합니다. 역시 동시성 이슈가 발생할 수 있기 때문에 잘 다루어 사용해야 합니다.
배치에는 AsyncItemProcessor/Writer, Multi-threaded Step, Parallel Steps, Partitioning이 등 여러 멀티 스레드 모델이 있습니다. AsyncItemProcessor/Writer는 ItemProcessor/Writer에게 별도의 스레드를 할당해 작업을 처리하는 방식입니다. Multi-threaded Step는 Step 내 Chunk 구조인 ItemReader/Processor/Writer마다 여러 스레드가 할당돼 작업을 처리합니다. Parallel Steps는 Step마다 스레드가 할당돼 여러 Step들을 병렬로 실행하는 방식입니다. Partitioning은 Master/Slave 방식으로 Master Step에서 데이터를 파티셔닝해 각 파티션에 스레드를 할당해 Slave Step이 독립적으로 작동하는 방식입니다.
AsyncItemProcessor/Writer, Multi-threaded Step 방식은 Step 내에서 별도의 스레드가 할당되고, Parallel Steps는 Step 별로 스레드가 할당되는 차이가 있습니다. 또한 Partitioning도 Step 별로 스레드가 할당되긴 하지만, 1개의 Master Step과 다수의 Slave Step으로 구성되어 있습니다. 하지만 TaskExecutor의 구현체를 사용해 멀티 스레드와 비동기를 구성한기 때문에 이 객체의 사용이 모두 필수적입니다.
먼저 Multi-threaded Step 방식에 대해 살펴보겠습니다. Step 내 Chunk 구조인 ItemReader/Processor/Writer마다 여러 스레드가 할당되는데요, 가장 중요한 점은 ItemReader가 데이터를 읽어올 때 Thread-Safe해야 한다는 점입니다. 예를 들어 3개의 멀티 스레드를 사용한다고 할 때, 각 스레드가 ItemReader를 통해 읽은 Chunk는 해당 스레드의 몫으로 ItemProcessor와 ItemWriter까지 고유하게 전해져 처리됩니다. 스레드끼리 Chunk를 공유하지 않기 때문에 이후에는 서로 간섭할 일이 생기지 않습니다.
코드를 볼텐데요, Customer 테이블에 100개의 데이터가 있습니다. ItemReader에서 이 테이블의 데이터를 가져오는데, chunk size는 10개입니다. 그리고 ItemProcessor에서는 이 데이터를 Customer2 타입으로 변환합니다. 마지막으로 ItemWriter에서는 받은 데이터를 Customer2 테이블에 저장합니다. 그리고 Multi-threaded Step 방식을 사용하기 위해서는 아래의 taskExecutor() API가 필수적입니다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration6 {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final String sql1 = "SELECT * FROM customer";
private final String sql5 = "INSERT INTO customer2 VALUES (:id, :firstName, :lastName, :birthDate)";
@Bean
public Job batchjob6() throws Exception {
return jobBuilderFactory.get("batchjob6")
.start(step6())
.listener(new StopWatchJobListener())
.build();
}
@Bean
public Step step6() throws Exception {
return stepBuilderFactory.get("step6")
.<Customer, Customer2>chunk(10)
.reader(jdbcPagingItemReader())
.listener(new CustomItemReadListener())
.processor(new CustomItemProcessor3())
.listener(new CustomItemProcessListener())
.writer(jdbcBatchItemWriter())
.listener(new CustomItemWriteListener())
.taskExecutor(taskExecutor()) // 비동기적인 멀티 스레드 적용
.allowStartIfComplete(true)
.build();
}
@Bean
public ItemReader<Customer> jdbcPagingItemReader() throws Exception {
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader")
.pageSize(10) // 사이즈 설정
.dataSource(dataSource) // DB 설정
.rowMapper(new BeanPropertyRowMapper<>(Customer.class)) // 매핑할 클래스 설정
.queryProvider(pagingQueryProvider()) // PagingQueryProvider 설정
.build();
}
@Bean
public PagingQueryProvider pagingQueryProvider() throws Exception {
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("*");
queryProvider.setFromClause("FROM customer");
queryProvider.setSortKeys(sortKeys); // 정렬 설정
return queryProvider.getObject();
}
@Bean
public JdbcBatchItemWriter<Customer2> jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer2>()
.dataSource(dataSource)
.sql(sql5)
.beanMapped() // 객체와 그 필드를 DB 테이블과 컬럼에 매핑
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(3);
taskExecutor.setThreadNamePrefix("async-thread");
return taskExecutor;
}
}
단일 스레드의 동기적 방식이라면, ItemReader에서 10건의 데이터를 가져오고, ItemProcessor에서 10건의 데이터는 Customer2 타입으로 변환되고, ItemWriter는 이걸 Customer 테이블에 저장할 것입니다. 그리고 이 과정이 10번 반복되고 아래의 결과가 나오게 됩니다.
Thread : main, read item : 1
Thread : main, read item : 2
Thread : main, read item : 3
Thread : main, read item : 4
Thread : main, read item : 5
Thread : main, read item : 6
Thread : main, read item : 7
Thread : main, read item : 8
Thread : main, read item : 9
Thread : main, read item : 10
Thread : main, process item : 1
Thread : main, process item : 2
Thread : main, process item : 3
Thread : main, process item : 4
Thread : main, process item : 5
Thread : main, process item : 6
Thread : main, process item : 7
Thread : main, process item : 8
Thread : main, process item : 9
Thread : main, process item : 10
Thread : main, write item size : 10
Thread : main, read item : 11
Thread : main, read item : 12
Thread : main, read item : 13
Thread : main, read item : 14
Thread : main, read item : 15
Thread : main, read item : 16
Thread : main, read item : 17
Thread : main, read item : 18
Thread : main, read item : 19
Thread : main, read item : 20
Thread : main, process item : 11
Thread : main, process item : 12
Thread : main, process item : 13
Thread : main, process item : 14
Thread : main, process item : 15
Thread : main, process item : 16
Thread : main, process item : 17
Thread : main, process item : 18
Thread : main, process item : 19
Thread : main, process item : 20
Thread : main, write item size : 10
...
하지만 멀티 스레드 프로세싱에서는 ItemReader에서 30건의 데이터를 가져오고, ItemProcessor에서 30건의 데이터는 Customer2 타입으로 변환되고, ItemWriter는 이걸 Customer 테이블에 저장합니다. 3개의 스레드로 실행되니까요. 결과가 뒤죽박죽처럼 보이는 것은 멀티 스레드의 특성상 실행 순서를 예상할 수 없음으로 인한 현상입니다.
Thread : async-thread3, read item : 3
Thread : async-thread1, read item : 2
Thread : async-thread2, read item : 1
Thread : async-thread3, read item : 4
Thread : async-thread1, read item : 5
Thread : async-thread3, read item : 7
Thread : async-thread2, read item : 6
Thread : async-thread1, read item : 9
Thread : async-thread2, read item : 10
Thread : async-thread3, read item : 8
Thread : async-thread1, read item : 11
Thread : async-thread3, read item : 12
Thread : async-thread2, read item : 13
Thread : async-thread3, read item : 15
Thread : async-thread1, read item : 14
Thread : async-thread2, read item : 17
Thread : async-thread3, read item : 16
Thread : async-thread2, read item : 19
Thread : async-thread1, read item : 18
Thread : async-thread3, read item : 20
Thread : async-thread2, read item : 21
Thread : async-thread3, read item : 22
Thread : async-thread2, read item : 24
Thread : async-thread1, read item : 23
Thread : async-thread3, read item : 25
Thread : async-thread2, read item : 26
Thread : async-thread1, read item : 27
Thread : async-thread2, read item : 28
Thread : async-thread1, read item : 29
Thread : async-thread1, read item : 30
Thread : async-thread1, process item : 2
Thread : async-thread3, process item : 3
Thread : async-thread2, process item : 1
Thread : async-thread1, process item : 5
Thread : async-thread2, process item : 6
Thread : async-thread3, process item : 4
Thread : async-thread2, process item : 10
Thread : async-thread1, process item : 9
Thread : async-thread2, process item : 13
Thread : async-thread3, process item : 7
Thread : async-thread2, process item : 17
Thread : async-thread3, process item : 8
Thread : async-thread1, process item : 11
Thread : async-thread2, process item : 19
Thread : async-thread1, process item : 14
Thread : async-thread3, process item : 12
Thread : async-thread1, process item : 18
Thread : async-thread3, process item : 15
Thread : async-thread2, process item : 21
Thread : async-thread3, process item : 16
Thread : async-thread1, process item : 23
Thread : async-thread3, process item : 20
Thread : async-thread2, process item : 24
Thread : async-thread3, process item : 22
Thread : async-thread1, process item : 27
Thread : async-thread2, process item : 26
Thread : async-thread1, process item : 29
Thread : async-thread3, process item : 25
Thread : async-thread1, process item : 30
Thread : async-thread2, process item : 28
Thread : async-thread2, write item size : 10
Thread : async-thread3, write item size : 10
Thread : async-thread1, write item size : 10
위 로그는 listener() API에 등록한 ItemListener의 구현체로 출력한 것인데요, 쉽게 설명하면 각 단계가 끝이나면 뒤이어 실행되는 작업입니다. Reader/Processor/Writer 모두에 사용할 수 있지만, 모두 다른 인터페이스를 상속받습니다. ItemListener에 대해서는 다음 글에서 다루도록 하겠습니다.
그리고 또 하나 중요한 점은 멀티 스레드 환경에서는 동시성 이슈가 발생할 수 있기 때문에 jdbc든 jpa든 CursorItemReader는 사용할 수 없다는 점입니다. 동일한 데이터를 DB로 부터 읽어와 에러를 발생시키기 때문에 꼭 PagingItemReader를 사용해주어야 합니다.
그리고 이제 Parallel Steps에 대해 알아보겠습니다. SplitState를 이용해 여러 개의 Flow를 병렬적으로 실행하는 구조로, 쉽게 말해 Flow 별로 스레드를 할당하는 방식입니다. 방식은 생각보다 간단합니다. Job을 구성하는 단계에서 split() API의 파라미터에 TaskExecutor 객체를 넣어 스레드풀에 대해 설정합니다.
아래의 코드를 보면 flow1과 flow2가 있는데, flow1은 step1을 실행하고 flow2는 step2와 3을 실행합니다. 현재 상태에서는 flow1과 flow2에 대해 스레드가 할당된 것입니다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration7 {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job batchjob7() {
return jobBuilderFactory.get("batchjob7")
.start(flow1())
.split(taskExecutor7()).add(flow2())
.end()
.listener(new StopWatchJobListener())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(step1())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(step2())
.next(step3())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.allowStartIfComplete(true)
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(tasklet())
.allowStartIfComplete(true)
.build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3")
.tasklet(tasklet())
.allowStartIfComplete(true)
.build();
}
@Bean
public Tasklet tasklet() {
return new CustomTasklet();
}
@Bean
public TaskExecutor taskExecutor7() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(4);
taskExecutor.setThreadNamePrefix("Paralell-Steps");
return taskExecutor;
}
}
아래의 tasklet은 10번의 반복을 실행하는데, 최종적으로 마지막 수를 출력하게 됩니다.
public class CustomTasklet implements Tasklet {
private long sum;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int i = 0; i < 1000000000; i++) {
sum++;
}
System.out.println(String.format("%s has been executed on thread %s", chunkContext.getStepContext().getStepName(), Thread.currentThread().getName()));
System.out.println(String.format("sum = %d", sum));
return RepeatStatus.FINISHED;
}
}
위 코드를 실행시켜보면 총 3개의 Step에서 동일한 tasklet을 실행합니다. 그렇다면 첫 Step에서는 10억을, 두 번째 Step에서는 20억을, 세 번째 Step에서는 30억을 찍어야합니다. 하지만 실제로 실행시켜보면 그렇게 되지 않고 아래처럼 이상한 수가 나오게 됩니다. 그 이유는 바로 2개의 스레드가 실행되면서 멤버변수인 sum과 execute() 매서드를 공유하게 되는데요, 이 과정에서 동시성 이슈가 발생해 서로의 데이터에 간섭하게 된 것입니다.
step1 has been executed on thread Paralell-Steps1
sum = 1022785943
step2 has been executed on thread Paralell-Steps2
sum = 1029521744
step3 has been executed on thread Paralell-Steps1
sum = 2029521744
그래서 이 경우에는 synchronized 키워드를 사용해 하나의 스레드만 점유할 수 있도록 해주어야 합니다. 자바에서 synchronized를 사용할 때에는 디테일이 필요한데요, 그러니까 synchronized를 어디에 사용하는 지에 따라 영향도가 달라집니다. 매서드 자체에 붙이느냐, 실제로 필요한 곳에 블록으로 붙이느냐로 나뉩니다. 매서드에 설정되는 synchronized는 인스턴스 단위로 동기화가 되기 때문에 1개의 스레드만 허용됩니다. 반대로 블록 단위로 설정하면 그 블록 내에서만 동기화되기 때문에 더욱 효율적입니다.
public class CustomTasklet implements Tasklet {
private long sum;
private Object lock = new Object();
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
synchronized (lock) {
for (int i = 0; i < 1000000000; i++) {
sum++;
}
System.out.println(String.format("%s has been executed on thread %s", chunkContext.getStepContext().getStepName(), Thread.currentThread().getName()));
System.out.println(String.format("sum = %d", sum));
}
return RepeatStatus.FINISHED;
}
}
위 코드로 수정한 후 실행하면 순차적으로 결과가 출력되는 걸 볼 수 있습니다.
step1 has been executed on thread Paralell-Steps2
sum = 1000000000
step2 has been executed on thread Paralell-Steps1
sum = 2000000000
step3 has been executed on thread Paralell-Steps1
sum = 3000000000
그리고 SynchronizedItemStreamReader에 대해 알아볼텐데요, Thread-Safe하게 DB로 부터 Item을 읽어올 수 있도록 도와주는 역할을 합니다. 위에서도 계속 봐왔지만 멀티 스레드 프로세싱에서는 ItemReader에서 Thread-Safe하지 않으면 여러 스레드에서 같은 데이터를 읽어 오는 문제가 생깁니다. 아래의 코드는 JdbcCursorItemReader를 사용해 DB에서 값을 읽어온 코드입니다. Cursor는 동시성 이슈가 있기 때문에 멀티 스레드 환경에는 사용하기 어렵습니다.
Thread : Non-Thread-Safe3, item.getId() = 4
Thread : Non-Thread-Safe2, item.getId() = 4
Thread : Non-Thread-Safe1, item.getId() = 4
Thread : Non-Thread-Safe4, item.getId() = 4
Thread : Non-Thread-Safe4, item.getId() = 8
Thread : Non-Thread-Safe2, item.getId() = 8
Thread : Non-Thread-Safe3, item.getId() = 8
Thread : Non-Thread-Safe3, item.getId() = 11
Thread : Non-Thread-Safe3, item.getId() = 12
Thread : Non-Thread-Safe3, item.getId() = 13
Thread : Non-Thread-Safe3, item.getId() = 14
org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.
그러나 ItemReader를 SynchronizedItemStreamReader 내부에 등록하여 사용한다면 동기화된 ItemReader로써 사용할 수 있습니다.
@Bean
public Step step9() {
return stepBuilderFactory.get("step9")
.<Customer, Customer2>chunk(10)
.reader(jdbcCursorItemReader9())
.listener(new CustomItemReadListener())
.writer(jdbcBatchItemWriter9())
.allowStartIfComplete(true)
.taskExecutor(taskExecutor9())
.build();
}
@Bean
public SynchronizedItemStreamReader<Customer> jdbcCursorItemReader9() {
JdbcCursorItemReader<Customer> jdbcCursorItemReader = new JdbcCursorItemReaderBuilder<Customer>()
.name("jdbcCursorItemReader9")
.fetchSize(10) // 한 번에 가져올 데이터 수
.sql(sql2) // 실행할 sql문
.beanRowMapper(Customer.class) // 변환할 클래스
.dataSource(dataSource) // DB 설정
.build();
return new SynchronizedItemStreamReaderBuilder<Customer>()
.delegate(jdbcCursorItemReader)
.build();
}
실행결과를 보면 같은 데이터를 읽지 않는 걸 볼 수 있습니다.
Thread : Thread-Safe1, item.getId() = 3
Thread : Thread-Safe2, item.getId() = 4
Thread : Thread-Safe4, item.getId() = 2
Thread : Thread-Safe3, item.getId() = 1
Thread : Thread-Safe2, item.getId() = 5
Thread : Thread-Safe4, item.getId() = 6
Thread : Thread-Safe2, item.getId() = 7
Thread : Thread-Safe1, item.getId() = 8
Thread : Thread-Safe3, item.getId() = 9
Thread : Thread-Safe1, item.getId() = 10
Thread : Thread-Safe2, item.getId() = 11
Thread : Thread-Safe4, item.getId() = 12
Thread : Thread-Safe2, item.getId() = 13
이번 글에서 배치에서의 멀티 스레드 프로세싱에 대해 알아봤습니다. 꽤나 양이 많고 복잡한 내용이었습니다. 다음 글에서는 이벤트 리스너에 대해 알아보겠습니다.