querydsl paging reader thread-safe
multi thread๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด thread safe๊ฐ ๋์ด ์๋ Reader์ Writer๋ฅผ ์ฌ์ฉํด์ผํ๋๋ฐ. ๋ด๊ฐ ์ฌ์ฉํ querydslReader์ Writer๋ safe๊ฐ ๋ง๋ค๋ ์ธ์ฆํ ๊ธ์ ์ฐธ๊ณ ํ์๋ค.
batch์์์ multi thread ํ๊ฒฝ ์ค์ ์ excutor๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
//๋ก์ง ์คํ์ ํ๋ฒ์ ์ฝ์ด์ฌ ๋ฐ์ดํฐ (=chunk์ ํฌ๊ธฐ == transaction ํฌ๊ธฐ)
private final int CHUNK_SIZE = 1;
private final int POOL_SIZE = 2; //multi thread
private long totalBeforeTime = 0;
@Bean
public TaskExecutor orderDivisionExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(POOL_SIZE); //Thread Pool ๊ธฐ๋ณธ ์ฌ์ด์ฆ, ์ต์ด ์ธํ
์ฌ์ด์ฆ
executor.setMaxPoolSize(POOL_SIZE); //Thread Pool ์ต๋ ์ฌ์ด์ฆ
executor.setThreadNamePrefix("order-division-thread-");
executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE); //์์
์ด ์ข
๋ฃ๋๋ฉด Thread๋ ์ข
๋ฃ
executor.initialize();
return executor;
}
job์ ์ง์ ํ๋ ๊ณณ์ ๋ค์๊ณผ ๊ฐ์ด Bean์ผ๋ก TaskExecutor๋ฅผ ๋ง๋ค๊ณ POOL_SIZE๋ ์์๋ก ์ํ๋ ํฌ๊ธฐ๋งํผ์ผ๋ก ์ค์ ํด์ค๋ค.
@Bean
@JobScope
public Step orderProcessorStep() {
return stepBuilderFactory.get("orderProcessorStep")
.transactionManager(pt)
.<Long, Long>chunk(CHUNK_SIZE)
.reader(orderProcessorReader())
.writer(orderProcessorWriter())
.taskExecutor(orderDivisionExecutor()) //task ์ค์
.throttleLimit(POOL_SIZE) //์์ฑ๋ Thread ์ค ์ฌ์ฉํ Thread์ ๊ฐ์
.build();
}
๊ทธ๋ฆฌ๊ณ job์ ๋ง๋ค ๋ writer ๋ค์ ํด๋น ์ค์ ์ ํด์ค๋ค.
๊ทธ๋ฆฌ๊ณ ์ค์ํ๊ฒ์ mulit thread๋ก batch๋ฅผ ์๋์ํค๋ฉด batch์ ์ฅ์ ์ค ํ๋์ธ ์คํ์ ์คํจํ์ ๊ฒฝ์ฐ ์คํจ ๋ถ๋ถ๋ถํฐ ์ฌ์์ํ๋ ๊ฒ์ ํ ์ ์๊ฒ๋๋ค.
๊ทธ ์ด์ ๋ ๋น๋๊ธฐ๋ก ์์ ์ด ์งํ๋๋ค ๋ณด๋ ์คํจ ์์ ์ ์์๋ ์ดํ์ ์์ ๋ค์ด ์ ์ ์ฒ๋ฆฌ๋์์์ง ์๋์์์ง ๋ช ํํ๊ฒ ์ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค.
๊ทธ๋์ ์ฐ๋ฆฌ๋ Reader์์ saveState๋ฅผ false๋ก ์ค์ ํด์ฃผ์ด์ผํ๋ค.
๋ฐ๋ก ์ค์ ์ ์ํด๋ ๋๋์ง๋ ๋ชจ๋ฅด๊ฒ ์ง๋ง ํด๋ณด์ ํ๋ฒ
QuerydslPagingItemReader์ ๋ฉ์๋ ์ค์ save option์ ์ ๊ฑฐํ๊ณ ์์ฑํ๋ ์์ฑ์๊ฐ ์๋๋ผ ๊ทธ๋์ ์์ ๋ฐ๊ณ ์๋ AbstractPagingItemReader ๋ด๋ถ๋ฅผ ์ดํด๋ดค๋ค.
๊ทธ๋ฐ๋ฐ AbstractPagingItemReader ๋ด๋ถ์๋ ๋ฐ๋ก ์ค์ ํ๋ ์ต์ ์ด ์๋๋ผ ๊ทธ๋์ ๋ AbstractItemCountingItemStreamItemReader๋ฅผ ์ดํด๋ณด๋ ๊ทธ์ ์์ผ saveState๊ฐ ๋ณด์๋ค.
๊ธฐ๋ณธ ๊ฐ์ด true์ธ๊ฑธ๋ก ๋ณด์ ์ฐ๋ฆฌ๊ฐ false๋ก ๋ฐ๊ฟ์ค์ผํ ๊ฑฐ ๊ฐ๋ค.
public void setSaveState(boolean saveState) {
this.saveState = saveState;
}
/**
* The flag that determines whether to save internal state for restarts.
* @return true if the flag was set
*/
public boolean isSaveState() {
return saveState;
}
๊ตฌํ๋์ด์๋ ๋ฉ์๋ ์ค setSaveState()
๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ์ false๋ก ๋ฐ๊ฟ์ ์์ฑํ๋ ์์ฑ์๋ฅผ ๊ตฌํํด๋ณด์๋ค.
//multi thread ์ฌ์ฉ์ save option ์ ๊ฑฐ
public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory,
int pageSize,
int state,
Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
this();
this.entityManagerFactory = entityManagerFactory;
this.queryFunction = queryFunction;
setPageSize(pageSize);
setSaveState(state == 1 ? true : false);
}
๊ธฐ์กด์ transacted ์ค์ ํ๋ ์์ฑ์๊ฐ ์กด์ฌํด์ boolean ๊ฐ์ ๊ทธ๋๋ก ์ฌ์ฉํ์ง ๋ชปํ๊ณ ๊ทธ๋ ๋ค๊ณ ๋งค๊ฐ๋ณ์์ ์์๋ฅผ ๋ฐ๊พธ์๋ ๋์ค์ ๋ ํท๊ฐ๋ฆด๊ฑฐ ๊ฐ๊ณ ๊ทธ๋ ๋ค๊ณ ๋ enum์ผ๋ก ์ ์ํด๋์๋ true false๋ง ๊ตฌ๋ถํ๋ฉด ๋๊ธฐ ๋๋ฌธ์ Integer์ 0๊ณผ 1๋ก ๊ตฌ๋ถํ๋๋ก ํ๋ค.
@Bean
@StepScope
public QuerydslPagingItemReader<? extends Long> orderProcessorReader() {
return new QuerydslPagingItemReader<>(emf, CHUNK_SIZE, 0, queryFactory ->
queryFactory
.select(order.idx)
.from(order)
.where(order.payWay.eq(8).and(order.ordStep.lt("200")))
);
}
๊ทธ๋ฆฌ๊ณ ์์ฑํ ๋ ๋งค๊ฐ๋ณ์๋ก 0์ ์ ๋ฌํ๋ฉด false๋ก ์ธํ ๋ Reader๋ฅผ ์์ฑํ ์ ์๊ฒ๋์๋ค.
QuerydslNoOffsetPagingItemReader์ ์ QuerydslPagingItemReader๋ฅผ ์์ํ์ฌ ์์ฑํ๊ณ ์๊ธฐ ๋๋ฌธ์ ์์์ ๋ง๋ ์์ฑ์๋ก ๋ง๋๋ ์์ฑ์๋ฅผ ๋ ์ถ๊ฐํด์ฃผ๋ฉด ๋๋ค.
//multi thread ํ๊ฒฝ์ผ ๊ฒฝ์ฐ saveState(false)
public QuerydslNoOffsetPagingItemReader(EntityManagerFactory entityManagerFactory,
int pageSize,
QuerydslNoOffsetOptions<T> options,
int state,
Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
super(entityManagerFactory, pageSize, state, queryFunction);
setName(ClassUtils.getShortName(QuerydslNoOffsetPagingItemReader.class));
this.options = options;
}
๋ค์๊ณผ ๊ฐ์ด ๋์ผํ๊ฒ ์์ฑํด์ฃผ๊ณ
@Bean
@StepScope
public QuerydslNoOffsetPagingItemReader<OrderDivision> orderDivisionSimpleReader() {
QuerydslNoOffsetNumberOptions<OrderDivision, Long> options = new QuerydslNoOffsetNumberOptions<>(orderDivision.idx, Expression.ASC);
return new QuerydslNoOffsetPagingItemReader<>(emf, CHUNK_SIZE, options, 0, queryFactory ->
...
);
}
๋ค์๊ณผ ๊ฐ์ด ์์ฑํ๋ค.
Multi Thread ํ๊ฒฝ์ผ๋ก batch๋ฅผ ๋๋ฆฌ๋๊ฒ์ ๊ถ์ฅํ๋ ๊ฒ ๊ฐ์ง๋ ์์ผ๋ ํ์ฌ ์ ๋ฌด ์ฐจ์์์ ๋์ฉ๋์ด์ ๋ฐ์ดํฐ๋ฅผ ์ ํด์ง ์๊ฐ ๋ด์ ์ต๋ํ ๋น ๋ฅด๊ฒ ๋ง์ ์์ ์ ํด์ผํ ์ผ์ด ์์ด์ ์ฌ์ฉํ๊ฒ ๋์๋ค. ์ด๋์ฑ๋๋ ํด๋น ๊ธฐ๋ฅ์ ๋์ ํ ๋ ์ต๋ํ ๊ณ ๋ฏผํด๋ณด๋ผ๊ณ ํ์๋๊ฑธ ๋ณด๋ฉด ์ ๋ง ๋น ๋ฅด๊ฒ ์ฒ๋ฆฌํด์ผํ ๊ฒฝ์ฐ์๋ง ๋์ ํ๋๊ฒ ์ข์๊ฑฐ ๊ฐ๋ค!