[Redis] Publish/ Subscribe

Vorhandenheit ·2022년 8월 21일
0

Redis

목록 보기
5/5

[Redis] Publish/Subscribe

얼마전에 MQTT에 대략적으로 알아보았지만 REDIS로도 이것이 가능하다는 걸 알게되었습니다. 회사에서 사용을 해서, 이 코드를 발전시켜가야하기 때문에 어떤건지 파악하기 위해서 공부를 해야합니다.

다른 message 시스템도 있지만(RabbitMQ, Kafka) redis에서 이 Pub/sub를 사용하는 이유는 인 레디스가 인 메모리 형식이라 pub/sub 모델을 구현하면 가볍고 빠르다는 장점이 있습니다.
차이점이 있다면 Redis는 메세지를 별도로 저장하지않고 메세지를 한번 보내면 땡 입니다.

실시간 데이터처리에는 매우 적합하지만, 저장되지않는다는 걸 반드시 알고있어야합니다.
Redis stream이 그 대안이 될 수 있다고 합니다.

1. Pub/Sub 명령어

  • subscribe (channel, message)
    지정한 채널로 보내진 메세지를 받습니다.

  • publish (channel, message)
    메세지를 지정한 채널로 보냅니다.

  • pubsub (
    subscribe를 관리할 수 있는 명령어입니다.

  • psubscribe (pattern)
    채널을 패턴으로 등록합니다.
    - '?' 한 글자를 대치합니다
    - '*' 공백이나 여러 글자를 대치합니다
    - h[ae[llo 'a'나 'e'만 올 수 있습니다.

  • unsubscribe (channel)
    subscribe로 등록한 채널을 삭제해서, 더 이상 메세지를 받지않습니다. 채널명을 입력하지않으면 해당 클라이언트에 등록된 모든 채널을 삭제합니다.

  • punsubscribe (pattern)
    psubscribe로 등록한 패턴을 삭제해서, 더 이상 메세지를 받지 않습니다. 패턴명을 입력하지않으면 해당 클라이언트에 등록된 모든 패턴을 삭제합니다.

  • pubsub subcommnad (argument)
    pubsub 명령어는 서버에 등록된 채널이나 패턴을 조회합니다. 세 가지 subcommand가 있습니다.
    - pubsub channels (pattern)
    pattern을 입력하지 않으면 해당 서버에 등록된 모든 채널명을 보여줍니다. 여기서 pattern은 채널명을 global-style로 찾습니다.
    - pubsub numsub (channel)
    해당 채널에 등록된 클라이언트 개수를 보여줍니다.
    - pubsub numpat
    서버에 등록된 pattern의 개수를 보여줍니다.

    각 명령어 내부구조

    SUBSCRIBE


    채널을 등록해서 메세지를 받는 명령입니다. 서버 구조체와 클라이언트 구조체에 채널을 등록합니다.

  1. dictAdd(clinet=> pubsub_channels, channel, NULL)을 실행하여 REDIScLIENT.PUBSUB_CHANNELS의 dict구조체에 채널명을 등록합니다. 세 번째 인수가 값으로 Null이 들어갑니다.

  2. dictFind(server.pubsub_channels, channel)을 수행하여 채널이 없으면 list = listCreate()를 수행하여 리스트를 생성합니다. Channel이 이미 등록되어있으면, list = dictGetVal(dictEntry)를 수행해서 리스트를 얻어오고, 4번으로 갑니다.

  3. dictAdd(server.pubsub_channels, channel, list)를 수행해서 채널과 리스트를 dict에 등록합니다.

  4. listAddNodeTail(list, client)를 수행해서 리스트에 클라이언트를 등록합니다,.

psubscribe


패턴을 등록해서 메세지를 받는 명령입니다. psubscribe 명령이 수행되면 서버와 클라이언트의 리스트에 패턴을 등록합니다.

  1. listSearchKey(client => pubsub_patterns, pattern)로 패턴이 이미 있는 지 확인합니다. 없으면 listAddNodeTail(client => pubsub_patterns, pattern)를 수행해서 클라이언트 리스트에 패턴을 등록합니다.

  2. listAddNodeTail(server.pubsub_patterns, pat)로 서버에 패턴을 등록하는데, 클라이언트에는 패턴만 등록하는 반면에, 서버에는 pubsubPattern 구조체에 client와 pattern을 담아서 같이 등록합니다.왜냐하면 패턴을 등록한 클라이언트에 메세지를 보내기위해서 입니다.

unsubscribe


등록한 채널을 삭제해서 더 이상 메세지를 받지않도록 합니다.
인수로 채널을 입력하지 않으면 pubsubUnsubscribeAllChannels()를 호출해서 클라이언트에 등록된 모든 채널을 삭제합니다.
pubsubUnsubscribeAllChannels()에서 while loop를 돌면서 pubsubUnsubscribeChannel()를 호출해서 dictNect()를 이용해서 채널을 하나씩 삭제합니다.
채널을 입력하면 입력한 채널 개수만큼 while loop를 돌면서 pubsubUnsubscribeChannel()를 호출해서 채널을 하나씩 삭제합니다.

  1. dictDelete(clinet => pubsub_channels,channel)를 수행해서 클라이언트에 채널을 삭제합니다.
  2. dictEntry = dictFind(server.pubsub_channels, channel)로 서버에서 dictEntry를 구합니다.
  3. list = dictGetVal(dictEntry)로 value에 저장된 list를 가져옵니다
  4. listNode = listSearchKey(list, client)를 수행해서 리스트 노드를 얻어옵니다
  5. listDelNode(list, listNode)를 수행해서 리스트에서 노드를 삭제합니다.
  6. 리스트에 노드가 하나도없으면 dictDelete(server.pubsub_channels, channel)를 수행해서 리스트를 삭제합니다

punsubscribe

등록한 패턴을 삭제해서 더 이상 메세지를 받지 않도록 합니다.
인수로 패턴을 입력하지 않으면 pubsubUnsubscribeAllPatterns()를 호출해서 클라이언트에 등록된 pubsubUnsubscribePattern()를 호출해서 listDelNode()를 이용해서 클라이언트에 등록된 모든 패턴을 삭제합니다.
pubsubUnsubscribeAllPatterns()에서 while loop를 돌면서 pubsubUnsubscribePattern()를 호출해서 listDelNode()를 이용해서 채널을 하나씩 삭제한다.
채널을 입력하면 입력한 채널 개수만큼 while loop를 돌면서 pubsubUnsubscribePattern()를 호출해서 채널을 하나씩 삭제한다.
1. 먼저 listSearchKey(client->pubsub_patterns,pattern)을 실행해서 클라이언트에서 패턴을 찾는다.
2. listDelNode(client->pubsub_patterns,listNode)를 수행해서 패턴을 삭제한다.
3. 다음 listSearchKey(server->pubsub_patterns,pubsubPattern)을 실행해서 서버에서 패턴을 찾는다.
4. listDelNode(server->pubsub_patterns,listNode)를 수행해서 패턴을 삭제한다.

publish

publish는 메세지를 보내는 명령입니다. 서버 내에서 클라이언트에게 메세지를 보내는 것과 서버가 클러스터 모드이면 다른 서버에게 전달해서 메세지를 각 서버에 접속해있는 클라이언트들에게 메세지를 보내는 것입니다.

서버 내
1. 먼저 서버에서 해당 채널을 찾습니다. dictFind(server.pubsub_channels, channel)
2. listNext(listNode)로 노드가 없을 때까지 While Loop를 돌면서 등록된 클라이언트에 addReplyBulk(client, message)로 메세지를 보냅니다
3. listLength(server.pubsub_patterns)으로 확인해서 서버에 등록된 패턴이 있다면
4. listNext(listNode)로 노드가 없을 때까지 While Loop를 돌면서 stringmatchlen() function으로 비교해서 맞으면 클라이언트에 addReplyBulk(pubsubPattern => client, message)로 메세지를 보냅니다

클러스터
클러스터 서버들에 대한 정보가 dict에 저장되어있고, dictNext()로 서버 정보를 얻어서 채널과 메세지를 보냅니다. 그럼 해당 서버는 채널과 메세지를 받아서 위와 같은 과정을 거쳐서 클라이언트들에게 메세지를 보냅니다

pubsub

  • Subcommand가 channels이면 server.pubsub_channels에서 dictNext()로 이동하면서 채널을 보여줍니다. 패턴이 입력되었다면 stringmatchlen()으로 패턴과 비교해서 맞는 채널들만 보여줍니다.
  • numsub이면 해당 채널에 등록된 클라이언트 개수를 listLength(list)로 구해서 보여줍니다
  • numpat면 리스트에 등록된 패턴 개수를 listLength(list)로 구해서 보여줍니다.

2. pub/sub 내부구조

(1) server.pubsub_channels

서버측에서 channel을 저장하는 구조

pubsub_channels는 채널을 저장하는 dict 구조체를 가리킵니다.
dict구조체는 여러개의 dictEntry를 가지고 있고 각 Entry는 key : robj channel, value: list 타입으로 linked list로 이루어져있습니다.

PUBLISH channel_name message => dict에서 channel을 찾고 그 value에 있는 linked list를 돌면서 client들에게 메세지를 보냅니다.

(2) server.pubsub_patterns

서버 측에서 패턴을 저장하는 구조

pubsub_patterns는 패턴을 저장하는 노드으 ㅣ리스트를 가리킵니다. 각각의 노드는 Pattern 구조체를 가리키고 이 구조체는 client와 pattern을 가집니다.

PUBLISH channel_name message => channel 명으로 클라이언트에 메세지를 보낸 다음에 pubsub_patterns을 돌며 패턴에 맞느 ㄴclient에게 메세지를 보냅니다.

(3) client.pubsub_channel

client측에서 channel을 저장하는 구조

SUBSCRIBE channel_name => server 구조체와 client 구조체 둘다 채널을 저장합니다. client 구조체는 해당 채널을 등록하고 클라이언트를 pubsub모드로 전환하는 역할을 합니다.
평소에는 normal모드이고 client buffer가 무제한입니다. 그러나 subscribe를 하게되면 pubsub모드로 변하게되면서 hard limit: 32mb, soft limit 8mb로 제한합니다.

(4) client.pubsub_patterns

client 측에서 pattern을 저장하는 구조

client가 pattern을 저장해야되는 구조체입니다. server.pubsub_patterns는 client와 pattern을 묶어서 저장해야했기 때문에 pubsubPattern이라는 robj를 하나 더 가졌지만 여기서 패턴만 기억하면 되기 때문에 노드에 direct로 robj pattern이 연결되어있습니다.

psubscribe pattern => list에 pattern을 저장하고 서버 구조체에도 패턴을 저장하며 클라이언트를 pubsub모드로 전환하는 역할도합니다.

출처

https://inpa.tistory.com/entry/REDIS-%F0%9F%93%9A-PUBSUB-%EA%B8%B0%EB%8A%A5-%EC%86%8C%EA%B0%9C-%EC%B1%84%ED%8C%85-%EA%B5%AC%EB%8F%85-%EC%95%8C%EB%A6%BC?category=918728
https://brunch.co.kr/@springboot/374
http://redisgate.kr/redis/command/pubsub_intro.php
https://velog.io/@ekzm8523/Redis-pubsub

profile
읽고 기록하고 고민하고 사용하고 개발하자!

0개의 댓글