이전 데이터에 success_state 컬럼이 추가되었고 우리는 false 상태의 거래건을 모두 가져와 true로 변경해야한다.
@ToString
@Getter
@Setter
@NoArgsConstructor
public class Pay {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
private Long id;
private Long amount;
private String txName;
private LocalDateTime txDateTime;
private boolean successState;
public Pay(Long amount, String txName, String txDateTime) {
this.amount = amount;
this.txName = txName;
this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
}
public Pay(Long id, Long amount, String txName, String txDateTime) {
this.id = id;
this.amount = amount;
this.txName = txName;
this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
}
public Pay(Long id, Long amount, String txName, LocalDateTime txDateTime, boolean successState) {
this.id = id;
this.amount = amount;
this.txName = txName;
this.txDateTime = txDateTime;
this.successState = successState;
}
}
@Slf4j
@RequiredArgsConstructor
@Configuration
public class SuccessStateJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; //db
private static final int chunkSize = 10;
@Bean
public Job SuccessStateJob() throws Exception{
return jobBuilderFactory.get("successStateJob")
.start(successStateStep())
.build();
}
@Bean
public Step successStateStep() throws Exception{
return stepBuilderFactory.get("successStateStep")
.<Pay, Pay>chunk(chunkSize)
.reader(successStateReader())
.processor(successStateProcessor()) //필요할 경우 데이터 처리
.writer(successStateWriter())
.build();
}
@Bean
public ItemReader<? extends Pay> successStateReader() throws Exception {
Map<String, Object> param = new HashMap<>();
param.put("success_state","false");
return new JdbcPagingItemReaderBuilder<Pay>()
.pageSize(chunkSize)
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.queryProvider(createProvider())
.parameterValues(param)
.name("jdbcPagingItemReader")
.build();
}
@Bean
public PagingQueryProvider createProvider() throws Exception{
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setDataSource(dataSource);
provider.setSelectClause("id, amount, tx_name, tx_date_time, success_state");
provider.setFromClause("from pay");
provider.setWhereClause("where success_state = :success_state");
Map<String, Order> sortKey = new HashMap<>(1);
sortKey.put("id", Order.ASCENDING);
provider.setSortKeys(sortKey);
return provider.getObject();
}
@Bean
@StepScope
public ItemProcessor<? super Pay, ? extends Pay> successStateProcessor() { //success_state를 true 처리
return item -> {
item.setSuccessState("true");
return item;
};
}
@Bean
public JdbcBatchItemWriter<? super Pay> successStateWriter() {
return new JdbcBatchItemWriterBuilder<Pay>()
.dataSource(dataSource)
.sql("update pay set success_state = :successState where id = :id")
.beanMapped()
.build();
}
}
실행해보자
데이터가 내가 예상했던대로 변경된 것을 확인할 수 있다.