Spring Cloud with Apache Kafka - 7

최혜성·2024년 2월 16일
0

msa

목록 보기
1/7

Fallback

현재 예약 모듈 - 유저 모듈 / 예약 모듈 - 이벤트 모듈 간 서로 OpenFeign를 통해 통신을 진행한다.

interface EventAPIClient {
    /**
    * Check와 occupy를 한 호출에 진행하는 api.
     */
    @PostMapping("/api/event/occupy")
    fun occupySeat(@RequestBody seatDTO: SeatDTO) : Response<SeatDTO>

}

이런식으로 진행되는데 문제점이 몇가지 있다.

Exception Handling

이벤트 모듈에 좌석 예약을 신청할때, 해당 좌석이 이미 예약된경우 EventException이 발생되고, 이는 ExceptionHandler에 매핑되어서 Response<Nothing>의 형태로 클라이언트에게 반환된다.

물론 단순히 클라이언트가 다이렉트로 접근한다면 json으로 반환되는거니까 큰 문제는 없는데, 지금은 OpenFeign로 접근하는거라 기존 Response의 형식인 Response<SeatDTO>를 만족하지 못해 Exception이 발생한다.

이 또한 Spring의 컨트롤러에서 발생하는거라 ExceptionHandler로 처리하면 되지만, 그래도 hoxy.. 좀더 나은 처리 방법이 없을까 해서 찾아봤다.

https://mangkyu.tistory.com/289

CircuitBreak with fallback

서킷 브레이커는 말그대로 퓨즈처럼 해당 모듈 / API가 맛탱이 갔을때 끊어주거나, 다시 시도하는 역할을 담당한다. MSA 특성상 여러개의 서버가 존재하는데, 한 서버가 응답이 없을때 다른 서버 또한 맛탱이 가지 않도록 중간에서 차단 or 예외 메시지 전달을 가능하게 한다.

fallback은 이 중에서 예외 메시지를 보낼 수 있도록 하는건데 구현도 상당히 간단했다.

  • EventFallbackFactory
class EventFallbackFactory : FallbackFactory<EventFallback>
 override fun create(cause: Throwable?): EventFallback {
        val jsonExceptionMessage = deserialize(cause)
        if (jsonExceptionMessage != null)
            throw ReservationException(jsonExceptionMessage, null)
        return provideFallback()
    }

    // Response<T> 형식으로 예외가 발생한경우 메시지 반환.
    private fun deserialize(cause: Throwable?) : String? {
        if (cause !is FeignException)
            return null

        val response = cause.responseBody().getOrNull() ?: return null
        val deserializeResponse = kotlin.runCatching {
            objectMapper.readValue(String(response.array()), object : TypeReference<Response<Nothing>>() {})
        }.getOrNull()
        return deserializeResponse?.message
    }
  • EventAPIClient
@FeignClient(name = "EVENT-API", fallbackFactory = EventFallbackFactory::class)
interface EventAPIClient {
}

이렇게 fallback factory를 선언해두고 클래스만 잘 집어넣으면 된다.
EventFallback은 EventAPIClient를 구현해서 만약 API의 응답이 없거나 문제가 생기면 대신 반환될 값을 정의할 수 있다.
그래서 FallbackFactory를 쓰지 않고도 단순히 Fallback 클래스만 만들어도 되지만, 나는 Event에서 발생된 예외 json 메시지를 그대로 예약 모듈 응답에 사용하고 싶어 exception을 받을 수 있는 Factory를 선택했다.

EventFallbackFactory는 exception(throwable)을 파라미터로 가지므로 해당 값을 Response<Nothing>으로 변환할 수 있을경우 변환한 뒤 해당값의 message 필드를 가져온다.

Response<Nothing>으로 값을 받아 오는 이유는, Event, User 모듈의 경우 값이 없거나 찾을 수 없는경우 Exception을 발생하는데, 이 Exception을 가공해서 Response<Nothing>으로 반환하기 때문이다.
실질적으로 담긴 데이터는 메시지뿐이므로 제네릭값을 Nothing으로 지정하였다. 그래서 message 필드만 사용..

이후 파싱된 메시지를 Exception에 담아 throw함으로써 ExceptionHandler에게 처리를 위임한다.

Enhancement

위에서 말했다시피 Event, User등등 각 모듈의 예외는 문자형태로 제공된다고 했다. 그러면 Fallback의 구현도 거의 비슷해지지 않을까?

  • UserFallbackFactory
override fun create(cause: Throwable?): UserFallback {
        val jsonExceptionMessage = deserialize(cause)
        if (jsonExceptionMessage != null)
            throw ReservationException(jsonExceptionMessage, null)
        return provideFallback()
    }

    fun provideFallback() : UserFallback = UserFallback()

    // Response<T> 형식으로 예외가 발생한경우 메시지 반환.
    private fun deserialize(cause: Throwable?) : String? {
        if (cause !is FeignException)
            return null

        val response = cause.responseBody().getOrNull() ?: return null
        val deserializeResponse = kotlin.runCatching {
            objectMapper.readValue(String(response.array()), object : TypeReference<Response<Nothing>>() {})
        }.getOrNull()
        return deserializeResponse?.message
    }
  • EventFallbackFactory
override fun create(cause: Throwable?): EventFalblack {
        val jsonExceptionMessage = deserialize(cause)
        if (jsonExceptionMessage != null)
            throw ReservationException(jsonExceptionMessage, null)
        return provideFallback()
    }

    fun provideFallback() : EventFalblack = EventFalblack()

    // Response<T> 형식으로 예외가 발생한경우 메시지 반환.
    private fun deserialize(cause: Throwable?) : String? {
        if (cause !is FeignException)
            return null

        val response = cause.responseBody().getOrNull() ?: return null
        val deserializeResponse = kotlin.runCatching {
            objectMapper.readValue(String(response.array()), object : TypeReference<Response<Nothing>>() {})
        }.getOrNull()
        return deserializeResponse?.message
    }
  1. 예외가 발생했다 - 메시지가 있다 - throw
  2. 예외가 발생했다 - 메시지가 없다 - 적절한 fallback 호출

2번 과정은 각 fallback마다 달라진다 쳐도, 1번 과정에 중복되는 코드가 있다.
이를 추상클래스로 묶어서 상속구조로 변경한다.

  • AbstractFallbackFactory
abstract class AbstractFallbackFactory<T>(private val objectMapper: ObjectMapper) : FallbackFactory<T> {
    override fun create(cause: Throwable?): T {
        val jsonExceptionMessage = deserialize(cause)
        if (jsonExceptionMessage != null)
            throw ReservationException(jsonExceptionMessage, null)
        return provideFallback()
    }

    abstract fun provideFallback() : T

    // Response<T> 형식으로 예외가 발생한경우 메시지 반환.
    private fun deserialize(cause: Throwable?) : String? {
        if (cause !is FeignException)
            return null

        val response = cause.responseBody().getOrNull() ?: return null
        val deserializeResponse = kotlin.runCatching {
            objectMapper.readValue(String(response.array()), object : TypeReference<Response<Nothing>>() {})
        }.getOrNull()
        return deserializeResponse?.message
    }
}

제네릭을 받아서 실제 반환될 fallback을 정할 수 있도록 했다. 각 fallback별로 구현체만 바꿔주면 되겠다

  • EventFallbackFactory
@Component
class EventFallbackFactory(objectMapper: ObjectMapper) : AbstractFallbackFactory<EventFallback>(objectMapper) {
    override fun provideFallback(): EventFallback {
        return EventFallback()
    }
}

class EventFallback : EventAPIClient {

    override fun occupySeat(seatDTO: SeatDTO): Response<SeatDTO> {
        return Response.of(false, "좌석 정보 API서버가 응답하지 않습니다.", null)
    }

}

매우 간략해진 모습

Event

현재 좌석의 occupy 요청시 해당 좌석이 존재 하는지 체크하는 API (1), 해당 좌석의 점유 요청을 하는 API(2) 요렇게 2개의 호출이 발생한다.

하지만 1번과 2번 사이에 또다른 요청이 발생한다면?
물론 2번 요청시 좌석의 여부를 한번더 체크하긴 하지만, 1번을 통과한 다른 요청이 중복되어 발생할 수 있다.

따라서 occupy와 check를 하나로 통합했다.

  @Transactional
    suspend fun occupySeat(seatDTO: SeatDTO) : SeatDTO {
        val seat = findSeatWithEventId(seatDTO.eventId, seatDTO.seatId)
        if (seat == null)
            throw EventException("존재하지 않는 좌석 정보 입니다. id : ${seatDTO.seatId}", null)
        if (seat.status != Status.OPEN)
            throw EventException("예약할 수 없는 좌석입니다. id : ${seat.id}", null)

        val seatSaveDTO = SeatSaveDTO(seatDTO.eventId, seatDTO.seatId, Status.CLOSE) //Event와 유사한 DTO
        kafkaProducer.sendOccupyRequest(seatSaveDTO)
        return seatDTO //delete boolean (check랑 통합)
    }

JMeter

MSA의 꽃은 대용량 트래픽 처리가 아닐까

하지만, 이런 하꼬 수준의 프로젝트에서는 대용량 트래픽을 모집할 순 없다. 그래서 스트레스 테스트 툴 겸 테스팅에 유용한 JMeter를 사용해서 테스트를 해보기로 했다.

JMeter를 통해 하나의 좌석에 대해 1초안에 1000건의 점유 요청을 넣었다.

  • Request
{
	"userId" : 1,
	"eventId" : 5,
	"seatId" : 9
}

같은 좌석에다 1000건의 트래픽이 몰려 동시성 상황을 연출할겸 테스트를 수행했다.

Mistake

하지만 첫번째 시도만에 서버가 뻗었다.
모놀리틱 방식에서도 어느정도는 처리를 하던데 얘는 아예 뻗어서 원인을 확인했다.

Hibernate의 Transaction에서 30000ms정도 지연이 됐다는 로그가 있었다.
음...
생각없이 붙여놨던 Transcational 어노테이션에서 문제가 생겼던것이였다. Transcational 자체가 한 트랜잭션 내에서 진행된다는것을 보증해주는데, 이는 사실상 DB를 점유한다는거나 마찬가지였다.
근데 하필이면 해당 메소드에서 User, Event API를 호출하는 OpenFeign가 있었다는게 문제가 되었다.

IO를 트랜잭션 안에서 수행하니 걔가 뻗을 수 밖에..

보통 트랜잭션 어노테이션 붙일때 IO요청이 있을지 생각하고 주의해서 사용하긴 했는데, 코드 수정이 너무 많다보니 API 호출부를 코드에 포함시켰다는걸 깜빡했다.

suspend fun reserve(requestDTO: ReservationRequestDTO) : Response<Nothing> {
        val reservation = requestDTO.toEntity()
        // IO 디스패처 실행
        return withContext(Dispatchers.IO) {
            val userResponse = userAPIClient.check(reservation.userId)
            if (!userResponse.success)
                throw ReservationException(userResponse.message ?: "유저 API가 응답하지 않습니다.", null)
            // check랑 request 동시 호출
            val responseOccupy = eventAPIClient.occupySeat(SeatDTO(reservation.eventId, reservation.seatId))
            if (!responseOccupy.success)
                throw ReservationException(responseOccupy.message ?: "예약 API가 응답하지 않습니다.", reservation.seatId)
            else
                Response.of(true, responseOccupy.message, null)
        }
    }

    @Transactional
    suspend fun saveReservation(requestDTO: ReservationRequestDTO) : ReservationResponseDTO {
        return reservationRepository.save(requestDTO.toEntity()).toResponseDTO()
    }

그래서 db에 저장하는 로직과 IO 요청하는 로직을 분리해서 해결했다.

동시성

그렇게 돌린 결과는 나쁘지 않았다.

  • Response

    요청은 잘 날아갔고, 몇몇 Connection Reset 문제가 발생한걸 빼면 요청 전달은 되었다.
    Request중 대다수는 이미 예약된 좌석이라 뜨고 예약이 진행되지 않았지만, 몇개는 동시성 문제로 인해 예약이 중복되어 진행되었다.

kafka

DB


kafka로 요청도 잘 전송되었고, DB에도 저장은 잘 되었다. 다만 중복되었을뿐

그래서, 다음번에는 가장 중요한 동시성 문제 해결을 위한 Redis와, 나머지 reservation의 kafka consumer 적용 등등을 하지 않을까 싶다.

Redis에 값을 넣어놓고 그걸 확인한다 / DB에 Transcation 격리 수준을 정한다 등등 여러가지 방법을 생각해보고 있다.
동시성 문제가 항상 머리아픈 주제이긴 한데, 최대한 사이드이펙트가 덜하고, 효율적인 방법을 생각해봐야겠다

profile
KRW 채굴기

0개의 댓글