Kotlin에서 DynamoDb SDK의 AttributeConverter 사용 시 발생했던 문제점 분석

KIYOUNG KWON·2023년 10월 3일
0

문제발생

새로운 배포를 진행 후 Spring Boot Application 서버가 몇 분뒤에 응답을 할 수 없는 상태가 발생하였다. stage단계에서 간단한 기능 테스트에선 문제가 없었는데 배포 후에 요청이 어느정도 발생하고 나면 문제가 발생하였던 것이다. 신규로 나간 기능 중에서 DynamoDb에서 조회를 하는 부분이 있었고 이 부분에서 문제가 발생하고 있는 것이 아닌가로 분석을 시작하였다.

문제원인

결론적으로 DynamoDb에서 조회를 하는 부분에서 발생한 것은 틀림이 없는 상황이었는데 이상한 것은 예외가 Sentry로 넘어오지 않는다는 것이었다. 롤백을 진행하면 프로세스가 종료되면서 한번에 여러개의 InterruptedException이 발생하였다. InterruptedException은 Thread를 종료하고 싶은 경우 발생을 시키는 것인데 프로세스가 종료되는 경우에 한번에 발생한 다는 것은 프로세스를 종료하는 시점까지 살아있던 Thread가 존재한다는 것이다.

Thread가 살아있다는 것은 결국 request에 의해 생성된 connection 혹은 thread가 살아있단 의미이고 해당 리소스가 쌓이면서 서버가 응답할 수 없는 상태가 된 것으로 추측하였다. 그렇다면 thread가 살아서 exception을 뱉지도 못하는 상황이 된 것은 왜일까..

이번 배포에 DynamoDbAsyncClient를 사용하여 조회를 수행하였는데 여기서 문제가 발생한 것으로 가정하고 원인을 파악해보기 시작하였다.

//DynamoDBTable의 AttributeConverter 구현체 코드 일부 예시
override fun transformTo(input: AttributeValue): MyData {
	return mapToObject(it.m(), MyData::class)
}

private fun <T : Any> mapToObject(map: Map<String, AttributeValue>, clazz: KClass<T>): T {
    val constructor = clazz.constructors.first()

    val args = constructor
        .parameters.associateWith { transformAttribute(map[it.name]) }

    return constructor.callBy(args)
}

배포가 진행되었던 코드에 위와 같은 코드가 존재하였는데 DynamoDB의 map을 reflection을 활용해서 컨버팅을 해주었다. 로컬에서 테스트를 진행해본 결과 reflection에서 예외가 발생 시 thread에서 응답이 오지 않고 무한대기를 하고 있었다.

예외디깅

DynamoDbAsyncClient의 내부를 살펴보니 CompletableFuture로 해당 비동기 블럭을 처리하고 join을 호출하고 있었다. 그러면 CompletableFuture의 reference에서 join 부분을 살펴보면 Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally라고 되어있다. 여기서 이상한 부분은 unchecked exception을 던진다고 되어있다. 그렇다면 checked exception이 내부에서 발생하면 어떻게 되는 것일까? 혹시 checked exception을 Main Thread로 던지지 않아 Thread의 종류를 인지 하지 못해 발생한 문제가 아닐까?

실제로 위의 코드를 unchecked exception으로 변환해서 던지면 생각했던대로 바로 예외를 뱉고 종료되는 것을 확인할 수 있었다.

본래 Java라면 checked exception이 발생할 수 있는 경우 컴파일러에서 에러를 발생시켜 이 를 알려준다.(AttributeConverter처럼 상속을 받는 경우에도 처리를 강요함) 하지만 Kotlin의 경우 이를 구분하지 않기 때문에 uncheck exception이 발생할 수 있어도 따로 컴파일 시점에 확인할 수 있는 부분은 없었다.

실험

CompletableFuture 자체가 checked exception을 잡아서 다시 던지지 않는다 가정하고 한번 실험을 해보았다.

val future = CompletableFuture.runAsync {
        throw Exception()
    }

    future.join()

사실 위에까진 CompeletableFuture를 별도의 Thread에서 실행하면 Main Thread까지 전파가 안되는 건 아닐까로 가정하고 테스트를 해보았지만 결과는 Exception이 Main Thread로 throw되어 예외로 프로세스가 종료되었다. 위의 가정은 틀린 것 같다.

해당 이슈를 정확히 파악하려면 aws-sdk의 reactive 라이브러리 부분을 디깅을 해야할 것 같다. 우선 확실한 것은 Kotlin을 사용시 DynamoDB의 Converter에서 checked exception이 발생할 가능성이 있는지 확인하고 있다면 적절한 핸들링을 수행할 필요가 있을 것으로 보인다.

AttributeConverter

우선 확인해봐야 할 부분은 AttributeConverter에서 예외가 발생했기 때문에 SDK에서 요청값을 변환하는 부분을 찾아봐야 한다. DefaultDynamoDbAsyncTable.query부터 시작해서 호출하는 함수를 따라가보면 PaginatedOperation.executeAsync를 확인해볼 수 있다.

default PagePublisher<ItemT> executeAsync(TableSchema<ItemT> tableSchema,
                                               OperationContext context,
                                               DynamoDbEnhancedClientExtension extension,
                                               DynamoDbAsyncClient dynamoDbAsyncClient) {
        RequestT request = generateRequest(tableSchema, context, extension);
        SdkPublisher<ResponseT> response = asyncServiceCall(dynamoDbAsyncClient).apply(request);
        return PagePublisher.create(response.map(r -> transformResponse(r, tableSchema, context, extension)));
}

위에서 transformResponse라고 되있는 부분이 AttributeConverter를 호출하는 부분이다. 해당 로직은 publisher의 map의 매개변수로 전달되는데 그 말인즉슨 비동기 응답에 대한 결과를 받고 나면 해당 로직이 호출된다고 보면된다. 대충 해당 코드를 봤을 때 알수있는 부분은 이 코드가 reactive 라이브러리를 사용해서 비동기 로직을 구현하고 있고 문제가 되는 부분을 찾기 위해선 Publisher와 Subscriber의 구현체를 찾아야만 한다.

SdkPublisher

AWS SDK에서 비동기로 API를 호출하는 경우 SdkPublisher를 사용한다.(아마도 bulk로 조회하는 경우에만..) DynamoDb의 Query 명령어의 경우 QueryPublisher를 구현체로 사용하는 것으로 보인다. 해당 class의 subscribe를 살펴보면 다음과 같다.

@Override
public void subscribe(Subscriber<? super QueryResponse> subscriber) {
	subscriber.onSubscribe(ResponsesSubscription.builder().subscriber(subscriber).nextPageFetcher(nextPageFetcher).build());
}

Subscription을 생성할 때 nextPageFetcher라는 것을 설정하고 Subscriber에게 전달을 하게되는데 이 nextPageFetcher는 QueryPublisher의 inner class인 QueryResponseFetcher를 사용하는데 코드는 아래와 같다.

private class QueryResponseFetcher implements AsyncPageFetcher<QueryResponse> {
        @Override
        public boolean hasNextPage(final QueryResponse previousPage) {
            return PaginatorUtils.isOutputTokenAvailable(previousPage.lastEvaluatedKey());
        }

        @Override
        public CompletableFuture<QueryResponse> nextPage(final QueryResponse previousPage) {
            if (previousPage == null) {
                return client.query(firstRequest);
            }
            return client.query(firstRequest.toBuilder().exclusiveStartKey(previousPage.lastEvaluatedKey()).build());
        }
}

여기서 확인할 부분은 Subscriber가 다음 값을 요청했을 때 nextPage를 통해 값을 가져오고 더 가져올 값이 존재하는지 여부를 hasNextPage로 확인하는 부분이다. 그리고 API의 반환값은 CompletableFuture 형태로 받게 되는 것이다. 여기서 이전에 의심했던 CompletableFuture가 나온다. StackTrace에서 CompletableFuture가 호출되었던 것도 이 때문이었는데 주 원인은 아니었다. 여기선 이렇다 할만한 부분이 보이질 않는다. 이제 Subscriber를 찾아봐야 할 것 같다.

MappingSubscriber

DynamoDbAsyncTable.query를 호출하면 PagePublisher가 호출되는데 이를 바로 subscribe해서 사용해도 되고 items를 사용해서 한번에 받도록 subscribe하여 사용해도 되는데 우리는 items를 통해 전체를 받는 식으로 구현했었다. 이렇게 할 경우 값을 모두 받은 뒤 한번에 list형태로 subscriber에게 전달하게 된다. 그러면 우리의 Application에서 Subscribe하는 것과는 별도로 각 값에 대한 처리를 위해 MappingSubscriber에게 전달된 뒤에 처리가 되는데 이 class의 onNext 함수를 보면 대충 무엇이 문제인지 감이 올 것이다.

@Override
public void onNext(T t) {
     if (!isCancelled) {
         try {
             delegateSubscriber.onNext(mapFunction.apply(t));
         } catch (RuntimeException e) {
             // If the map function throws an exception, the subscription should be cancelled as the publisher will
             // otherwise not be aware it has happened and should have the opportunity to clean up resources.
             cancelSubscriptions();
             delegateSubscriber.onError(e);
         }
     }
}

여기가 바로 문제의 부분이었다! 저 mapFunction에서 Converter의 작업을 수행하게 된텐데 여기서 checked exception이 발생하게 되고 이를 catch하지 못해 cancelSubscriptions함수가 호출되지도 못하고 onError로 건져내지도 못하는 상황이 발생하게 된 것이고 이 때문에 무한대기에 빠지게 된 것으로 보인다.

결론

뭔가 thread에 따른 문제라거나 하는 것을 생각했지만 실상은 delegateSubscriber로 onError가 전달되지 못한 것이 가장 큰 문제가 된 것으로 보인다. 찾아보니 해당 이슈에 대한 PR을 올려둔 것이 있었는데 특별한 진전을 없어보인다.

사실 MappingSubscriber 말고도 Subscriber의 구현체들이 onNext 처리 시 checked exception을 따로 핸들링을 하지는 않는 것으로 보인다. Java에선 컴파일러가 경고를 줄테니 문제가 되는 일이 없겠지만 Kotlin의 경우 이 부분에 대한 주의가 필요할 것으로 보인다.

0개의 댓글