Redis 로고가...바뀌었네요???????? 공식은 왼쪽을 밀고 있지만 저는 여전히 오른쪽이 공식 아이콘이라 생각합니다.
글 쓰려고 공식 사이트 좀 보니까 생성형 AI 시대에 맞춰서 Vector DB 로써 타사 제품과 비교한 글도 있네요. 실시간성이 중요한 데이터 플랫폼이 필요한 경우 하나의 선택지가 될 수 있을 것 같습니다.
+) Redis의 벡터 검색 지원은 7.2 버전부터 입니다.
1편에서 Redis의 Cache-Aside, 분산 락, Rate Limiting 에 대해 살펴봤습니다. Redis를 활용해서 Read 성능, 안정성, API 보호까지 다양한 활용 방법에 대해 알아봤는데 더욱 기발한 활용법이 많습니다.
그 중 예약 이체를 구현하는 예제를 통해 토큰, 작업 큐 & 지연 큐, 실시간 Pub/Sub (vs Stream), 순위/리더보드에 대해 알아보겠습니다.
인증 후 발급되는 세션 ID나 JWT Refresh Token을 빠르고 안전하게 저장하는 용도로 Redis가 자주 사용됩니다.
| Key | Type | TTL | Value | 설명 |
|---|---|---|---|---|
auth:rt:{userId}:{deviceId} | String | Refresh Token 만료와 동일 | 해시된 Refresh Token | 기기별 리프레시 토큰 저장 |
auth:devices:{userId} | Set | - | deviceId | 사용자 활성 기기 목록 |
auth:bl:at:{jti} | String | Access Token 남은 만료 | "1" | Access Token 블랙리스트 |
1️⃣ Refresh Token 저장, 검증, 삭제
@Service
@RequiredArgsConstructor
public class TokenService {
private final StringRedisTemplate redis;
private static final String RT_PREFIX = "auth:rt:"; // auth:rt:{userId}:{deviceId}
private String key(long userId, String deviceId) {
return RT_PREFIX + userId + ":" + deviceId;
}
// 저장 : HMAC 등으로 해시해 보관
public void saveRefreshToken(long userId, String deviceId, String hashedRt, Duration ttl) {
redis.opsForValue().set(key(userId, deviceId), hashedRt, ttl);
// 기기 목록 관리
redis.opsForSet().add("auth:devices:" + userId, deviceId);
}
// 검증
public boolean validateRefreshToken(long userId, String deviceId, String hashedRt) {
String saved = redis.opsForValue().get(key(userId, deviceId));
return saved != null && saved.equals(hashedRt);
}
// 삭제 (특정 기기 로그아웃)
public void deleteRefreshToken(long userId, String deviceId) {
redis.delete(key(userId, deviceId));
redis.opsForSet().remove("auth:devices:" + userId, deviceId);
}
}
2️⃣ Access Token 블랙리스트
jti(고유 ID)를 키로 사용하고 TTL을 남은 만료 시간과 동일하게 설정@Service
@RequiredArgsConstructor
public class AccessTokenBlacklist {
private final StringRedisTemplate redis;
// Blacklist = 즉시 차단
public void blacklist(String atJti, Duration remainingTtl) {
redis.opsForValue().set("auth:bl:at:" + atJti, "1", remainingTtl);
}
// 필터나 게이트웨이에서 사용되는 검사 메서드
public boolean isBlacklisted(String atJti) {
return Boolean.TRUE.equals(redis.hasKey("auth:bl:at:" + atJti));
}
}
+) JWT 필터에서의 사용 예시
// 요청마다 AT의 jti 추출 후 블랙리스트 조회
String jti = claims.getId(); // jti
if (accessTokenBlacklist.isBlacklisted(jti)) {
throw new Unauthorized("Token revoked");
}
3️⃣ Refresh Token Rotate + 재사용 감지
@Component
@RequiredArgsConstructor
public class RefreshTokenRotator {
private final StringRedisTemplate redis;
private static final String ROTATE_LUA =
// KEYS[1]=oldKey, KEYS[2]=newKey, ARGV[1]=oldHash, ARGV[2]=newHash, ARGV[3]=ttlSeconds
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" redis.call('del', KEYS[1]); " +
" redis.call('set', KEYS[2], ARGV[2], 'EX', ARGV[3]); " +
" return 1 " +
"else return 0 end";
public boolean rotate(long userId, String oldDeviceId, String newDeviceId,
String oldHashedRt, String newHashedRt, Duration ttl) {
String oldKey = "auth:rt:" + userId + ":" + oldDeviceId;
String newKey = "auth:rt:" + userId + ":" + newDeviceId;
Long ok = redis.execute((RedisCallback<Long>) conn ->
conn.scriptingCommands().eval(
ROTATE_LUA.getBytes(StandardCharsets.UTF_8),
ReturnType.INTEGER, 2,
oldKey.getBytes(StandardCharsets.UTF_8),
newKey.getBytes(StandardCharsets.UTF_8),
oldHashedRt.getBytes(StandardCharsets.UTF_8),
newHashedRt.getBytes(StandardCharsets.UTF_8),
String.valueOf(ttl.toSeconds()).getBytes(StandardCharsets.UTF_8)
)
);
return ok != null && ok == 1L;
}
}
spring-session-data-redis를 쓰면 세션 TTL, 직렬화, 정리 작업의 표준화가 자동으로 이루어집니다.예약 이체를 위해 사용자가 T 시각에 A 원 이체를 등록하면,
이러한 작업 큐를 Redis로 구현할 수 있습니다.
| 목적 | 키 예시 | 타입 | 값/설명 |
|---|---|---|---|
| 예약 작업 인덱스 | jobs:delay:payments | ZSET | score=executeAtMillis, member=job:{id} |
| 작업 페이로드 | job:payload:{id} | Hash | userId, amount, toAccount, attempts, … |
| 멱등성 키 | idem:payment:{id} | String | "1", EX=보존기간 |
| 결과 이벤트 | txn:stream:ledger | Stream | 이후 단계: 원장/감사/리스크 파이프라인 |
| 실패 DLQ | jobs:dlq:payments | Stream/리스트 | 최대 재시도 초과·영구 실패 건 |
1️⃣ 예약 이체 등록 (Producer)
@Service
@RequiredArgsConstructor
public class PaymentScheduler {
private final StringRedisTemplate redis;
private static final String DELAY_KEY = "jobs:delay:payments";
// 예약 생성 Produce
// 작업 페이로드를 Hash에 저장하고, ZSET에 실행시각을 점수로 등록
public void schedulePayment(String jobId, long executeAtMillis,
String userId, long amount, String toAccount) {
// 1) 저장 (Hash) (필요 시 TTL 부여)
String payloadKey = "job:payload:" + jobId;
Map<String,String> payload = Map.of(
"userId", userId,
"amount", String.valueOf(amount),
"to", toAccount,
"attempts", "0"
);
redis.opsForHash().putAll(payloadKey, payload);
// 2) ZSET에 스케줄링
redis.opsForZSet().add(DELAY_KEY, "job:" + jobId, executeAtMillis);
}
}
executeAtMillis를 통해 score 부여redis.expire(payloadKey, Duration.ofDays(N)2️⃣ 만기 작업 꺼내기 (Consumer)
ZRANGEBYSCORE : 범위 내에 해당하는 데이터 반환ZREM : 해당 데이터 삭제@Component
@RequiredArgsConstructor
public class DueJobPopper {
private final StringRedisTemplate redis;
private static final String DELAY_KEY = "jobs:delay:payments";
// nowMillis 이하 작업을 최대 N개까지 pop (원자성 보장)
private static final String POP_DUE_LUA =
// KEYS[1]=ZSET, ARGV[1]=nowMillis, ARGV[2]=limit
"local r = {} " +
"local vals = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2]) " +
"for i,v in ipairs(vals) do " +
" redis.call('ZREM', KEYS[1], v); table.insert(r, v) " +
"end; return r";
// 작업 꺼내기 Consume
public List<String> popDue(long nowMillis, int limit) {
List<String> res = redis.execute((RedisConnection conn) -> {
@SuppressWarnings("unchecked")
List<byte[]> raw = (List<byte[]>) conn.scriptingCommands().eval(
POP_DUE_LUA.getBytes(StandardCharsets.UTF_8),
ReturnType.MULTI, 1,
DELAY_KEY.getBytes(StandardCharsets.UTF_8),
String.valueOf(nowMillis).getBytes(StandardCharsets.UTF_8),
String.valueOf(limit).getBytes(StandardCharsets.UTF_8)
);
if (raw == null) return List.of();
return raw.stream().map(b -> new String(b, StandardCharsets.UTF_8)).toList();
});
return res == null ? List.of() : res;
}
}
3️⃣ ⭐⭐⭐ 실행
@Service
@RequiredArgsConstructor
public class PaymentWorker {
private final StringRedisTemplate redis;
private final DueJobPopper popper;
private final PaymentExecutor executor; // 외부 Executor 호출, 결과 반환
private static final int BATCH = 100;
@Scheduled(fixedDelay = 300) // 300ms마다 폴링
public void tick() {
long now = System.currentTimeMillis();
for (String member : popper.popDue(now, BATCH)) {
String jobId = member.substring("job:".length());
process(jobId);
}
}
private void process(String jobId) {
// 1) 멱등성 키: 이미 처리 중 or 완료면 스킵
boolean first = Boolean.TRUE.equals(
redis.opsForValue().setIfAbsent("idem:payment:" + jobId, "1", Duration.ofMinutes(10))
);
if (!first) return;
String payloadKey = "job:payload:" + jobId;
Map<Object,Object> p = redis.opsForHash().entries(payloadKey);
String userId = (String) p.get("userId");
long amount = Long.parseLong((String)p.get("amount"));
String to = (String) p.get("to");
int attempts = Integer.parseInt((String)p.get("attempts"));
try {
PaymentResult r = executor.execute(jobId, userId, amount, to); // 외부 시스템 (PaymentExecutor)
appendLedgerEvent(r); // → 3단계에서 스트림으로 영속/분석
publishUserNotify(r); // → Pub/Sub 즉시 알림
} catch (TransientException te) {
// 일시 오류 → 재시도 예약 (지수 백오프)
int next = attempts + 1;
long delayMs = backoffMs(next); // 예: min(2^next*1000, 5분)
redis.opsForHash().put(payloadKey, "attempts", String.valueOf(next));
long requeueAt = System.currentTimeMillis() + delayMs;
redis.opsForZSet().add("jobs:delay:payments", "job:" + jobId, requeueAt);
} catch (Throwable fatal) {
// 영구 실패 → DLQ로 이동
Map<String,String> dlq = Map.of(
"jobId", jobId, "userId", userId, "amount", String.valueOf(amount),
"to", to, "error", fatal.getClass().getSimpleName()
);
redis.opsForStream().add("jobs:dlq:payments", dlq);
} finally {
}
}
private long backoffMs(int attempt) {
long base = (long)Math.min(Math.pow(2, attempt) * 1000, 5 * 60 * 1000);
// Jitter로 몰림 완화
double jitter = 0.8 + Math.random() * 0.4;
return (long)(base * jitter);
}
private void appendLedgerEvent(PaymentResult r) {
Map<String,String> body = Map.of(
"txnId", r.txnId(), "userId", r.userId(),
"amount", String.valueOf(r.amount()), "to", r.toAccount(),
"status", r.status().name(), "ts", String.valueOf(System.currentTimeMillis())
);
redis.opsForStream().add("txn:stream:ledger", body);
}
private void publishUserNotify(PaymentResult r) {
redis.convertAndSend("notify:pub:" + r.userId(),
"{\"type\":\"PAYMENT\",\"status\":\""+r.status()+"\",\"amount\":"+r.amount()+"}");
}
}
멱등성 키
SET key value NX EX ttl 패턴 사용idem:transfer:{reservationId}라는 키를 걸어두면 같은 예약 ID로 중복 실행 시도 시 Redis가 첫 실행 이후에는 무시실패 처리 : 지수 백오프 (Exponential Backoff) + DLQ
Stream 기반 결과 로그
추가) 지연 큐 처리 시간을 지수 백오프로 구현한 예제이지만 지연 큐 전용 라이브러리가 있다! 깃허브 링크
1️⃣ 발행 (Publish)
@Service
@RequiredArgsConstructor
public class TransferPublisher {
private final StringRedisTemplate redisTemplate;
public void notifyTransfer(long userId, String status) {
String message = String.format("{\"userId\":%d,\"status\":\"%s\"}", userId, status);
// 채널 용도별/사용자별 분리 가능: transfer:notify[:{userId}]
redisTemplate.convertAndSend("transfer:notify", message);
}
}
2️⃣ 구독 (Subscrive)
@Component
@RequiredArgsConstructor
public class TransferSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("알림: " + body);
// 여기서 WebSocket/SSE를 통해 클라이언트에 전달
}
}
ZADD로 fraud:rank:alerts (ZSET)에 (score=리스크, member=alertId) 반영| 목적 | 키 | 타입 | 값/설명 |
|---|---|---|---|
| 우선순위 큐(실시간) | fraud:rank:{bucket} | ZSET | score=리스크 점수, member=alert:{id} |
| 선점 상태 | fraud:claimed:{alertId} | String | analystId, EX=작업 타임아웃 |
| 이력/아카이브 | fraud:stream:resolved | Stream | 처리 결과(누가, 언제, 결과) |
추가) 버킷 분할{bucket}은 트래픽에 따라 월별, 리전 등으로 나누면 효과적
1️⃣ 점수 반영
@Service
@RequiredArgsConstructor
public class FraudPriorityWriter {
private final StringRedisTemplate redis;
// 리스크 점수 산출 결과를 반영
public void upsert(String bucket, String alertId, double riskScore) {
String key = "fraud:rank:" + bucket;
redis.opsForZSet().add(key, "alert:" + alertId, riskScore);
}
// 누적 가중이 필요한 경우
public Double incr(String bucket, String alertId, double delta) {
String key = "fraud:rank:" + bucket;
return redis.opsForZSet().incrementScore(key, "alert:" + alertId, delta);
}
}
2️⃣ 조회: 상위 N, 순위, 점수 구간
@Service
@RequiredArgsConstructor
public class FraudPriorityReader {
private final StringRedisTemplate redis;
public List<String> topN(String bucket, int n) {
var key = "fraud:rank:" + bucket;
var r = redis.opsForZSet().reverseRange(key, 0, n - 1);
return r == null ? List.of() : r.stream().toList();
}
public Long rankOf(String bucket, String alertId) {
var key = "fraud:rank:" + bucket;
return redis.opsForZSet().reverseRank(key, "alert:" + alertId); // 0 = 1위
}
public Set<ZSetOperations.TypedTuple<String>> byScore(String bucket, double min, double max, int limit) {
var key = "fraud:rank:" + bucket;
return redis.opsForZSet().reverseRangeByScoreWithScores(key, min, max, 0, limit);
}
}
3️⃣ 선점(claim) : 경쟁 없이 한 건 뽑아 배정
@Component
@RequiredArgsConstructor
public class FraudClaimer {
private final StringRedisTemplate redis;
// KEYS[1]=ZSET fraud:rank:{bucket}
// ARGV[1]=nowMillis, ARGV[2]=analystId, ARGV[3]=claimTtlSeconds
private static final String CLAIM_LUA =
"local k=KEYS[1] " +
"local top = redis.call('ZREVRANGE', k, 0, 0) " +
"if #top == 0 then return nil end " +
"local member = top[1] " +
"redis.call('ZREM', k, member) " + -- 우선순위 큐에서 제거
"redis.call('SET', 'fraud:claimed:'..member, ARGV[2], 'EX', ARGV[3]) " + -- 선점 상태 기록
"return member";
// 상위 1건을 원자적으로 꺼내 선점. 없으면 null
public String claimTopOne(String bucket, String analystId, Duration claimTtl) {
String zset = "fraud:rank:" + bucket;
return redis.execute((RedisConnection c) ->
(String) c.scriptingCommands().eval(
CLAIM_LUA.getBytes(StandardCharsets.UTF_8),
ReturnType.VALUE, 1,
zset.getBytes(StandardCharsets.UTF_8),
String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8),
analystId.getBytes(StandardCharsets.UTF_8),
String.valueOf(claimTtl.toSeconds()).getBytes(StandardCharsets.UTF_8)
)
);
}
}
claimed키 삭제@Service
@RequiredArgsConstructor
public class FraudArchive {
private final StringRedisTemplate redis;
public String resolve(String alertMember, String analystId, String decision, String reason) {
// 1) 선점 해제
redis.delete("fraud:claimed:" + alertMember);
// 2) 처리 이력 스트림에 기록(누가/언제/결과)
Map<String, String> body = Map.of(
"alert", alertMember,
"analyst", analystId,
"decision", decision, // APPROVE / REJECT / ESCALATE ...
"reason", reason,
"ts", String.valueOf(System.currentTimeMillis())
);
return redis.opsForStream().add("fraud:stream:resolved", body);
}
}

이번에는 1탄과 다르게 은행권이라는.. 도메인을 설정해서 범위를 좀 좁혀보고 이해에 도움이 되도록 노력해봤습니다. Redis는 ZSET, Lua 등 활용 방법을 넓혀주는 다양한 기술이 있어서 무궁무진하게 사용할 수 있는 NoSQL이라 생각합니다.
캐싱을 위해서만이 아닌 다른 기능으로 Redis 한번 사용해보시는 건 어떨까요.
다음엔 저도 프로젝트 리팩토링 해보며 어떻게 활용되는지 더 자세한 예제 코드와 함께 돌아오겠습니다.