Spring에서 Redis Stream 활용하기

KIYOUNG KWON·2022년 3월 6일
2

Redis Stream을 활용하기 위한 간단한 프로젝트를 구상해 보았다. Stream을 활용하기 위해선 우선 Producer와 Consumer의 역할을 할 구성요소가 필요하다. 나는 다음과 같은 프로젝트를 구상해 보았다.


(흐름도의 퀄리티는 양해를..)

텔레그램의 봇을 등록할 수 있는 API를 만들고 이 봇에 내가 알림을 하고자 하는 생일의 정보를 등록하면 이 API 서버가 Producer로서 알람을 하라는 메시지를 Redis를 통해 전달하고 이를 Consumer에서 읽어가 확인한 뒤에 텔레그램 API로 봇에게 메시지를 전달하라는 요청을 보낸다.

우선 Redis Stream에서 알아야 할 요소를 간단하게 확인해보자.

Redis Stream

redis에서 stream은 string, hash table, sorted set과 같이 key에 상응하는 자료형의 일종이다. 그렇다는 것은 이 stream도 redis에서 key값을 갖고 이 key값을 통해 무언가 상호작용을 하는 것이라고 보면된다.

Redis Stream은 크게 시계열 데이터의 분석과 메시지 브로커로 사용이 가능한데 나는 메시지 브로커로서 활용을 해보려고 한다. 메시지 브로커로 활용하기 위해선 아래와 같은 명령어를 알아두면 된다.

명령어기능예시
XADDstream에 value 삽입XADD bot-stream * message hello
XGROUPstream에 group을 생성XGROUP CREATE bot-stream bot-group $
XREADGROUPstream으로 부터 consumer group 단위로 아직 읽은 적이 없는 첫번째 데이터를 읽어옴XREADGROUP GROUP bot-group bot-number1 count 1 STREAMS bot-stream >
XACKxreadgroup으로 데이터를 읽은 경우 이 데이터를 처리했다고 consumer group에 알려주는 명령어XACK bot-stream bot-group 1234568726521-0

크게 위 4개의 명령어를 알아두면 메시지 브로커로 활용이 가능해진다. 그러면 실제로 내가 구현하려는 프로젝트와 매칭하여 명령어의 활용예시를 살펴보자.

활용예시

우선 Producer에서 DB에 등록된 생일과 일치하는 날이 되면 Stream을 통해 메시지를 전달해야 할 것이다. 메시지의 format은 다음과 같이 정의했다.

{
  "name":"birthday~",
  "token":"sdkjbihssia123",
  "chatId":"12345",
  "birthdayDateTime":"2022-03-06T00:00:00",
}

위와 같이 알람의 이름과 telegram의 bot이 token그리고 chat방의 id를 갖고 있다. 그러면 우선 stream에 위의 정보를 전달해보자. xadd 명령어를 사용하면 된다. stream이 생성되있지 않다면 생성이 될 것이다.

XADD bot-stream * name birthday~ token sdkjbihssia123 chatId 12345 birthdayDateTime 2022-03-06T00:00:00
'1234568726521-0'

xadd를 할때 2번째 매개변수에 *를 넣어준 경우 자동으로 redis가 시간을 key값으로 넣어준다. 실행 후 표시되는 '1234568726521-0'가 데이터의 id이다.

그 다음 xgroup 명령어를 사용해서 consumer group을 생성해주자. consumer group에 대해서 이야기 하기 전에 xread라는 명령어에 대해서 이야기를 해야하는데 xread의 경우에는 모든 client에 대해서 broadcast방식으로 stream을 소비할 수 있게 해준다. 하지만 설계에 따라서 여러개의 client에 대해서 각각 병렬적으로 처리를 해주기를 원할 수 있다. 이 때 사용하는 것이 xreadgroup이다.

같은 xreadgroup을 통해 동일한 consumer group은 서로 병렬적으로 stream을 처리할 수 있다. 같은 consumer group에 C1, C2, C3에서 데이터를 처리하는 예시이다. 번호는 stream 내부 데이터의 index라고 생각하면 된다. 자세한 내용은 레디스 공식 소개페이지에서 참고하기 바란다.

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
XGROUP CREATE bot-stream bot-group $ # bot-stream이라는 stream에 bot-group 이라는 consumer group을 생성
XREADGROUP GROUP bot-group consumer1 count 1 STREAMS bot-stream > # consumer1이라는 클라이언트가 bot-group에서 소비되지 않은 가장 오래된 1개의 데이터를 요청

xreadgroup으로 읽은 후 consumer client에서 적절히 처리(해당 예제에선 telegram api에 요청을 날리는 것)를 한뒤에 XACK를 전달해주면 완료가 된다. xreadgroup만 수행한 경우 pending이라는 상태가 되는데 xreadgroup을 수행한 특정 consumer client에서 처리에 실패했을 경우 xclaim이라는 명령어로 다른 consumer에게 넘기기 위함이다.

XACK bot-stream bot-group 1234568726521-0 # 처리완료

여기서 xack의 3번째 매개변수는 xadd했을 때 생성된 데이터의 id이다.

자 그러면 위와 같은 과정을 spring application과 kotlin application으로 구현해보자.

구현

Producer : Spring Application

  • Spring Batch
  • Spring quartz
  • Spring data jpa
  • kt-arrow (이건 연습삼아 한번 사용해봄..)

Consumer : JVM Console Application

  • lettuce
  • ktor client

언어는 현재 공부중인 Kotlin을 사용하였다. 필요한 부분만 살펴보겠다.

class ProducerTask(
    private val birthdayAlarmRepository: BirthdayAlarmRepository,
    private val redisTemplate: RedisTemplate<String, Any>
) : Tasklet {

    override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
        val localDateNow = LocalDate.now()
        val localDateTimeNowStart = localDateNow.atTime(LocalTime.MIN)
        val localDateTimeNowEnd = localDateNow.atTime(LocalTime.MAX)

        val birthdayAlarmList =
            birthdayAlarmRepository.findAllByBirthdayDateTimeBetween(localDateTimeNowStart, localDateTimeNowEnd)
        birthdayAlarmList.map {
            val record: ObjectRecord<String, RedisBirthdayBotPacket> = StreamRecords.newRecord()
                .`in`<String>("bot-stream")
                .ofObject<RedisBirthdayBotPacket>(
                    RedisBirthdayBotPacket(
                        it.name,
                        it.telegramBot?.token ?: "",
                        it.telegramBot?.chatId ?: "",
                        it.birthdayDateTime
                    )
                )
                .withId(RecordId.autoGenerate())

            //TODO : 전달관련 로그 추가 해야함
            val recordId: RecordId? = redisTemplate.opsForStream<String, RedisBirthdayBotPacket>()
                .add(record)

            println(recordId.toString())
        }

        return RepeatStatus.FINISHED
    }

}

Spring batch job을 사용하여 일정 주기 혹은 일정 시간에 위와 같은 코드를 실행시켜 주기로 했다. BirthdayAlarmRepository로부터 오늘 생일인 사람의 목록을 불러와 bot-stream이라는 이름의 stream에 RedisBirthdayBotPacket(위에서 정의한 데이터의 구조)이라는 형태로 xadd를 해준다고 보면 된다.

suspend fun main(args: Array<String>) {
    val redisClient: RedisClient = RedisClient.create("redis://localhost:6379")
    val client = HttpClient(CIO)

    val connection: StatefulRedisConnection<String, String> = redisClient.connect()
    val syncCommands: RedisCommands<String, String> = connection.sync()

    try {
        syncCommands.xgroupCreate(XReadArgs.StreamOffset.from("bot-stream", "0-0"), "bot-group")
    } catch (redisBusyException: RedisBusyException) {
        println(String.format("\t Group '%s' already exists", "bot-group"))
    }

    while (true) {
        val messages: MutableList<StreamMessage<String, String>> = syncCommands.xreadgroup(
            Consumer.from("bot-group", "consumer_1"),
            XReadArgs.StreamOffset.lastConsumed("bot-stream")
        )
        if (messages.isNotEmpty()) {
            for (message in messages) {
                syncCommands.xack("bot-stream", "application_1", message.id)

                val birthdayBotPacket = RedisBirthdayBotPacket(
                    name = message.body["name"]!!,
                    token = message.body["token"]!!,
                    birthdayDateTime = LocalDateTime.parse(message.body["birthdayDateTime"])
                )
                println(birthdayBotPacket)


                client.request<HttpResponse>("https://api.telegram.org/bot${birthdayBotPacket.token}/sendmessage?") {
                    method = HttpMethod.Get
                    parameter("text", birthdayBotPacket.name)
                    parameter("chat_id", birthdayBotPacket.chatId)
                }
            }
        }
    }

    client.close()
    connection.close()
    redisClient.shutdown()
}

그리고 Consumer Application에서 이를 while문내에서 데이터가 있는경우 xreadgroup으로 읽고 telegram api에 전달을 한다. 자세한 코드는 링크에서 참조바란다.

위 코드에 한가지 문제가 있는데 생일이라는 메시지를 보내려면 년도까지 맞아야한다는 것이다.. 월일만 봐야하는데 글을 거의다 작성하고 눈치챘다. 일단 stream을 사용하는 코드에는 크게 문제가 없으니 나중에 안귀찮을 때 수정해놔야 겠다.

실무에서도 kafka 든 redis stream 이든 뭐가 되었든 제대로 활용을 해보고 싶다.

0개의 댓글