
Spring Batch를 적용해보았다. 적용 과정을 기록한다.
이 글에서 나는 이론적인 부분보다 간략하게 코드에 대한 정리를 남길 것이다.
implementation 'org.springframework.boot:spring-boot-starter-batch'
build.gradle에 다음과 같이 spring batch 의존성을 부여한다.
이렇게 하면 가장 최신 버전이 설치되게 된다.
Version은 5.x를 사용한다.
Spring Batch 5.x에서는 StepBuilderFactory, JobBuilderFactory가 Deprecated 되었다.
해당 Factory를 사용하여 Step과 Job을 구성하는 건 일반적으로 사용하는 방식이다.
하지만 나는 첫 작업을 할 때부터 Spring Batch 5.x를 썼기에 Migration을 하지 않아서 직접적으로 체감은 하지 못했다.
사용자가 문서를 읽어 인지하지 않는 한 Builder에서 JobRepository가 생성되고 설정된다는 사실을 숨기고 있다고 한다.
그래서 아래와 같이 JobRepository를 명시적으로 제공하는 방식을 사용하길 권장한다.
그러면서 transactionManager 또한 위와 같은 이유로 사용한다고 한다.
@Bean
public Job refreshTokenJob() {
return new JobBuilder("refreshTokenJob", jobRepository)
.start(refreshTokenStep())
.build();
}
대제인 Job을 먼저 형성한다.
이 처리할 Job의 이름은 refreshTokenJob 이다.
Job의 진행은 Step에서 정의하며 Chunk, Tasklet 방식이 있다.
@Bean
public Step refreshTokenStep() {
return new StepBuilder("refreshTokenStep", jobRepository)
.<Account, Account>chunk(10, transactionManager)
.reader(accountReader())
.processor(tokenRefreshProcessor())
.writer(accountWriter())
.faultTolerant()
.retry(Exception.class)
.retryLimit(3)
.build();
}
Step에서는 이 Job이 돌아갈 순서를 정의한다.
Account Entity를 사용하며 chunk 지향 처리를 하기로 했다.
Chunk의 size를 10으로 정했다.
.<Account, Account>chunk(10, transactionManager)
이 코드가 사실 이해가 잘 안갈 수 있다.
제네릭에 같은 타입을 2개를 넣어두었다. 이것은 무엇을 뜻할까?
.<Account, Account>chunk(10, transactionManager)는 Spring Batch에서 chunk 기반 처리 방식을 설정하는 부분이다.
<Account, Account>이 부분은 제네릭을 사용하여 해당 Step의 입력과 출력 타입을 지정합한다.
Account: Reader가 읽어오는 데이터 타입.Account 객체를 읽어온다.Account: Processor가 처리한 후 Writer로 전달할 데이터 타입.Account 객체를 그대로 사용한다.즉, Account 객체를 읽고, 처리한 후 다시 Account 타입으로 기록한다는 의미이다.
chunk(10, transactionManager)chunk()는 Spring Batch의 chunk 기반 처리 방식을 설정하는 메서드이다.
Chunk 데이터를 일정한 단위로 나누어 한 번에 처리하는 방식을 말한다.
청크 크기만큼 데이터를 읽고, 처리하고, 쓰는 작업이 트랜잭션 내에서 일어난다.
10: 청크의 크기.
즉, 한 번에 10개의 Account 데이터를 읽고, 처리하고, 기록한다.
transactionManager: 트랜잭션 관리자를 지정한다.
청크 단위의 처리가 트랜잭션 내에서 이루어지며, 이 트랜잭션 관리자가 트랜잭션을 관리한다.
만약 예외가 발생하면 트랜잭션이 롤백되고, 청크 단위로 재시도가 이루어질 수 있다.
Chunk 지향 처리 이유
@Bean
public RepositoryItemReader<Account> accountReader() {
return new RepositoryItemReaderBuilder<Account>()
.name("accountReader")
.repository(accountRepository)
.methodName("findByAccessTokenFetchedAtBefore")
.arguments(LocalDateTime.now().minusMinutes(50))
.pageSize(10)
.sorts(Collections.singletonMap("id", Sort.Direction.ASC))
.build();
}
Batch 작업할 Account를 읽어오는 부분이다.
accountRepository에서 findByAccessTokenFetchedAtBefore 에 메서드에 해당하는 Account들을 읽어온다.
이는 date가 필요한데 이때 arguments로 설정 값을 넣어준다.
@Bean
public ItemProcessor<Account, Account> tokenRefreshProcessor() {
return account -> {
try {
Map<String, String> newTokens = googleOAuthUtils.refreshAccessToken(account.getRefreshToken());
String newAccessToken = newTokens.get("access_token");
account.setAccessToken(newAccessToken);
account.setAccessTokenFetchedAt(LocalDateTime.now());
return account;
} catch (Exception e) {
throw e;
}
};
}
핵심 비즈니스 로직을 처리하는 부분이라고 할 수 있다.
이는 기존에 작성해두었던 GoogleOAuthUtils 클래스에서 새 액세스 토큰을 발급 받는 과정을 작성하였다.
새 액세스 토큰을 발급하고 그 액세스 토큰이 발급된 시간을 기록한 account를 반환한다.
@Bean
public ItemWriter<Account> accountWriter() {
return accounts -> {
for (Account account : accounts) {
if (account != null) {
accountRepository.save(account);
} else {
}
}
};
}
위에서 반환된 account를 DB에 덮어씌워주는 마지막 작업을 한다.
딱히 특별히 큰 기능은 없다.
명시적으로 Step에 맞춰 그에 주어진 역할에 맞춰 코드를 작성했다.