: Spring의 기본 @Transactional은 RDBMS 트랜잭션만 관리하고, MongoDB 트랜잭션은 별도로 관리하지 않기 때문
@Transactional // 트랜잭션 매니저를 명시하지 않으면 기본적으로 JpaTransactionManager만 적용됨
public void doSomething() {
rdbmsRepository.save(...); // 트랜잭션 관리됨
mongoRepository.save(...); // 트랜잭션 관리 안됨 (그냥 일반 커밋됨)
throw new RuntimeException(); // → RDBMS만 롤백됨
}
=> 그러므로 예외가 발생해서 던져지면, 트랜잭션 매니저에 의해 관리되는 RDBMS 작업만 롤백되는 것.
MongoDB에서 단일 문서에 대한 작업은 원자적으로 이루어집니다. 임베디드 문서와 배열을 사용하면 여러 문서와 컬렉션에 걸쳐 정규화하는 대신 단일 문서 구조에서 데이터 간의 관계를 캡처할 수 있으므로 이러한 단일 문서 원자성은 많은 실제 사용 사례에서 분산 트랜잭션의 필요성을 없애줍니다.
여러 문서 (단일 또는 여러 컬렉션)에 대한 읽기 및 쓰기의 원자성이 필요한 상황의 경우, MongoDB는 분산 트랜잭션을 지원합니다. 분산 트랜잭션을 사용하면 여러 작업, 컬렉션, 데이터베이스, 문서 및 샤드에서 트랜잭션을 사용할 수 있습니다.
출처 - https://www.mongodb.com/ko-kr/docs/manual/core/transactions/ MongoDB 공식문서
MongoDB 트랜잭션 도입 실전 가이드 참고자료 (replica set 설정 방법 등)
https://jh2021.tistory.com/24
https://oliveyoung.tech/2024-12-17/catalog-mongo-transaction-2/
SAGA 패턴 (보상 트랜잭션)
// SubscriptionService
@Transactional
public SubscriptionDto create(Long interestId, Long userId) {
Subscription subscription = new Subscription(user, updatedInterest);
subscriptionRepository.save(subscription);
SubscriptionDto subscriptionDto = subscriptionMapper.toDto(subscription);
eventPublisher.publishEvent(SubscriptionCreateEvent.builder()
.subscriptionDto(subscriptionDto)
.userId(userId)
.build());
return subscriptionDto;
}
// SubscriptionEventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void handleSubscriptionCreateEvent(SubscriptionCreateEvent subscriptionCreateEvent) {
Query query = Query.query(Criteria.where("_id").is(subscriptionCreateEvent.userId()));
Update update = new Update()
.push("subscriptions", subscriptionCreateEvent.subscriptionDto())
.set("updatedAt", LocalDateTime.now());
mongoTemplate.updateFirst(query, update, SubscriptionActivity.class);
}
// CompensationService (신규 생성)
@Service
@RequiredArgsConstructor
public class SubscriptionCompensationService {
private final SubscriptionRepository subscriptionRepository;
private final InterestRepository interestRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void rollbackSubscription(Long subscriptionId, Long interestId) {
// 1. 구독 삭제 (멱등성 보장)
if (subscriptionRepository.existsById(subscriptionId)) {
subscriptionRepository.deleteById(subscriptionId);
}
// 2. 관심사 카운트 복구
interestRepository.decrementSubscriberCount(interestId);
}
}
// SubscriptionEventListener.java
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void handleSubscriptionCreateEvent(SubscriptionCreateEvent event) {
try {
// MongoDB 활동 내역 저장
Query query = Query.query(Criteria.where("_id").is(event.userId()));
Update update = new Update()
.push("subscriptions", event.subscriptionDto())
.set("updatedAt", LocalDateTime.now());
mongoTemplate.updateFirst(query, update, SubscriptionActivity.class);
} catch (Exception e) {
// MongoDB 작업이 실패했다면 -> SAGA 보상 트랜잭션 실행
compensationService.rollbackSubscription(
event.subscriptionId(),
event.interestId()
);
log.error("활동 내역 생성 실패 및 구독 롤백 - userId={}, subscriptionId={}",
event.userId(), event.subscriptionId(), e);
}
}
// SubscriptionService
@Transactional
public SubscriptionDto create(Long interestId, Long userId) {
Subscription subscription = new Subscription(user, updatedInterest);
subscriptionRepository.save(subscription);
SubscriptionDto subscriptionDto = subscriptionMapper.toDto(subscription);
eventPublisher.publishEvent(SubscriptionCreateEvent.builder()
.subscriptionDto(subscriptionDto)
.userId(userId)
.build());
return subscriptionDto;
}
// SubscriptionEventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void handleSubscriptionCreateEvent(SubscriptionCreateEvent subscriptionCreateEvent) {
Query query = Query.query(Criteria.where("_id").is(subscriptionCreateEvent.userId()));
Update update = new Update()
.push("subscriptions", subscriptionCreateEvent.subscriptionDto())
.set("updatedAt", LocalDateTime.now());
mongoTemplate.updateFirst(query, update, SubscriptionActivity.class);
}
@Slf4j
@Service
@AllArgsConstructor
public class CommentActivityBatchService {
private final UserRepository userRepository;
private final CommentRepository commentRepository;
private final MongoTemplate mongoTemplate;
private final RetryTemplate retryTemplate;
@Transactional
public void syncAll() {
log.info("[배치 시작] 작성한 댓글 MongoDB 동기화");
List<User> users = userRepository.findAll();
List<WriteModel<Document>> operations = new ArrayList<>();
List<Long> failedUserIds = new ArrayList<>();
for (User user : users) {
try {
List<Comment> comments = commentRepository.findByUser_IdAndIsDeletedFalseOrderByCreatedAtDesc(user.getId());
List<CommentActivityDto> commentDtos = comments.stream()
.map(comment -> CommentActivityDto.builder()
.id(comment.getId())
.articleId(comment.getArticle().getId())
.articleTitle(comment.getArticle().getTitle())
.userId(user.getId())
.userNickname(user.getNickname())
.content(comment.getContent())
.likeCount(comment.getLikeCount())
.createdAt(comment.getCreatedAt())
.build())
.toList();
Document document = new Document();
document.put("_id", user.getId());
document.put("comments", commentDtos);
LocalDateTime now = LocalDateTime.now();
document.put("createdAt", now);
document.put("updatedAt", now);
Query query = Query.query(Criteria.where("_id").is(user.getId()));
ReplaceOneModel<Document> replaceModel = new ReplaceOneModel<>(
query.getQueryObject(),
document,
new ReplaceOptions().upsert(true)
);
operations.add(replaceModel);
} catch (Exception e) {
log.error("작성한 댓글 활동 동기화 실패 - userId: {}, reason: {}", user.getId(), e.getMessage(), e);
failedUserIds.add(user.getId());
}
}
if (!operations.isEmpty()) {
try {
retryTemplate.execute(context -> {
mongoTemplate.getCollection("comment_activities")
.bulkWrite(operations, new BulkWriteOptions().ordered(false));
log.info("총 {}건의 CommentActivity 문서가 bulkWrite 되었습니다.", operations.size());
return null;
});
} catch (Exception e) {
log.error("bulkWrite 재시도 실패 - 최대 재시도 횟수 초과", e);
throw new RestException(
MAX_RETRY_EXCEEDED,
Map.of(
"detail", "댓글 활동 내역 bulkWrite 최종 실패",
"skippedUserIds", failedUserIds.toString()
)
);
}
}
log.info("[배치 종료] 작성한 댓글 MongoDB 동기화 완료");
log.info("[요약] 전체 사용자 수: {}, 처리 성공 수: {}, 실패 사용자 수: {}",
users.size(), operations.size(), failedUserIds.size());
}
}