실무에서 consumer 어플리케이션을 배포할 때마다 에러가 발생해왔다.
일부 데이터의 누락도 발생했지만, 중요하지 않은 데이터이며
데이터 정합성 검증 배치 job + 보정하는 api를 만들어두었기에
크게 신경은 쓰지 않고 있었다.
그래도 땜빵(?) 보다는 근본적으로 문제를 해결해보자.
참고로 Spring-kafka 를 사용하면 graceful-shutdown 옵션으로 간단하게 되는걸로 알지만
Reactor-kafka-listener 를 사용하면 별도로 graceful-shutdown 을 구현해야 하는걸로 안다 ㅠㅠ
이번 포스팅에서는 spring의 동작 과정을 중심으로 아래 주제를 다뤄본다.
어플리케이션을 종료할 때
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.zookeeper.discovery.ZookeeperDiscoveryClientConfiguration': Initialization of bean failed; nested exception is org.springframework.boot.context.properties.ConfigurationPropertiesBindException: Error creating bean with name 'zookeeperDependencies': Could not bind properties to 'ZookeeperDependencies' : prefix=spring.cloud.zookeeper, ignoreInvalidFields=false, ignoreUnknownFields=true; nested exception is java.lang.IllegalStateException: org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@4b80d15e has been closed already
Caused by: org.springframework.data.mongodb.ClientSessionException: state should be: open; nested exception is java.lang.IllegalStateException: state should be: open
Caused by: java.lang.IllegalStateException: state should be: open
등등...
기존에 사용하던 코드는 아래와 같다.
override fun subscribe(topicName: String, groupId: String) {
configuration.kafkaReceiver(topicName, groupId)
.receive()
.subscribe {
boundedElasticScope.launch { process(it, groupId) }
}
}
KafkaReceiver 를 만들고, 토픽이 들어올 때마다 처리하는 코드이다.
어플리케이션이 종료되기 전에 실행되는 @PreDestroy 어노테이션을 통해
실행중인 job이 완료될 때까지 기다려본다.
private lateinit var consumeCoroutineJob: Job
override fun subscribe(topicName: String, groupId: String) {
consumeCoroutineJob = configuration.kafkaReceiver(topicName, groupId)
.receive()
.asFlow()
.collect {
withContext(NonCancellable) { process(it, groupId) }
}
}
@PreDestroy
fun onApplicationEvent() {
runBlocking {
consumeCoroutineJob.cancelAndJoin()
}
}
아쉽게도 위 방법으로는 해결되지 않았다. (처음 에러가 그대로 발생한다.)
그래서 로깅을 해보기로 한다.
private lateinit var consumeCoroutineJob: Job
override fun subscribe(topicName: String, groupId: String) {
consumeCoroutineJob = configuration.kafkaReceiver(topicName, groupId)
.receive()
.asFlow()
.collect {
withContext(NonCancellable) { process(it, groupId) }
}
}
@PreDestroy
fun onApplicationEvent() {
logger.info("close 탔당~!")
runBlocking {
consumeCoroutineJob.cancelAndJoin()
}
logger.info("close 끝났당~!")
}
10:48:59.527 [restartedMain-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x1068162a2c2eb1e
10:48:59.527 [Thread-29] INFO org.apache.zookeeper.ZooKeeper - Session: 0x1068162a2c2eb1e closed
10:48:59.548 [Thread-29] INFO c.n.o.c.l.s.StatisticSubscriber - close 탔당~!
10:49:11.506 [FeignDispatcher2] WARN o.s.c.a.AnnotationConfigApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'zookeeperDiscoveryClientServiceInstanceListSupplier' defined...
10:49:11.945 [Thread-29] INFO c.n.o.c.l.s.StatisticSubscriber - close 끝났당~!
10:49:11.962 [Thread-29] INFO c.n.o.c.l.ordercs.OrderCsSubscriber - close 탔당~!
10:49:11.962 [Thread-29] INFO c.n.o.c.l.ordercs.OrderCsSubscriber - close 끝났당~!
10:49:11.963 [Thread-29] INFO c.n.o.c.l.mall.MallCreatedSubscriber - close 탔당~!
10:49:11.967 [Thread-29] INFO c.n.o.c.l.mall.MallCreatedSubscriber - close 끝났당~!
...
로그를 살펴보면, PreDestroy 로직을 타기도 전에 Zookeeper가 먼저 떨어진 것을 알 수 있다.
그 외에도, bean을 만들 수 없다는 로그도 함께 보인다.
그래서, 처리하는 로직 내 feign (internal API) 호출 은 다 실패한다.
Zookeeper와 다른 bean들은 왜 떨어질까?
@PreDestroy 의 어노테이션의 동작 시점에서 정답을 알 수 있다.
The PreDestroy annotation is used on a method as a callback notification to signal that the instance is in the process of being removed by the container.
즉, @PreDestroy는 실행중인 인스턴스가 종료될 때
실행된다.
하지만, zookeeper는 이보다 이전 시점
에 떨어진다.
우리 서비스는, 인스턴스를 종료할 때 actuator의 shutdown 명령어로 종료한다.
그래서 나도 테스트 환경에서 curl -X POST localhost:{PORT}/actuator/shutdown
를 사용하여 '실제 종료되는것 처럼' 종료했다.
private void performShutdown() {
try {
Thread.sleep(500L);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
this.context.close();
}
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
protected void doClose() {
// Check whether an actual close attempt is necessary...
if (this.active.get() && this.closed.compareAndSet(false, true)) {
... 중략
try {
// Publish shutdown event.
publishEvent(new ContextClosedEvent(this));
}
catch (Throwable ex) {
logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
}
... 중략
// Destroy all cached singletons in the context's BeanFactory.
destroyBeans();
... 중략
}
}
public void destroySingletons() {
... 중략
String[] disposableBeanNames;
synchronized (this.disposableBeans) {
disposableBeanNames = StringUtils.toStringArray(this.disposableBeans.keySet());
}
for (int i = disposableBeanNames.length - 1; i >= 0; i--) {
destroySingleton(disposableBeanNames[i]);
}
... 중략
}
중간중간 생략된 과정도 많지만, 정리해보자면 아래와 같다.
2번에서 살펴본 내용에서, Zookeeper Bean 해제 > @PreDestroy 순서
로 동작했기 때문에, 에러가 발생했음을 알 수 있다.
그렇다면, Zookeeper Bean 해제보다 먼저 동작
시키면서 해결할 수 있다.
@PreDestroy 대신 @EventListener(ContextClosedEvent::class) 를 사용해본다.
@EventListener(ContextClosedEvent::class)
fun onApplicationEvent(){
logger.info("close 탔당~!")
runBlocking {
consumeCoroutineJob.cancelAndJoin()
}
logger.info("close 끝났당~!")
}
작업중이던 Job이 모두 완료된 후, zookeeper가 bean에서 빠진다.
우리파트에는 (아마 다른 파트에서 사용하지 않을?) 특이한 Kafka Listener 가 있다.
바로 Blocking으로 동작하는 (ReactorKafkaListener를 사용하는) BlockingKafkaListener 이다.
특정 상황에서 통계/정산이 중복으로 쌓이는 이슈가 있어서,
중복 방지 로직을 적용하기 위해 사용한다.
코드는 아까와 비슷한데, runBlocking으로 둘러쌓여있다.
private lateinit var consumeCoroutineJob: Job
override fun subscribe(topicName: String, groupId: String) {
consumeCoroutineJob = runBlocking {
launch {
configuration.kafkaReceiver(topicName, groupId)
.receive()
.subscribe { runBlocking { withContext(NonCancellable) { process(it, groupId) } } }
}
}
}
@EventListener(ContextClosedEvent::class)
fun onApplicationEvent() {
runBlocking {
processJob.cancelAndJoin()
}
}
얘는 아무리 EventListener를 써도 cancelAndJoin()이 동작하지 않았다.
그래서. 현재 상태를 찍는 로그를 추가해본다.
@EventListener(ContextClosedEvent::class)
fun onApplicationEvent() {
logger.info("Blocking : isCompleted : ${processJob.isCompleted} isCancelled : ${processJob.isCancelled} isActive : ${processJob.isActive}")
runBlocking {
processJob.cancelAndJoin()
}
logger.info("Blocking : isCompleted : ${processJob.isCompleted} isCancelled : ${processJob.isCancelled} isActive : ${processJob.isActive}")
}
비교군으로, ReactiveKafkaListener에도 로깅을 추가했다.
@EventListener(ContextClosedEvent::class)
fun onApplicationEvent() {
logger.info("Reactive : isCompleted : ${consumeCoroutineJob.isCompleted} isCancelled : ${consumeCoroutineJob.isCancelled} isActive : ${consumeCoroutineJob.isActive}")
runBlocking {
consumeCoroutineJob.cancelAndJoin()
}
logger.info("Reactive : isCompleted : ${consumeCoroutineJob.isCompleted} isCancelled : ${consumeCoroutineJob.isCancelled} isActive : ${consumeCoroutineJob.isActive}")
}
그리고 놀라운 로그(?) 를 확인할 수 있었다.
12:27:43.778 [Thread-29] INFO c.n.o.c.l.{non-blocking-class-name} - Reactive : isCompleted : false isCancelled : false isActive : true
12:27:43.792 [Thread-29] INFO c.n.o.c.l.{non-blocking-class-name} - Reactive : isCompleted : true isCancelled : true isActive : false
12:27:43.795 [Thread-29] INFO c.n.o.c.l.{blocking-class-name} - Blocking : isCompleted : true isCancelled : false isActive : false
12:27:43.795 [Thread-29] INFO c.n.o.c.l.{blocking-class-name} - Blocking : isCompleted : true isCancelled : false isActive : false
Reactive는 기본 isActive 상태에, isCompleted가 false인 반면
Blocking은 항상 isCompleted 상태에 isActive가 false였다.
그래서 cancelAndJoin() 에서 join 이 안되고 있었다.
runBlocking을 사용하는 대신, 1개의 thread를 갖는 dispatcher를 새로 만들어본다.
그러면 runBlocking이랑 동일하게 동작하지 않을까?
(아직까지는 이슈가 없다,,)
private val SINGLE_THREAD_DISPATCHER = "SINGLE_THREAD_DISPATCHER"
override fun subscribe(topicName: String, groupId: String) {
processJob = CoroutineScope(newSingleThreadContext(SINGLE_THREAD_DISPATCHER)).launch {
configuration.kafkaReceiver(topicName, groupId)
.receive()
.asFlow()
.collect { withContext(NonCancellable) { process(it, groupId) } }
}
}
(거짓말 조금 보태서) 100번의 디버깅으로 삽질 ㅠㅠ