NHN 분산 시스템에서 데이터를 전달하는 효율적인 방법

hyng·2023년 1월 27일
0

smilegate-winter-dev-camp

목록 보기
10/15

컨퍼런스 영상에서 도움이 될만한 내용을 기록합니다.
https://youtu.be/uk5fRLUsBfk

RDB를 사용하는 애플리케이션에서 전달 방법

  1. DB 트랜잭션
  2. REST-API
@Service
public class CreateTaskService implements CreateTaskUserCase {
	@Transactional
    public CreateTaskResponse createTask(CreateTaskCommand createTaskCommand) {
    	Task task = createTaskCommand.toTask();
        taskRepository.save(task);
        eventHandler.propagate(CreateTaskEvent.of(task));
        return CreateTask.of(task);
   }
}

@Transactional은 우리의 코드를 proxy로 감싸서 실행하는 형태이기 때문에 이벤트를 발생시켜 REST-API가 실행되고 이후에 DB 트랜잭션이 여러 이유로 rollback 되는 상황이 발생할 수 있음.
-> 데이터는 저장이 되지 않았는데 rest-api만 호출되는 상황

이 문제를 해결하기 위해 트랜잭션 commit 이벤트를 사용할 수 있다.

  • @TransactionalEventListener -> event 사용
  • TransactionSynchronizationManager, TransactionSynchronization -> callback 이용

이 방법을 사용해도 문제는 여전히 발생할 수 있는데, DB 트랜잭션이 성공하더라도 REST-API 호출이 실패하는 상황이 발생할 수있다.
-> 데이터만 저장이 되고 전파는 못하는 상황

@TransactionalEventListener + @Retryable 붙여 해결할 수 있다.

그런데 retry도 실패한다면?

  • Transactional Outbox Pattern
    * RDB를 메시지큐처럼 사용하는 방법
    • 이벤트와 데이터를 모두 RDB에 저장하기 때문에 하나의 트랜잭션으로 묶어서 처리할 수 있음
  • Polling Publisher Pattern
    * 스케줄러를 띄워서 DB에 저장된 데이터를 주기적으로 폴링을 하고 api를 호출하는 방법

RDB에 저장할 데이터 모델
현재 진행중인 프로젝트에서 배치 서버가 차단 사용자 캐시를 확인하도록 할건데 이 데이터 모델 참고하면 좋을듯

Transactional Outbox 패턴을 적용한 코드

@Service
public class CreateTaskService implements CreateTaskUserCase {
	@Transactional -> task, event 저장을 하나의 트랜잭션에서 처리하는게 핵심
    public CreateTaskResponse createTask(CreateTaskCommand createTaskCommand) {
    	Task task = createTaskCommand.toTask();
        taskRepository.save(task);
        eventRepository.save(CreateTaskEvent.of(task));
        return CreateTaskResponse.of(task);
    }
}

Polling Publisher 패턴을 적용한 코드

@Service
public class MessagePublisher {
	@Scheduler(cron = "0/5 * * * * *")
    @Transactional
    public void publish() {
    	LocalDateTime now = LocalDateTime.now();
        eventRepository.findByCreatedBefore(now, EventStatus.READY)
        				.stream()
                        .map(event -> restTemplate.execute(event))
                        .map(event -> event.done())
                        .forEach(eventRepository::save);
    }
}
  • 장점
    • REST-API 환경에서 At-least-once를 구현할 수 있다.
  • 단점
    • Polling, Publisher 과정으로 의한 지연 처리 (최대 5초 이후 실행됨, 만약 처리 중 문제가 발생하면 무제한으로 연기됨 = 실시간이 중요하면 사용할 수 없음)
      • 데이터 베이스 부하
      • 데이터 베이스 비례한 처리속도

kafka를 사용한 전달 방법

@Sl4j
@Component
@RequiredArgsConstructor
public class Producer {
	public void sendEvent(CreateTaskEvent event) {
    	ListenableFuture<SendResult<String, CreateTaskEvent>> futer = kafkaTemplate.send(TOPIC_TASK, event);
        futre.addCallback(
        		result -> log.info("offset : {}", result.getRecordMetadata().offset()), // sucess callback
                throwable -> log.error("fail to publish", throwable) // failure callback
        );
   }
}
@FunctionalInterface
public interface AcknowledgingMessageListener<K,V> extends MessageListener<K,V> {
	default void onMessage(ConsumerRecord<K,V> data) {
    	throw new UnsupportedOperationException("Container should never call this");
	void onMessage(ConsumerRecord<K,V> var1, Acknowledgment var2);
}
@Override
@KafkaListener(
       // ...
       containerFactory = "kafkaListenerContainerFactory"
)
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
	try {
    	acknowledgment.acknowledge();
    } catch(Exception e) {
    	log.error("Error to receive messages.", e);
    }
}

수동으로 ack, nack 날릴거라는 설정이 필요함

@Bean
public ConsumerFactory<String, String> consumerFactory() {
	Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 // .....
 
 return new DefaultKafkaConsumerFactory<>(props);
}
profile
공부하고 알게 된 내용을 기록하는 블로그

0개의 댓글