실제 서비스에서 Redis Pub/Sub을 사용할 때 단순히 "구독자에게 메시지를 퍼뜨린다"로만 이해하면 부족하다. Redis 내부에서는 어떻게 동작하고 있을까?
Redis Pub/Sub은 발행자-구독자 패턴을 따르며, 중간에 Redis가 브로커 역할을 수행한다.
메시지는 Redis 서버를 거쳐 해당 채널을 구독 중인 모든 클라이언트에게 fan-out 된다.
📌 이 구조는 트위터의 일반 유저 알림 시스템에 적용된적이 있다고 한다 (인플루언서는 별도 처리)
Redis는 채널 기반 fan-out 시스템으로 동작한다.
예를 들어:
하나의 채널에 대해 여러 클라이언트가 구독하고 있으면, 메시지는 모두에게 전파된다. → 이것이 바로 1:N 구조다.
Redis는 Pub/Sub을 다음과 같이 관리한다:
dict<string, list<client>> pubsub_channels;
pubsub_channels = {
"chatting-room-1": [clientA, clientB],
"chatting-room-2": [clientC]
}
ClientD가 "chatting-room-2"를 구독하면?
pubsub_channels = {
"chatting-room-1": [clientA, clientB],
"chatting-room-2": [clientC, clientD]
}
❗ 중복 구독은 어떻게 될까?
ㄱㅊ Redis는 Set처럼 동작해서, 같은 채널을 여러 번 구독해도 한 번만 등록된다.
코드로 보면 다음과 같다:
int pubsubSubscribeChannel(client *c, robj *channel) {
if (dictAdd(c->pubsub_channels, channel, NULL) == DICT_OK) {
...
listAddNodeTail(clients, c);
}
}
그럼 다시 자료구조로 돌아와서 시간복잡도를 알아보자
redis는 싱글 스레드(워커) 기반이기 때문에 시간복잡도가 중요함
| 연산 | 동작 | 시간복잡도 |
|---|---|---|
| 채널 추가 | dictAdd | O(1) 평균 / O(N) 최악 |
| 채널 제거 | dictDelete | O(1) 평균 / O(N) 최악 |
| 채널 탐색 | dictFind | O(1) 평균 / O(N) 최악 |
| 구독자 추가 | listAddNodeTail | O(1) |
| 구독자 제거 | listDelNode | O(1) (노드 참조 필요) |
| 구독자 탐색 | listSearchKey | O(N) |
채널 추가 제거 탐색은 HashMap이기 때문에 시간복잡도가 좋음 (리해싱이나 해시 충돌은 나중에)
구독자 추가, 제거도 리스트기 때문에 간편함 하지만 구독자 탐색이 시간복잡도가 N임
그렇다면?? 하나의 채널에 구독자가 많으면 많을수록 안좋다,
Redis의 해시 테이블은 성능에 민감하기 때문에 다음과 같은 조건을 만족해야 한다:
| 조건 | 이유 |
|---|---|
| 빠른 해시 연산 | O(1) 삽입/조회 보장 |
| 낮은 충돌률 | 성능 저하 방지 |
| 암호화 불필요 | 보안보다 속도가 중요 |
그래서 Redis는 SHA256, MD5 대신 murmurhash2 기반의 dictGenHashFunction을 사용한다.
murmurhash2 특징:
murmurhash2는 Google 출신 개발자인 Austin Appleby가 만든 비암호화(non-cryptographic) 해시 함수
Redis의 내부 해시 테이블은 어떻게 충돌을 처리할까?
Java의 HashMap과 비교하며 Redis의 설계를 들여다보자.
해시 충돌은 서로 다른 키가 동일한 해시 슬롯으로 매핑되는 상황이다.
Java(8이상) 와 Redis는 이를 어떻게 처리할까?
| 항목 | Java (HashMap) | Redis |
|---|---|---|
| 충돌 처리 방식 | 체이닝 (LinkedList), 이후 Tree 전환 | 체이닝 (단순 LinkedList) |
| 트리 전환 조건 | 동일 슬롯에 8개 이상 엔트리 | 없음 |
| 트리 자료구조 | Red-Black Tree | ❌ |
| 이유 | O(N) → O(log N) 성능 개선 | 메모리 절약 + 단순성 |
간단한 해시인덱스 - 연결리스트 기반의 해시 체이닝을 기반
그렇다면? 엔트리 개수가 많아질수록 한 해시인덱스의의 연결리스트안의 엔트리가 많이 추가되므로 해시 충돌 확률도 올라감
어떻게 해쉬 충돌을 해결했을까? → 해시 리사이징과 점진적 리해싱으로 해결
hash % 4 로 간단하게 슬롯을 나눈다고 가정
| 순서 | key | hash % 4 | 삽입 위치 | 상태 | 테이블 상태 (ht[0]) |
|---|---|---|---|---|---|
| 1 | "apple" | 1 | 1 | 삽입 | [1] apple |
| 2 | "banana" | 1 | 1 (충돌) | 체이닝 | [1] banana → apple |
| 3 | "melon" | 2 | 2 | 삽입 | [2] melon |
| 4 | "grape" | 0 | 0 | 삽입 + 리해싱 트리거 | [0] grape |
❓ 왜 banana가 apple 앞에 왔을까?
Redis는 최근 삽입된 데이터를 리스트의 앞쪽에 배치한다.
최근 데이터가 자주 조회될 가능성이 높기 때문이다.
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
int htidx;
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
htidx = dictIsRehashing(d) ? 1 : 0;
size_t metasize = dictMetadataSize(d);
entry = zmalloc(sizeof(*entry) + metasize);
if (metasize > 0) {
memset(dictMetadata(entry), 0, metasize);
}
entry->next = d->ht_table[htidx][index];
d->ht_table[htidx][index] = entry;
d->ht_used[htidx]++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
여기서
entry->next = d->ht_table[htidx][index];
d->ht_table[htidx][index] = entry;
이부분을 보면 맨 앞으로 삽입한것을 볼 수 있다. 최근에 추가한 놈을 찾을 확률이 높기 때문에 앞에다 배치 이런식으로 redis는 해시 충돌을 링크드 리스트 체이닝 방식으로 해결한다.
근데 자바는 충돌 횟수가 8개 즉 동일 슬롯에 8개 이상 충돌 시 링크드 리스트를 red black tree로 변경한다. 왜? 조회성능을 올리려고 O(N) → O(log N)
하지만 redis는 충돌이 많아져도 트리 변환을 안한다,
왜와이?
| 순서 | key | hash % 4 | 삽입 위치 | 상태 변경 | 테이블 상태 (ht[0]) | used | load factor |
|---|---|---|---|---|---|---|---|
| 1 | "apple" | 1 | 1 | 삽입 | [0] null[1] apple[2] null[3] null | 1 | 0.25 |
| 2 | "banana" | 1 | 1 (충돌) | 체이닝 추가 | [0] null[1] banana → apple[2] null[3] null | 2 | 0.5 |
| 3 | "melon" | 2 | 2 | 삽입 | [0] null[1] banana → apple[2] melon[3] null | 3 | 0.75 |
| 4 | "grape" | 0 | 0 | 삽입 + 트리거 | [0] grape[1] banana → apple[2] melon[3] null | 4 | 1.0 ✅ |
이제 4번 순서로 가보자
여기서 Load Factor = used / size 이다
4번째 grape를 넣는 순간 load factor가 1.0이 되면서 리사이징 조건을 충족하면서 리해싱을 시작함
struct dict {
dictType *type;
dictEntry **ht_table[2];
unsigned long ht_used[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
};
처음에 dict를 만들때부터 해시테이블은 두개 만들어놨음
그래서 두번쨰 해시테이블에 조금씩 점직적으로 요청 처리시마다 조금씩 옮김
새로운 테이블은 사이즈를 늘려서 만듬
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
이부분이 리해싱하고 잇는 인덱스라고 보면 됨
이제 리해싱 시작: 점진적 이동
새 테이블 생성:
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
if (dict_can_resize == DICT_RESIZE_AVOID &&
(DICTHT_SIZE(d->ht_size_exp[1]) / DICTHT_SIZE(d->ht_size_exp[0]) < dict_force_resize_ratio))
{
return 0;
}
while(n-- && d->ht_used[0] != 0) { // -> n다쓸때까지, 해시테이블[0]에 아무것도 없을 때 까지
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
while(d->ht_table[0][d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht_table[0][d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
de->next = d->ht_table[1][h];
d->ht_table[1][h] = de;
d->ht_used[0]--;
d->ht_used[1]++;
de = nextde;
}
d->ht_table[0][d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht_used[0] == 0) {
zfree(d->ht_table[0]);
/* Copy the new ht onto the old one */
d->ht_table[0] = d->ht_table[1];
d->ht_used[0] = d->ht_used[1];
d->ht_size_exp[0] = d->ht_size_exp[1];
_dictReset(d, 1);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
de->next = d->ht_table[1][h];
d->ht_table[1][h] = de;
d->ht_used[0]--;
d->ht_used[1]++;
| Table | Slot | Entry |
|---|---|---|
| ht[0] | 0 | apple ← 아직 안 옮김 |
| 1 | melon | |
| 2 | grape | |
| ht1 | 0~7 | [4] banana, 나머지는 null |
그럼 다음 요청에선 apple 리해싱함
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
if (malloc_failed) *malloc_failed = 0;
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht_used[0] > size)
return DICT_ERR;
/* the new hash table */
dictEntry **new_ht_table;
unsigned long new_ht_used;
signed char new_ht_size_exp = _dictNextExp(size);
/* Detect overflows */
size_t newsize = 1ul<<new_ht_size_exp;
if (newsize < size || newsize * sizeof(dictEntry*) < newsize)
return DICT_ERR;
/* Rehashing to the same table size is not useful. */
if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */
if (malloc_failed) {
new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*));
*malloc_failed = new_ht_table == NULL;
if (*malloc_failed)
return DICT_ERR;
} else
new_ht_table = zcalloc(newsize*sizeof(dictEntry*));
new_ht_used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht_table[0] == NULL) {
d->ht_size_exp[0] = new_ht_size_exp;
d->ht_used[0] = new_ht_used;
d->ht_table[0] = new_ht_table;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
d->ht_size_exp[1] = new_ht_size_exp;
d->ht_used[1] = new_ht_used;
d->ht_table[1] = new_ht_table;
d->rehashidx = 0;
return DICT_OK;
}
리해싱은 슬롯 단위로, 순차적(0 → 1 → 2 → …)으로 진행됨 (다음 요청에는 1번 슬롯이 옮겨지겠지?)
d->ht_size_exp[1] = new_ht_size_exp;
d->ht_used[1] = new_ht_used;
d->ht_table[1] = new_ht_table;
d->rehashidx = 0; -> 0번 슬롯부터 시작
return DICT_OK;
삽입은 항상 ht[1]에만 들어감
조회는 ht[0]과 ht[1] 둘 다 탐색함
/* Check if we already rehashed the whole table... */
if (d->ht_used[0] == 0) {
zfree(d->ht_table[0]);
/* Copy the new ht onto the old one */
d->ht_table[0] = d->ht_table[1];
d->ht_used[0] = d->ht_used[1];
d->ht_size_exp[0] = d->ht_size_exp[1];
_dictReset(d, 1);
d->rehashidx = -1;
return 0;
}
rehashidx == ht[0].size (예: 4) 되면:
그럼 그 사이에 추가되는 값 추가는 어떻게 될까?
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
int htidx;
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
htidx = dictIsRehashing(d) ? 1 : 0;
size_t metasize = dictMetadataSize(d);
entry = zmalloc(sizeof(*entry) + metasize);
if (metasize > 0) {
memset(dictMetadata(entry), 0, metasize);
}
entry->next = d->ht_table[htidx][index];
d->ht_table[htidx][index] = entry;
d->ht_used[htidx]++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
htidx = dictIsRehashing(d) ? 1 : 0; → 리해싱 중이라면 1, 아니면 0
redis는 기본적으로 단일 스레드 구조다 모든 명령은 하나의 메인 스레드에서 실행된다 → pub/sub의 메시지 발행, 전송도 하나의 스레드에서 처리한다.
핵심은 논블로킹 I/O, 이벤트루프 이다
자세히 설명해보면
리눅스에서 FD → 모든 것은 파일이다
소켓 또한 파일이다
redis는 모든 클라이언트 소켓을 non-blocking으로 설정한다 따라서 어떤 클라이언트가 느리더라도, 그 클라이언트 때문에 Redis 전체가 멈추지 않는다
또한 enpoll 기반의 이벤트 루프로 대기중인 수천개의 소켓(파일)을 효율적으로 모니터링 한다
간단하게 설명하면 select 시스템 콜은, for문 처럼 모든 소켓을 순회한다면 (O(n))
enpoll은
간단하게 소켓중에 준비되거나 보낼게 있는 놈들만 조사할게! 이다
이벤트 감지를 select처럼 for문으로 순회하는게 아닌 LinkedList에서 이벤트 감지 하기 때문에 O(n) FD등록은 red black tree에 하기 때문에 O(log n)
또한 메시지 전파는 매우 빠르다 왜? 사실 아까 말했듯이 구독자 리스트 조회할때, O(n)이긴 하다, 하지만 구독자의 숫자는 적게 설정하기 때문에 n이 작다
또한 실제 메시지를 바로 TCP로 전송하는게 아니라 클라이언트 버퍼에 넣어둔다, 그 후 write 시스템 콜이 가능할때(이벤트 루프에서 클라이언트가 writeable 상태로 감지되면), OS가 알려주면 버퍼에서 꺼내서 전송한다
메인 스레드는 list 순회만 하면 끝난다
[PUBLISH foo "hello"]
↓
[Redis 단일 스레드]
↓
(pubsub_channels["foo"] → clientA, clientB, clientC)
↓
각 client의 reply 버퍼에 메시지 push
↓
event loop가 write 가능할 때 TCP로 전송
아까 말했듯이
Redis는 메시지를 "바로 소켓으로 보내는 게 아니라, 먼저 출력 버퍼에 넣고, 소켓에 write() 시도함"
그리고 클라이언트가 읽지 않으면, 그 write는 지연되고, 버퍼가 점점 차게 됨.
예시)
Redis 서버 입장에서 “클라이언트에게 보낼 데이터를 임시로 쌓아두는 공간”인데. 클라이언트가 느리거나 받지 않으면, 이 공간에 계속 메시지가 쌓임
Redis는 메시지를 전송할 준비가 되면,클라이언트 소켓에 write() 하기 전에 → buf에 먼저 넣어둠 → output buffer
문제가 되는 상황은
typedef struct client {
...
list *reply; // 💬 일반적인 응답 메시지 큐 (linked list)
char buf[PROTO_REPLY_CHUNK_BYTES]; // 🔄 작고 빠른 응답 캐시 (16KB 정도)
size_t sentlen; // 현재까지 write된 양
...
}
동작 흐름
buf[]에 저장reply list에 저장sendReplyToClient() 호출write()로 전송, 전송된 만큼 buf/list에서 지움근데만약에 버퍼가 계속 쌓이면?
ex) 느린 클라이언트에게 메시지를 꼐속 보내는 경우 Pub/Sub
client → reply가 너무 커지면, 메모리 누수, 서버 과부하
redis는 이를 감지하고 자동 차단
# redis.conf
client-output-buffer-limit pubsub 32mb 8mb 60
| 항목 | 의미 |
|---|---|
| 32mb | 절대 최대치: 넘으면 즉시 연결 종료 |
| 8mb 60 | 최근 60초 동안 평균 8MB 이상이면 연결 종료 |
스레드 하나당, 클라이언트와 연결하나일까? 놉 자바는 selector 기반의 이벤트 루프가 들어있음
webflux가 아닌데도 어떻게 이벤트 루프가 된다는거지?
스프링 자체가 이벤트 루프를 갖고 있찌 않지만, 서블릿 컨테이너 (tomcat)는 내부적으로 이벤트루프를 갖고있기 때문
웹소켓 연결 흐름 (Spring + Tomcat):
연결 상태 유지
메시지 수신 시 처리
연결 수 제한
스레드 수 제한
[Client] → WebSocket → [Tomcat 스레드 처리] → RedisTemplate.convertAndSend() ← 동기
↓
Redis (PUBLISH)
↓
[RedisMessageListenerContainer 스레드] ← 비동기 → RedisSubscriber.sendMessage()
이 RedisMessageListenerContainer는 백그라운드 스레드에서 돌아가며
Redis Pub/Sub 메시지를 비동기적으로 수신하고,
우리가 등록한 RedisSubscriber.sendMessage() 메서드를 호출 즉, 수신은 완전히 비동기로 동작하고, 서버는 따로 요청 기다릴 필요 없음
서버가 Redis에게 "나 여기에 구독할게" 하고 등록만 해놓고, 이후에는 직접 요청하지 않아도, Redis에서 메시지가 오면 → 자동으로 호출되는 방식
즉,
서버는 수동으로 받을 준비를 하지 않음
Redis가 알아서 메시지 줄 때 → 리스너 스레드가 대신 받고 처리