ItemProcessor๋ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๊ณตํ๊ฑฐ๋ ํํฐ๋งํ๋ ์ญํ ์ด๋ค.
ItemWriter์์ ์ถฉ๋ถํ ๊ตฌํ ๊ฐ๋ฅํ๊ณ ๋ฐ๋ผ์ Processor๋ ํ์๊ฐ ์๋๋ค.
ํ์ง๋ง ์ฝ๊ธฐ/์ฒ๋ฆฌ/์ฐ๊ธฐ๋ฅผ ๋ถ๋ฆฌํ ์ ์์ด ์ฌ์ฉ์ ๊ถ์ฅํ๋ค.
ItemReader์์ ๋๊ฒจ์ค ๋ฐ์ดํฐ ๊ฐ๋ณ๊ฑด์ ๊ฐ๊ณต/์ฒ๋ฆฌํ๋ค.
๋ฐ์ดํฐ๋ฅผ ์ํ๋ ํ์
์ผ๋ก ๋ณํํ๊ฑฐ๋ ์ํ๋ ๋ฐ์ดํฐ๋ฅผ ํํฐ๋งํ์ฌ ์ ๋ฌํ ์๋ ์๋ค.
ItemProcessor
๋ ๋ค์๊ณผ ๊ฐ์ด ๊ตฌ์ฑ๋์ด ์๋ค.
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
I
O
ItemReader
๊ฐ ์ฝ์ ๋ฐ์ดํฐ๋ฅผ ItemProcessor
์ process()
๋ฅผ ํต๊ณผํ์ฌ ItemWriter
์๊ฒ ์ ๋ฌ๋๋ค.
์ฝ๋์ ์์ด ๋ง์์ง๋ฉด ๋ณ๋์ ํด๋์ค๋ก ๋ถ๋ฆฌํด์ ์ฐ๊ธฐ๋ ํ์ง๋ง ๋ณดํต์ ์ต๋ช
ํจ์๋ฅผ ์ฌ์ฉํ๋ค.
ItemReader
๋ก ์ฝ์ด์จ ๋ฐ์ดํฐ์ ํ์
์ ๋ณํ์์ผ ItemWriter
์ ์ ๋ฌํ๋ ์์ ์ด๋ค.
ProcessorConvertJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorConvertJobConfiguration {
public static final String JOB_NAME = "processorConvertBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(step())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step step() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Pay, String>chunk(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public JpaPagingItemReader<Pay> reader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT t FROM Pay t")
.build();
}
@Bean
public ItemProcessor<Pay, String> processor() {
return pay -> {
return pay.getTxName();
};
}
private ItemWriter<String> writer() {
return items -> {
for (String item: items) {
log.info("Pay TxName={}", item);
}
};
}
}
./gradlew build -x test
java -jar ./build/libs/*.jar --job.name=processorConvertBatch
Pay
ํด๋์ค๊ฐ ItemProcessor
๋ฅผ ํตํด String์ผ๋ก ์ ๋ณํ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
ItemReader
๋ก ์ฝ์ด์จ ๋ฐ์ดํฐ ์ค ํํฐ๋ง ๋ ๋ฐ์ดํฐ๋ง ItemWriter
์๊ฒ ์ ๋ฌํ๋ ์์ ์ด๋ค.
ProcessorConvertJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorNullJobConfiguration {
public static final String JOB_NAME = "processorNullBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(step())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step step() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Pay, Pay>chunk(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean(BEAN_PREFIX + "reader")
public JpaPagingItemReader<Pay> reader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT t FROM Pay t")
.build();
}
@Bean(BEAN_PREFIX + "processor")
public ItemProcessor<Pay, Pay> processor() {
return pay -> {
boolean isIgnoreTarget = pay.getAmount() > 2000L;
if(isIgnoreTarget){
log.info(">>>>>>>>> Pay txName={}, isIgnoreTarget={}", pay.getTxName(), isIgnoreTarget);
return null;
}
return pay;
};
}
private ItemWriter<Pay> writer() {
return items -> {
for (Pay item : items) {
log.info("pay txName={}", item.getTxName());
}
};
}
}
./gradlew build -x test
java -jar ./build/libs/*.jar --job.name=processorNullBatch
Pay
ํ
์ด๋ธ์ row ์ค amount๊ฐ 3000์ด์์ธ ๊ฒ์ ์ ์ธํ ๋๋จธ์ง๊ฐ ํํฐ๋ง๋์ด์ก๋ค.
ItemProcessor
๊ฐ ์ฌ๋ฌ ๊ธฐ๋ฅ์ ํฌํจํ ๊ฒฝ์ฐ๋ฅผ ์๊ฐํด๋ณด์.
ํ๋์ ItemProcessor
์ญํ ์ด ์ ์ ์ปค์ง๊ฒ ๋ ๊ฒ์ด๋ค.
์ด๋ฅผ ์ํด CompositeItemProcessor
๊ตฌํ์ฒด๊ฐ ์กด์ฌํ๋ค.
์ด๋ ItemProcessor
๊ฐ์ ์ฒด์ด๋์ ์ง์ํ๋ค.
ProcessorCompositeJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorCompositeJobConfiguration {
public static final String JOB_NAME = "processorCompositeBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(step())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step step() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Pay, String>chunk(chunkSize)
.reader(reader())
.processor(compositeProcessor())
.writer(writer())
.build();
}
@Bean(BEAN_PREFIX + "reader")
public JpaPagingItemReader<Pay> reader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT t FROM Pay t")
.build();
}
@Bean(BEAN_PREFIX + "processor")
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(processor1());
delegates.add(processor2());
CompositeItemProcessor processor = new CompositeItemProcessor<>();
processor.setDelegates(delegates);
return processor;
}
public ItemProcessor<Pay, String> processor1() {
return Pay::getTxName;
}
public ItemProcessor<String, String> processor2() {
return name -> "ํธ๋์ญ์
์ด๋ฆ์"+ name + "์
๋๋ค.";
}
private ItemWriter<String> writer() {
return items -> {
for (String item : items) {
log.info("Pay Name={}", item);
}
};
}
./gradlew build -x test
java -jar .\build\libs\demo-0.0.1-SNAPSHOT.jar --job.name=processorCompositeBatch
Pay
์์ String(txName
)์ผ๋ก ๋ณํ๋๊ณ , String์ด ๋๋ค๋ฅธ String์ผ๋ก ๋ณํ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
๋ชจ๋ ์์ค๋ ๊นํ๋ธ์ ์ฌ๋ ค๋์๋ค.
์ฐธ๊ณ ๋งํฌ: jojoldu ๋ธ๋ก๊ทธ