요약
이 서비스는 매 10분마다 알림메시지를 생성해 유저들에게 보냅니다. 메시지를 생성하는 스레드와 사용자에게 메시지를 전송하는 스레드를 구분하는 게 작업도 분리되고 빠를 것이라고 생각해, 메시지큐를 사용했습니다. 메시지큐 엔진을 사용할 수도 있겠지만, 앱서버 내에 직접 빈으로 만들어 관리하는 게 서버 분리가 필요 없고 네트워킹 비용이 없고 가벼워서 좋을 것 같다 생각해서 직접 구현했습니다.
ConcurrentLinkedQueue를 이용해 상호배타적인 메시지큐 클래스를 만들었고, 메시지 컨슈머라는 객체가 스레드풀을 생성해 메시지들을 비동기적으로 병렬 처리합니다. 더 이상 처리할 메시지가 없다면 스레드풀을 제거합니다.
만약 메시지를 처리하는 도중 예외가 발생하면 MessageFailHandler라는 클래스가 처리하게 되는데, 해당 메시지를 다시 메시지큐에 넣고, 설정된 용인 횟수를 초과하면 예외로그를 출력하고 메시지 처리를 포기합니다.
이 기능들이 정상작동함을 모두 테스트로 증명되었습니다.
멀티스레드에서 동시다발적으로 발행하는 메시지를 문제 없이 처리하는 클래스들을 만들어보면서, 동시성 이슈와 멀티스레딩, 스레드풀, 모니터링 스레드에 대해 직접 경험해볼 수 있어서 좋았고, 직접 만들어보며 작은 기능들을 추가하거나 작동 구조를 개선하는 일 자체가 그냥 즐거웠습니다.
'날씨 알리미' 서비스는 매 10분마다 알림메시지를 생성해 유저들에게 보냅니다. 메시지를 생성하는 스레드와 사용자에게 메시지를 전송하는 스레드를 구분하는 게 작업도 분리되고 빠를 것이라고 생각해, 메시지큐를 사용했습니다. 메시지큐 엔진을 사용할 수도 있겠지만, 앱서버 내에 직접 빈으로 만들어 관리하는 게 서버 분리가 필요 없고 네트워킹 비용이 없고 가벼워서 좋을 것 같다 생각해서 직접 구현했습니다.
메시지큐는 큐를 필드로 하는 빈으로 만들었고, 메시지 컨슈머라는 빈이 메시지큐에서 메시지를 꺼내와 처리합니다. 메시지큐에 메시지가 있는지 매초마다 검사하는 스케줄러를 통해(@Scheduled 활용), 현재 메시지가 있고 메시지를 처리하는 스레드풀이 작동중이지 않다면, 스레드풀을 생성해 메시지큐의 메시지들을 처리합니다. 메시지를 처리하는 스레드풀을 항상 유지하지 않고, 메시지를 처리할 때만 잠깐 스레드풀을 생성하고 메시지를 다 처리하고 난 뒤엔 소멸시키는 방식을 택했습니다. 왜냐하면 이 서비스에선 메시지가 10분에 한번 대용량으로 전송되기 때문에, 그 때를 제외하고는 스레드풀이 제거되어 리소스가 다른 연산에 쓰일 수 있도록 한 것입니다.
+ 지금 생각해보니 스레드풀 자체를 만들었다 지웠다 할 필요 없이, 스레드풀의 core size를 0으로 만들어서, 작업이 없을 땐 스레드풀 크기가 0이 되게끔 하면 되지 않나 싶긴 합니다.
// 메시지가 담길 메시지큐
@Component
@ToString
@RequiredArgsConstructor
public class MessageQ {
// Thread-safe하기 위해 ConcurrentLinkedQueue 사용
private final Queue<Message> queue = new ConcurrentLinkedQueue<>();
public void add(Message noti){
queue.add(noti);
}
public Message poll(){
return queue.poll();
}
public boolean isEmpty(){
return queue.isEmpty();
}
public int size(){ return queue.size(); }
}
// 메시지큐에서 메시지를 꺼내와 처리하는 메시지 컨슈머
@Slf4j
@Component
public class MessageConsumerImpl implements MessageConsumer {
private final List<MessageHandler> messageHandlerList;
private final MessageFailHandler failHandler;
private final MessageQ messageQ;
private final int threadPoolSize = 5;
private boolean isTerminated = true;
public MessageConsumerImpl(MessageQ messageQ, List<MessageHandler> messageHandlerList, MessageFailHandler failHandler) {
this.messageQ = messageQ;
this.messageHandlerList = messageHandlerList;
this.failHandler = failHandler;
}
@Scheduled(cron = "0/1 * * * * *")
public void consume(){
log.info("메시지큐에 메시지가 있는지 확인합니다.");
if (!messageQ.isEmpty() && isTerminated)
executeThreadPool();
}
private void executeThreadPool(){
isTerminated = false;
log.info(messageQ.size() + "개의 메시지가 있습니다.");
log.info(threadPoolSize + "개의 스레드 풀을 생성해 메시지를 처리합니다.");
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
IntStream.range(0, threadPoolSize).forEach(
threadNum -> CompletableFuture.runAsync(this::process, executor));
executor.shutdown();
}
private void process() {
log.info("메시지 처리를 준비중입니다.");
Message message = null;
while ((message = messageQ.poll()) != null) {
log.info("메시지를 처리합니다.");
handleMessage(message);
}
isTerminated = true;
}
private void handleMessage(final Message message){
messageHandlerList.forEach(messageHandler -> {
try {
messageHandler.handle(message);
} catch(Exception e){
failHandler.handleFail(message, e);
}
});
}
}
메시지를 처리하는 방식은, MessageHandler 인터페이스를 구현한 모든 핸들러들의 handle(Message) 메서드에 메시지를 인자로 전달합니다. 메서드 내에선 해당 메시지 타입이 해당 핸들러가 처리해야할 타입이라고 판단되면 처리하게 됩니다.
이렇게 모든 핸들러에 일단 메시지를 전달하도록 할 수도 있고, 메시지타입과 핸들러를 매핑해 관리해서 해당 메시지타입을 처리하는 핸들러에게만 보낼수도 있겠습니다만, 전자가 더 구현하기 단순하고 매핑정보를 관리할 필요가 없는 게 간편해 그렇게 구현했습니다.
// 메시지 핸들러 인터페이스
public interface MessageHandler {
void handle(final Message message);
}
# 메시지 핸들러 예시
@Component
@Slf4j
@RequiredArgsConstructor
public class NotiMessageHandler implements MessageHandler {
private final NotiSender notiSender;
@Override
public void handle(final Message message) {
if (message.getMessageType() != MessageType.NOTI)
return;
notiSender.send((Notification) message.getContent());
log.info("메시지가 NotiMessageHandler에 의해 처리되었습니다.");
}
}
제가 만든 메시지큐 클래스의 내부엔 큐 자료구조가 쓰였는데, 동시성 이슈를 막기 위해 ConcurrentLinkedQueue를 사용했습니다. SynchronousQueue도 적용해보았지만 생성자 스레드가 전달한 메시지가 처리될 때까지 생성자 스레드가 대기하게 되는 문제도 있고, 스레드풀을 이용한 여러 개의 소비자를 큐에 연결하기 어려운 부분이 있었고 성능도 느려서 ConcurrentLinkedQueue를 사용해서 상호배타적으로 큐를 이용하는 방식을 사용했습니다. 멀티스레드에서 동시다발적으로 메시지가 생성되었을 때 동시성 문제없이 처리함이 테스트 로 증명되었습니다.
만약 메시지를 처리하는 도중 예외가 발생하면 MessageFailHandler라는 클래스가 처리하게 되는데, 해당 메시지를 다시 메시지큐에 넣고, 설정된 용인 횟수를 초과하면 예외로그를 출력하고 메시지 처리를 포기합니다. 이 역시 테스트 되었습니다.
// MessageFailHandler
@RequiredArgsConstructor
@Slf4j
@Component
public class MessageFailHandler {
private final MessageQ messageQ;
public void handleFail(Message message, Exception e){
message.addException(e);
message.increaseFailCount();
if(message.getFailCount() >= message.getTolerance())
handleTooManyFails(message);
else
messageQ.add(message);
}
private void handleTooManyFails(Message message){
log.info("최대 실패 횟수로 지정한 " + message.getTolerance() + "회를 초과했습니다.");
log.info("예외 리스트---------------------------------------");
List<Exception> exceptionList = message.getExceptionList();
log.info("실패한 메시지 : " + message);
for(int i = 0; i < message.getExceptionList().size(); i++){
Exception e = exceptionList.get(i);
e.printStackTrace();
log.info("----------------------------------");
}
log.info("예외 리스트 끝 -----------------------------------");
}
}
멀티스레드에서 동시다발적으로 발행하는 메시지를 문제 없이 처리하는 클래스들을 만들어보면서, 동시성 이슈와 멀티스레딩, 스레드풀, 모니터링 스레드에 대해 직접 경험해볼 수 있어서 좋았고, 직접 만들어보며 작은 기능들을 추가하거나 작동 구조를 개선하는 일 자체가 그냥 즐거웠습니다.