[Spring, ReactorKafka, Coroutine] 어플리케이션 종료 시 zookeeper connection 이슈 해결

조갱·2023년 2월 12일
0

이슈 해결

목록 보기
10/15
post-thumbnail

실무에서 consumer 어플리케이션을 배포할 때마다 에러가 발생해왔다.

일부 데이터의 누락도 발생했지만, 중요하지 않은 데이터이며
데이터 정합성 검증 배치 job + 보정하는 api를 만들어두었기에
크게 신경은 쓰지 않고 있었다.

그래도 땜빵(?) 보다는 근본적으로 문제를 해결해보자.

참고로 Spring-kafka 를 사용하면 graceful-shutdown 옵션으로 간단하게 되는걸로 알지만
Reactor-kafka-listener 를 사용하면 별도로 graceful-shutdown 을 구현해야 하는걸로 안다 ㅠㅠ

이번 포스팅에서는 spring의 동작 과정을 중심으로 아래 주제를 다뤄본다.

  1. 어떤 에러가 발생했는가?
  2. 첫 번째 시도 (실패 사례)
    Q. 왜 해결이 되지 않았는가?
  3. 두 번째 시도 (성공 사례)
  4. *우리파트에서 사용하는 (특이한?) Kafka Listener

1. 어떤 에러가 발생했는가?

어플리케이션을 종료할 때

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.zookeeper.discovery.ZookeeperDiscoveryClientConfiguration': Initialization of bean failed; nested exception is org.springframework.boot.context.properties.ConfigurationPropertiesBindException: Error creating bean with name 'zookeeperDependencies': Could not bind properties to 'ZookeeperDependencies' : prefix=spring.cloud.zookeeper, ignoreInvalidFields=false, ignoreUnknownFields=true; nested exception is java.lang.IllegalStateException: org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@4b80d15e has been closed already

Caused by: org.springframework.data.mongodb.ClientSessionException: state should be: open; nested exception is java.lang.IllegalStateException: state should be: open

Caused by: java.lang.IllegalStateException: state should be: open

등등...

2. 첫 번째 시도 (실패 사례)

기존에 사용하던 코드는 아래와 같다.

override fun subscribe(topicName: String, groupId: String) {
	configuration.kafkaReceiver(topicName, groupId)
    	.receive()
	    .subscribe {
			boundedElasticScope.launch { process(it, groupId) }
		}
}

KafkaReceiver 를 만들고, 토픽이 들어올 때마다 처리하는 코드이다.

어플리케이션이 종료되기 전에 실행되는 @PreDestroy 어노테이션을 통해
실행중인 job이 완료될 때까지 기다려본다.

private lateinit var consumeCoroutineJob: Job

override fun subscribe(topicName: String, groupId: String) {
	consumeCoroutineJob = configuration.kafkaReceiver(topicName, groupId)
	    .receive()
	    .asFlow()    
	    .collect {
			withContext(NonCancellable) { process(it, groupId) }
		}
}

@PreDestroy
fun onApplicationEvent() {
    runBlocking {
        consumeCoroutineJob.cancelAndJoin()
    }
}

2-Q. 왜 해결이 되지 않았는가?

아쉽게도 위 방법으로는 해결되지 않았다. (처음 에러가 그대로 발생한다.)
그래서 로깅을 해보기로 한다.

private lateinit var consumeCoroutineJob: Job

override fun subscribe(topicName: String, groupId: String) {
	consumeCoroutineJob = configuration.kafkaReceiver(topicName, groupId)
    .receive()
    .asFlow()    
    .collect {
		withContext(NonCancellable) { process(it, groupId) }
	}
}

@PreDestroy
fun onApplicationEvent() {
	logger.info("close 탔당~!")
    runBlocking {
        consumeCoroutineJob.cancelAndJoin()
    }
    logger.info("close 끝났당~!")
}
10:48:59.527 [restartedMain-EventThread] INFO  org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x1068162a2c2eb1e
10:48:59.527 [Thread-29] INFO  org.apache.zookeeper.ZooKeeper - Session: 0x1068162a2c2eb1e closed
10:48:59.548 [Thread-29] INFO  c.n.o.c.l.s.StatisticSubscriber - close 탔당~!
10:49:11.506 [FeignDispatcher2] WARN  o.s.c.a.AnnotationConfigApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'zookeeperDiscoveryClientServiceInstanceListSupplier' defined...
10:49:11.945 [Thread-29] INFO  c.n.o.c.l.s.StatisticSubscriber - close 끝났당~!
10:49:11.962 [Thread-29] INFO  c.n.o.c.l.ordercs.OrderCsSubscriber - close 탔당~!
10:49:11.962 [Thread-29] INFO  c.n.o.c.l.ordercs.OrderCsSubscriber - close 끝났당~!
10:49:11.963 [Thread-29] INFO  c.n.o.c.l.mall.MallCreatedSubscriber - close 탔당~!
10:49:11.967 [Thread-29] INFO  c.n.o.c.l.mall.MallCreatedSubscriber - close 끝났당~!
...

잘 가 Zookeeper~~

로그를 살펴보면, PreDestroy 로직을 타기도 전에 Zookeeper가 먼저 떨어진 것을 알 수 있다.
그 외에도, bean을 만들 수 없다는 로그도 함께 보인다.

그래서, 처리하는 로직 내 feign (internal API) 호출 은 다 실패한다.

왜 가는거야?

Zookeeper와 다른 bean들은 왜 떨어질까?
@PreDestroy 의 어노테이션의 동작 시점에서 정답을 알 수 있다.

The PreDestroy annotation is used on a method as a callback notification to signal that the instance is in the process of being removed by the container.

즉, @PreDestroy는 실행중인 인스턴스가 종료될 때 실행된다.
하지만, zookeeper는 이보다 이전 시점에 떨어진다.

Application 종료 과정

우리 서비스는, 인스턴스를 종료할 때 actuator의 shutdown 명령어로 종료한다.
그래서 나도 테스트 환경에서 curl -X POST localhost:{PORT}/actuator/shutdown 를 사용하여 '실제 종료되는것 처럼' 종료했다.

  1. [actuator] - ShutdownEndpoint.java > performShutdown()
private void performShutdown() {
    try {
        Thread.sleep(500L);
    }
    catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
    this.context.close();
}
  1. [spring-context] - SimpleApplicationEventMulticaster.java > multicastEvent()
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    Executor executor = getTaskExecutor();
    for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
        if (executor != null) {
            executor.execute(() -> invokeListener(listener, event));
        }
        else {
            invokeListener(listener, event);
        }
    }
}
  1. [spring-context] - AbstractApplicationContext.java > doClose()
protected void doClose() {
    // Check whether an actual close attempt is necessary...
    if (this.active.get() && this.closed.compareAndSet(false, true)) {
        ... 중략
        try {
            // Publish shutdown event.
            publishEvent(new ContextClosedEvent(this));
        }
        catch (Throwable ex) {
            logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
        }

        ... 중략
        // Destroy all cached singletons in the context's BeanFactory.
        destroyBeans();

        ... 중략
    }
}
  1. [spring-beans] - DefaultSingletonBeanRegistry.java > destroySingletons()
public void destroySingletons() {
    ... 중략
    String[] disposableBeanNames;
    synchronized (this.disposableBeans) {
        disposableBeanNames = StringUtils.toStringArray(this.disposableBeans.keySet());
    }
    for (int i = disposableBeanNames.length - 1; i >= 0; i--) {
        destroySingleton(disposableBeanNames[i]);
    }
    ... 중략
}
  1. ReactiveKafkaListener 빈이 빠지면서, @PreDestroy 가 실행된다.

중간중간 생략된 과정도 많지만, 정리해보자면 아래와 같다.

  1. Endpoint 에서 shutdown 명령을 받고, context 를 종료시킨다.
  2. 등록된 Event를 수행한다.
  3. ContextClosedEvent 수행 후, Bean 을 해제한다.
  4. 등록된 빈들을 해제한다.
  5. ReactiveKafkaListener 빈이 빠지면서, @PreDestroy 가 실행된다.

3. 두 번째 시도 (성공 사례)

2번에서 살펴본 내용에서, Zookeeper Bean 해제 > @PreDestroy 순서로 동작했기 때문에, 에러가 발생했음을 알 수 있다.
그렇다면, Zookeeper Bean 해제보다 먼저 동작시키면서 해결할 수 있다.

@PreDestroy 대신 @EventListener(ContextClosedEvent::class) 를 사용해본다.

@EventListener(ContextClosedEvent::class)
fun onApplicationEvent(){
    logger.info("close 탔당~!")
    runBlocking {
        consumeCoroutineJob.cancelAndJoin()
    }
    logger.info("close 끝났당~!")
}

작업중이던 Job이 모두 완료된 후, zookeeper가 bean에서 빠진다.

4. 우리파트에서 사용하는 (특이한?) Kafka Listener

우리파트에는 (아마 다른 파트에서 사용하지 않을?) 특이한 Kafka Listener 가 있다.
바로 Blocking으로 동작하는 (ReactorKafkaListener를 사용하는) BlockingKafkaListener 이다.

특정 상황에서 통계/정산이 중복으로 쌓이는 이슈가 있어서,
중복 방지 로직을 적용하기 위해 사용한다.

코드는 아까와 비슷한데, runBlocking으로 둘러쌓여있다.

private lateinit var consumeCoroutineJob: Job

override fun subscribe(topicName: String, groupId: String) {
    consumeCoroutineJob = runBlocking {
        launch {
            configuration.kafkaReceiver(topicName, groupId)
                .receive()
                .subscribe { runBlocking { withContext(NonCancellable) { process(it, groupId) } } }
        }
    }
}

@EventListener(ContextClosedEvent::class)
fun onApplicationEvent() {
    runBlocking {
        processJob.cancelAndJoin()
    }
}

얘는 아무리 EventListener를 써도 cancelAndJoin()이 동작하지 않았다.
그래서. 현재 상태를 찍는 로그를 추가해본다.

@EventListener(ContextClosedEvent::class)
fun onApplicationEvent() {
    logger.info("Blocking : isCompleted : ${processJob.isCompleted} isCancelled : ${processJob.isCancelled} isActive : ${processJob.isActive}")
    runBlocking {
        processJob.cancelAndJoin()
    }
    logger.info("Blocking : isCompleted : ${processJob.isCompleted} isCancelled : ${processJob.isCancelled} isActive : ${processJob.isActive}")
}

비교군으로, ReactiveKafkaListener에도 로깅을 추가했다.

@EventListener(ContextClosedEvent::class)
fun onApplicationEvent() {
    logger.info("Reactive : isCompleted : ${consumeCoroutineJob.isCompleted} isCancelled : ${consumeCoroutineJob.isCancelled} isActive : ${consumeCoroutineJob.isActive}")
    runBlocking {
        consumeCoroutineJob.cancelAndJoin()
    }
    logger.info("Reactive : isCompleted : ${consumeCoroutineJob.isCompleted} isCancelled : ${consumeCoroutineJob.isCancelled} isActive : ${consumeCoroutineJob.isActive}")
}

그리고 놀라운 로그(?) 를 확인할 수 있었다.

12:27:43.778 [Thread-29] INFO  c.n.o.c.l.{non-blocking-class-name} - Reactive : isCompleted : false isCancelled : false isActive : true
12:27:43.792 [Thread-29] INFO  c.n.o.c.l.{non-blocking-class-name} - Reactive : isCompleted : true isCancelled : true isActive : false
12:27:43.795 [Thread-29] INFO  c.n.o.c.l.{blocking-class-name} - Blocking : isCompleted : true isCancelled : false isActive : false
12:27:43.795 [Thread-29] INFO  c.n.o.c.l.{blocking-class-name} - Blocking : isCompleted : true isCancelled : false isActive : false

Reactive는 기본 isActive 상태에, isCompleted가 false인 반면
Blocking은 항상 isCompleted 상태에 isActive가 false였다.

그래서 cancelAndJoin() 에서 join 이 안되고 있었다.

그래가지고

runBlocking을 사용하는 대신, 1개의 thread를 갖는 dispatcher를 새로 만들어본다.
그러면 runBlocking이랑 동일하게 동작하지 않을까?
(아직까지는 이슈가 없다,,)

private val SINGLE_THREAD_DISPATCHER = "SINGLE_THREAD_DISPATCHER"

override fun subscribe(topicName: String, groupId: String) {
    processJob = CoroutineScope(newSingleThreadContext(SINGLE_THREAD_DISPATCHER)).launch {
        configuration.kafkaReceiver(topicName, groupId)
            .receive()
            .asFlow()
            .collect { withContext(NonCancellable) { process(it, groupId) } }
    }
}

출처

(거짓말 조금 보태서) 100번의 디버깅으로 삽질 ㅠㅠ

profile
A fast learner.

0개의 댓글