BatchAuthConfiguration.java
에 의해 해당 어노테이션을 붙이면 중복 설정이 되므로 해당 어노테이션을 붙이지 않아도 된다. @Configuration
public class DataSourceConfig {
@Bean
public DataSource dataSource() {
return DataSourceBuilder.create()
.type(HikariDataSource.class)
.url("jdbc:h2:tcp://localhost/~/batch")
.driverClassName("org.h2.Driver")
.username("sa")
.build();
}
}
@Slf4j
@Configuration
@RequiredArgsConstructor
public class ItemJobConfig {
private final JobRepository jobRepository;
@Bean
public Job itemClickThroughRateJob() {
return new JobBuilder("itemClickThroughRateJob", jobRepository)
.preventRestart() // disable restartability
.start(itemClickStep()) // 시작 Step
.next(itemExposeStep()) // 다음 Step
.next(itemCtrProcessStep()) // 마지막 Step
.build();
}
//.....
}
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
@Bean
public Step itemClickStep() {
return new StepBuilder("itemClickStep", jobRepository)
.chunk(3, transactionManager) // Chunk Oriented
.reader(accessLogReader()) // reader
.processor(clickLogFilterProcessor()) // processor
.writer(itemClickLogWriter()) // writer
.build();
}
@Bean
public ItemReader<AccessLog> accessLogReader() {
FlatFileItemReader<AccessLog> accessLogFileReader = new FlatFileItemReader<>();
accessLogFileReader.setResource(new ClassPathResource("/item/access.log"));
accessLogFileReader.setLineMapper(accessLogLineMapper);
return accessLogFileReader;
}
LineMapper<AccessLog> accessLogLineMapper = (line, lineNumber) -> {
try {
return new AccessLog(line);
} catch (Exception e) {
log.error("error processing line:{} = {},e", lineNumber, line, e);
throw e;
}
};
@Bean
public ItemProcessor clickLogFilterProcessor() {
return new ClickLogFilterProcessor();
}
@Bean
public ItemWriter<ItemClickLog> itemClickLogWriter() {
FlatFileItemWriter<ItemClickLog> writer = new FlatFileItemWriter<>();
writer.setName("itemClickLogWriter");
writer.setResource(new FileSystemResource("~/output/clickLog.json"));
writer.setLineAggregator(memberAggregator);
return writer;
}
spring.batch.jdbc.initialize-schema=always
spring.batch.jdbc.initialize-schema=always
none
으로 두고 테이블을 미리 생성해줘야함.datasource
스프링 빈과 transactionManager
스프링 빈을 사용하여 처리 Status
: 시작, 실패, 완료 startTime
, endTime
, exitStatus
, createTime
, lastUpdated
executionContext
: 실행 간에 유지되어야 하는 사용자 데이터가 포함된 속성 모음 failureExceptions
: 실행 중 발생한 예외 목록 Status
, startTime
, endTime
, exitStatus
executionContext
, readCount
, writeCount
, CommitCount
, rollbackCount
readSkipCount
, processSkipCount
, writeSkipCount
//JdbcPagingItemReader
@Bean
public JdbcPagingItemReader<WeeklyStat> aggregatedWeeklyStatReader() throws Exception {
Map<String, Object> paramValues = Map.of(
"startDate", "20230101",
"beginDate", "20230101",
"endDate", "20230107");
return new JdbcPagingItemReaderBuilder<WeeklyStat>()
.pageSize(PAGE_SIZE)
.fetchSize(PAGE_SIZE)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(WeeklyStat.class))
.queryProvider(createQueryProvider())
.parameterValues(paramValues)
.name("dailyStatReader")
.build();
}
//JdbcCursorItemReader
@Bean
public JdbcCursorItemReader<WeeklyStat> weeklyStatReader() {
return new JdbcCursorItemReaderBuilder<WeeklyStat>()
.fetchSize(10)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(WeeklyStat.class))
.sql("SELECT STAT_DATE, ITEM_ID, CLICK_COUNT, EXPOSE_COUNT, CTR FROM STAT_WEEKLY")
.name("weeklyStatReader")
.build();
}
@Bean
public ItemWriter<WeeklyStat> weeklyStatWriter() {
String sql = "insert into STAT_WEEKLY(STAT_DATE, ITEM_ID, CLICK_COUNT,EXPOSE_COUNT, CTR) "
+ "values "
+ "(:statDate, :itemId, :clickCount, :exposeCount, :ctr)";
return new JdbcBatchItemWriterBuilder<WeeklyStat>()
.dataSource(dataSource)
.sql(sql)
.beanMapped()
.build();
}
1) applecation.properties에서 job 이름 설정
spring.batch.job.name={잡이름}
spring.batch.job.enabled=true
(default)2) program argument를 사용하여 실행
--spring.batch.job.name={잡이름}
3) JobLauncher를 사용하는 방법
spring.batch.job.enabled=false
JobLauncher jobLauncher = ctxt.getBean(JobLauncher.class);
Job weeklyStatJob = ctxt.getBean("weeklyStatJob", Job.class);
jobLauncher.run(weeklyStatJob, new JobParametersBuilder().toJobParameters());
4) @Scheduled를 사용하는 방법
@Scheduled(cron = "0/10 * * * * *")
public void launchWeeklyStatJob() {
try {
System.out.println("Launch WeeklyStatJob --------------------------------------");
jobLauncher.run(weeklyStatJob(null), new JobParameters());
} catch (Exception e){
// Error Handling
}
}
5) Rest API를 사용하는 방법
spring.main.web-application-type=servlet
@RestController
public class JobLaunchController {
@PostMapping("/v1/jobs/weeklyStat/execute")
public void launchWeeklyStatJob() {
try {
jobLauncher.run(weeklyStatJob(null), new JobParameters());
} catch (Exception e){
// Error Handling
}
}
}
6) Crontab을 사용하는 방법
$ crontab -l
35 * * * * /home1/irteam/apps/job/weeklyStatJob.sh
$ cat /home1/irteam/apps/job/weeklyStatJob.sh
#!/bin/bash
/home1/irteam/apps/jdk/bin/java -Dspring.profiles.active=real -jar /home1/irteam/apps/job/JOB-SNAPSHOT.jar --spring.batch.job.name=weeklyStatJob > error.log 2>&1 &
7) 외부 애플리케이션의 도움을 받는 방법
@Bean
public Job weeklyStatJob(Step aggregateWeeklyStatStep) {
return new JobBuilder("weeklyStatJob", jobRepository)
.start(aggregateWeeklyStatStep)
.validator(new WeeklyStatJobValidator())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<WeeklyStat> aggregatedWeeklyStatReader(
@Value("#{jobParameters[beginDate]}") String beginDate,
@Value("#{jobParameters[endDate]}") String endDate
) throws Exception {
// .....
}
SingleTon, Prototype
스코프 존재데이터베이스를 사용하여 배치 Job의 정보를 저장할 필요가 없는 경우, In-Memory를 사용할 수 있다.
Spring Batch 5.0 이전 : DefaultBatchConfiguere
의 createJobRepository()
오버라이드하여 JobRepository를 MapJobRepositoryFactoryBean으로 교체
Spring Batch 5.0 이상 : JobRepositoryFactoryBean with an in-memory database 사용을 권장
package org.springframework.batch.core.step.tasklet;
@FunctionalInterface
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}