컨퍼런스 영상에서 도움이 될만한 내용을 기록합니다.
https://youtu.be/uk5fRLUsBfk
@Service
public class CreateTaskService implements CreateTaskUserCase {
@Transactional
public CreateTaskResponse createTask(CreateTaskCommand createTaskCommand) {
Task task = createTaskCommand.toTask();
taskRepository.save(task);
eventHandler.propagate(CreateTaskEvent.of(task));
return CreateTask.of(task);
}
}
@Transactional은 우리의 코드를 proxy로 감싸서 실행하는 형태이기 때문에 이벤트를 발생시켜 REST-API가 실행되고 이후에 DB 트랜잭션이 여러 이유로 rollback 되는 상황이 발생할 수 있음.
-> 데이터는 저장이 되지 않았는데 rest-api만 호출되는 상황
이 문제를 해결하기 위해 트랜잭션 commit 이벤트를 사용할 수 있다.
이 방법을 사용해도 문제는 여전히 발생할 수 있는데, DB 트랜잭션이 성공하더라도 REST-API 호출이 실패하는 상황이 발생할 수있다.
-> 데이터만 저장이 되고 전파는 못하는 상황
@TransactionalEventListener + @Retryable 붙여 해결할 수 있다.
그런데 retry도 실패한다면?
RDB에 저장할 데이터 모델
현재 진행중인 프로젝트에서 배치 서버가 차단 사용자 캐시를 확인하도록 할건데 이 데이터 모델 참고하면 좋을듯
Transactional Outbox 패턴을 적용한 코드
@Service
public class CreateTaskService implements CreateTaskUserCase {
@Transactional -> task, event 저장을 하나의 트랜잭션에서 처리하는게 핵심
public CreateTaskResponse createTask(CreateTaskCommand createTaskCommand) {
Task task = createTaskCommand.toTask();
taskRepository.save(task);
eventRepository.save(CreateTaskEvent.of(task));
return CreateTaskResponse.of(task);
}
}
Polling Publisher 패턴을 적용한 코드
@Service
public class MessagePublisher {
@Scheduler(cron = "0/5 * * * * *")
@Transactional
public void publish() {
LocalDateTime now = LocalDateTime.now();
eventRepository.findByCreatedBefore(now, EventStatus.READY)
.stream()
.map(event -> restTemplate.execute(event))
.map(event -> event.done())
.forEach(eventRepository::save);
}
}
@Sl4j
@Component
@RequiredArgsConstructor
public class Producer {
public void sendEvent(CreateTaskEvent event) {
ListenableFuture<SendResult<String, CreateTaskEvent>> futer = kafkaTemplate.send(TOPIC_TASK, event);
futre.addCallback(
result -> log.info("offset : {}", result.getRecordMetadata().offset()), // sucess callback
throwable -> log.error("fail to publish", throwable) // failure callback
);
}
}
@FunctionalInterface
public interface AcknowledgingMessageListener<K,V> extends MessageListener<K,V> {
default void onMessage(ConsumerRecord<K,V> data) {
throw new UnsupportedOperationException("Container should never call this");
void onMessage(ConsumerRecord<K,V> var1, Acknowledgment var2);
}
@Override
@KafkaListener(
// ...
containerFactory = "kafkaListenerContainerFactory"
)
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
try {
acknowledgment.acknowledge();
} catch(Exception e) {
log.error("Error to receive messages.", e);
}
}
수동으로 ack, nack 날릴거라는 설정이 필요함
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// .....
return new DefaultKafkaConsumerFactory<>(props);
}