JdbcPagingItemReader<T>
같은 ItemReader 를 사용한다.ItemProcessor<I, O>
같은 인터페이스로 I
타입의 객체를 받아 O
타입의 객체로 반환 처리한다.JdbcBatchItemWriterBuilder<T>
같은 ItemWriter 를 사용한다.implementation("org.springframework.boot:spring-boot-starter-batch")
@EnableBatchProcessing
을 선언해야 한다.@EnableBatchProcessing
@SpringBootApplication
class BatchApplication
fun main(args: Array<String>) {
runApplication<BatchApplication>(*args)
}
/**
* Tasklet Step 으로 Job 구성하기
* */
@Configuration
class TaskletStepJobConfig(
val jobBuilderFactory: JobBuilderFactory,
val stepBuilderFactory: StepBuilderFactory
) {
@Bean
fun job(): Job {
return jobBuilderFactory["taskletStepJob"]
.start(step1())
.next(step2())
.build()
}
@Bean
fun step1(): Step {
return stepBuilderFactory["step1"]
.tasklet { contribution, chunkContext ->
println(">> This is tasklet step1 <<")
RepeatStatus.FINISHED
}
.build()
}
@Bean
fun step2(): Step {
return stepBuilderFactory["step2"]
.tasklet { contribution, chunkContext ->
println(">> This is tasklet step2 <<")
RepeatStatus.FINISHED
}
.build()
}
}
/**
* Chunk Step 으로 Job 구성하기
* -> Person 의 age 가 20 미만이면 isStudent 가 True, 아니면 False 로 업데이트 해주는 Batch Job
* */
@Configuration
class JdbcChunkStepJobConfig(
val jobBuilderFactory: JobBuilderFactory,
val stepBuilderFactory: StepBuilderFactory,
val dataSource: DataSource
) {
companion object {
const val JOB_NAME = "JdbcChunkStepJob"
const val CHUNK_SIZE = 10;
}
@Bean(JOB_NAME)
fun job(): Job {
return jobBuilderFactory[JOB_NAME]
.start(step())
.build()
}
@Bean(JOB_NAME.plus("_STEP"))
fun step(): Step {
return stepBuilderFactory[JOB_NAME.plus("_STEP")]
.chunk<PersonAgeInfo, PersonIsStudentInfo>(CHUNK_SIZE)
.reader(reader())
.processor(processor())
.writer(writer())
.build()
}
@Bean(JOB_NAME.plus("_READER"))
fun reader(): JdbcPagingItemReader<PersonAgeInfo> {
println("BATCH-LOG >>> reader : ${LocalDateTime.now()} <<<")
return JdbcPagingItemReaderBuilder<PersonAgeInfo>()
.name(JOB_NAME.plus("_READER"))
.pageSize(CHUNK_SIZE)
.fetchSize(CHUNK_SIZE)
.dataSource(dataSource)
.queryProvider(query())
.rowMapper(resultMapper())
.build()
}
@Bean(JOB_NAME.plus("_PROCESSOR"))
fun processor(): ItemProcessor<PersonAgeInfo, PersonIsStudentInfo> {
println("BATCH-LOG >>> processor : ${LocalDateTime.now()} <<<")
return ItemProcessor {
if (it.age < 20) {
val personIsStudentInfo = PersonIsStudentInfo(it.id, true)
println("BATCH-LOG >>> processor - true : $it | $personIsStudentInfo <<<")
return@ItemProcessor personIsStudentInfo
} else {
val personIsStudentInfo = PersonIsStudentInfo(it.id, false)
println("BATCH-LOG >>> processor - false : $it | $personIsStudentInfo <<<")
return@ItemProcessor personIsStudentInfo
}
}
}
@Bean(JOB_NAME.plus("_WRITER"))
fun writer(): JdbcBatchItemWriter<PersonIsStudentInfo> {
println("BATCH-LOG >>> writer : ${LocalDateTime.now()} <<<")
return JdbcBatchItemWriterBuilder<PersonIsStudentInfo>()
.dataSource(dataSource)
.sql("update person set is_student = :studentFlag where person_no = :id")
.beanMapped()
.build()
}
private fun query(): PagingQueryProvider {
val spqpfb = SqlPagingQueryProviderFactoryBean()
spqpfb.setDataSource(dataSource)
spqpfb.setSelectClause("select person_no, age ")
spqpfb.setFromClause("from person ")
spqpfb.setWhereClause("where is_student is null ")
spqpfb.setSortKeys(mapOf("person_no" to Order.ASCENDING))
return spqpfb.`object`
}
private fun resultMapper() = RowMapper { rs, _ ->
PersonAgeInfo(
id = rs.getLong("person_no"),
age = rs.getInt("age")
)
}
}
data class PersonAgeInfo(val id: Long, val age: Int)
data class PersonIsStudentInfo(val id: Long, val studentFlag: Boolean)
@Entity
class Person(
val name: String,
val age: Int,
var isStudent: Boolean?
) {
@Id
@Column(name = "person_no")
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long = 0L
}
...
@StepScope
@Bean("${JOB_NAME}_reader")
fun reader(
@Value("#{jobParameters[requestAt]}") requestAt: String?
): JdbcPagingItemReader<MemberInfo> {
println("requestAt >>> $requestAt")
val format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val requestDateTime = LocalDateTime
.parse(requestAt, format)
.atZone(ZoneId.of("Asia/Seoul"))
.withZoneSameInstant(ZoneId.systemDefault())
val parameterValues = mapOf("requestAt" to Timestamp.from(requestDateTime.toInstant()))
return JdbcPagingItemReaderBuilder<MemberInfo>()
.name("${JOB_NAME}_reader")
.pageSize(chunkSize)
.fetchSize(chunkSize)
.dataSource(dataSource)
.queryProvider(query())
.parameterValues(parameterValues)
.rowMapper(resultMapper())
.build()
}
...
@JopScope
는 Step 빈을 선언하는 메서드에 선언 가능@StepScope
는 Step 을 구성하는 빈들을 선언하는 메서드에 선언 가능ItemReader
, ItemProcessor
, ItemWriter
@JopScope
, @StepScope
를 선언한 빈은 어플리케이션 실행시점이 아닌 사용 시점에 빈을 구성하고 생성된다....
@Bean(JOB_NAME.plus("_COMPOSITE_WRITER"))
fun compositeWriter(): CompositeItemWriter<PersonIsStudentInfo> {
return CompositeItemWriterBuilder<PersonIsStudentInfo>()
.delegates(writer1(), writer2())
.build()
}
@Bean(JOB_NAME.plus("_WRITER1"))
fun writer1(): JdbcBatchItemWriter<PersonIsStudentInfo> {
println("BATCH-LOG >>> writer1 : ${LocalDateTime.now()} <<<")
return JdbcBatchItemWriterBuilder<PersonIsStudentInfo>()
.dataSource(dataSource)
.sql("update person set is_student = :studentFlag where person_no = :id")
.beanMapped()
.build()
}
@Bean(JOB_NAME.plus("_WRITER2"))
fun writer2(): JdbcBatchItemWriter<PersonIsStudentInfo> {
println("BATCH-LOG >>> writer2 : ${LocalDateTime.now()} <<<")
return JdbcBatchItemWriterBuilder<PersonIsStudentInfo>()
.dataSource(dataSource)
.sql("update person set name = 'DONE!!!!' where person_no = :id")
.beanMapped()
.build()
}
...
ItemWriter
들을 사용해야 할 때 사용하는 WriterCompositeItemWriter<T>
빈을 등록하는 메서드를 만들고 구성할 때 .delegates()
메서드를 사용하여 하위 Writer 객체들을 등록하면 사용할 수 있다.spring.batch.job.enabled: true
JobLauncher
를 사용하면 JobParameters 를 설정해주면서 Job 을 실행시킬 수 있다.@Configuration
@Slf4j
public class JobScheduler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job ExampleJob;
@Scheduled(cron = "1 * * * * *")
public void jobSchduled() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException {
Map<String, JobParameter> jobParametersMap = new HashMap<>();
SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
Date time = new Date();
String time1 = format1.format(time);
jobParametersMap.put("date",new JobParameter(time1));
JobParameters parameters = new JobParameters(jobParametersMap);
JobExecution jobExecution = jobLauncher.run(ExampleJob, parameters);
while (jobExecution.isRunning()) {
log.info("...");
}
log.info("Job Execution: " + jobExecution.getStatus());
log.info("Job getJobConfigurationName: " + jobExecution.getJobConfigurationName());
log.info("Job getJobId: " + jobExecution.getJobId());
log.info("Job getExitStatus: " + jobExecution.getExitStatus());
log.info("Job getJobInstance: " + jobExecution.getJobInstance());
log.info("Job getStepExecutions: " + jobExecution.getStepExecutions());
log.info("Job getLastUpdated: " + jobExecution.getLastUpdated());
log.info("Job getFailureExceptions: " + jobExecution.getFailureExceptions());
}
}
Spring Batch란? 이해하고 사용하기(예제소스 포함)
[Kotlin] Spring Batch를 사용한 배치 애플리케이션 작성
5. Spring Batch 가이드 - Spring Batch Scope & Job Parameter
JobParameter 활용 방법 (feat. LocalDate 파라미터 사용하기)
https://velog.io/@gongmeda/CompositeItemWriter를-사용하여-하나의-Step에-여러개의-ItemWriter-등록하기