Batch 기본 Task 방식

Kevin·2024년 1월 23일
0

Spring Batch

목록 보기
3/3
post-thumbnail

전체 코드

@Getter
@Setter
@ToString
@Entity
@Table(name = "pass")
public class PassEntity extends BaseEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY) // 기본 키 생성을 DB에 위임합니다. (AUTO_INCREMENT)
    private Integer passSeq;
    private Integer packageSeq;
    private String userId;

    @Enumerated(EnumType.STRING)
    private PassStatus status;
    private Integer remainingCount;

    private LocalDateTime startedAt;
    private LocalDateTime endedAt;
    private LocalDateTime expiredAt;

}
@Slf4j
@Component
public class AddPassesTasklet implements Tasklet {
    private final PassRepository passRepository;
    private final BulkPassRepository bulkPassRepository;
    private final UserGroupMappingRepository userGroupMappingRepository;

    public AddPassesTasklet(PassRepository passRepository, BulkPassRepository bulkPassRepository, UserGroupMappingRepository userGroupMappingRepository) {
        this.passRepository = passRepository;
        this.bulkPassRepository = bulkPassRepository;
        this.userGroupMappingRepository = userGroupMappingRepository;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        // 이용권 시작 일시 1일 전 user group 내 각 사용자에게 이용권을 추가해줍니다.
        final LocalDateTime startedAt = LocalDateTime.now().minusDays(1);
        final List<BulkPassEntity> bulkPassEntities = bulkPassRepository.findByStatusAndStartedAtGreaterThan(BulkPassStatus.READY, startedAt);

        int count = 0;
        for (BulkPassEntity bulkPassEntity : bulkPassEntities) {
            // user group에 속한 userId들을 조회합니다.
            final List<String> userIds = userGroupMappingRepository.findByUserGroupId(bulkPassEntity.getUserGroupId())
                    .stream().map(UserGroupMappingEntity::getUserId).toList();

            // 각 userId로 이용권을 추가합니다.
            count += addPasses(bulkPassEntity, userIds);
            // pass 추가 이후 상태를 COMPLETED로 업데이트합니다.
            bulkPassEntity.setStatus(BulkPassStatus.COMPLETED);

        }
        log.info("AddPassesTasklet - execute: 이용권 {}건 추가 완료, startedAt={}", count, startedAt);
        return RepeatStatus.FINISHED;

    }

    // bulkPass의 정보로 pass 데이터를 생성합니다.
    private int addPasses(BulkPassEntity bulkPassEntity, List<String> userIds) {
        List<PassEntity> passEntities = new ArrayList<>();
        for (String userId : userIds) {
            PassEntity passEntity = PassModelMapper.INSTANCE.toPassEntity(bulkPassEntity, userId);
            passEntities.add(passEntity);

        }
        return passRepository.saveAll(passEntities).size();

    }

}

Tasklet은 데이터 처리과정이 tasklet안에서 한번에 이뤄진다.

배치 처리과정이 쉬운 경우 쉽게 사용되며, 대량처리 경우 더 복잡해질 수 있다.

그르니까 소량 데이터일 때 Tasklet을 사용하자.

현재 사용하고 있는 실무에서도 대용량은 Chunk를 적극 사용하고 있다.

Tasklet으로 처리하면 전체를 한번에 처리하거나, 수동으로 N개씩 분할할 수 있다.

public interface Tasklet {

	@Nullable
	RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;

}

tasklet 인터페이스를 사용해서 개발자는 execute 메서드가 repeat status finished를 반환할 때까지 트랜젝션 범위내에서 반복적으로 실행되게 할 수 있다.

배치 작업 순서

  1. Job을 생성한다.

    @Bean
        public Job addPassesJob() {
            return this.jobBuilderFactory.get("addPassesJob")
                    .start(addPassesStep())
                    .build();
        }

  1. Pass를 생성한다.

    @Bean
        public Step addPassesStep() {
            return this.stepBuilderFactory.get("addPassesStep")
                    .tasklet(addPassesTasklet)
                    .build();
        }

    이 때 Tasklet 구현체를 주입받은후 인자로 넘겨준다.


  1. Tasklet 인터페이스를 구현하고, execute 메서드를 오버라이딩

    @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
            // 이용권 시작 일시 1일 전 user group 내 각 사용자에게 이용권을 추가해줍니다.
            final LocalDateTime startedAt = LocalDateTime.now().minusDays(1);
            final List<BulkPassEntity> bulkPassEntities = bulkPassRepository.findByStatusAndStartedAtGreaterThan(BulkPassStatus.READY, startedAt);
    
            int count = 0;
            for (BulkPassEntity bulkPassEntity : bulkPassEntities) {
                // user group에 속한 userId들을 조회합니다.
                final List<String> userIds = userGroupMappingRepository.findByUserGroupId(bulkPassEntity.getUserGroupId())
                        .stream().map(UserGroupMappingEntity::getUserId).toList();
    
                // 각 userId로 이용권을 추가합니다.
                count += addPasses(bulkPassEntity, userIds);
                // pass 추가 이후 상태를 COMPLETED로 업데이트합니다.
                bulkPassEntity.setStatus(BulkPassStatus.COMPLETED);
    
            }
            log.info("AddPassesTasklet - execute: 이용권 {}건 추가 완료, startedAt={}", count, startedAt);
            return RepeatStatus.FINISHED;
    
        }
    
    		// bulkPass의 정보로 pass 데이터를 생성합니다.
        private int addPasses(BulkPassEntity bulkPassEntity, List<String> userIds) {
            List<PassEntity> passEntities = new ArrayList<>();
            for (String userId : userIds) {
                PassEntity passEntity = PassModelMapper.INSTANCE.toPassEntity(bulkPassEntity, userId);
                passEntities.add(passEntity);
    
            }
            return passRepository.saveAll(passEntities).size();
    
        }

    tasklet 인터페이스를 사용해서 개발자는 execute 메서드가 repeat status finished를 반환할 때까지 트랜젝션 범위내에서 반복적으로 실행되게 할 수 있다.

    ← 이 말인 즉슨 메서드 시작부터 return RepeatStatus.FINISHED; 사이의 로직들을 for문등을 통해서 반복적으로 실행되게 할 수 있다는 것이다.

    Test Code

    package com.fastcampus.pass.job.pass;
    
    import com.fastcampus.pass.repository.pass.*;
    import com.fastcampus.pass.repository.user.UserGroupMappingEntity;
    import com.fastcampus.pass.repository.user.UserGroupMappingRepository;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.extension.ExtendWith;
    import org.mockito.ArgumentCaptor;
    import org.mockito.InjectMocks;
    import org.mockito.Mock;
    import org.mockito.junit.jupiter.MockitoExtension;
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.repeat.RepeatStatus;
    
    import java.time.LocalDateTime;
    import java.util.List;
    
    import static org.junit.jupiter.api.Assertions.assertEquals;
    import static org.mockito.ArgumentMatchers.*;
    import static org.mockito.Mockito.*;
    
    @Slf4j
    @ExtendWith(MockitoExtension.class) // JUnit5
    public class AddPassesTaskletTest {
        @Mock
        private StepContribution stepContribution;
    
        @Mock
        private ChunkContext chunkContext;
    
        @Mock
        private PassRepository passRepository;
    
        @Mock
        private BulkPassRepository bulkPassRepository;
    
        @Mock
        private UserGroupMappingRepository userGroupMappingRepository;
    
        // @InjectMocks 클래스의 인스턴스를 생성하고 @Mock으로 생성된 객체를 주입합니다.
        @InjectMocks
        private AddPassesTasklet addPassesTasklet;
    
        @Test
        public void test_execute() {
            // given
            final String userGroupId = "GROUP";
            final String userId = "A1000000";
            final Integer packageSeq = 1;
            final Integer count = 10;
    
            final LocalDateTime now = LocalDateTime.now();
    
            final BulkPassEntity bulkPassEntity = new BulkPassEntity();
            bulkPassEntity.setPackageSeq(packageSeq);
            bulkPassEntity.setUserGroupId(userGroupId);
            bulkPassEntity.setStatus(BulkPassStatus.READY);
            bulkPassEntity.setCount(count);
            bulkPassEntity.setStartedAt(now);
            bulkPassEntity.setEndedAt(now.plusDays(60));
    
            final UserGroupMappingEntity userGroupMappingEntity = new UserGroupMappingEntity();
            userGroupMappingEntity.setUserGroupId(userGroupId);
            userGroupMappingEntity.setUserId(userId);
    
            // when
            when(bulkPassRepository.findByStatusAndStartedAtGreaterThan(eq(BulkPassStatus.READY), any())).thenReturn(List.of(bulkPassEntity));
            when(userGroupMappingRepository.findByUserGroupId(eq("GROUP"))).thenReturn(List.of(userGroupMappingEntity));
    
            RepeatStatus repeatStatus = addPassesTasklet.execute(stepContribution, chunkContext);
    
            // then
            // execute의 return 값인 RepeatStatus 값을 확인합니다.
            assertEquals(RepeatStatus.FINISHED, repeatStatus);
    
            // 추가된 PassEntity 값을 확인합니다.
            ArgumentCaptor<List> passEntitiesCaptor = ArgumentCaptor.forClass(List.class);
            verify(passRepository, times(1)).saveAll(passEntitiesCaptor.capture());
            final List<PassEntity> passEntities = passEntitiesCaptor.getValue();
    
            assertEquals(1, passEntities.size());
            final PassEntity passEntity = passEntities.get(0);
            assertEquals(packageSeq, passEntity.getPackageSeq());
            assertEquals(userId, passEntity.getUserId());
            assertEquals(PassStatus.READY, passEntity.getStatus());
            assertEquals(count, passEntity.getRemainingCount());
    
        }
    
    }
profile
Hello, World! \n

0개의 댓글