이전 포스트에서 읽기 부하를 분산시키기 위해 Redis Cache를 활용하였습니다. 이때, TTL을 5초로 두게 되었는데 캐싱과 관련된 내용들에 대해서 더 학습하게 되면서 Hot key에서 발생할 수 있는 여러 문제점들에 대해서 알게 되었습니다.
예를 들어 Cache에 Lazy Loading이 0.2s걸리는 요청이 있다고 한고 해당 요청은 10000TPS규모라고 한다면 만료된 순간부터 0.2s동안의 해당 key를 참조하려는 2000개의 요청은 DB로 쏟아지게 되며 조회 후 Redis에 중복해서 2000번의 Write가 발생할 것입니다.
Cache Stampede의 장단점 들을 고려하여 PER을 선택하기로 하였습니다. 해당 방법에 대해서 좀 더 자세하게 알아보려고 합니다.
PER의 핵심 개념은 다음 조건들을 만족해야합니다.
1. 만료가 도래하기 전에 갱신한다. 갱신하지 못하면 Cache Stampede가 발생한다.
2. 만료시간에 가까워질 수록 재갱신 확률이 상승한다.
3. 자주 참조되는 key일 수록 재갱신 시도가 증가하므로 재갱신 확률 또한 증가한다.
4. Recompute Time Interval이 클수록 남은 ttl 만료 시간 대비 재갱신 확률이 커야한다.
해당 조건들을 만족하기 위해서 XFetch라는 개념을 도입했습니다. 해당 알고리즘은 다음과 같습니다.
PER의 핵심은 TTL까지 남은 시간 <= -∆β log(rand()) 인 경우 해당 키를 갱신하는 것입니다.
그렇다면 이 식은 어떻게 유도 되었을까요? 해당 논문에 자세히 나와있습니다. 수식에 대해서 설명해보겠습니다.
https://cseweb.ucsd.edu/~avattani/papers/cache_stampede.pdf
TTL까지 남은 시간이 많은데 새로 갱신이 이루어지는 것은 비교적 비효율적입니다. 따라서 TTL 까지 남은 시간이 적을 수록 우변 값이 클 확률이 높아지기 때문에 자연스럽게 새로 갱신이 일어날 확률이 높아집니다.
한번의 Recompute가 오래 걸리고 무거운 경우 Cache Stampede가 발생했을 때 문제점도 커지게 됩니다. 따라서 해당 값에 비례하도록 구성하여 TTL 만료시간이 비교적 많이 남더라도 갱신이 이루어질 수 있습니다.
갱신이 자주 일어나길 바랄 경우 β를 수정하여 확률을 높일 수 있습니다. 높일 수록 갱신이 잘 발생합니다.
왜 꼭 log가 되는지 가장 의문이 들었던 부분입니다.
여기서 rand()는 0~1사이의 값을 가집니다. 따라서 해당 값은 0부터 마이너스 무한대까지의 범위를 가집니다.
포아송 분포
포아송 분포는 독립적으로 일어나는 특정 사건이 일어나는 횟수에 대한 확률분포입니다. 핵심은 사건들이 독립적이라는 점과 이때 사건간의 시간간격은 지수분포를 따른다는 점입니다.
command를 개별 실행시키지 않은 이유는 여러 커맨드들을 개별 실행할 경우 개별적으로 네트워크 통신을 하여 blocking되는 방식으로 동작하기 때문에 RTT가 증가하고 System call 횟수가 증가하기 때문입니다.
조회 시 Script를 사용한 이유는 Transaction, Pipeline의 경우 조회의 결과를 반환받을 수 없기 때문입니다.
다음과 같이 실행할 스크립트를 미리 작성해둡니다. 이때 모든 key를 인자로 입력받도록 해야하며 같은 클러스터 노드에 존재해야합니다.
return {redis.call('mget', KEYS[1], KEYS[2]), redis.call('pttl', KEYS[1])};
redis.call('mset', KEYS[1], ARGV[1], KEYS[2], ARGV[2]);
redis.call('expire', KEYS[1], ARGV[3]);
redis.call('expire', KEYS[2], ARGV[3]);
@Configuration
public class LuaScriptConfig {
@Bean
public DefaultRedisScript<List> cacheGetRedisScript(){
DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/cache_get.lua")));
redisScript.setResultType(List.class);
return redisScript;
}
@Bean
public DefaultRedisScript<List> cacheSetRedisScript(){
DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/cache_set.lua")));
redisScript.setResultType(List.class);
return redisScript;
}
}
private final DefaultRedisScript<List> cacheGetRedisScript;
private final DefaultRedisScript<List> cacheSetRedisScript;
public Object probabilisticEarlyRecomputationGet(String originKey, Function<List<Object>, Object> recomputer, List<Object> args, Integer ttl) {
redisTemplate.execute(cacheSetRedisScript, List.of(key, getDeltaKey(key)), data, computationTime, ttl);
}
return data;
}
private void setKeyAndDeltaWithPipeline(Integer ttl, String key, Object data, long computationTime) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] keyBytes = serializer.serialize(key);
byte[] deltaKeyBytes = serializer.serialize(getDeltaKey(key));
RedisSerializer<Object> valueSerializer = (RedisSerializer<Object>) redisTemplate.getValueSerializer();
byte[] dataBytes = valueSerializer.serialize(data);
byte[] computationTimeBytes = valueSerializer.serialize(computationTime);
connection.set(keyBytes, dataBytes);
connection.set(deltaKeyBytes, computationTimeBytes);
long ttlLong = Long.parseLong(ttl.toString());
Duration duration = Duration.of(ttlLong, ChronoUnit.SECONDS);
connection.expire(keyBytes, duration.getSeconds());
connection.expire(deltaKeyBytes, duration.getSeconds());
return null;
});
}
private final DefaultRedisScript<List> cacheGetRedisScript;
private final DefaultRedisScript<List> cacheSetRedisScript;
public Object probabilisticEarlyRecomputationGet(String originKey, Function<List<Object>, Object> recomputer, List<Object> args, Integer ttl) {
//같은 클러스터 key로 보장
String key = hashtags(originKey);
List<Object> ret = (List<Object>)redisTemplate.execute(cacheGetRedisScript, List.of(key, getDeltaKey(key)));
List<Object> valueList = (List<Object>) ret.get(0);
Object data = valueList.get(0);
Long delta = (Long)valueList.get(1);
Long remainTtl = (Long)ret.get(1);
log.debug("data: {}, delta: {}, remainTtl: {}", data, delta, remainTtl);
// 재 갱신을 해야하는 경우.
if (data == null || delta == null || remainTtl == null ||
- delta * BETA * Math.log(randomDoubleGenerator.nextDouble()) >= remainTtl) {
long start = System.currentTimeMillis();
data = recomputer.apply(args);
long computationTime = (System.currentTimeMillis() - start);
setKeyAndDeltaWithPipeline(ttl, key, data, computationTime);
}
return data;
}
@Slf4j
@SpringBootTest(classes = {ArticleCacheService.class, LettuceConnectionConfig.class,
LuaScriptConfig.class})
class ArticleCacheServiceTest {
@MockBean
private ProbabilisticEarlyRecomputationConfig.RandomDoubleGenerator randomDoubleGenerator;
@MockBean
private RedisMessageSubscriber redisMessageSubscriber;
@Autowired
private ArticleCacheService articleCacheService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@BeforeEach
void setUp() {
redisTemplate.getConnectionFactory().getConnection().flushAll();
}
@Test
void probabilisticEarlyRecomputationGet_whenCacheDataIsAbsent_thenComputeAndWrite() {
//given
String key = "target1";
// when(randomDoubleGenerator.nextDouble()).thenReturn(0.5);
//when
articleCacheService.probabilisticEarlyRecomputationGet(key, (x) -> {
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "a";
}, List.of(), 5);
String value = (String) redisTemplate.opsForValue().get(hashtag(key));
Object delta = redisTemplate.opsForValue().get(getDelta(hashtag(key)));
//then
assertThat(value)
.isEqualTo("a");
assertThat(delta)
.isNotNull();
}
@Test
void probabilisticEarlyRecomputationGet_givenCacheDataPresent_whenRemaintTTLIsMuchLargerThanDelta_thenNotRecompute() {
//given
String key = "target1";
String hashtag = hashtag(key);
redisTemplate.opsForValue().set(hashtag, "a");
redisTemplate.opsForValue().set(getDelta(hashtag), 100l);
Duration duration = Duration.of(5, ChronoUnit.SECONDS);
redisTemplate.expire(hashtag, duration);
redisTemplate.expire(getDelta(hashtag), duration);
when(randomDoubleGenerator.nextDouble()).thenReturn(0.1);
//when
ArrayList<Object> args = new ArrayList<>();
ArrayList<String> check = new ArrayList<>();
args.add(check);
// 100 * betta log(0.1) >= 5000
articleCacheService.probabilisticEarlyRecomputationGet(key, (x) -> {
ArrayList<String> check1 = (ArrayList<String>)(x.get(0));
check1.add("3");
return "a";
}, args, 5);
//실행 되지 않았는지 확인.
assertThat(check).isEmpty();
}
@Test
void probabilisticEarlyRecomputationGet_givenCacheDataPresent_whenRemaintTTLIsSmallerThanDelta_thenRecompute() {
//given
String key = "target1";
String hashtag = hashtag(key);
redisTemplate.opsForValue().set(hashtag, "a");
redisTemplate.opsForValue().set(getDelta(hashtag), 5005l);
Duration duration = Duration.of(5, ChronoUnit.SECONDS);
redisTemplate.expire(hashtag, duration);
redisTemplate.expire(getDelta(hashtag), duration);
when(randomDoubleGenerator.nextDouble()).thenReturn(0.1);
//when
ArrayList<Object> args = new ArrayList<>();
ArrayList<String> check = new ArrayList<>();
args.add(check);
// 100 * betta log(0.1) >= 5000
articleCacheService.probabilisticEarlyRecomputationGet(key, (x) -> {
ArrayList<String> check1 = (ArrayList<String>)(x.get(0));
check1.add("3");
return "a";
}, args, 5);
//실행 되었는지 않았는지 확인.
assertThat(check).isNotEmpty();
}
private String hashtag(String key) {
return "{" + key + "}";
}
private String getDelta(String key) {
return key + "-" + "delta";
}
}
테스트 조건중 5%가 전체 요청의 80%에 해당한다고 가정하였다. Gaussian Distribution을 활용하여 PageNumber의 값을 정하도록 Jmeter를 설정하였다.
더 트래픽이 집중되도록 하여 TTL 만료로 인한 Hit Miss를 더 잘 보기 위해 이전 테스트보다 요청이 집중된다고 가정하였다.
평균 50, 표준편차 2인 경우 47.5 ~ 52.5이 나올 경우가 80%가 된다.
5분간 실행 후 CacheMiss, Cache Hit 비율 비교할 것입니다.
1.TPS 1700, 응답시간 0.53s
2.RDS CPU 45%
3.elasticache CacheHit 50k, CacheMiss 1.3k,
CacheHit Ratio 97%
4.WAS CPU 90%, DBCP 10~18(MAX 20)
CPU, HEAP
DBCP
1.TPS 1400, 응답시간 0.64s
2.RDS CPU 28%
3.elasticache CacheHit 133k, CacheMiss 0.3k, CacheHit Ratio 99.9%
4.WAS CPU 90%, DBCP 10~18(MAX 20)
1.TPS 1700, 응답시간 0.53s
2.RDS CPU 45%
3.elasticache CacheHit 50k, CacheMiss 1.3k,
CacheHit Ratio 97%
4.WAS CPU 90%, DBCP 10~18(MAX 20)
1.TPS 1400, 응답시간 0.64s
2.RDS CPU 28%
3.elasticache CacheHit 133k, CacheMiss 0.3k, CacheHit Ratio 99.9%
4.WAS CPU 95%, DBCP 10~18(MAX 20)
현재 테스트의 경우 상당히 제한적인 조건으로 테스트하고 있기 때문에 Hit Ratio가 높게 나왔으며 현실에서는 PER적용 이전, 이후 모두 더 낮을 것이다.
WAS는 부하가 한계상태에 도달하였다. WAS가 병목지점이다.
DBCP는 아직 여유가 남아있다.
현재 상황의 경우 1400~1700TPS로 테스트하여 CacheMiss로 인한 부하 상승이 덜 체감될 수 있지만 이보다 더 큰 트래픽이 몰려올 경우 Cache Stampede현상으로 인해서 DB, Redis에 부하가 크게 증가할 것이다.
https://cseweb.ucsd.edu/~avattani/papers/cache_stampede.pdf
https://engineering.linecorp.com/en/blog/redis-lua-scripting-atomic-processing-cache
https://redis.io/docs/manual/programmability/lua-api/#global-variables-and-functions