dataflow를 사용하기 위해서 대시보드를 구성했었는데, 배치를 실행하다가 실패한 이후로 Job executions메뉴에 들어가지지 않았다.
서버에 들어가보니 아래같은 에러가 남아있었다.
따로 설정을 변경하거나 한적은 없는데 왜 그럴까..
이 문제를 해결하려면 TASK - JOB의 관계가 어디서 설정되는지 알아야했다.
먼저 프로젝트에서는 spring boot이기때문에 starter를 사용하고 있었고, @EnableTask
를 이용했었다.
implementation("org.springframework.cloud:spring-cloud-starter-task:2.3.0")
@EnableTask
@Configuration
class TaskConfiguration {
}
문제 원인을 찾아보다 알게된건데 spring-cloud-starter-task
에서 사용하는 TaskBatchAutoConfiguration
에서는 TaskBatchExecutionListener
Bean을 생성하고, TaskBatchExecutionListenerBeanPostProcessor
라는 BeanPostProcessor를 통해서 생성되는 모든 Job타입 Bean에 대해 listener로 추가해주는 기능을 제공하고 있었다.
이 TaskBatchExecutionListener
가 Task와 Job사이의 연관관계와 관련된 DB를 관리해주는데, 문제는 TaskBatchExecutionListener
는 BeanPostProcessor에서 등록되기 때문에 가장 마지막 순서로 등록되고 빈 생성 과정에서 수동으로 등록한 listener가 우선 실행된다는 것이다.
그런데 수동으로 등록한 listener에서 에러가 발생하면 JOB(spring-batch)관련된 테이블엔 데이터가 등록되지만, TASK(spring-cloud-data)관련된 테이블엔 데이터가 등록되지 않아서 문제가 된 것이다.
코드를 통해 상황을 살펴보자
BatchConfiguration
@Configuration
class BatchConfiguration(
private val jobBuilderFactory: JobBuilderFactory,
private val stepBuilderFactory: StepBuilderFactory
) {
@Bean
fun printJob(alphabetPrintStep: Step): Job {
return jobBuilderFactory.get("print-job")
.start(alphabetPrintStep)
.listener(elapsedTimeListener())
.build()
}
@Bean
@JobScope
fun alphabetPrintStep(@Value("#{jobParameters['alphabets']}") alphabets: String): Step {
val chuckSize = 3
return stepBuilderFactory.get("alphabet-print-step")
.chunk<String, String>(chuckSize)
.reader(alphabetReader(alphabets))
.processor(duplicateItemProcessor())
.writer(printItemWriter())
.build()
}
fun alphabetReader(alphabets: String) = ListItemReader(alphabets.split(","))
fun duplicateItemProcessor() = ItemProcessor<String, String> { string ->
"$string$string"
}
fun printItemWriter() = ItemWriter<String> { list ->
println(list.joinToString())
}
fun elapsedTimeListener() = ElapsedTimeListener()
}
ElapsedTimeListener
class ElapsedTimeListener : JobExecutionListenerSupport() {
override fun beforeJob(jobExecution: JobExecution) {
println(System.currentTimeMillis())
}
override fun afterJob(jobExecution: JobExecution) {
println(System.currentTimeMillis())
}
}
실행결과
2021-04-17 09:40:44.930 INFO 33315 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [alphabets=a,b,c,d,e]
2021-04-17 09:40:44.975 INFO 33315 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=print-job]] launched with the following parameters: [{alphabets=a,b,c,d,e}]
1618620044988
2021-04-17 09:40:44.988 INFO 33315 --- [ main] o.s.c.t.b.l.TaskBatchExecutionListener : The job execution id 1 was run within the task execution 1
2021-04-17 09:40:45.055 INFO 33315 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [alphabet-print-step]
aa, bb, cc
dd, ee
2021-04-17 09:40:45.072 INFO 33315 --- [ main] o.s.batch.core.step.AbstractStep : Step: [alphabet-print-step] executed in 16ms
1618620045074
2021-04-17 09:40:45.075 INFO 33315 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=print-job]] completed with the following parameters: [{alphabets=a,b,c,d,e}] and the following status: [COMPLETED] in 97ms
실행시간 전후로 시간을 출력하는 listener를 추가했다.
중간에 보면 TaskBatchExecutionListener
에서 로그를 찍은 부분이 있는데, 이 부분에서 job(id=1)과 task(id=1)의 연관관계가 설정되었음을 알 수 있다.
그런데 로그 순서로도 보면 알 수 있듯이 ElapsedTimeListener
가 TaskBatchExecutionListener
보다 먼저 실행되었음을 알 수 있다.
ElapsedTimeListener에서 에러를 발생시키도록 해보자.
ElapsedTimeListener
class ElapsedTimeListener : JobExecutionListenerSupport() {
override fun beforeJob(jobExecution: JobExecution) {
println(System.currentTimeMillis())
throw RuntimeException("ERROR")
}
override fun afterJob(jobExecution: JobExecution) {
println(System.currentTimeMillis())
}
}
실행결과
2021-04-17 10:25:18.652 INFO 36448 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [alphabets=a,b,c,d,e]
2021-04-17 10:25:18.684 INFO 36448 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=print-job]] launched with the following parameters: [{alphabets=a,b,c,d,e}]
1618622718694
2021-04-17 10:25:18.698 ERROR 36448 --- [ main] o.s.batch.core.job.AbstractJob : Encountered fatal error executing job
...
1618622718703
2021-04-17 10:25:18.704 INFO 36448 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=print-job]] completed with the following parameters: [{alphabets=a,b,c,d,e}] and the following status: [FAILED] in 17ms
TaskBatchExecutionListener
가 등장하지 않는다.
이런 상황이되면 Job관련 테이블에만 데이터가 등록되고, Task에는 추가되지 않는것이다.
TaskBatchExecutionListener
를 가장 먼저 실행되도록 추가하는 방법으로 문제를 해결할 수 있다.
수동으로 등록해도 TaskBatchExecutionListenerBeanPostProcessor
에서 다시 listener를 추가하려고 하지만, Job에는 동일한 리스너가 중복해서 등록되지 않기 때문에 TaskBatchExecutionListener
가 중복해서 실행되지는 않는다.
BatchConfiguration
@Bean
fun printJob(alphabetPrintStep: Step): Job {
return jobBuilderFactory.get("print-job")
.start(alphabetPrintStep)
.listener(taskBatchExecutionListener)
.listener(elapsedTimeListener())
.build()
}
실행결과
2021-04-17 10:28:48.290 INFO 36700 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [alphabets=a,b,c,d,e]
2021-04-17 10:28:48.326 INFO 36700 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=print-job]] launched with the following parameters: [{alphabets=a,b,c,d,e}]
2021-04-17 10:28:48.338 INFO 36700 --- [ main] o.s.c.t.b.l.TaskBatchExecutionListener : The job execution id 1 was run within the task execution 1
1618622928339
2021-04-17 10:28:48.343 ERROR 36700 --- [ main] o.s.batch.core.job.AbstractJob : Encountered fatal error executing job
...
1618622928348
2021-04-17 10:28:48.350 INFO 36700 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=print-job]] completed with the following parameters: [{alphabets=a,b,c,d,e}] and the following status: [FAILED] in 20ms
로그를 보면 정상적으로 TaskBatchExecutionListener
가 실행되고, 연관관계가 등록되었음을 알 수 있다.
만약 모든 Job에 TaskBatchExecutionListener
가 추가되는것이 싫고, 특정 Job에만 추가하고 싶다면 TaskBatchExecutionListenerBeanPostProcessor를 직접 추가하는 방법이 있으니 참고하도록 하자.
https://docs.spring.io/spring-cloud-task/docs/current/reference/html/#batch-association-override