@Getter
@Setter
@ToString
@Entity
@Table(name = "pass")
public class PassEntity extends BaseEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY) // 기본 키 생성을 DB에 위임합니다. (AUTO_INCREMENT)
private Integer passSeq;
private Integer packageSeq;
private String userId;
@Enumerated(EnumType.STRING)
private PassStatus status;
private Integer remainingCount;
private LocalDateTime startedAt;
private LocalDateTime endedAt;
private LocalDateTime expiredAt;
}
@Slf4j
@Component
public class AddPassesTasklet implements Tasklet {
private final PassRepository passRepository;
private final BulkPassRepository bulkPassRepository;
private final UserGroupMappingRepository userGroupMappingRepository;
public AddPassesTasklet(PassRepository passRepository, BulkPassRepository bulkPassRepository, UserGroupMappingRepository userGroupMappingRepository) {
this.passRepository = passRepository;
this.bulkPassRepository = bulkPassRepository;
this.userGroupMappingRepository = userGroupMappingRepository;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
// 이용권 시작 일시 1일 전 user group 내 각 사용자에게 이용권을 추가해줍니다.
final LocalDateTime startedAt = LocalDateTime.now().minusDays(1);
final List<BulkPassEntity> bulkPassEntities = bulkPassRepository.findByStatusAndStartedAtGreaterThan(BulkPassStatus.READY, startedAt);
int count = 0;
for (BulkPassEntity bulkPassEntity : bulkPassEntities) {
// user group에 속한 userId들을 조회합니다.
final List<String> userIds = userGroupMappingRepository.findByUserGroupId(bulkPassEntity.getUserGroupId())
.stream().map(UserGroupMappingEntity::getUserId).toList();
// 각 userId로 이용권을 추가합니다.
count += addPasses(bulkPassEntity, userIds);
// pass 추가 이후 상태를 COMPLETED로 업데이트합니다.
bulkPassEntity.setStatus(BulkPassStatus.COMPLETED);
}
log.info("AddPassesTasklet - execute: 이용권 {}건 추가 완료, startedAt={}", count, startedAt);
return RepeatStatus.FINISHED;
}
// bulkPass의 정보로 pass 데이터를 생성합니다.
private int addPasses(BulkPassEntity bulkPassEntity, List<String> userIds) {
List<PassEntity> passEntities = new ArrayList<>();
for (String userId : userIds) {
PassEntity passEntity = PassModelMapper.INSTANCE.toPassEntity(bulkPassEntity, userId);
passEntities.add(passEntity);
}
return passRepository.saveAll(passEntities).size();
}
}
배치 처리과정이 쉬운 경우 쉽게 사용되며, 대량처리 경우 더 복잡해질 수 있다.
그르니까 소량 데이터일 때 Tasklet을 사용하자.
현재 사용하고 있는 실무에서도 대용량은 Chunk를 적극 사용하고 있다.
Tasklet으로 처리하면 전체를 한번에 처리하거나, 수동으로 N개씩 분할할 수 있다.
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
tasklet 인터페이스를 사용해서 개발자는 execute 메서드가 repeat status finished를 반환할 때까지 트랜젝션 범위내에서 반복적으로 실행되게 할 수 있다.
Job을 생성한다.
@Bean
public Job addPassesJob() {
return this.jobBuilderFactory.get("addPassesJob")
.start(addPassesStep())
.build();
}
Pass를 생성한다.
@Bean
public Step addPassesStep() {
return this.stepBuilderFactory.get("addPassesStep")
.tasklet(addPassesTasklet)
.build();
}
이 때 Tasklet 구현체를 주입받은후 인자로 넘겨준다.
Tasklet 인터페이스를 구현하고, execute 메서드를 오버라이딩
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
// 이용권 시작 일시 1일 전 user group 내 각 사용자에게 이용권을 추가해줍니다.
final LocalDateTime startedAt = LocalDateTime.now().minusDays(1);
final List<BulkPassEntity> bulkPassEntities = bulkPassRepository.findByStatusAndStartedAtGreaterThan(BulkPassStatus.READY, startedAt);
int count = 0;
for (BulkPassEntity bulkPassEntity : bulkPassEntities) {
// user group에 속한 userId들을 조회합니다.
final List<String> userIds = userGroupMappingRepository.findByUserGroupId(bulkPassEntity.getUserGroupId())
.stream().map(UserGroupMappingEntity::getUserId).toList();
// 각 userId로 이용권을 추가합니다.
count += addPasses(bulkPassEntity, userIds);
// pass 추가 이후 상태를 COMPLETED로 업데이트합니다.
bulkPassEntity.setStatus(BulkPassStatus.COMPLETED);
}
log.info("AddPassesTasklet - execute: 이용권 {}건 추가 완료, startedAt={}", count, startedAt);
return RepeatStatus.FINISHED;
}
// bulkPass의 정보로 pass 데이터를 생성합니다.
private int addPasses(BulkPassEntity bulkPassEntity, List<String> userIds) {
List<PassEntity> passEntities = new ArrayList<>();
for (String userId : userIds) {
PassEntity passEntity = PassModelMapper.INSTANCE.toPassEntity(bulkPassEntity, userId);
passEntities.add(passEntity);
}
return passRepository.saveAll(passEntities).size();
}
tasklet 인터페이스를 사용해서 개발자는 execute 메서드가 repeat status finished를 반환할 때까지 트랜젝션 범위내에서 반복적으로 실행되게 할 수 있다.
← 이 말인 즉슨 메서드 시작부터 return RepeatStatus.FINISHED;
사이의 로직들을 for문등을 통해서 반복적으로 실행되게 할 수 있다는 것이다.
package com.fastcampus.pass.job.pass;
import com.fastcampus.pass.repository.pass.*;
import com.fastcampus.pass.repository.user.UserGroupMappingEntity;
import com.fastcampus.pass.repository.user.UserGroupMappingRepository;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import java.time.LocalDateTime;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
@Slf4j
@ExtendWith(MockitoExtension.class) // JUnit5
public class AddPassesTaskletTest {
@Mock
private StepContribution stepContribution;
@Mock
private ChunkContext chunkContext;
@Mock
private PassRepository passRepository;
@Mock
private BulkPassRepository bulkPassRepository;
@Mock
private UserGroupMappingRepository userGroupMappingRepository;
// @InjectMocks 클래스의 인스턴스를 생성하고 @Mock으로 생성된 객체를 주입합니다.
@InjectMocks
private AddPassesTasklet addPassesTasklet;
@Test
public void test_execute() {
// given
final String userGroupId = "GROUP";
final String userId = "A1000000";
final Integer packageSeq = 1;
final Integer count = 10;
final LocalDateTime now = LocalDateTime.now();
final BulkPassEntity bulkPassEntity = new BulkPassEntity();
bulkPassEntity.setPackageSeq(packageSeq);
bulkPassEntity.setUserGroupId(userGroupId);
bulkPassEntity.setStatus(BulkPassStatus.READY);
bulkPassEntity.setCount(count);
bulkPassEntity.setStartedAt(now);
bulkPassEntity.setEndedAt(now.plusDays(60));
final UserGroupMappingEntity userGroupMappingEntity = new UserGroupMappingEntity();
userGroupMappingEntity.setUserGroupId(userGroupId);
userGroupMappingEntity.setUserId(userId);
// when
when(bulkPassRepository.findByStatusAndStartedAtGreaterThan(eq(BulkPassStatus.READY), any())).thenReturn(List.of(bulkPassEntity));
when(userGroupMappingRepository.findByUserGroupId(eq("GROUP"))).thenReturn(List.of(userGroupMappingEntity));
RepeatStatus repeatStatus = addPassesTasklet.execute(stepContribution, chunkContext);
// then
// execute의 return 값인 RepeatStatus 값을 확인합니다.
assertEquals(RepeatStatus.FINISHED, repeatStatus);
// 추가된 PassEntity 값을 확인합니다.
ArgumentCaptor<List> passEntitiesCaptor = ArgumentCaptor.forClass(List.class);
verify(passRepository, times(1)).saveAll(passEntitiesCaptor.capture());
final List<PassEntity> passEntities = passEntitiesCaptor.getValue();
assertEquals(1, passEntities.size());
final PassEntity passEntity = passEntities.get(0);
assertEquals(packageSeq, passEntity.getPackageSeq());
assertEquals(userId, passEntity.getUserId());
assertEquals(PassStatus.READY, passEntity.getStatus());
assertEquals(count, passEntity.getRemainingCount());
}
}