AWS SQS start, stop with 스프링 부트 3.0

1

back-end

목록 보기
10/18

회사에서 프로젝트를 하다가 Spring Boot 버전을 2.7에서 3.2로 올리면서 SQS 버전도 같이 올렸다.


SQS 버전이 올라가면서 기존에 SimpleMessageListenerContainer에서 SqsMessageListenerContainerFactory로 변경됐고, 변경에 따른 이슈가 생겼다.

기존 구현사항

Switch로 Queue를 일괄 Start, Stop하는 로직이 있었는데
기존에는 SimpleMessageListenerContainer를 주입받아 simpleMessageListenerContainer.start(),
simpleMessageListenerContainer.stop() 을 해주면 등록된 모든 queue에 대해 일괄 start, stop이 됐다.

private final SimpleMessageListenerContainer simplemessageListenerContainer;

public void start() {
	//SqsListener Annotation으로 등록된 모든 queue가 시작
	if(!simplemessageListenerContainer.isRunning()){
    	simplemessageListenerContainer.start();
    }
}

public void stop() {
	//SqsListener Annotation으로 등록된 모든 queue가 중지
	if(simplemessageListenerContainer.isRunning()){
    	simplemessageListenerContainer.stop();
    }
}

문제 인식

하지만, SqsMessageListenerContainerFactory 에서는 start(), stop() 메서드가 사라져있었다..
그 대신 SqsMessageListenerContainer에 start(), stop() 메서드가 있었고 해결의 실마리를 찾은 듯 했다.

문제 해결 1

Bean으로 SqsMessageListenerContainer를 등록해서 기존 로직 그대로 사용하면 되지 않을까 싶어서 구현해봤다.

//config
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient){
	return SqsMessageListenerContainerFactory
    .builder()
    .. 중략 
    .messageLisner((message) -> 로직 )
    .configure(... 중략)
    .build();
}

@Bean
public SqsMessageListenerContainer<Object> sqsMessageListenerContainer(SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory){
	return defaultSqsListenerContainerFactory.createContainer(...queue 이름 삽입);
}

SqsMessageListenerContainerFactory에서 createContainer라는 메서드를 사용하면 되는데, 이때 SqsMessageListenerContainerFactory를 만들 때 messageListener가 없으면 에러가 나서 추가해줬다.


private final SqsMessageListenerContainer<Object> sqsMessageListenerContainer;

public void start() {
	//SqsListener Annotation으로 등록된 모든 queue가 시작
	if(!sqsMessageListenerContainer.isRunning()){
    	sqsMessageListenerContainer.start();
    }
}

public void stop() {
	//SqsListener Annotation으로 등록된 모든 queue가 중지
	if(sqsMessageListenerContainer.isRunning()){
    	sqsMessageListenerContainer.stop();
    }
}

@SqsListener로 등록되어있는 queue이름을 전부 넣어주고 start, stop을 하면 기존 로직 그대로 사용할 수 있을 줄 알았다...

하지만 저기서 두가지 문제점이 있었다.

  1. SqsMessageListenerContainerFactory 에 messageLisner를 달면 기존 @SqsListener로 데이터를 받지 못하고 messageListener안에 있는 메서드에 데이터가 들어간다.. 즉 모든 queue의 데이터를 저기서 처리하게 된다.
  2. createContainer로 queue이름을 넣었었는데 @SqsListener로 연결된 queue와 별개로 생성됐고, 그 queue만 start, stop 가능했다.

즉 이 방법으로는 문제 해결이 사실상 불가능했다...

문제 해결 2

해결하기 위해 @SqsListener로 등록되는 queue는 어디서 관리하는지 정말 열심히 찾아봤다..
Spring Bean에 등록되나해서 Bean도 찾아보고 수단을 가리지 않고 내부 코드도 들어가서 동작 로직에 대해서 들여다봤다.

그러다 우연히 공식 문서에서 해결 실마리를 찾게 됐다.

MessageListenerContainer에서 container를 id값으로 찾아서 start, stop하는 방법이 있었다!

또한, @SqsListener 내부에도 id를 등록하는게 있었다!
해서 바로 시도해봤다.


@Autowired
MessageListenerContainerRegistry registry;

List<MessageListenerContainer<?>> getMessageListener(boolean isRunning){
	List<MessageListenerContainer<?>> containers = new ArrayList<>();
    // QueueNames 라는 클래스에 queue 이름을 등록 해줘야한다..
    for(String id : QueueNames.getQueue()){
    	MessageListenerContainer<?> container = registry.getContainerById(id);
        // Start, Stop 함수를 돌리기 전에 isRunning 된 queue만 가져오기 위해 로직을 추가했다.
        if (container != null && container.isRunning == isRunning) {
        	containers.add(container);
        }
    }
    return containers;
}

public void start() {
	//SqsListener Annotation으로 등록된 모든 queue가 시작
    // isRunning = false 인 queue만 가져와서 start
	List<MessageListenerContainer<?>> messageListeners = getMessageListener(false);
    for (MessageListenerContainer<?> messageListener : messageListeners) {
    	messageListener.start();
    }
}

public void stop() {
	//SqsListener Annotation으로 등록된 모든 queue가 중지
    // isRunning = true 인 queue만 가져와서 stop
	List<MessageListenerContainer<?>> messageListeners = getMessageListener(true);
    for (MessageListenerContainer<?> messageListener : messageListeners) {
    	messageListener.start();
    }
}

// SqsListener를 등록할 때, 같은 이름으로 id도 등록해줘야 한다.
@SqsListener(value = QueueName, id = QueueName)

이렇게 구현하니 내부적으로 등록되던 queue들이 id값으로 식별이 가능해지고, MessageListenerContainer에서 제어가 가능해졌다.

결과

getMessageListener 메서드 로직이 지저분하고, @SqsListener를 사용할 때 반드시 id를 등록해줘야 하지만 Annotation으로 등록된 queue를 일괄 제어할 수 있다는 점에서 굉장히 효율적인 방법임에 틀림 없다 (틀림 없다고 말하고 싶다)

일하면서 틈틈이 2~3일 정도 들여다 본 문제라 해결하고 나서 굉장히 기분이 좋았다.
래퍼런스가 많이 없어서 힘들었지만 다른 사람들은 나와 같은 고생은 안했으면 좋겠다

공식문서를 들여다보는 습관을 들여야겠다고 생각했고, 무엇인가 안되면 라이브러리 내부 클래스에 접근해서 그 동작 로직을 들여다보는 습관도 좋은 것 같다

--끝--

0개의 댓글