Oracle
에서 읽은 데이터를 Mssql
로 Insert Job이 있다.
에러가 발생하지 않지 않는다면 문제가 없겠지만,
procedure
에서 Exception이 발생한다면 그 결과를 사용자가 알 수 있도록 Update해야한다.
보통 에러가 발생했을 때 양쪽 DB 모두 Rollback을 하는 것이 일반적이다.
지금 발생한 케이스는 한쪽은 Rollback 처리를 하고, 다른 한쪽은 Commit를 해야 한다.
반대로 정상 처리되었을 때는 양쪽 다 Commit을 해야한다.
양쪽 DB간 에러가 발생 했을 때 고안했던 과정을 적어볼려고 합니다.
한 Step안에서 read, process, writer
가 처리 됨으로 같은 Step에서 읽고 쓸때 하나의 Transaction으로 관리되어야 한다.
Step에서 어떤점을 수정해야하는지 알아보자
ChainedTransactionManager
는 분리된 DB에 대해 Transaction을 하나로 묶어주는 역할을 한다.
Commit
등록된 TransactionManager에 대해 역순으로 처리합니다.
만약 여러 트랜잭션에 대해 Commit
중 에러 발생 시 미 Commit에 대해서 Rollback
을 한다
해당 메소드를 사용한다고 해도 완벽한 데이터 일괄성을 보여주기는 힘듭니다.
실제로 에러가 발생 시, 방어 로직이 필요합니다.
PlatformTransactionManager chainTxManager() {
ChainedTransactionManager txManager = new ChainedTransactionManager(oracleTransactionManager, mssqlTransactionManager);
return txManager;
}
분리된 DB에 다른 query
를 사용해야 하니 Delegates를 사용해서 하나의 Writer에서 다중 쿼리를 실행할 수 있도록 했습니다.
여기서 oracleWriter
는 에러가 미 발생 시 사용하는 쿼리 입니다.
public CompositeItemWriter<dtoModel> test_Writer() {
CompositeItemWriter<dtoModel> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(oracleWriter(), mssqlWriter()));
return compositeItemWriter;
}
Exception이 발생 했을 때 Step이 종료되지 않고 이후 처리가 가능
faultTolerant
활성화 필요
processor non transaction 영어 뜻 그대로 process에 따로 기입된 트랜잭션이 없다
만약 Chunk
단위로 처리 중 writer 중 에러 발생 시, Chunk
의 처음 데이터 부터 process
가 실행하게 된다.
위 옵션을 통해 에러가 발생해도 해당 row만 제거된 상태로 writer가 실햄됨으로 process
가 다시 시작하지 않는다.
SkipListener 를 사용해서 Skip
이 일어난 데이터에 대해 후 처리를 했습니다.
이 부분이 에러가 발생 시, 에러에 대한 내용을 Update하기 위해 만들어진 로직입니다.
SkipListener는 총 3개로 나눠져 있습니다.
Skip
발생 한 원인으로 부터 listener가 호출되게 됩니다.
전 write
중 에러가 발생함으로 후 처리 로직을 기입했습니다.
public class skipListener implements SkipListener<dtoModel, dtoModel> {
@Override
public void onSkipInRead(Throwable t) {
log.info("Skipped at Read due to: {}", t.getMessage());
}
@Override
public void onSkipInWrite(dtoModel item, Throwable t) {
log.info("Item {} was skipped at Writer due to: {}", item.getInfo().get(0).getTest(), t.getMessage());
SQLException se = (SQLException) ((DataAccessException) t).getRootCause();
while (se != null) {
log.debug("SQLException SQLState is " + se.getSQLState());
log.debug("SQLException SQLErrorCode is " + se.getErrorCode());
log.debug("SQLException Message is " + se.getMessage());
log.debug("SQLException StackTrace is " + se.getStackTrace());
se = se.getNextException();
}
testService.updateFail(item, t.getMessage());
}
@Override
public void onSkipInProcess(dtoModel item, Throwable t) {
log.info("Item {} was skipped at Process due to: {}", item.getInfo().get(0).getTest(), t.getMessage());
}
}
public Step test_step() {
return stepBuilderFactory
.get("test_step")
.transactionManager(chainTxManager())
.<dtoModel, dtoModel>chunk(100)
.reader(test_Reader())
.processor(test_Processor())
.writer(test_Writer())
.faultTolerant()
.skip(Exception.class)
.skipLimit(9999)
.listener(new skipListener())
.processorNonTransactional()
.build();
}
이렇게 정상 처리되었을 때와 에러가 발생했을 때를 한쪽은 Rollback이지만 다른 한쪽은 Update를 할 수 있도록 처리했습니다.