
스프링배치에서는 청크 프로세스에서 ItemWriter를 사용할 수있는데, ItemWriter에는 여러 구현체들이 존재한다.
오늘은 이중에서 MongoItemWriter를 살펴보려고 한다.
MongoItemWriter를 사용해서 청크 프로세서의 writer를 구현할 수있는데,
MongoItemWriter는 3가지 동작을 가지고 있다.
//MongoItemWriter.java
protected void doWrite(Chunk<? extends T> chunk) {
if (!CollectionUtils.isEmpty(chunk.getItems())) {
switch (this.mode) {
case INSERT:
this.insert(chunk);
break;
case REMOVE:
this.remove(chunk);
break;
default:
this.upsert(chunk);
}
}
}
위 코드는 실제 MongoItemWriter 코드중 doWrite 함수에 해당한다.
위 코드는 INSERT, REMOVE, 그리고 UPSERT로 구성되어 있는걸 확인할 수 있다.
여기에서 생각해보면 UPDATE가 존재하지 않고 UPSERT가 존재한다.
그러나... UPSERT에는 엄청난 제약이 존재한다.
아래는 MongoItemWriter에서 실제 upsert 구현이다.
private void upsert(Chunk<? extends T> chunk) {
BulkOperations bulkOperations = this.initBulkOperations(BulkMode.ORDERED, chunk.getItems().get(0));
MongoConverter mongoConverter = this.template.getConverter();
FindAndReplaceOptions upsert = (new FindAndReplaceOptions()).upsert();
Chunk.ChunkIterator var5 = chunk.iterator();
while(var5.hasNext()) {
Object item = var5.next();
Document document = new Document();
mongoConverter.write(item, document);
Object objectId = document.get("_id") != null ? document.get("_id") : new ObjectId();
Query query = (new Query()).addCriteria(Criteria.where("_id").is(objectId));
bulkOperations.replaceOne(query, document, upsert);
}
bulkOperations.execute();
}
위 코드를 잘보면.. _id, 즉 ObjectId가 존재해야만 정상적인 upsert가 가능하다.
만약 mongodb 쿼리에서 _id를 가져오지 않는다면 upsert가 불가능하다고 볼 수 있다.
_id를 사용하지 않고 upsert를 구현해주기 위해서는 다른 방법이 필요하다.
mongotemplate에서 지원해주는 upsert를 이용할 수도 있지만, 여기에도 문제가 있다.
샤딩된 테이블을 대상으로는 반드시 _id가 Query에 지정되어야 하기 때문이다.
_id가 지정되지 않으면 오류가 발생한다.(샤딩되지 않은 테이블은 문제 없음)
따라서.. _id를 사용하지 않고 upsert를 구현하기 위해서는 직접 find, update, insert를 구현해야한다.