위에서 살펴본 스프링 부트 배치 컴포넌트들을 이용하여 커뮤니티 사이트에 가입한 회원 중 1년이 지나도록 상태 변화가 없는 회원을 휴먼회원으로 전환하는 배치 예제코드를 작성하였습니다.
의존성 라이브러리들은 Spring Data JPA, H2, Lombok, Spring Batch starter 시리지들로 선택하였습니다.
- H2 DB에 저장된 데이터 중 1년간 업데이트 되지 않은 사용자를 찾는 로직을 ItemReader로 구현합니다.
- 대상 사용자 데이터의 상태 값을 휴먼회원으로 전환하는 프로세스를 ItemProcessor에 구현합니다.
- 상태 값이 변한 휴먼회원을 실제로 DB에 저장하는 ItemWriter를 구현합니다.
buildscript {
ext {
springBootVersion = '2.3.0.RELEASE'
gradle_node_version='2.2.4'
}
repositories {
mavenCentral()
maven {
url "https://plugins.gradle.org/m2/"
}
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
subprojects {
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.junyoung'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-batch')
runtime('com.h2database:h2')
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile('org.springframework.batch:spring-batch-test')
}
test {
useJUnitPlatform()
}
}
먼저 휴먼회원 배치 처리에 사용될 도메인을 작성합니다. 객체 명은 User이고, 휴먼 여부를 판별하는 UserStatus Enum을 추가하였습니다. ACTIVE는 활성회원, INACTIVE는 휴먼회원입니다.
public enum UserStatus {
ACTIVE, INACTIVE;
}
public enum Grade {
VIP, GOLD, FAMILY;
}
@NoArgsConstructor
@Entity
@Table
public class User implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long idx;
@Column
private String name;
@Column
private String password;
@Column
private String email;
@Column
private String principle;
@Column
@Enumerated(EnumType.STRING)
private SocialType socialType;
@Column
@Enumerated(EnumType.STRING)
private UserStatus status;
@Column
@Enumerated(EnumType.STRING)
private Grade grade;
@Column
private LocalDateTime createdDate;
@Column
private LocalDateTime updatedDate;
@Builder
public User(String name, String password, String email, String principle,
SocialType socialType, UserStatus status, Grade grade, LocalDateTime createdDate,
LocalDateTime updatedDate) {
this.name = name;
this.password = password;
this.email = email;
this.principle = principle;
this.socialType = socialType;
this.status = status;
this.grade = grade;
this.createdDate = createdDate;
this.updatedDate = updatedDate;
}
public User setInactive() {
status = UserStatus.INACTIVE;
return this;
}
}
public interface UserRepository extends JpaRepository<User, Long> {
List<User> findByUpdatedDateBeforeAndStatusEquals(LocalDateTime localDateTime, UserStatus status);
}
findByUpdatedDateBeforeAndStatusEquals()
메서드는 인자 값으로 LocalDateTime, 즉 현재 기준 날짜 값보다 1년 전의 날짜 값을 받고 두 번째 인자값으로 UserStatus 타입을 받아 쿼리를 실행하는 메서드 입니다.
먼저, 스프링 부트를 실행하는 Entry 포인트인 BatchApplication 파일에서 아래와 같이 @EnableBatchProcessing
을 활성화 시켜야 합니다.
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
EnableBatchProcessing
를 적용해야 배치 작업에 필요한 빈을 미리 등록하여 사용할 수 있습니다.
배치 정보는 아래 @Configuration 어노테이션을 사용하는 설정 클래스에서 빈으로 등록합니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {
private final UserRepository userRepository;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job inactiveUserJob() {
// (1) JobBuilderFactory 주입
return jobBuilderFactory.get("inactiveUserJob3")
// (2) Job의 재실행 방지
.preventRestart()
.start(inactiveJobStep(null))
.build();
}
}
- (1)는 Job 생성을 직관적이고 편리하게 도와주는 빌더인 JobBuilderFactory를 주입하였습니다.
- (2)는 preventRestart()는 Job의 재실행을 막습니다.
위에 Job 설정을 완료하였고, 이제 Step을 설정하겠습니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {
@Bean
public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
log.info("requestDate: {}", requestDate);
// (1) StepBuilderFactory 주입
return stepBuilderFactory.get("inactiveUserStep")
// (2) chunk 사이즈 입력
.<User, User>chunk(10)
// (3) reader, processor, writer를 각각 설정
.reader(inactiveUserReader())
.processor(inactiveUserProcessor())
.writer(inactiveUserWriter())
.build();
}
}
- (1)은 StepBuilderFactory의 get("inactiveUserStep")은
inactiveUserStep
이라는 이름의 StepBuilder를 생성합니다.- (2)는 제네릭을 사용해 chunk의 입력 타입과 출력 타입을 User 타입으로 설정하였습니다. 인자 값은 10으로 설정했는데 쓰기 시에 청크 단위로 묶어서 writer() 메서드를 실행시킬 단위를 지정하였습니다. 즉, 커밋의 다누이가 10개입니다.
- (3)은 reader, processor, writer를 각각 설정하였습니다.
여기서... Chunk가 무엇일까요?
Chunk는 덩어리라는 뜻으로 Spring Batch에서 각 커밋 사이에 처리되는 row 수를 애기합니다. 만약에 Chunk로 처리하지 않을 경우에 DB에서 데이터가 1000개인 로우를 읽어와서 배치처리를 하는 경우를 생각할 수 있습니다.
배치처리를 하는 중에 1개의 데이터를 저장하는데 문제가 생기면 나머지 999개의 데이터도 rollback
처리를 해야됩니다. 이러한 문제를 방지하기 위해서 Chunk 지향 프로세스
방식으로 스프링 부트에서 배치 실행을 지원하고 있습니다.
즉, Chunk 단위로 트랜잭션을 수행하기 때문에 실패하는 경우 해당 Chunk 만큼 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다는 것입니다.
Chunk는 스프링 부트 배치를 잘 사용하기 위해서 반드시 알아둬야 하는 개념이라고 생각됩니다.
아래 예시코드는 jojoldu님
이 블로그에서 가져온 Chunk 지향 처리를 Java 코드로 표현하는 것입니다.
for (int i = 0; i < totalSize; i += chunkSize) {
List<Item> items = new ArrayList<> ();
for (int j = 0; j < chunkSize; j++) {
Object item = itemReader.read();
Object processedItem = item.Processor.process(item);
items.add(processedItem);
}
itemWriter.write(items);
}
Chunk 단위로 reader, process, writer로 처리하기 때문에 만약 chunkSize가 10일 경우에 process나 write에서 예외가 발생한다면 전부 rollback 되고, 그 다음 chunkSize만큼 배치가 처리 됩니다. 참조 블로그
인터페이스인 ItemReader(데이터를 읽어오는 역할)
를 설정한 부분입니다. 여기서는 ItemReader 인터페이스를 구현한 QueueItemReader 구현체를 리턴하고 있습니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {
private final UserRepository userRepository;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
@StepScope // (1) Step의 주기에 따라 새로운 빈 생성
public QueueItemReader<User> inactiveUserReader() {
List<User> oldUsers =
userRepository.findByUpdatedDateBeforeAndStatusEquals(
LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
return new QueueItemReader<>(oldUsers);
}
}
- 기본 빈 생성은 싱글턴 방식이지만, (1)에서
@StepScope
를 사용하면 해당 메서드는 Step 주기에 따라 새로운 빈을 생성합니다. 즉, 각 Step의 실행마다 새로운 빈을 만들기 때문에지연 생성
이 가능합니다. @StepScope는 기본 프록시 모드가 반환되는 클래스 타입을 참조하기 때문에@StepScope를 사용하면 반드시 구현된 반환 타입을 명시해 반환해야 합니다.
위 예제에서는 QueueItemReader라고 명시했습니다.findByUpdatedDateBeforeAndStatusEquals()
메서드로 현재 날짜 기준 1년 전의 날짜값과 User의 상태값이 ACTIVE인 User 리스트를 조회하고, QueueItemReader 객체 생성 시 파라미터로 넣어서 Queue에 담도록 하고 있습니다.
public class QueueItemReader<T> implements ItemReader<T> {
private Queue<T> queue;
public QueueItemReader(List<T> data) {
this.queue = new LinkedList<>(data);
}
@Override
public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return this.queue.poll();
}
}
위 코드를 보면 QueItemReader를 사용해 휴먼회원으로 지정될 타깃 데이터를 한번에 불러와 큐에 담아놓습니다.
그리고 read() 메서드를 호출할 때 큐의 poll() 메서드를 사용하여 큐에서 데이터를 하나씩 반환합니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {
...
public ItemProcessor<User, User> inactiveUserProcessor() {
return new ItemProcessor<User, User>() {
@Override
public User process(User user) throws Exception {
return user.setInactive();
}
};
}
}
ItemReader에서 읽은 User를 휴먼 상태로 전환하는 processor 메서드를 추가하는 예입니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {
...
public ItemWriter<User> inactiveUserWriter() {
return ((List<? extends User> users) -> userRepository.saveAll(users));
}
}
@Slf4j
@RequiredArgsConstructor
@Configuration
public class InactiveUserJobConfig {
private final UserRepository userRepository;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job inactiveUserJob() {
return jobBuilderFactory.get("inactiveUserJob3")
.preventRestart()
.start(inactiveJobStep(null))
.build();
}
@Bean
public Step inactiveJobStep(@Value("#{jobParameters[requestDate]}") final String requestDate) {
log.info("requestDate: {}", requestDate);
return stepBuilderFactory.get("inactiveUserStep")
.<User, User>chunk(10)
.reader(inactiveUserReader())
.processor(inactiveUserProcessor())
.writer(inactiveUserWriter())
.build();
}
@Bean
@StepScope
public QueueItemReader<User> inactiveUserReader() {
List<User> oldUsers =
userRepository.findByUpdatedDateBeforeAndStatusEquals(
LocalDateTime.now().minusYears(1), UserStatus.ACTIVE);
return new QueueItemReader<>(oldUsers);
}
public ItemProcessor<User, User> inactiveUserProcessor() {
return new org.springframework.batch.item.ItemProcessor<User, User>() {
@Override
public User process(User user) throws Exception {
return user.setInactive();
}
};
}
public ItemWriter<User> inactiveUserWriter() {
return ((List<? extends User> users) -> userRepository.saveAll(users));
}
}
ItemWriter는 리스트 타입을 앞서 설정한 Chunk 단위
로 받습니다. Chunk 단위를 10으로 설정했으므로 users에는 휴먼회원 10개 가 주어지며 saveAll() 메서드를 사용해서 한번에 DB에 저장합니다.
실제 배치를 실행하기 전에 테스트 할 데이터를 만들기 위해 SQL 쿼리 파일을 생성하여 실행하였습니다.
기본적으로 /resources 하위 경로에 import.sql 파일을 생성하면 스프링 부트(정확히는 하이버네이트)가 실행될 때 자동으로 해당 파일의 쿼리를 실행합니다.
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1001, 'test@test.com', 'test1', 'test1', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1002, 'test@test.com', 'test2', 'test2', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2018-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1003, 'test@test.com', 'test3', 'test3', 'FACEBOOK', 'ACTIVE', 'VIP', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1004, 'test@test.com', 'test4', 'test4', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1005, 'test@test.com', 'test5', 'test5', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1006, 'test@test.com', 'test6', 'test6', 'FACEBOOK', 'ACTIVE', 'GOLD', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1007, 'test@test.com', 'test7', 'test7', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1008, 'test@test.com', 'test8', 'test8', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1009, 'test@test.com', 'test9', 'test9', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1010, 'test@test.com', 'test10', 'test10', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
insert into user (idx, email, name, password, social_type, status, grade, created_date, updated_date) values (1011, 'test@test.com', 'test11', 'test11', 'FACEBOOK', 'ACTIVE', 'FAMILY', '2016-03-01T00:00:00', '2016-03-01T00:00:00');
휴먼회원 배치 실행 결과 UPDATE_DATE 컬럼 값이 현재 시점에서 1년전이고 상태값이 ACTIVE인 회원들의 상태 값이 INAVTIVE로 변경되는 것을 확인할 수 있었습니다.
참조: https://jojoldu.tistory.com/331?category=902551, https://github.com/young891221/Spring-Boot-Community-Batch