Spring Cloud with Apache Kafka - 6

최혜성·2024년 2월 15일
0

msa

목록 보기
2/7

Reservation

우선 flow는 다음과 같다.

  • 유저가 예약 요청을 넣는다. 이때 데이터는 (유저 id, 이벤트 id, 좌석 id)이다.

  • Reservation 모듈은 유저 id 존재 여부를 Feign를 통해 체크한다.

  • Event 모듈은 이벤트 id와 좌석 id를 토대로 좌석이 비어있는지 확인한다.

  • 좌석이 비어있는경우 점유 요청을 보내 해당 seat를 close 상태로 변경한다.

  • 최종적으로 Reservation을 저장한다.

    그래서 구현된게 다음과 같다.

   @Transactional
    suspend fun reserve(requestDTO: ReservationRequestDTO) : ReservationResponseDTO {
        val reservation = requestDTO.toEntity()
        // IO 디스패처 실행
        withContext(Dispatchers.IO) {
            if (!userAPIClient.check(reservation.userId))
                throw ReservationException("존재하지 않는 유저입니다.", reservation.userId)

            val seatDTO = SeatDTO(reservation.eventId, reservation.seatId)
            if (!eventAPIClient.checkSeat(seatDTO))
                throw ReservationException("존재하지 않는 좌석 정보입니다.", reservation.seatId)
            if (!eventAPIClient.occupySeat(seatDTO))
                throw ReservationException("예약에 실패했습니다. 다시 시도해주세요.", reservation.seatId)
        }

        return reservationRepository.save(reservation).toResponseDTO()
    }
}

응답은 다음과 같다.

{
    "success": true,
    "message": "예약에 성공하였습니다.",
    "data": {
        "id": 1,
        "userId": 1,
        "seatId": 5,
        "eventId": 3
    }
}

To be Continued..?

MSA 할만한데? 나도 혹시 코딩 고수?

아직 고려해야할게 많이 남았다. 정작 중요한 kafka도 못써먹었지 않았는가

  1. API 서버(User, Event)가 boolean을 정상적으로 반환하면 Reservation 모듈에서 정상적으로 exception handling이 가능하지만, 만약 Exception이 발생해 boolean이 아닌 다른 자료형으로 반환된다면?

지금 코드만 봐도 User가 없을경우 UserException을 발생시키는데, 이는 클라이언트가 다른 형식으로 응답을 받을 수 있다는점이 있다.

우선적으로 Feign를 처리하는 엔드포인트 (/exist /occupy)등에는 throw 하지 않는 메소드로 구성해놨으나, 어떻게 될지 모르는일이기 때문에 대응책이 필요하다.

뭐, Retrofit도 ExceptionHandling이 되는데 얘도 될거라 믿고 스프링 자체에서 잘 처리하면 되겠다.

  1. OpenFeign 과의존

지금 kafka를 하나도 안쓰고 OpenFeign로 모든 요청을 처리하고 있다.
당장은 괜찮지만, 대용량 처리를 할때 OpenFeign를 그대로 쓰는순간? 로드밸런서 나사빠지는순간 다 터지는거다~

그래서.. 일단 Reservation - Event 과정에서 Event의 특정 좌석을 Occupy 하는 절차가 있다. 이 과정을 kafka를 쓰고자 한다.
현재 occupy는 다음과 같다.


    @Transactional
    suspend fun occupySeat(seatDTO: SeatDTO) : Boolean {
        val event = findEventEntityOrNull(seatDTO.eventId) ?: return false // 이벤트가 존재하지 않을경우 false
        val seat = event.seats.find { it.id == seatDTO.seatId && it.status != Status.CLOSE } ?: return false // 좌석이 존재하지 않거나 이미 예약된경우 false
        seat.status = Status.CLOSE
        eventRepository.save(event)
        return true
    }

여기서 eventRepository.save 부분에서 db 과부하가 발생할경우 문제가 생길 수 있으므로 이부분을 kafka로 produce한뒤 subscribe를 해서 저장하도록 변경한다.

근데 위에 find도 문제가 있을거 같은데.. 얘는 redis쓰거나, 체크로직을 하나로 통합하거나 해야할듯. (현재는 체크도 하고 occupy도 하고 둘다 함)

  1. (Miner) request 요청시 seat의 id값을 그대로 받음

seat의 좌석 번호인 A1, A3 이런 값을 넣을 수 있게 했는데 PK를 받아서 사용자가 신청할때 어렵지 않을까? 라는 생각이 듬 함 고려해보는것도

Kafka

Group id

topic만 중요하지 group 지정할일도 없겠지~ 하고 얘를 제외했더니 오류가 발생했다.
No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

group id는 topic을 처리할때 사용되는 각 consumer별 고유한 id라고 보면 되겠다. 이걸 지정 안해놨으니 작동안할 수 밖에..
그래서 각 consumer 별로 다른 group id를 지정하면 되겠다.
https://www.inflearn.com/questions/846016/kafka-consumer-groupid

ConsumerConfig.GROUP_ID_CONFIG to "occupy_group_1" 요거랑 Listener에 group id 명시해줬더니 잘된다.

Error

  • No Session (JPA)
Consumer
fun read(data : String) {
        log.info("Kafka Consumed Data. topic : occupy")
        kotlin.runCatching {
            val saveRequest = objectMapper.readValue(data, SeatSaveDTO::class.java)
            CoroutineScope(Dispatchers.IO).launch {
                eventService.saveSeat(saveRequest)
            }
        }
EventService
    @Transactional
    suspend fun saveSeat(seatSaveDTO: SeatSaveDTO) {
        if (existSeat(seatSaveDTO.eventId, seatSaveDTO.seatId)) {

분명히 Consumer에서 Transactional이 붙어있는 saveSeat를 호출하는데 자꾸 세션 없다고 Detach된거 가져올려 한다고 오류를 뱉었다.

그래서 read메소드에 Transaction도 붙여보고, 레포지토리에 직접 접근도 해보고, 다 해봤는데 문제가 해결되지 않았다.

에러는 정확하게 Event와 1 : N 관계를 갖고 있는 Seat에 접근할 수 없다는건데, Transaction이 붙어있는데도 왜 detach (준영속) 엔티티로 간주하는지 모르겠다.

후우.. 그래서 그냥 FetchType Eager로 해서 해결했다.

  • Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'

웨 다운받았는데 또 빌드하라 하지?
-> source가 아니라 binary로 받아서 하면 되겠다.

  • Bean Duplication

Retrofit처럼 유스케이스별로 api 따로 호출 할 수 있게 ExistSeat, OccupySeat 두개의 클래스를 선언해놓고 같은 Event-API를 호출하게 했더니 이미 클라이언트가 존재한다고 오류를 뱉었다. 그래서 EventClient로 합침

***************************
APPLICATION FAILED TO START
***************************

Description:

The bean 'EVENT-API.FeignClientSpecification' could not be registered. A bean with that name has already been defined and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true


Process finished with exit code 1

그래서 그냥 EventAPIClient 하나로 합쳐서 해결했다.

//Seat의 존재 여부를 체크
@FeignClient(name = "EVENT-API")
interface EventAPIClient {

    @PostMapping("/api/event/exist")
    fun checkSeat(@RequestBody seatDTO: SeatDTO) : Boolean

    @PostMapping("/api/event/occupy")
    fun occupySeat(@RequestBody seatDTO: SeatDTO) : Boolean
}

현황

Event module에서 OpenFeign로 요청받은 occupy시 엔티티를 저장하는게 아니라, 일단 요청을 받고 kafka한테 보낸 뒤, 이를 다시 consume 해서 저장하는 방식으로 했다.

이러면 db가 과부하 걸릴 일 없어 순차적으로 처리 할 수도 있고, kafka의 셋팅에 따라 처리량 조절도 가능하다.

물론 동시성 문제의 관점에서 보면 consume하기 전에 요청이 들어오면 비어있는 상태로 나오므로 추가적인 요청이 도달할 수 있다.
물론 저장또한 kafka를 거쳐서 하기 때문에 db에서 중첩될 일은 없지만, 클라이언트는 이를 모르기 때문에 문제가 될 수 있다.

따라서 redis를 연계, 해당 좌석의 예약 여부를 빠르게 확인할 수 있도록 해야한다고 본다.

profile
KRW 채굴기

0개의 댓글