구동 목표 : Movie Open Api를 통해서 오늘의 인기 급상승 영화에 대한 리스트를 받아온 후 로컬 DB에 저장
- Spring boot 버전 : 2.6.6
- Java 버전 : openJdk 1.8
- 빌드 툴 : Gradle 7.4.1
- MongoDb 버전 : 5.0.6
spring:
data:
mongodb:
uri: mongodb://localhost:27017/local # 로컬 DB경로
batch:
job:
enabled: false # 애플리케이션 구동과 동시에 배치 작업이 실행되는 것을 방지
# Movie Open Api 관련
movie:
openApi:
trending:
uri: https://api.themoviedb.org/3 # Default 경로 설정
apiKey : cbfa4*007409cc**86bfeaf** # API키
[사용 API 사이트] 실제로 사용해보실 분들은 회원가입 후 API키를 발급받아서 입력해주세요
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.springframework.batch:spring-batch-test'
}
Open API에서 받아오는Response의 변수명과 바인딩할 객체의 변수명을 일치시켜주세요. 일치시키지 않을시 NULL로 들어옵니다.
@Getter
@ToString
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class TrendingObject {
private TrendingMovie[] results;
@Builder
public TrendingObject(TrendingMovie[] results) {
this.results = results;
}
}
@Data
@Document(collection = "TrendingMovie") // MongoDb에 저장되어 있는 collection을 정의해줍니다.
@NoArgsConstructor(access = AccessLevel.PROTECTED) // 빈 생성자가 만들어지는 것을 방지해줍니다.
public class TrendingMovie {
@Id
private int id;
private boolean adult;
private String overview;
private int[] genre_ids;
private String title;
private int vote_count;
private int vote_average;
@Builder
public TrendingMovie(int id, boolean adult, String overview, int[] genre_ids, String title, int vote_count,
int vote_average) {
this.id = id;
this.adult = adult;
this.overview = overview;
this.genre_ids = genre_ids;
this.title = title;
this.vote_count = vote_count;
this.vote_average = vote_average;
}
}
@Configuration
public class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}
이 게시물에서는 WebClient를 사용하였지만 한정된 데이터를 처리하는 배치 작업 특성 상 Non-Blocking 방식인 템플릿을 사용하는 것이 좋을 것 같습니다.
Spring Batch를 사용하기 전에 Batch의 뜻을 먼저 알아보면 다음과 같습니다.
데이터를 실시간으로 처리하는 것이 아닌 일괄적으로 모아서 처리하는 작업을 의미
Spring Batch는 이러한 Batch 기능을 구현할 때 사용하는 오픈소스 프레임워크
- 예외사항과 비정상 동작에 대한 방어 기능
- 작업을 실패하여 재시작을 하게 되었을 때 처음부터가 아닌 실패한 지점부터 실행하도록 지원
- 성공한 이력이 있는 Job에 대해 동일한 Parameter로 실행 시 중복 실행을 방지해주는 Exception을 발생시켜줌
배치 처리 과정을 하나의 단위로 만들어 놓은 객체로 Job은 배치 처리 과정에 대해서 가장 최상위 계층에 존재하는 개념입니다. 간단하게 상황으로 예를 들자면 특정 홈페이지에서 모든 회원들의 정보를 읽어 탈퇴를 한지 1년이 지난 회원에 대해 데이터 삭제 처리 라는 로직이 있다면 이 과정이 하나의 Job이 되겠습니다.
JobLauncher는 Job과 JobParameters를 사용하여 Job을 실행하는 객체입니다.
- 재실행 여부 검토, 실행방법, 파라미터 유효성 검증 등을 수행
- 별도의 설정이 없으면 애플리케이션 구동시 즉시 Job을 실행함
구동한 Job을 구분하기 위한 개념. 같은 파라미터로 Job이 실행될 경우 Exception을 발생
JobInstance의 구별 방법으로 동일한 파라미터로 들어올 경우 같은 객체로 인식합니다. 그리고 JobParameters는 String, Double, Long, Date 4가지 형식만을 지원하고 있습니다.
Job의 배치처리를 정의하고 순차적인 단계를 나눠줍니다.
위에서 이야기한 모든 배치 처리 정보를 담는 녀석. Job이 실행되게 되면 JobRepository에 JobExecution과 StepExecution을 생성합니다.
Job의 실행정보를 보존해 두는 오브젝트로 위의 예에서 1월 1일에 동일한 파라미터로 실행을 하면 Spring Batch는 같은 JobInstance를 실행을 하게됩니다. 하지만 이 2번째 실행에 대한 객체를 별도로 생성하여 DB에 저장을 하게 됩니다. 이렇게 과거에 실행했던 Job에 대해서 다시 작업을 시작을 하는 등의 조작이 가능합니다.
JobExecution과 동일하게 Step 실행 시도에 대한 객체를 나타지만 이전 단계의 Step이 실패할 경우 다음 Step에 대한 StepExecution을 생성하지 않습니다. StepExecution은 데이터를 읽은 횟수, 데이터를 작성(txt 파일을 작성한다거나 DB에 데이터를 저장), Step을 무시한 횟수들도 저장이 됩니다.
Job에서 데이터를 공유할 수 있는 데이터 저장소입니다. 이 ExecutionContext는 JobExecutionContext와 StepExecutionContext 2가지 종류가 있으나 저장이 되는 시점이 다릅니다. JobExecution의 경우 commit한 시점, StepExecution은 Step 사이사이에 저장이 됩니다.
@EnableBatchProcessing
어노테이션 사용@SpringBootApplication
@EnableBatchProcessing // 배치 인프라스트럭처 사용을 위한 대부분의 빈을 등록해줍니다.
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.RequiredArgsConstructor;
import mongo.movie.domain.TrendingMovie;
import reactor.core.publisher.Mono;
@Configuration
@RequiredArgsConstructor
public class OpenApiJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
// Job build 및 순서 정의
@Bean
public Job trendingMovieJob() {
Job trendingMovieJob = jobBuilderFactory.get("trendingMovieJob")
.start(openApiFristStep()) // 데이터를 한번만 받으면 되기 때문에 단일 스텝으로 구성
.build();
return trendingMovieJob;
}
@Bean
@JobScope // JobParameter를 보내므로 설정
public Step openApiFristStep() {
return stepBuilderFactory.get("openApiFristStep")
.<Mono<TrendingMovie[]>, TrendingMovie[]>chunk(1) // Input, Output, chunk 사이즈
.reader(openApiReader())
.processor(dataEditProcessor())
.writer(dataInsertWrite())
.build();
}
// 데이터를 읽어오는 ItemReader 인터페이스의 커스텀 구현체
@Bean
@StepScope
public OpenApiReader openApiReader(){
return new TrendingMovieItemReader();
}
// 읽어온 데이터를 가공 후 반환하는 ItemProcessor 인터페이스의 커스텀 구현체
@Bean
@StepScope
public OpenApiProcessor dataEditProcessor() {
return new OpenApiProcessor();
}
// 가공 되어진 데이터들(Chunk)를 DB 혹은 특정 파일에 작성하는 ItemWriter 인터페이스의 커스텀 구현체
@Bean
@StepScope
public OpenApiWriter dataInsertWrite() {
return new OpenApiWriter();
}
}
ItemReader
public class OpenApiReader implements ItemReader<Mono<TrendingMovie[]>> {
@Value("${movie.openApi.trending.uri}")
private final String TRENDING_MOVIE_URL;
@Value("${movie.openApi.apiKey}")
private final String API_KEY;
@Autowired
private WebClient.Builder wcBuilder;
private int cnt = 0;
@Override
public Mono<TrendingMovie[]> read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
cnt++;
return cnt == 1
? wcBuilder.build().get()
.uri(TRENDING_MOVIE_URL+"/trending/movie/day?api_key={API_KEY}",API_KEY)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(TrendingObject.class)
.map(trendingObject -> trendingObject.getResults())
: null;
}
}
Reader을 통해서 데이터를 읽었을 때 최종적으로 NULL을 반환하지 않으면 무한 루프가 돈다는 것을 모르고 있다가 상당히 애를 먹었습니다.
ItemProcessor
// exception이 발생하였을 때 Roll Back
// 적용된 범위에서는 트랜잭션 기능이 포함된 프록시 객체가 생성되어 자동으로 commit 혹은 rollback을 진행해준다.
@Transactional(rollbackFor = Exception.class)
public class OpenApiProcessor implements ItemProcessor<Mono<TrendingMovie[]>, TrendingMovie[]>{
@Override
public TrendingMovie[] process(Mono<TrendingMovie[]> item) throws Exception {
return item.block();
}
}
ItemWriter
@Slf4j
public class OpenApiWriter implements ItemWriter<TrendingMovie[]> {
@Autowired
private MongoOperations mongoOperations;
@Override
public void write(List<? extends TrendingMovie[]> items) throws Exception {
// chunk 사이즈가 1이므로 한번만 돌음
for(int i = 0; i < items.size(); i++) {
TrendingMovie[] movies = items.get(i);
for(TrendingMovie movie : movies) {
log.info("movie : {}", movie.toString());
mongoOperations.save(movie);
}
}
}
}
위에서 일괄처리를 도와주는 Spring Batch에 대해서 알아보았습니다. 그렇다면 Scheduler는 무엇일까요?
Bacth가 일괄처리라고 한다면 Scheduler는 일정한 시간간격 또는 일정한 시각에 특정 로직을 돌리기 위해서 사용하는 것입니다.
Spring에서 이러한 Scheduler 기능을 사용하기위한 방법으로 Spring Scheduler와 Spring Quartz 두 가지가 있는데 Spring Quartz는 별도의 의존성이 필요하며 조금 더 복잡하고 어렵다고 합니다.
@EnableScheduling
@EnableBatchProcessing
@SpringBootApplication
public class Application() {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
부트 스트랩 클래스에 @EnableScheduling
어노테이션을 설정해주면 준비작업은 끝납니다.
- method는 void의 return타입을 가져야합니다.
- method는 파라미터를 가질 수 없습니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class JobScheduler {
private final JobLauncher jobLauncher;
private final OpenApiJob openApiJob;
@Scheduled(cron = "0 0/3 * * * ?") // 3분에 1번씩 실행
public void openApiRequestSchedule() {
// 넘기는 파라미터를 매번 다르게 해서 별개의 JobInstance로 인식하게 함
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
JobExecution jobExecution = jobLauncher.run(openApiJob.trendingMovieJob(), jobParameters);
log.info("Job Execution: " + jobExecution.getStatus());
log.info("Job getJobConfigurationName: " + jobExecution.getJobConfigurationName());
log.info("Job getJobId: " + jobExecution.getJobId());
log.info("Job getExitStatus: " + jobExecution.getExitStatus());
log.info("Job getJobInstance: " + jobExecution.getJobInstance());
log.info("Job getStepExecutions: " + jobExecution.getStepExecutions());
log.info("Job getLastUpdated: " + jobExecution.getLastUpdated());
log.info("Job getFailureExceptions: " + jobExecution.getFailureExceptions());
} catch (JobExecutionAlreadyRunningException
|JobRestartException
|JobInstanceAlreadyCompleteException
|JobParametersInvalidException e) {
e.getMessage();
}
}
}
이미지가 너무 작게 들어가네