[DB] Redis 활용 패턴 2️⃣

cup-wan·2025년 8월 15일
3

Database

목록 보기
3/3

Intro

Redis 로고가...바뀌었네요???????? 공식은 왼쪽을 밀고 있지만 저는 여전히 오른쪽이 공식 아이콘이라 생각합니다.
글 쓰려고 공식 사이트 좀 보니까 생성형 AI 시대에 맞춰서 Vector DB 로써 타사 제품과 비교한 글도 있네요. 실시간성이 중요한 데이터 플랫폼이 필요한 경우 하나의 선택지가 될 수 있을 것 같습니다.
+) Redis의 벡터 검색 지원은 7.2 버전부터 입니다.

1편에서 Redis의 Cache-Aside, 분산 락, Rate Limiting 에 대해 살펴봤습니다. Redis를 활용해서 Read 성능, 안정성, API 보호까지 다양한 활용 방법에 대해 알아봤는데 더욱 기발한 활용법이 많습니다.
그 중 예약 이체를 구현하는 예제를 통해 토큰, 작업 큐 & 지연 큐, 실시간 Pub/Sub (vs Stream), 순위/리더보드에 대해 알아보겠습니다.

1. 토큰

인증 후 발급되는 세션 ID나 JWT Refresh Token을 빠르고 안전하게 저장하는 용도로 Redis가 자주 사용됩니다.

  • TTL로 자동 만료를 쉽게 다룰 수 있어 RDBMS 보다 효휼이 좋습니다.
  • 로그아웃 / 강제 만료 / 블랙리스트가 필요한 보안에 민감한 서비스에 적합합니다.

키 설계

KeyTypeTTLValue설명
auth:rt:{userId}:{deviceId}StringRefresh Token 만료와 동일해시된 Refresh Token기기별 리프레시 토큰 저장
auth:devices:{userId}Set-deviceId사용자 활성 기기 목록
auth:bl:at:{jti}StringAccess Token 남은 만료"1"Access Token 블랙리스트

구현

1️⃣ Refresh Token 저장, 검증, 삭제

  • 저장 시 해싱 후 저장
  • 검증 로직 추가 가능
  • 삭제 시 특정 디바이스에서만 로그아웃, 모든 디바이스 로그아웃 등 구현 가능
@Service
@RequiredArgsConstructor
public class TokenService {
    private final StringRedisTemplate redis;
    private static final String RT_PREFIX = "auth:rt:"; // auth:rt:{userId}:{deviceId}

    private String key(long userId, String deviceId) {
        return RT_PREFIX + userId + ":" + deviceId;
    }

    // 저장 : HMAC 등으로 해시해 보관
    public void saveRefreshToken(long userId, String deviceId, String hashedRt, Duration ttl) {
        redis.opsForValue().set(key(userId, deviceId), hashedRt, ttl);
        // 기기 목록 관리
        redis.opsForSet().add("auth:devices:" + userId, deviceId);
    }

    // 검증
    public boolean validateRefreshToken(long userId, String deviceId, String hashedRt) {
        String saved = redis.opsForValue().get(key(userId, deviceId));
        return saved != null && saved.equals(hashedRt);
    }

    // 삭제 (특정 기기 로그아웃)
    public void deleteRefreshToken(long userId, String deviceId) {
        redis.delete(key(userId, deviceId));
        redis.opsForSet().remove("auth:devices:" + userId, deviceId);
    }
}

2️⃣ Access Token 블랙리스트

  • 로그아웃/탈취 의심 등으로 이미 발급된 Access Token을 즉시 무효화 하기 위해 사용
  • 토큰의 jti(고유 ID)를 키로 사용하고 TTL을 남은 만료 시간과 동일하게 설정
@Service
@RequiredArgsConstructor
public class AccessTokenBlacklist {
    private final StringRedisTemplate redis;

    // Blacklist = 즉시 차단
    public void blacklist(String atJti, Duration remainingTtl) {
        redis.opsForValue().set("auth:bl:at:" + atJti, "1", remainingTtl);
    }

    // 필터나 게이트웨이에서 사용되는 검사 메서드
    public boolean isBlacklisted(String atJti) {
        return Boolean.TRUE.equals(redis.hasKey("auth:bl:at:" + atJti));
    }
}

+) JWT 필터에서의 사용 예시

// 요청마다 AT의 jti 추출 후 블랙리스트 조회
String jti = claims.getId(); // jti
if (accessTokenBlacklist.isBlacklisted(jti)) {
    throw new Unauthorized("Token revoked");
}

3️⃣ Refresh Token Rotate + 재사용 감지

  • 사용자가 Access Token 재발급 요청 후 기존 Refresh Token으로 인증되면 새 Refresh Token을 발급하고 이전 Refresh Token은 즉시 폐기
  • 원자성 보장을 위해 사용
    • 이전 Refresh Token 삭제 -> 새로운 Refresh Token 발급이 동시에 이루어져야 경쟁 조건 X
  • 검증 -> 교체 과정을 Lua 스크립트로 한 번에 처리하면 경쟁 조건 방지 가능
@Component
@RequiredArgsConstructor
public class RefreshTokenRotator {
    private final StringRedisTemplate redis;

    private static final String ROTATE_LUA =
        // KEYS[1]=oldKey, KEYS[2]=newKey, ARGV[1]=oldHash, ARGV[2]=newHash, ARGV[3]=ttlSeconds
        "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "  redis.call('del', KEYS[1]); " +
        "  redis.call('set', KEYS[2], ARGV[2], 'EX', ARGV[3]); " +
        "  return 1 " +
        "else return 0 end";

    public boolean rotate(long userId, String oldDeviceId, String newDeviceId,
                          String oldHashedRt, String newHashedRt, Duration ttl) {
        String oldKey = "auth:rt:" + userId + ":" + oldDeviceId;
        String newKey = "auth:rt:" + userId + ":" + newDeviceId;
        Long ok = redis.execute((RedisCallback<Long>) conn ->
            conn.scriptingCommands().eval(
                ROTATE_LUA.getBytes(StandardCharsets.UTF_8),
                ReturnType.INTEGER, 2,
                oldKey.getBytes(StandardCharsets.UTF_8),
                newKey.getBytes(StandardCharsets.UTF_8),
                oldHashedRt.getBytes(StandardCharsets.UTF_8),
                newHashedRt.getBytes(StandardCharsets.UTF_8),
                String.valueOf(ttl.toSeconds()).getBytes(StandardCharsets.UTF_8)
            )
        );
        return ok != null && ok == 1L;
    }
}

고려 사항

  • 세션 관리를 위해서는 Spring Session Redis 사용 시 spring-session-data-redis를 쓰면 세션 TTL, 직렬화, 정리 작업의 표준화가 자동으로 이루어집니다.
  • 클러스터/복제 : 인증은 서비스 핵심이므로 Redis는 클러스터 + 복제 + Sentinel/Cloud HA 구성을 권장합니다.
  • 백업 전략 : 장기 세션/토큰은 가급적 짧게 운용하고 (TTL 설정 적당히), 필요 시 감사용 이벤트 로그를 별도 저장소에 저장합니다.

2. 작업 큐 & 지연 큐

목적

예약 이체를 위해 사용자가 T 시각에 A 원 이체를 등록하면,

  • 현재 : 대기 상태
  • T 시간 : 정확히 한 번 실행
  • 실패 시 : 재시도/만기 처리(DLQ)
    의 흐름이 됩니다.

이러한 작업 큐를 Redis로 구현할 수 있습니다.

키 설계

목적키 예시타입값/설명
예약 작업 인덱스jobs:delay:paymentsZSETscore=executeAtMillis, member=job:{id}
작업 페이로드job:payload:{id}HashuserId, amount, toAccount, attempts, …
멱등성 키idem:payment:{id}String"1", EX=보존기간
결과 이벤트txn:stream:ledgerStream이후 단계: 원장/감사/리스크 파이프라인
실패 DLQjobs:dlq:paymentsStream/리스트최대 재시도 초과·영구 실패 건
  • ZSET?
    • Redis의 정렬된 집합 자료 구조형
    • score에 대한 member의 매핑 값 저장 가능
    • member + score로 지금 처리해야 할 작업을 효율적으로 pop 가능

구현

1️⃣ 예약 이체 등록 (Producer)

@Service
@RequiredArgsConstructor
public class PaymentScheduler {
    private final StringRedisTemplate redis;
    private static final String DELAY_KEY = "jobs:delay:payments";

    // 예약 생성 Produce
    // 작업 페이로드를 Hash에 저장하고, ZSET에 실행시각을 점수로 등록
    public void schedulePayment(String jobId, long executeAtMillis,
                                String userId, long amount, String toAccount) {
        // 1) 저장 (Hash) (필요 시 TTL 부여)
        String payloadKey = "job:payload:" + jobId;
        Map<String,String> payload = Map.of(
            "userId", userId,
            "amount", String.valueOf(amount),
            "to", toAccount,
            "attempts", "0"
        );
        redis.opsForHash().putAll(payloadKey, payload);
        
        // 2) ZSET에 스케줄링
        redis.opsForZSet().add(DELAY_KEY, "job:" + jobId, executeAtMillis);
    }
}
  • ZSET에 추가할 때 executeAtMillis를 통해 score 부여
  • 페이로드는 Hash에 저장
    • 만기 후 데이터 보존 기간이 있다면 payload TTL 설정
    • redis.expire(payloadKey, Duration.ofDays(N)

2️⃣ 만기 작업 꺼내기 (Consumer)

  • ZSET 활용
    • ZRANGEBYSCORE : 범위 내에 해당하는 데이터 반환
    • ZREM : 해당 데이터 삭제
    • 두번의 호출로 경쟁 상태 가능
  • 경쟁..상태? = Lua로 해결 가능
@Component
@RequiredArgsConstructor
public class DueJobPopper {
    private final StringRedisTemplate redis;
    private static final String DELAY_KEY = "jobs:delay:payments";

    // nowMillis 이하 작업을 최대 N개까지 pop (원자성 보장)
    private static final String POP_DUE_LUA =
        // KEYS[1]=ZSET, ARGV[1]=nowMillis, ARGV[2]=limit
        "local r = {} " +
        "local vals = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2]) " +
        "for i,v in ipairs(vals) do " +
        "  redis.call('ZREM', KEYS[1], v); table.insert(r, v) " +
        "end; return r";

	// 작업 꺼내기 Consume
    public List<String> popDue(long nowMillis, int limit) {
        List<String> res = redis.execute((RedisConnection conn) -> {
            @SuppressWarnings("unchecked")
            List<byte[]> raw = (List<byte[]>) conn.scriptingCommands().eval(
                POP_DUE_LUA.getBytes(StandardCharsets.UTF_8),
                ReturnType.MULTI, 1,
                DELAY_KEY.getBytes(StandardCharsets.UTF_8),
                String.valueOf(nowMillis).getBytes(StandardCharsets.UTF_8),
                String.valueOf(limit).getBytes(StandardCharsets.UTF_8)
            );
            if (raw == null) return List.of();
            return raw.stream().map(b -> new String(b, StandardCharsets.UTF_8)).toList();
        });
        return res == null ? List.of() : res;
    }
}

3️⃣ ⭐⭐⭐ 실행

@Service
@RequiredArgsConstructor
public class PaymentWorker {
    private final StringRedisTemplate redis;
    private final DueJobPopper popper;
    private final PaymentExecutor executor; // 외부 Executor 호출, 결과 반환
    private static final int BATCH = 100;

    @Scheduled(fixedDelay = 300) // 300ms마다 폴링
    public void tick() {
        long now = System.currentTimeMillis();
        for (String member : popper.popDue(now, BATCH)) {
            String jobId = member.substring("job:".length());
            process(jobId);
        }
    }

    private void process(String jobId) {
        // 1) 멱등성 키: 이미 처리 중 or 완료면 스킵
        boolean first = Boolean.TRUE.equals(
            redis.opsForValue().setIfAbsent("idem:payment:" + jobId, "1", Duration.ofMinutes(10))
        );
        if (!first) return;

        String payloadKey = "job:payload:" + jobId;
        Map<Object,Object> p = redis.opsForHash().entries(payloadKey);
        String userId = (String) p.get("userId");
        long amount = Long.parseLong((String)p.get("amount"));
        String to = (String) p.get("to");
        int attempts = Integer.parseInt((String)p.get("attempts"));

        try {
            PaymentResult r = executor.execute(jobId, userId, amount, to); // 외부 시스템 (PaymentExecutor)
            appendLedgerEvent(r); // → 3단계에서 스트림으로 영속/분석
            publishUserNotify(r); // → Pub/Sub 즉시 알림
        } catch (TransientException te) {
            // 일시 오류 → 재시도 예약 (지수 백오프)
            int next = attempts + 1;
            long delayMs = backoffMs(next); // 예: min(2^next*1000, 5분)
            redis.opsForHash().put(payloadKey, "attempts", String.valueOf(next));
            long requeueAt = System.currentTimeMillis() + delayMs;
            redis.opsForZSet().add("jobs:delay:payments", "job:" + jobId, requeueAt);
        } catch (Throwable fatal) {
            // 영구 실패 → DLQ로 이동
            Map<String,String> dlq = Map.of(
              "jobId", jobId, "userId", userId, "amount", String.valueOf(amount),
              "to", to, "error", fatal.getClass().getSimpleName()
            );
            redis.opsForStream().add("jobs:dlq:payments", dlq);
        } finally {
        }
    }

    private long backoffMs(int attempt) {
        long base = (long)Math.min(Math.pow(2, attempt) * 1000, 5 * 60 * 1000);
        // Jitter로 몰림 완화
        double jitter = 0.8 + Math.random() * 0.4;
        return (long)(base * jitter);
    }

    private void appendLedgerEvent(PaymentResult r) {
        Map<String,String> body = Map.of(
          "txnId", r.txnId(), "userId", r.userId(),
          "amount", String.valueOf(r.amount()), "to", r.toAccount(),
          "status", r.status().name(), "ts", String.valueOf(System.currentTimeMillis())
        );
        redis.opsForStream().add("txn:stream:ledger", body);
    }

    private void publishUserNotify(PaymentResult r) {
        redis.convertAndSend("notify:pub:" + r.userId(),
            "{\"type\":\"PAYMENT\",\"status\":\""+r.status()+"\",\"amount\":"+r.amount()+"}");
    }
}
  • 멱등성 키

    • 예약 이체 실행 시 같은 요청 중복 실행 방지
    • SET key value NX EX ttl 패턴 사용
      • NX : 키 없을 때만 생성 (중복 방지)
      • EX : TTL 설정 (멱등성 보장 기간 설정)
    • ex) idem:transfer:{reservationId}라는 키를 걸어두면 같은 예약 ID로 중복 실행 시도 시 Redis가 첫 실행 이후에는 무시
  • 실패 처리 : 지수 백오프 (Exponential Backoff) + DLQ

    • 은행 송금 API나 타 금융 기관 API 호출 실패 시
    • Exponential Backoff로 재시도 시점 간격을 점점 늘려주면, 시스템 부하가 줄고 일시적인 장애 회복 가능성 상승
    • 재시도 횟수가 일정 한도 초과 시 Dead Letter Queue (DLQ로 이동
      • DLQ에 쌓인 데이터는 운영자가 수동 처리 or 별도 재처리 파이프라인으로 복구
  • Stream 기반 결과 로그

    • 예약 이체 성공/실패 여부를 Redis Stream에 기록
      • 순차적으로 안전한 기록
      • 이후 단계에서 비동기 처리 (기록, 감사 로그 생성, 리스크 모니터링 등)
      • 장애 발생 시 처리 지점부터 재시작 가능

추가) 지연 큐 처리 시간을 지수 백오프로 구현한 예제이지만 지연 큐 전용 라이브러리가 있다! 깃허브 링크

3. Pub/Sub

목적

  • Pub/Sub 모델은 발행자(Pub)가 특정 채널에 메시지 발행 시 해당 채널을 구독한 모든 구독자(Sub)가 메시지를 실시간으로 받는 구조
  • 메시지 전송 시 Redis 저장 X (휘발성)
  • 예약 이체 결과를 실시간으로 알림 (송금 완료, 이체 실패 - 잔액 부족 등)
  • Stream과 차이점
    • Strema : 안전한 저장/재처리가 필요한 데이터
    • Pub/Sub : 일시적이고 빠른 알림

구현

1️⃣ 발행 (Publish)

@Service
@RequiredArgsConstructor
public class TransferPublisher {
    private final StringRedisTemplate redisTemplate;

    public void notifyTransfer(long userId, String status) {
        String message = String.format("{\"userId\":%d,\"status\":\"%s\"}", userId, status);
        // 채널 용도별/사용자별 분리 가능: transfer:notify[:{userId}]
        redisTemplate.convertAndSend("transfer:notify", message);
    }
}

2️⃣ 구독 (Subscrive)

@Component
@RequiredArgsConstructor
public class TransferSubscriber implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("알림: " + body);
        // 여기서 WebSocket/SSE를 통해 클라이언트에 전달
    }
}

4. 순위/리더보드

목적

  • 거래별 리스크 점수를 산출 중
  • ZADDfraud:rank:alerts (ZSET)에 (score=리스크, member=alertId) 반영
  • 거래 분석 시 상위 N건을 즉시 확인 및 선점해 중복 처리 방지

키 설계

목적타입값/설명
우선순위 큐(실시간)fraud:rank:{bucket}ZSETscore=리스크 점수, member=alert:{id}
선점 상태fraud:claimed:{alertId}StringanalystId, EX=작업 타임아웃
이력/아카이브fraud:stream:resolvedStream처리 결과(누가, 언제, 결과)

추가) 버킷 분할{bucket}은 트래픽에 따라 월별, 리전 등으로 나누면 효과적

구현

1️⃣ 점수 반영

@Service
@RequiredArgsConstructor
public class FraudPriorityWriter {
    private final StringRedisTemplate redis;

    // 리스크 점수 산출 결과를 반영
    public void upsert(String bucket, String alertId, double riskScore) {
        String key = "fraud:rank:" + bucket;
        redis.opsForZSet().add(key, "alert:" + alertId, riskScore);
    }

    // 누적 가중이 필요한 경우
    public Double incr(String bucket, String alertId, double delta) {
        String key = "fraud:rank:" + bucket;
        return redis.opsForZSet().incrementScore(key, "alert:" + alertId, delta);
    }
}

2️⃣ 조회: 상위 N, 순위, 점수 구간

@Service
@RequiredArgsConstructor
public class FraudPriorityReader {
    private final StringRedisTemplate redis;

    public List<String> topN(String bucket, int n) {
        var key = "fraud:rank:" + bucket;
        var r = redis.opsForZSet().reverseRange(key, 0, n - 1);
        return r == null ? List.of() : r.stream().toList();
    }

    public Long rankOf(String bucket, String alertId) {
        var key = "fraud:rank:" + bucket;
        return redis.opsForZSet().reverseRank(key, "alert:" + alertId); // 0 = 1위
    }

    public Set<ZSetOperations.TypedTuple<String>> byScore(String bucket, double min, double max, int limit) {
        var key = "fraud:rank:" + bucket;
        return redis.opsForZSet().reverseRangeByScoreWithScores(key, min, max, 0, limit);
    }
}

3️⃣ 선점(claim) : 경쟁 없이 한 건 뽑아 배정

@Component
@RequiredArgsConstructor
public class FraudClaimer {
    private final StringRedisTemplate redis;

    // KEYS[1]=ZSET fraud:rank:{bucket}
    // ARGV[1]=nowMillis, ARGV[2]=analystId, ARGV[3]=claimTtlSeconds
    private static final String CLAIM_LUA =
        "local k=KEYS[1] " +
        "local top = redis.call('ZREVRANGE', k, 0, 0) " +
        "if #top == 0 then return nil end " +
        "local member = top[1] " +
        "redis.call('ZREM', k, member) " +                       -- 우선순위 큐에서 제거
        "redis.call('SET', 'fraud:claimed:'..member, ARGV[2], 'EX', ARGV[3]) " + -- 선점 상태 기록
        "return member";

    // 상위 1건을 원자적으로 꺼내 선점. 없으면 null
    public String claimTopOne(String bucket, String analystId, Duration claimTtl) {
        String zset = "fraud:rank:" + bucket;
        return redis.execute((RedisConnection c) ->
            (String) c.scriptingCommands().eval(
                CLAIM_LUA.getBytes(StandardCharsets.UTF_8),
                ReturnType.VALUE, 1,
                zset.getBytes(StandardCharsets.UTF_8),
                String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8),
                analystId.getBytes(StandardCharsets.UTF_8),
                String.valueOf(claimTtl.toSeconds()).getBytes(StandardCharsets.UTF_8)
            )
        );
    }
}
  • 상위 항목 분석 요청 겹칠 때 가장 높은 건수 1건을 꺼낸 후 선점하는 것을 Atomic 하게 처리
  • Lua로 구현
  • 선점 TTL이 지나면 fraud:claimed:alert:{id} 키가 만료되어 자동 반환
  • 선점 후 실제 처리 완료 시 아카이브(Stream)에 기록 후 claimed키 삭제
@Service
@RequiredArgsConstructor
public class FraudArchive {
    private final StringRedisTemplate redis;

    public String resolve(String alertMember, String analystId, String decision, String reason) {
        // 1) 선점 해제
        redis.delete("fraud:claimed:" + alertMember);

        // 2) 처리 이력 스트림에 기록(누가/언제/결과)
        Map<String, String> body = Map.of(
            "alert", alertMember,
            "analyst", analystId,
            "decision", decision,      // APPROVE / REJECT / ESCALATE ...
            "reason", reason,
            "ts", String.valueOf(System.currentTimeMillis())
        );
        return redis.opsForStream().add("fraud:stream:resolved", body);
    }
}

Outro

이번에는 1탄과 다르게 은행권이라는.. 도메인을 설정해서 범위를 좀 좁혀보고 이해에 도움이 되도록 노력해봤습니다. Redis는 ZSET, Lua 등 활용 방법을 넓혀주는 다양한 기술이 있어서 무궁무진하게 사용할 수 있는 NoSQL이라 생각합니다.
캐싱을 위해서만이 아닌 다른 기능으로 Redis 한번 사용해보시는 건 어떨까요.
다음엔 저도 프로젝트 리팩토링 해보며 어떻게 활용되는지 더 자세한 예제 코드와 함께 돌아오겠습니다.

profile
아무것도 안해서 유죄 판결 받음

0개의 댓글