
간단한 Job을 하나 만들어 보고 잘 동작하는지 테스트 해보고, Step의 Reader, Processor, Writer 별로 단위 테스트를 하는 방식에 대해 학습해보자.
공공 데이터 포털에서 csv File을 하나 다운 받고, csv 파일 내에 있는 정보를 DB에 넣는 Job을 만들 것이다. 2024-9-13일 기준 파일데이터 1위 였던 경기도 의왕시_도서관 문화강좌을 사용하려 한다.
먼저 해당 Job의 전체 코드는 다음과 같다.
@Configuration
class FlatFileBatchConfig(
private val platformTransactionManager: PlatformTransactionManager,
private val jobRepository: JobRepository,
private val lectureEntityRepository: LectureEntityRepository
) {
@Bean
fun lectureJob(lectureStep: Step): Job =
JobBuilder("lectureJob", jobRepository)
.incrementer(RunIdIncrementer())
.start(lectureStep)
.build()
@JobScope
@Bean
fun lectureStep(
flatFileLectureReader: FlatFileItemReader<Lecture>,
lectureToLectureEntityProcessor: ItemProcessor<Lecture, LectureEntity>,
lectureEntityDBWriter: RepositoryItemWriter<LectureEntity>
) = StepBuilder("lectureStep", jobRepository)
.chunk<Lecture, LectureEntity>(10, platformTransactionManager)
.reader(flatFileLectureReader)
.faultTolerant()
.skip(FlatFileParseException::class.java)
.skipLimit(15)
.processor(lectureToLectureEntityProcessor)
.writer(lectureEntityDBWriter)
.build();
@StepScope
@Bean
fun flatFileLectureReader(
@Value("#{jobParameters['file.input']}") input: String
): FlatFileItemReader<Lecture> = FlatFileItemReaderBuilder<Lecture>()
.name("flatFileLectureReader")
.resource(FileSystemResource(input))
.lineTokenizer(DelimitedLineTokenizer())
.fieldSetMapper(LectureFieldSetMapper())
.linesToSkip(1)
.build()
@Bean
@StepScope
fun lectureToLectureEntityProcessor(): ItemProcessor<Lecture, LectureEntity> {
return ItemProcessor<Lecture, LectureEntity> { lecture ->
LectureEntity(
library = lecture.library,
title = lecture.title,
applyStartDate = lecture.applyStartDate,
applyEndDate = lecture.applyEndDate,
target = lecture.target,
trainingStartDate = lecture.trainingStartDate,
trainingEndDate = lecture.trainingEndDate,
trainingStartTime = lecture.trainingStartTime,
trainingEndTime = lecture.trainingEndTime,
materialCost = lecture.materialCost,
applyMemberCnt = lecture.applyMemberCnt,
waitMemberCnt = lecture.waitMemberCnt
)
}
}
@StepScope
@Bean
fun lectureEntityDBWriter(): RepositoryItemWriter<LectureEntity> {
return RepositoryItemWriterBuilder<LectureEntity>()
.repository(lectureEntityRepository)
.methodName("save")
.build()
}
}
파일에서 읽어온 데이터를 보다 효율적으로 다룰 수 있도록 도와주는 Spring Batch의 추상화 클래스이다. 파일에서 읽어온 데이터를 일관되게 처리할 수 있고, 다양한 데이터 타입을 지원한다.
@StepScope
@Bean
fun flatFileLectureReader(
@Value("#{jobParameters['file.input']}") input: String
): FlatFileItemReader<Lecture> = FlatFileItemReaderBuilder<Lecture>()
.name("flatFileLectureReader")
.resource(FileSystemResource(input))
.lineTokenizer(DelimitedLineTokenizer())
.fieldSetMapper(LectureFieldSetMapper())
.linesToSkip(1)
.build()
@Value("#{jobParameters['file.input']}") input: String : 배치 작업에서 사용할 csv파일의 경로를 JobParameter로 받는다.
JobParameter: 배치 작업을 시작할 때 사용되는 파라미터의 집합을 가지고 있으며, 해당 파라미터들로 JobInstance를 식별한다.
name: ItemReader의 구현체가 여러 개 있을 수 있기 때문에, name을 통해서 ItemReader의 빈을 구분한다.
resource: .csv 파일 경로를 설정한다.
LineTokenizer: 각 줄을 FieldSet으로 변환하는 추강화로 다양한 파일 형식을 지원하기 위한 여러 구현체가 있다.
, 로 구분한다.FieldSetMapper: mapFieldSet 메서드를 통해서 FieldSet을 원하는 객체로 변환한다.
class LectureFieldSetMapper : FieldSetMapper<Lecture> {
override fun mapFieldSet(fieldSet: FieldSet): Lecture {
val library = fieldSet.readString(0)
...
val materialCost = fieldSet.readInt(9)
...
return Lecture(
...
)
}
}
linesToSkip(1): csv 파일의 첫번째 줄을 건너 뛴다. csv 파일을 보면 첫번째 줄은 column 명인 것을 볼 수 있다. 해당 내용은 필요없으니 건너뛴다.
@Bean
@StepScope
fun lectureToLectureEntityProcessor(): ItemProcessor<Lecture, LectureEntity> {
return ItemProcessor<Lecture, LectureEntity> { lecture ->
LectureEntity(
library = lecture.library,
title = lecture.title,
applyStartDate = lecture.applyStartDate,
applyEndDate = lecture.applyEndDate,
target = lecture.target,
trainingStartDate = lecture.trainingStartDate,
trainingEndDate = lecture.trainingEndDate,
trainingStartTime = lecture.trainingStartTime,
trainingEndTime = lecture.trainingEndTime,
materialCost = lecture.materialCost,
applyMemberCnt = lecture.applyMemberCnt,
waitMemberCnt = lecture.waitMemberCnt
)
}
}
ItemProcessor 인터페이스의 익명 객체를 만들어서 csv 파일에서 가져온 데이터를 DB에 저장하기 위한 Entity로 변환하는 과정을 진행한다.
@StepScope
@Bean
fun lectureEntityDBWriter(): RepositoryItemWriter<LectureEntity> {
return RepositoryItemWriterBuilder<LectureEntity>()
.repository(lectureEntityRepository)
.methodName("save")
.build()
}
Spring Batch에서 제공해주는 RepositoryItemWriter를 이용해서 Entity 객체를 DB에 저장한다.
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing
class SpringBatchTestConfig {
}
@SpringBatchTest
@SpringBootTest(classes = [FlatFileBatchConfig::class, SpringBatchTestConfig::class])
class FlatFileBatchConfigTest {
@Autowired
lateinit var itemReader: FlatFileItemReader<Lecture>
@Autowired
lateinit var processor: ItemProcessor<Lecture, LectureEntity>
@Autowired
lateinit var itemWriter: RepositoryItemWriter<LectureEntity>
@Autowired
lateinit var lectureEntityRepository: LectureEntityRepository
@Autowired
lateinit var jobLauncherTestUtils: JobLauncherTestUtils
val lectures: List<Lecture> = listOf(...)
@Test
fun `Lecture Job E2E 테스트`() {
val execution = jobLauncherTestUtils.launchJob(defaultJobParameters())
assertThat(execution.exitStatus).isEqualTo(ExitStatus.COMPLETED)
}
@Test
fun `FlatFileItemReader 테스트`() {
val stepExecution = MetaDataInstanceFactory.createStepExecution(defaultJobParameters())
val expected = lectures.iterator()
StepScopeTestUtils.doInStepScope(stepExecution) {
var lecture: Lecture?
itemReader.open(stepExecution.executionContext)
while (itemReader.read().also { lecture = it } != null) {
assertThat(lecture).isEqualTo(expected.next())
}
itemReader.close()
}
}
@Test
fun `LectureProcessor 테스트`() {
val stepExecution = MetaDataInstanceFactory.createStepExecution(defaultJobParameters())
val lectureEntities: List<LectureEntity> = listOf(...)
StepScopeTestUtils.doInStepScope(stepExecution) {
val result = lectures.map { processor.process(it) }
// then
assertThat(result).isEqualTo(lectureEntities)
}
}
@Test
fun `LectureDBWriter 테스트`() {
val stepExecution = MetaDataInstanceFactory.createStepExecution(defaultJobParameters())
val lectureEntities: List<LectureEntity> = listOf(...)
StepScopeTestUtils.doInStepScope(stepExecution) {
itemWriter.write(Chunk(lectureEntities))
}
val result: List<LectureEntity> = lectureEntityRepository.findAll()
assertThat(result).isEqualTo(lectureEntities)
}
private fun defaultJobParameters(): JobParameters {
val paramsBuilder = JobParametersBuilder()
paramsBuilder.addString("file.input", "lectures-test.csv")
return paramsBuilder.toJobParameters()
}
}
1. 테스트 파일 생성
@SpringBatchTest : Spring Batch에서 제공하는 테스트를 위한 어노테이션으로 JobLauncherTestUtils , JobRepositoryTestUtils 등을 통해 배치 Job을 쉽게 실행하고 검증할 수 있다.@EnableAutoConfiguration : Reader객체를 Bean으로 등록한다.@EnableBatchProcessing : 배치를 실행할 때 필요한 JobLauncher, JobRepository 등과 컴포넌트들을 자동으로 설정해준다.@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing
class SpringBatchTestConfig {
}
@SpringBatchTest
@SpringBootTest(classes = [FlatFileBatchConfig::class, SpringBatchTestConfig::class])
class FlatFileBatchConfigTest
2. JobParameter 설정: 테스트 csv 파일 경로를 JobParameter로 설정한다.
private fun defaultJobParameters(): JobParameters {
val paramsBuilder = JobParametersBuilder()
paramsBuilder.addString("file.input", "lectures-test.csv")
return paramsBuilder.toJobParameters()
}
3. Job 테스트 코드 작성: JobLauncherTestUtils를 통해서 Job을 실행하고, 해당 Job이 잘 동작했는지 테스트한다.
@Autowired
lateinit var jobLauncherTestUtils: JobLauncherTestUtils
@Test
fun `Lecture Job E2E 테스트`() {
val execution = jobLauncherTestUtils.launchJob(defaultJobParameters())
assertThat(execution.exitStatus).isEqualTo(ExitStatus.COMPLETED)
}
4. Reader, Processor, Writer 테스트 코드 작성: MetaDataInstanceFactory를 통해서 stepExecution을 생성하고, 특정 Step의 reader, processor, writer를 테스트한다.
@Test
fun `FlatFileItemReader 테스트`() {
val stepExecution = MetaDataInstanceFactory.createStepExecution(defaultJobParameters())
val expected = lectures.iterator()
StepScopeTestUtils.doInStepScope(stepExecution) {
var lecture: Lecture?
itemReader.open(stepExecution.executionContext)
while (itemReader.read().also { lecture = it } != null) {
assertThat(lecture).isEqualTo(expected.next())
}
itemReader.close()
}
}
@Test
fun `LectureProcessor 테스트`() {
val stepExecution = MetaDataInstanceFactory.createStepExecution(defaultJobParameters())
val lectureEntities: List<LectureEntity> = listOf(...)
StepScopeTestUtils.doInStepScope(stepExecution) {
val result = lectures.map { processor.process(it) }
// then
assertThat(result).isEqualTo(lectureEntities)
}
}
@Test
fun `LectureDBWriter 테스트`() {
val stepExecution = MetaDataInstanceFactory.createStepExecution(defaultJobParameters())
val lectureEntities: List<LectureEntity> = listOf(...)
StepScopeTestUtils.doInStepScope(stepExecution) {
itemWriter.write(Chunk(lectureEntities))
}
val result: List<LectureEntity> = lectureEntityRepository.findAll()
assertThat(result).isEqualTo(lectureEntities)
}
Job을 만들면서 Job E2E 테스트 코드와 Reader, Writer, Processor 별로 단위 테스트를 작성해볼 수 있었다.
다음에는 DB에서 데이터를 읽고 쓸 때, Paging 하면서 데이터를 불러오는 방법에 대해 학습 해보려 한다.