이 포스팅은 gemini의 도움을 받았음을 미리 밝힙니다.
지금은 프로젝트 규모가 작아 유저 관리가 매우 쉽습니다.
가입일 기준 또는 유튜브 접근 권한 기준으로 이 유저가 활성화 된 상태인지 아닌지를 확인할 수 있습니다.
하지만 유저의 수가 급격히 늘어난다면요? 막 200명, 300명 이렇게 된다면요?(희망사항) 이때는 DB를 계속 들여다보면서 할 수 있을까요?
이러한 상황이 오기 전 미리 유저 정리 로직을 짜면 더 좋겠다는 마음에 batch + cron을 도입하게 되었습니다.
Spring docs에 보면 아래와 같이 적혀있습니다.
Spring Batch는 로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기 및 리소스 관리를 포함하여 많은 양의 레코드를 처리하는 데 필수적인 재사용 가능한 기능을 제공합니다.
여기서 가장 중요한 것은 많은 양의 레코드(데이터)를 재사용 가능한 기능으로 처리할 수 있다
입니다. 데이터가 얼마나 크든 가벼운 프레임워크라 간단히 처리할 수 있다는 것이 장점이죠.
@Bean
ItemReader<From> itemReader()
@Bean
ItempProcessor<From, To> itemPrcessor()
@Bean
ItemWriter<To> itemWriter()
@Bean
Step step(ItemReacer, ItemProcessor, ItemWriter)
@Bean
Job job(Step)
수행하는 Job에 따라 다를 수 있겠지만, 주로 아래와 같이 실행됩니다. 하나의 job에는 여러 step이 있을 수 있고, 이 step들은 주로 2가지로 구성됩니다.
Job
└── Step(s)
├── Chunk-oriented Processing
│ ├── ItemReader
│ ├── ItemProcessor (Optional)
│ └── ItemWriter
└── Tasklet
그리고 하나의 Job은 아래와 같이 여러 step으로 구성될 수 있습니다.
new JobBuilder(JOB_NAME, jobRepository)
.start(step1)
.next(step2)
.next(step3)
...
.build();
from
, on
, to
등을 활용하여 더 복잡하게 할 수도 있죠!
new JobBuilder("conditionalEndJob", jobRepository)
.start(processStep)
.on("COMPLETED").to(successStep) // processStep 성공 시 successStep으로
.on("FAILED").to(failureStep) // processStep 실패 시 failureStep으로
.from(successStep)
.end() // successStep이 끝나면 Job 성공적으로 종료
.from(failureStep)
.fail() // failureStep이 끝나면 Job을 FAILED 상태로 종료
.build();
당신이 생각하는 모든 것을 할 수 있습니다. 예를 들어보죠.
당신은 Google OAuth로만 사이트를 로그인하게 할 수 있게 페이지를 구성하였습니다.
하지만 모종의 이유로 유저가 OAuth 권한을 회수했습니다!
하지만 데이터베이스에는 해당 유저의 정보가 남아있군요...
만약 이 유저가 다시 로그인을 한다면 회원가입 페이지로 가야할까요? 아니면 기존의 정보를 그대로 써야할까요?
이 경우에는 어떻게 할 수 있을까요?
위의 사례에서는
이와 같은 비즈니스 로직이 필요할 수 있습니다.
또 다른 예를 들어보죠.
유저가 Google 프로필 이미지와 닉네임을 수정하였습니다.
우리 사이트는 이 프로필 이미지와 닉네임을 바탕으로 유저 프로필이 생성됩니다.
그런데 최신화가 되지 않았네요? 어떻게 해야할까요?
매일매일 쌓이는 로그들을 다른 곳으로 옮기고 이 로그들을 삭제하려고 합니다.
물론 수작업으로 할 수도 있지만, 인력 낭비가 너무 심해요.
위와 같은 단순 반복 노동을 없앨 수 있다는 장점이 있습니다!
실제 사용하는 코드의 일부를 가져왔습니다.
@Bean
// 쿼리 시 매번 달라져야 하는 값이 있다면 StepScope를 활용하여 값을 수정하여 쿼리하자!
// 물론 매개변수로 받아서 처리하는게 가장 좋긴 합니다.
@StepScope
public JpaPagingItemReader<Member> memberReader() {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("cutoffTime", LocalDateTime.now().minusMinutes(30));
return new JpaPagingItemReaderBuilder<Member>()
.name("memberReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(100) // 대량의 데이터를 batch 처리!
.queryString("SELECT m FROM Member m WHERE m.createdAt <= :cutoffTime ORDER BY m.createdAt ASC")
.parameterValues(parameterValues)
.build();
}
process는 service 등과 같은 비즈니스 로직을 담고있습니다.
public MemberToBatchDao toBatchDao() {
return MemberToBatchDao.builder()
.id(id)
.build();
}
@Bean
public ItemProcessor<Member, MemberToBatchDao> memberItemProcessor() {
return member -> {
Map<String, Object> requestBody = new HashMap<>(baseBody);
try {
...
if (!hasYoutubeAccess(scope)) throw new Exception();
return null; // null이면 관심사 분리로 인해 write로 넘어가지 않음!
} catch (Exception e) {
return DeleteTargetMemberDAO.builder().id(member.getId()).build();
}
};
}
처리한 데이터를 어디에 저장할 것인가를 구현하는 곳입니다.
파일에 쓰거나, DB에서 삭제하거나 하는 쓰기 작업을 하는 곳이죠.
@Bean
public ItemWriter<DeleteTargetMemberDAO> memberItemWriter() {
return members -> {
for (DeleteTargetMemberDAO targetMember : members) {
memberRepository.deleteById(targetMember.getId());
}
};
}
지금은 db 조작이므로 chunk를 나눠서 하고 있습니다.
@Bean
public Step processMemberStep(
ItemReader<Member> memberItemReader,
ItemProcessor<Member, DeleteTargetMemberDAO> memberItemProcessor,
ItemWriter<DeleteTargetMemberDAO> memberItemWriter
) {
return new StepBuilder("processMemberStep", jobRepository)
.<Member, DeleteTargetMemberDAO>chunk(100, transactionManager)
.reader(memberItemReader)
.processor(memberItemProcessor)
.writer(memberItemWriter)
.build();
}
실제로 사용 가능한 job을 만드는 과정입니다.
@Bean
public Job removeUnlinkedMemberRequestsJob(Step processMemberStep) {
return new JobBuilder("removeUnlinkedMemberRequestsJob", jobRepository)
.start(processMemberStep)
.build();
}
@EnableBatchProcessing
이 어노테이션은 무조건 빼셔야합니다. deprecated 되었어요!
어느 한 component scan
이 가능한 어노테이션에서 적혀있다면 모든 설정이 Spring에서 설정한 기본값
으로 설정됩니다.
기본값으로 되면 job에 필요한 table들을 자동으로 생성하지 않아요..
아래를 application.yml
에 추가해주면 됩니다.
spring:
batch:
jdbc:
# jdbc를 참고하여 맞는 db의 sql 실행
initialize-schema: always
job:
# Spring Batch는 항상 Job을 실행하는 것을 보장한다!
# 따라서 이 옵션을 끄는 것을 추천. 아니면 Job 오류 뜬다!!!!
enabled: false
생성한 Job을 특정 시각마다 실행하게 하고싶다면 cron밖에 답이 없습니다.
ApplicationRunner
와 CommandLineRunner
가 있긴 하지만, 이 2개의 특장은 각각 아래와 같습니다.
공통점은 아래와 같습니다.
하지만 단 한번만 실행
이라는 제약조건 때문에 매번 로직을 검사해야 할 때는 적절하지 않습니다. 매번 서버를 껐다 켰다 할 수는 없잖아요?
이미 Batch가 적용되어있으므로 따로 해줄 것은 없습니다.
하나만 추가해주면 돼요!
...
@EnableScheduling
public class SpringApplication {}
일단 Job 기반이니 Job을 적용하는 방법을 해봅시다.
이때는 생성자를 만들어줘야되므로 최대한 Service Logic과는 분리하는 것이 좋겠죠?
Service의 특정 로직이 무조건 필요한 상황이라면 Service를 의존성 주입하여 사용하는 것이 좋겠습니다.
@Component
public class MemberAccessGrantedChecker {
private final JobLauncher jobLauncher;
private final Job job;
private final SomeService someService;
// 생성자 구성
public MemberAccessGrantedChecker(
JobLauncher jobLauncher,
// Job이 여러개인 경우!
// 단일 Job Bean인 경우에는 Qualifier를 빼도 무방합니다!
@Qualifier("JOB_NAME") Job job,
// 특정 Service의 로직이 필요한 경우
SomeService someService
) {
this.jobLauncher = jobLauncher;
this.job = job;
this.someService = someService;
}
// cron은 "초 분 시 일 월 요일" 으로 구성
@Scheduled(cron = "0 0 */12 * * *")
public void run() {
try {
String dynamicJobName = "jobPrefix-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("mm-ss-SSS"));
// 더 나은 활용 방법은 https://jojoldu.tistory.com/490 이곳 침고
// 지금은 따로 쓰고있는 타입이 없어서 기본값으로 넣습니다!
JobParameters jobParameters = new JobParametersBuilder()
.toJobParameters();
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
System.out.println(e);
}
}