ReplyingKafkaTemplate를 사용하면서 겪었던 트랜잭션 관련 문제들과 해결방법

ysng_is_yosong·2024년 3월 8일
0
post-thumbnail

배경

제가 현재 진행하고 있는 개인 프로젝트는 음악 추천 SNS Wvv(wave)입니다.

음악을 즐기는 사람들이 자신의 취향에 맞는 음악을 추천하고 싶어하는 욕구와
인디 뮤지션들이 자신의 음악을 대중들에게 홍보하는 데 어려움을 겪는 상황을 해결하기 위해 시작했습니다.
이를 위해 본인이 좋아하는 뮤지션의 음악을 유튜브 링크로 추천할 수 있을 뿐만 아니라
뮤지션 본인의 작업물을 업로드해서 사용자들이 듣게하여 홍보할 수 있도록 서비스를 만들고 있습니다.

현재 프로젝트는 아래와 같은 4개의 백엔드 서버로 구성되어있습니다.
1. 전반적인 API 요청을 담당하는 API 서버
2. 미디어 파일을 담당하는 Media 서버
3. 알림과 이메일등의 메시지 전송을 담당하는 Message 서버
4. 핫 트랜드 랭킹 집계 및 배치 작업을 담당하는 Batch 서버

각각의 서버들은 MOM(Message Oriented Middleware)인
아파치 카프카를 통해 서로가 약하게 결합되어 있습니다.

유저가 포스트에 올라온 음원을 스트리밍 요청을 하면 아래와 같은 순서로 진행됩니다.

  1. 사용자는 API서버에 특정 포스트에 있는 음원에 대해 스트리밍 요청을 보낸다.
  2. API 서버는 Media 서버에 특정 포스트의 음원데이터 요청 메시지를 발행한다.
  3. Media 서버는 요청 메시지를 받고 음원 데이터를 찾은 후 데이터가 담긴 메시지를 카프카에 발행한다.
  4. API 서버는 음원 데이터가 담긴 메시지를 받은 후 사용자에게 응답한다.

제 음원 스트리밍 서비스는 웹소켓, SSE, 다른 외부 프로토콜을 이용한 방식이 아닌
HTTP의 Partial Request(206)를 활용하였습니다.

엄밀히 말하면 스트리밍은 아니지만 심화적인 학습 없이
HTTP와 로직만으로 빠르게 음원 스트리밍을 구현할 수 있다는 점에서 선택했습니다.

하지만 이렇게 되면 스트리밍 요청과 응답이 동기적으로 이루어져야 합니다.
HTTP는 클라이언트와 연결되어 있는 상태를 유지하지 않기 때문입니다.

따라서 미디어 서버와도 동기적으로 통신을 해야합니다.
카프카는 기본적으로 비동기적으로 메시지를 주고 받지만
특별한 설정을 통해서 동기적인 통신을 할 수 있습니다.

스프링 카프카에서는 설정만 잘 한다면
ReplyingKafkaTemplate를 통해 쉽게 사용할 수 있도록 만들어줬습니다.

그래서 현재 Wvv는 사용자에게 음원을 동기적으로 제공해줍니다.

하지만 저는 카프카 사용에 익숙하지 않습니다.
이로 인해 여러 문제들이 발행했는데,
그 문제들을 해결하기 위해 겪었던 과정을 공유하고자 합니다.

발생한 문제들과 해결방법

첫 번째


사용자가 음원 스트리밍을 요청한 후 성공적으로 응답을 받으면 위의 사진과 같은 로그가 발생합니다.


하지만 미디어 서버에 문제가 생겨 모두 다운된 경우에는 위와 같은 로그가 발생합니다.
여기서 주목할 점은 빨간색으로 표시한 commit 메시지입니다.

이 커밋 메시지는 카프카에 의해 발행된 커밋 메시지입니다.
분명 미디어 서버에 요청한 데이터를 받지 못해 타임아웃이 발생했습니다.
즉, 예외가 발생했음에도 불구하고 발행한 메시지가 커밋된 것입니다.

물론 카프카는 발행된 메시지를 파티션에 넣어놔서 다시 구독할 수 있도록 만들었습니다.
그래서 이렇게 커밋되더라도 문제가 없다고 생각하실 수 있습니다.
하지만 제 서비스의 음원 스트리밍은 비동기적이 아니라 동기적으로 동작합니다.
그렇기 때문에 추후에 미디어 서버가 메시지를 받아 성공적으로 음원데이터를 전송하더라도
이미 만료된 요청이기 때문에 API 서버는 사용자에게 응답할 수 없습니다.


위의 사진이 미디어 서버를 다시 실행했을 때의 로그입니다.
이미 만료된 요청임에도 불구하고 메시지를 받아 요청을 처리하고 있습니다.
심지어 똑같은 메시지를 두 번이나 받아서 처리하고 있네요?

저는 위와 같은 상황이 발생하는 걸 원치 않습니다.
물론 현재 상황에서는 데이터베이스에 접근해 DML을 수행하지 않기 때문에 데이터 정합성에 문제가 없니다만,
추후 다른 로직에서 커밋되지 않은 메시지를 읽어 데이터를 변경하는 상황이 발생할 수 있기 때문입니다.

커밋(commit)으로 출력된 로그를 통해 카프카에도 트랜잭션 레벨이 있을거라 추측했습니다.
그리고 카프카의 exactly once 설정에 관한 글을 참고하여 문제를 해결했습니다.

프로듀서 서버의 카프카 설정에서
TRANSACTIONAL_ID_CONFIG, @Transactional을 설정
하는 것이 핵심입니다.

위의 에러로그를 살펴보면 카프카에 메시지가 커밋되고 나서 에러가 발생했습니다.
즉, 원자적으로 이루어져야 할 작업이 나눠서 진행된 것입니다.
따라서 하나의 묶음으로 된 작업들이 부분적으로 성공하는 것이 아니라,
전부 성공하던지, 전부 실패해야만 합니다.
이러한 상황에서 TRANSACTIONAL_ID_CONFIG 설정합니다.

TRANSACTIONAL_ID_CONFIG은 트랜잭션을 식별하는데 사용되는 고유한 식별자를 정의합니다.
이를 통해 여러 메시지를 한 번에 전송하고, 전송이 완료되기 전에는 메시지가 커밋되지 않습니다.
이는 메시지를 안전하게 처리하고, 장애 발생 시 롤백할 수 있는 메커니즘을 제공합니다.


설정한 후 API 서버에서 똑같은 에러를 발생시키면 기존의 에러로그에서는 확인 할 수 없었던
rollback 로그가 찍힌 것을 확인 할 수 있습니다. 이로써 문제를 해결할 수 있었습니다.


저는 최종적으로 프로듀서를 위와 같이 설정했습니다.
위에서는 설명하지 않은 ENABLE_IDEPOTENCE_CONFIG란 설정이 보이는데요.
이 설정은 Exactly Once. 정확히 한번만 메시지를 처리하기 위해 해놓은 설정입니다.

잠깐, IDEMPOTENCE에 관해 알아봅시다. IDEMPOTENCE는'멱등성'입니다.
즉, 여러 번 적용하여도 결과가 달라지지 않도록 만드는 설정인데요.
여기서는 그 대상이 프로듀서가 생산하는 메시지입니다.

저의 상황을 들어 설명하면, 음원 스트리밍 요청에 관한 메시지가 Kafka broker에 들어갔지만
timeout같은 오류가 발생하여 실패로 응답을 받을 받을 경우 Producer는 메시지를 재발행합니다.
Producer 입장에서는 실패 응답을 받았기 때문에 Kafka broker에
메시지가 들어갔는지 안들어 갔는지 알 수 없기 때문입니다.
따라서 Consumer 입장에서는 중복 발행된 메시지(두 번)를 받게 됩니다.

여기서 ENABLE_IDEMPOTENCE_CONFIG 가 true 로 되어 있으면,
재발행되서 두번째 메세지가 갔을 때 같은 메세지인지 비교합니다.
그리고 두번째로 온 메세지가 이전과 같은 메세지라면 동일한 메시지이므로 버립니다.


저는 위와 같이 설정했습니다. 공통적인 카프카 설정과 더불어 API서버에서 사용할 설정을 추가했습니다.
이렇게 설정함으로써 아래의 사진과 같이 딱 한번의 메시지만 받을 수 있게 되었습니다.

프로듀서는 커밋된 데이터만 토픽으로 생산하도록 했지만,
컨슈머 서버의 카프카 설정에서도 동일하게 커밋된 데이터만 읽도록 만들어야 합니다.
이를 위해 ISOLATION_LEVEL_CONFIG를 "read_committed"로
ENABLE_AUTO_COMMIT_CONFIG를 false로 설정해야 합니다.

ENABLE_AUTO_COMMIT_CONFIG은 기본적으로 true로 되어있습니다.
이는 메시지를 읽고 토픽의 파티션에 offset을 자동으로 커밋합니다.
만일, 메시지를 읽은 후에 비즈니스 로직을 처리하다가 오류가 났다면 어떨까요?
메시지에 관한 작업을 모두 처리하지 않았음에도 처리된 것으로 나타납니다.
이에 관한 트랜잭션을 보장하기 위해 ENABLE_AUTO_COMMIT_CONFIG를 false로 처리합니다.

ISOLATION_LEVEL_CONFIG를 "read_committed"로 설정하는 것은
말 그대로 트랜잭션의 ISOLATION을 커밋된 데이터만 읽게 만드는 것입니다.
DB의 트랜잭션 격리 레벨에 관해 학습해 보셨다면 이해가 빠르실 겁니다.

최종적으로 아래와 같이 컨슈머 서버를 설정했습니다.


두 번째

이로써 문제가 모두 해결됐다고 생각했지만, 새로운 문제가 생겼습니다.
위와 같이 설정하고 나니 기존에 정상적으로 작동되던 스트리밍 기능이 오히려 먹통이 됐습니다.
API 서버에서 발행한 메시지를 Media 서버가 구독하지 못하는 상황이 발생한 것입니다.
조금 당황했지만, 침착하게 세팅했던 설정 값을 되돌려보며 디버그하기로 했습니다.
의외로 빠르게 원인을 찾았는데요. 바로 위에서 컨슈머 서버에 설정했던 ISOLATION_LEVEL 때문이었습니다.

컨슈머에서 커밋된 메시지만 읽도록 설정했는데, 아무런 메시지도 읽지 못했다는 것은
스트리밍 요청에 의해 발행된 메시지가 커밋되지 않았다는 말입니다.

API 서버에서 자세히 확인해보니 정말로 커밋이 되지 않았습니다.
제가 잘못 이해하고 있던 것을 먼저 말씀드리고 실제로 작동하는 방식과 해결방법을 설명해보겠습니다.

위의 첫 번째 그림이 제가 기존에 생각했던 진행입니다.
애플리케이션 레이어에서 트랜잭션이 시작되고 음원 데이터를 요청하면 트랜잭션이 커밋된다고 생각했습니다.
그래서 미디어 서버에서 ISOLATION_LEVEL을 read_committed로 설정해놓아도
커밋된 메시지를 소비할 수 있을거라 생각했지만 실제로는 그렇지 않았습니다.

이는 replyingKafka의 특성에 기인하는데요.
이를 사용하면 비동기로 작동하는 카프카에서 동기적으로 데이터를 받습니다. (참고링크)
동기적으로 데이터를 받는다는 말은 이벤트를 발행하고나서 트랜잭션이 끝나는 것이 아니라
응답 데이터가 올 때까지 트랜잭션을 잡고 기다린다는 것입니다.
따라서 미디어 서버의 격리수준을 read_committed로 설정해 놓으면 커밋된 메시지가 아니므로
발행된 요청 메시지를 컨슈머가 읽을 수 없는 것입니다.

그리고 문제해결 방법을 찾던 와중에 스택 오버플로우에서 아래와 같은 답변을 확인했습니다.


"이 방식은 ISOLATION_LEVEL=read_committed에서 전혀 작동하지 않을 겁니다.
왜냐하면 이 요청이 커밋되기 전까지 컨슈머가 요청을 볼 수 없고
우리는 절대로 일어나지 않을 응답을 받을 때까지 커밋하지 않기 때문입니다.
그래서 우리는 타임아웃되고 트랜잭션을 중단하게 됩니다"
(제가 설명한 것과 동일합니다.)

"간단하게 말하면 sendAndReceive는 트랜잭션 내에서 사용할 수 없습니다.
메시지를 보내는 유일한 방법은 자신만의 리스너를 추가해 응답을 받는 것입니다"
(이 뜻은 참고링크를 확인하면 이해하실 수 있습니다.)

하지만 저는 스트리밍에 트랜잭션이 필요합니다.
왜냐하면 도중에 오류가 날 경우 롤백을 통해 메시지가 토픽에 생산되지 않게 만들기 위함입니다.
그렇다면 어떻게 처리를 하면 좋을까요?

저는 프로듀서의 설정은 그대로 둔 채, 컨슈머의 설정을 변경했습니다.
결국 트랜잭션 격리수준이 read_committed이기 때문에 발생한 문제였습니다.
따라서 동기적인 데이터를 받아야 하는 요청에 관해서는 메시지를 읽을 때 격리수준을 적용하지 않고
그 외의 비동기적인 데이터를 받을 때는 read_committed 수준에서 받도록 만들었습니다.


위의 코드는 미디어 서버에서 사용하는 카프카 프로듀서 설정입니다.
트랜잭션을 사용하도록 설정된 KafkaTemplate이 위의 코드입니다.
아래의 코드는 컨슈머 동기적으로 데이터를 응답할 때 사용할 KafkaTemplate입니다.
여기서 트랜잭션 설정을 지워준 것에 주목하면 됩니다.

제 목적은 API 서버에서 동기적으로 메시지를 생성할 때 트랜잭션이 필요한 것이고
동기적으로 응답하는 컨슈머에서는 트랜잭션이 필요없을 뿐만 아니라 사용할 수도 없습니다.
(동기적으로 응답을 할때 트랜잭션 애노테이션이 적용조차 안됩니다.)


그리고 최종적으로 두 번째와 같은 KafkaListenerContainerFactory를 만들어줬습니다.
첫 번째 KafkaListenerContainerFactory는 격리레벨이 적용된 리스너 컨테이너입니다.
저는 격리레벨을 사용하면 안되기 때문에 속성에서 지운 것을 확인하시면 됩니다.

최종적으로 제 미디어 서버의 애플리케이션 레이어 코드는 아래와 같이 되었습니다.

containerFactory가 서로 다르게 세팅되었다는 점에 주목하시면 됩니다.
첫 번째 메서드가 동기적인 작업으로 데이터를 전달하는 것이고 두 번째 메서드가 비동기적인 작업을 하는 것입니다.


추가

마지막으로 발생했던 문제를 공유해보겠습니다.
이 문제는 위의 문제들을 해결하면서 사라졌습니다.

카프카 트랜잭션을 아예 적용하기 전에 발생한 문제입니다.
스트리밍 애플리케이션 레이어에 @Transactional 애노테이션만 붙여놓은 상태였습니다.

음원 데이터를 불러올 때 데이터를 불러오지 못해 timeout이 발생하고 나서 서버를 다운시킨 후
재실행하면 데이터베이스와 커넥션을 못해 서버가 실행되지 못하는 문제가 있었습니다.

이 문제는 오류가 났음에도 카프카 토픽에 커밋된 메시지에 의해 발생한 문제였습니다.
왜냐하면 그 메시지가 모두 소비된 후, 혹은 카프카를 정지시켰다가 다시 실행한 후에
다시 데이터베이스와 커넥션을 얻을 수 있었기 때문입니다.

아쉽게도 정확한 원인을 아직 발견하지 못해서 프로젝트를 진행하면서 찾아나갈 예정입니다.

++
여러 글에서 replyingKafkaTemplate로 카프카를 동기적(Request And Response)으로
사용하는 것을 비추천한다고 합니다. 이는 카프카에 어울리는 사용 방식이 아니라 그런거 같습니다.
그러나 피치 못할 경우에는 어쩔 수 없으니 사용하라고 합니다.

물론 저는 이 방법으로 해결했지만,
다시 고민해보니 음원 스트리밍에 관한 설계가 잘못 됐다는 생각이 듭니다.
그러니 혹시 이 글을 보고 계신 분들 중에 동기적으로 카프카를 사용하려고 하신다면,
정말로 현재 설계가 괜찮은 설계인지, 다른 방안은 없는지 생각해보시고
만일 정말로 피치 못하신 경우에 이 솔루션을 사용하면 좋을 거 같습니다.


개인프로젝트를 진행하면서 제가 만들고 싶은 서비스를 만들고 있다는 뿌듯함도 드는 한편,
서비스에 알맞는 기술선택과 애플리케이션 구조에 성급한 판단이 있어 아쉬움과 고민도 생겼습니다.

이 글의 주제가 되는 카프카에 관련해서도 문제를 해결하면서 트랜잭션에 관한 이해도가 높아졌지만
한편으로는 더 적절한 기술을 선택해서 간단하게 해결했다면 어땠을까란 생각을 하고 있습니다.

현재 관심있는 기술들로 학습하면서 프로젝트를 진행하고 있기 때문에 정답이 아닌 부분이 많습니다.
잘못된 부분에 관해서는 댓글로 알려주세요:)

profile
Get hands on dirty!🤺

0개의 댓글