Spring Batch Multi Thread

์ตœ์ค€ํ˜ธยท2022๋…„ 5์›” 4์ผ
0

Spring Batch

๋ชฉ๋ก ๋ณด๊ธฐ
10/10
post-thumbnail

์ถœ์ฒ˜๊ธ€

querydsl paging reader thread-safe

multi thread๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด thread safe๊ฐ€ ๋˜์–ด ์žˆ๋Š” Reader์™€ Writer๋ฅผ ์‚ฌ์šฉํ•ด์•ผํ•˜๋Š”๋ฐ. ๋‚ด๊ฐ€ ์‚ฌ์šฉํ•œ querydslReader์™€ Writer๋Š” safe๊ฐ€ ๋งž๋‹ค๋Š” ์ธ์ฆํ•œ ๊ธ€์„ ์ฐธ๊ณ ํ•˜์˜€๋‹ค.

๐Ÿ”จbatch์˜ multi thread ์„ค์ •

batch์—์„œ์˜ multi thread ํ™˜๊ฒฝ ์„ค์ •์€ excutor๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

    //๋กœ์ง ์‹คํ–‰์‹œ ํ•œ๋ฒˆ์— ์ฝ์–ด์˜ฌ ๋ฐ์ดํ„ฐ (=chunk์˜ ํฌ๊ธฐ == transaction ํฌ๊ธฐ)
    private final int CHUNK_SIZE = 1;
    private final int POOL_SIZE = 2;   //multi thread
    private long totalBeforeTime = 0;
    
    @Bean
    public TaskExecutor orderDivisionExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(POOL_SIZE);    //Thread Pool ๊ธฐ๋ณธ ์‚ฌ์ด์ฆˆ, ์ตœ์ดˆ ์„ธํŒ… ์‚ฌ์ด์ฆˆ
        executor.setMaxPoolSize(POOL_SIZE);     //Thread Pool ์ตœ๋Œ€ ์‚ฌ์ด์ฆˆ
        executor.setThreadNamePrefix("order-division-thread-");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE); //์ž‘์—…์ด ์ข…๋ฃŒ๋˜๋ฉด Thread๋„ ์ข…๋ฃŒ
        executor.initialize();
        return executor;
    }

job์„ ์ง€์ •ํ•˜๋Š” ๊ณณ์— ๋‹ค์Œ๊ณผ ๊ฐ™์ด Bean์œผ๋กœ TaskExecutor๋ฅผ ๋งŒ๋“ค๊ณ  POOL_SIZE๋Š” ์ƒ์ˆ˜๋กœ ์›ํ•˜๋Š” ํฌ๊ธฐ๋งŒํผ์œผ๋กœ ์„ค์ •ํ•ด์ค€๋‹ค.

    @Bean
    @JobScope
    public Step orderProcessorStep() {
        return stepBuilderFactory.get("orderProcessorStep")
                .transactionManager(pt)
                .<Long, Long>chunk(CHUNK_SIZE)
                .reader(orderProcessorReader())
                .writer(orderProcessorWriter())
                .taskExecutor(orderDivisionExecutor())	//task ์„ค์ •
                .throttleLimit(POOL_SIZE)   //์ƒ์„ฑ๋œ Thread ์ค‘ ์‚ฌ์šฉํ•  Thread์˜ ๊ฐœ์ˆ˜
                .build();
    }

๊ทธ๋ฆฌ๊ณ  job์„ ๋งŒ๋“ค ๋•Œ writer ๋’ค์— ํ•ด๋‹น ์„ค์ •์„ ํ•ด์ค€๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์ค‘์š”ํ•œ๊ฒƒ์€ mulit thread๋กœ batch๋ฅผ ์ž‘๋™์‹œํ‚ค๋ฉด batch์— ์žฅ์  ์ค‘ ํ•˜๋‚˜์ธ ์‹คํ–‰์‹œ ์‹คํŒจํ–ˆ์„ ๊ฒฝ์šฐ ์‹คํŒจ ๋ถ€๋ถ„๋ถ€ํ„ฐ ์žฌ์‹œ์ž‘ํ•˜๋Š” ๊ฒƒ์„ ํ• ์ˆ˜ ์—†๊ฒŒ๋œ๋‹ค.
๊ทธ ์ด์œ ๋Š” ๋น„๋™๊ธฐ๋กœ ์ž‘์—…์ด ์ง„ํ–‰๋˜๋‹ค ๋ณด๋‹ˆ ์‹คํŒจ ์‹œ์ ์„ ์•Œ์•„๋„ ์ดํ›„์˜ ์ž‘์—…๋“ค์ด ์ •์ƒ ์ฒ˜๋ฆฌ๋˜์—ˆ์„์ง€ ์•ˆ๋˜์—ˆ์„์ง€ ๋ช…ํ™•ํ•˜๊ฒŒ ์•Œ ์ˆ˜ ์—†๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.
๊ทธ๋ž˜์„œ ์šฐ๋ฆฌ๋Š” Reader์—์„œ saveState๋ฅผ false๋กœ ์„ค์ •ํ•ด์ฃผ์–ด์•ผํ•œ๋‹ค.
๋”ฐ๋กœ ์„ค์ •์„ ์•ˆํ•ด๋„ ๋˜๋Š”์ง€๋Š” ๋ชจ๋ฅด๊ฒ ์ง€๋งŒ ํ•ด๋ณด์ž ํ•œ๋ฒˆ

QuerydslPagingItemReader

QuerydslPagingItemReader์˜ ๋ฉ”์„œ๋“œ ์ค‘์— save option์„ ์ œ๊ฑฐํ•˜๊ณ  ์ƒ์„ฑํ•˜๋Š” ์ƒ์„ฑ์ž๊ฐ€ ์—†๋”๋ผ ๊ทธ๋ž˜์„œ ์ƒ์† ๋ฐ›๊ณ  ์žˆ๋Š” AbstractPagingItemReader ๋‚ด๋ถ€๋ฅผ ์‚ดํŽด๋ดค๋‹ค.

๊ทธ๋Ÿฐ๋ฐ AbstractPagingItemReader ๋‚ด๋ถ€์—๋„ ๋”ฐ๋กœ ์„ค์ •ํ•˜๋Š” ์˜ต์…˜์ด ์—†๋”๋ผ ๊ทธ๋ž˜์„œ ๋˜ AbstractItemCountingItemStreamItemReader๋ฅผ ์‚ดํŽด๋ณด๋‹ˆ ๊ทธ์ œ์„œ์•ผ saveState๊ฐ€ ๋ณด์˜€๋‹ค.

๊ธฐ๋ณธ ๊ฐ’์ด true์ธ๊ฑธ๋กœ ๋ณด์•„ ์šฐ๋ฆฌ๊ฐ€ false๋กœ ๋ฐ”๊ฟ”์ค˜์•ผํ• ๊ฑฐ ๊ฐ™๋‹ค.

	public void setSaveState(boolean saveState) {
		this.saveState = saveState;
	}

	/**
	 * The flag that determines whether to save internal state for restarts.
	 * @return true if the flag was set
	 */
	public boolean isSaveState() {
		return saveState;
	}

๊ตฌํ˜„๋˜์–ด์žˆ๋Š” ๋ฉ”์„œ๋“œ ์ค‘ setSaveState()๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ’์„ false๋กœ ๋ฐ”๊ฟ”์„œ ์ƒ์„ฑํ•˜๋Š” ์ƒ์„ฑ์ž๋ฅผ ๊ตฌํ˜„ํ•ด๋ณด์•˜๋‹ค.

    //multi thread ์‚ฌ์šฉ์‹œ save option ์ œ๊ฑฐ
    public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory,
                                    int pageSize,
                                    int state,
                                    Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
        this();
        this.entityManagerFactory = entityManagerFactory;
        this.queryFunction = queryFunction;
        setPageSize(pageSize);
        setSaveState(state == 1 ? true : false);
    }

๊ธฐ์กด์˜ transacted ์„ค์ •ํ•˜๋Š” ์ƒ์„ฑ์ž๊ฐ€ ์กด์žฌํ•ด์„œ boolean ๊ฐ’์„ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜์ง„ ๋ชปํ•˜๊ณ  ๊ทธ๋ ‡๋‹ค๊ณ  ๋งค๊ฐœ๋ณ€์ˆ˜์˜ ์ˆœ์„œ๋ฅผ ๋ฐ”๊พธ์ž๋‹ˆ ๋‚˜์ค‘์— ๋” ํ—ท๊ฐˆ๋ฆด๊ฑฐ ๊ฐ™๊ณ  ๊ทธ๋ ‡๋‹ค๊ณ  ๋˜ enum์œผ๋กœ ์ •์˜ํ•ด๋†“์ž๋‹ˆ true false๋งŒ ๊ตฌ๋ถ„ํ•˜๋ฉด ๋˜๊ธฐ ๋•Œ๋ฌธ์— Integer์˜ 0๊ณผ 1๋กœ ๊ตฌ๋ถ„ํ•˜๋„๋ก ํ–ˆ๋‹ค.

    @Bean
    @StepScope
    public QuerydslPagingItemReader<? extends Long> orderProcessorReader() {
        return new QuerydslPagingItemReader<>(emf, CHUNK_SIZE, 0, queryFactory ->
                queryFactory
                        .select(order.idx)
                        .from(order)
                        .where(order.payWay.eq(8).and(order.ordStep.lt("200")))
        );
    }

๊ทธ๋ฆฌ๊ณ  ์ƒ์„ฑํ•  ๋•Œ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ 0์„ ์ „๋‹ฌํ•˜๋ฉด false๋กœ ์„ธํŒ…๋œ Reader๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ๊ฒŒ๋˜์—ˆ๋‹ค.

QuerydslNoOffsetPagingItemReader

QuerydslNoOffsetPagingItemReader์€ ์œ„ QuerydslPagingItemReader๋ฅผ ์ƒ์†ํ•˜์—ฌ ์ƒ์„ฑํ•˜๊ณ  ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ์œ„์—์„œ ๋งŒ๋“  ์ƒ์„ฑ์ž๋กœ ๋งŒ๋“œ๋Š” ์ƒ์„ฑ์ž๋ฅผ ๋˜ ์ถ”๊ฐ€ํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

    //multi thread ํ™˜๊ฒฝ์ผ ๊ฒฝ์šฐ saveState(false)
    public QuerydslNoOffsetPagingItemReader(EntityManagerFactory entityManagerFactory,
                                            int pageSize,
                                            QuerydslNoOffsetOptions<T> options,
                                            int state,
                                            Function<JPAQueryFactory, JPAQuery<T>> queryFunction) {
        super(entityManagerFactory, pageSize, state, queryFunction);
        setName(ClassUtils.getShortName(QuerydslNoOffsetPagingItemReader.class));
        this.options = options;
    }

๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋™์ผํ•˜๊ฒŒ ์ž‘์„ฑํ•ด์ฃผ๊ณ 

    @Bean
    @StepScope
    public QuerydslNoOffsetPagingItemReader<OrderDivision> orderDivisionSimpleReader() {

        QuerydslNoOffsetNumberOptions<OrderDivision, Long> options = new QuerydslNoOffsetNumberOptions<>(orderDivision.idx, Expression.ASC);

        return new QuerydslNoOffsetPagingItemReader<>(emf, CHUNK_SIZE, options, 0, queryFactory ->
               ...
        );
    }

๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ž‘์„ฑํ–ˆ๋‹ค.

Multi Thread ํ™˜๊ฒฝ์œผ๋กœ batch๋ฅผ ๋Œ๋ฆฌ๋Š”๊ฒƒ์„ ๊ถŒ์žฅํ•˜๋Š” ๊ฒƒ ๊ฐ™์ง€๋Š” ์•Š์œผ๋‚˜ ํšŒ์‚ฌ ์—…๋ฌด ์ฐจ์›์—์„œ ๋Œ€์šฉ๋Ÿ‰์ด์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ •ํ•ด์ง„ ์‹œ๊ฐ„ ๋‚ด์— ์ตœ๋Œ€ํ•œ ๋น ๋ฅด๊ฒŒ ๋งŽ์€ ์ž‘์—…์„ ํ•ด์•ผํ•  ์ผ์ด ์žˆ์–ด์„œ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค. ์ด๋™์šฑ๋‹˜๋„ ํ•ด๋‹น ๊ธฐ๋Šฅ์„ ๋„์ž…ํ•  ๋• ์ตœ๋Œ€ํ•œ ๊ณ ๋ฏผํ•ด๋ณด๋ผ๊ณ  ํ•˜์‹œ๋Š”๊ฑธ ๋ณด๋ฉด ์ •๋ง ๋น ๋ฅด๊ฒŒ ์ฒ˜๋ฆฌํ•ด์•ผํ•  ๊ฒฝ์šฐ์—๋งŒ ๋„์ž…ํ•˜๋Š”๊ฒŒ ์ข‹์„๊ฑฐ ๊ฐ™๋‹ค!

0๊ฐœ์˜ ๋Œ“๊ธ€