Redis Transaction, Pipeline의 개념과 동기화 문제 해결

HyunKyu Lee·2023년 11월 23일
0

42gg

목록 보기
1/4

문제상황

  • 42gg 서비스 운영중 redis, mysql간의 transaction 동기화 문제 발견
  • 탁구경기 결과 입력 현재 로직은 유저 승패정보 mysql → redis모두에 저장하는 식으로 되어있다, 만약 둘중 하나라도 실패하거나 중간에 로직에 문제가 생긴다면 mysql과 redis에 모두 데이터가 들어가서는 안된다. 현재 redis transaction을 적용하지 않아 문제가 생겼을 때 mysql에서만 데이터가 롤백되는 문제점을 발견, redis transaction을 프로젝트에 적용하기로 했다.

Redis Transaction이란?


  • 트랜잭션이란 여러가지 명령어들을 처리하는 하나의 단위이다.
  • rdb에서 기본적으로 제공되는 트랜잭션과는 다른 방식으로 redis에서는 트랜잭션을 지원한다.

transaction이 성공하는 예시

  • redis는 기본적으로 multi, exec command로 트랜잭션을 지원하는데 여기서의 multi는 transaction의 시작을 의미한다.
  • multi를 실행하고 나면 이후 실행되는 모든 명령을 queue에 쌓아뒀다가 exec 실행시에 한꺼번에 실행시킨다.

discord 명령어

  • discord 명령어는 queue에 보관되어있는 command들을 실행하지 않고 날리는 명령어다.
  • foo라는 키의 값을 증가시키라는 명령을 실행시켰지만 discord 명령어로 인해서 실행되지 않고 1이 유지되는 것을 볼 수 있다.

transaction이 실패하는 예시1

  • transaction 안에서 에러를 만들어보았다. exec실행시 EXECABORT error와 함께 트랜잭선이 discord되었고, a 값은 그대로 125를 유지하고 있다.

transaction이 실패하는 예시2

  • 이번에는 unknown command 에러가 아닌, command를 실행시에만 error인지 아닌지를 판단할 수 있는 명령어를 transaction queue에 넣고 exec를 실행해봤다
  • 이 상황에서는 첫번째 명령어는 정상적으로 실행되고 두번째 명령어만 실행되지 않는 것을 확인할 수 있다. 이 부분이 redis transaction에서 가장 중요한 부분이다. (redis docs를 확인해보면 It's important to note that even when a command fails, all the other commands in the queue are processed – Redis will not stop the processing of commands. 이런 문구를 확인할 수 있다.)

정리하자면

  1. 트랜잭션 내부에서 완전히 잘못된(사용할 수 없는) 명령어(syntax error)를 사용하면 트랜잭션은 Discard된다.

  2. 트랜잭션 내부에서 레디스 자료구조를 잘못 사용한 명령어는 트랜잭션에 영향을 주지 않는다. (다른 명령들은 정상적으로 실행, 반환됩니다.)

Redis Transaction을 Spring Project에 적용하기


  • spring docs를 확인해보면 spring에서 redis transaction을 사용하는 방법은 SessionCallback을 사용하는 방법과 @Transactional 어노테이션을 사용하는 방법이있다, 나는 비즈니스 로직 전반적으로 mysql과 redis의 트랜잭션이 동기화되기를 원하고 Spring transaction의 propagation의 이점도 얻어갈 수 있도록 @Transactional 방식을 사용하기로 하였다.
  • Spring Data Redis에서는 PlatformTransactionManager가 구현되어 있지 않지만 mysql을 사용하기때문에 이미 PlatformTransactionManager가 spring boot autoconfiguration의 도움을 받아bean으로 등록되어있다.

redis config

@Bean
    public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory redisConnectionFactory){
        final RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();

        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        redisTemplate.setHashKeySerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setEnableTransactionSupport(true); // <=

        return redisTemplate;
    }
  • 이제 spring에서 redis 자료구조를 사용하기위한 Redis Template를 등록하자, 중요한 부분은 redisTemplate.setEnableTransactionSupport(true) 이부분이다.
  • 이제 @Transactional annotation으로 redis transaction을 지원받을 수 있다.

test code

  • RedisTestService
@Service
@Transactional
@RequiredArgsConstructor
public class RedisTestService {
    private final RedisTemplate<String, Object> redisTemplate;

    public void addStringWithError(String key, Object value) {
        redisTemplate.opsForValue().set(key, value);
        throw new RuntimeException();
    }

    public void addString(String key, Object value) {
        redisTemplate.opsForValue().set(key, value);
    }

    public Object getFromString(String key) {
        return redisTemplate.opsForValue().get(key);
    }
}
  • RedisTransactionTest
@SpringBootTest
public class RedisTransactionTest {

    @Autowired
    RedisTestService redisTestService;

    @Autowired
    RedisTemplate<String, Object> redisTemplate;

    @Test
    @DisplayName("transaction안에서 exception이 발생한다면 해당 transaction이 discord되는지 확인")
    public void rollbackTest () throws Exception
    {
        String key = "hello";

        Assertions.assertThatThrownBy(() ->{
            redisTestService.addStringWithError(key, "aaa");
        }).isInstanceOf(RuntimeException.class);

        Object result = redisTemplate.opsForValue().get(key);
        Assertions.assertThat(result).isNull();
    }

    @Test
    @DisplayName("transaction이 올바르게 exec를 실행시키는지 테스트")
    public void commitTest () throws Exception
    {
        String key = "hello1";
        String value = "aaa";

        redisTestService.addString(key, value);

        Object result = redisTemplate.opsForValue().get(key);
        Assertions.assertThat(result).isNotNull();
        Assertions.assertThat(result.toString()).isEqualTo(value);
    }

    @Test
    @DisplayName("transaction안에서 새로 집어넣은 key에 대해서 조회시 null조회 test")
    @Transactional
    public void nullTest () throws Exception
    {
        String key = "test";
        String value = "value";

        redisTestService.addString(key, value);

        Object result = redisTestService.getFromString(key);
        Assertions.assertThat(result).isNull();
    }

    @Test
    @DisplayName("transaction안에서 기존에 존재하던 key에 대해서 조회시 not null test")
    public void nonNullTest () throws Exception
    {
        String key = "test";
        String value = "value";

        redisTestService.addString(key, value);

        Object result = redisTestService.getFromString(key);
        Assertions.assertThat(result).isNotNull();
        Assertions.assertThat(result.toString()).isEqualTo(value);
    }

}
  • rollbackTest
    • addStringWithError에서 인위적으로 RuntimeException을 던져봤고 해당 key가 들어가지 않는 것을 검증
  • commitTest
    • 코드에 아무 이상이 없을때 정상적으로 key가 들어가고 조회되는지 검증
  • nullTest
    • test함수 위에 @Transactional어노테이션을 활용하여 addString함수와 getFromString 함수를 하나의 transaction으로 묶었다.
    • 따라서 현재 test에서 하나의 트랜잭션 안에서 insert한 key를 조회하려고 할 때 null이 조회되는지를 확인한다.
    • 생각해보면 당연한 결과다. redis transaction은 exec가 호출되기 전까지(즉 transaction이 종료되기 전까지) command를 날리지 않기 때문에 transaction이 종료되기 전에 새로 삽입한 key를 조회하려고하면 없는것이 맞다.
  • nonNullTest
    • 그렇다면 기존에 이미 redis에 존재하는 key를 transaction안에서 조회하려고 하면 정상적으로 조회된다.
    • 테스트코드는 함수위에 @Transaction을 제거하여 addStringgetFromString 를 별개의 트랜잭션으로 분류하였고 getFromString 호출시에 addString 의 트랜잭션은 이미 종료되었을 것으로 생각, 해당 key가 이미 redis에 정상적으로 insert된 것을 기대하고 두번째 getFromString 호출, 정상적으로 데이터가 조회되는 것을 확인하였다.

Redis Pipeline


  • Redis Transaction을 프로젝트에 적용하려고 공부하던 도중 redis pipeline의 존재를 알게되었다. 이 기술 또한 프로젝트에 적용한다면 server 부팅시간을 크게 줄일 수 있을 것 같아서 이 또한 프로젝트에 적용하기로 하였다.
  • 현재 프로젝트 구조상 redis를 user ranking관리하는데 사용하고 있고 redis 특성상 데이터가 날아갈 위험이 있기 때문에 항상 mysql에 백업을 만들어놓고 서버가 올라갈 때 redis가 비어있다면 mysql에서 데이터를 잃어와 redis에 업로드하는 식이다.
  • 이러한 구조에서 redis에 insert되는 데이터의 량이 수천개, 혹은 시간이 지날수록 늘어날 수 있기 때문에 redis pipeline 기술을 적용하여 부하를 줄이기로 하였다.

Redis Pipeline이란?

  • Redis 또한 TCP프로토콜을 통해서 요청과 응답을 주고받는다.
  • 그래서 하나의 요청을 보낸다면, 하나의 응답을 반환하게 되고 TCP프로토콜상 이러한 구조는 서버에 부하를 가져오게 된다.
  • 그래서 여러 요청을 한번에 전송하고, 한번에 받도록 지원하는 것이 Pipeline이다, 마치 http의 persistent connection처럼.
  • Redis 서버만으로는 pipeline을 지원하지 않고 spring의 도움을 받아야한다.

executePipelined

//pop a specified number of items from a queue
List<Object> results = stringRedisTemplate.executePipelined(
  new RedisCallback<Object>() {
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
      StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
      for(int i=0; i< batchSize; i++) {
        stringRedisConn.rPop("myqueue");
      }
    return null;
  }
});
  • redis template의 executePipeline, RedisCallback을 사용하면 된다. 이때 직접적으로 redis connection을 가져다 쓰기 때문에 redis 에 데이터를 넣을때 serialize에 신경써주어야한다. connection을 사용할 때 사용한 serializer와 , redisTemplate이 사용하는 serializer 다르다면 후에 redis 데이터를 redis template로 읽어들일 때 문제가 될 수 있다.
private void upload() {

        redisTemplate.executePipelined((RedisCallback<Object>) connection ->{
            seasonRepository.findAll().forEach(season -> {
                String hashKey = RedisKeyManager.getHashKey(season.getId());
                String zSetKey = RedisKeyManager.getZSetKey(season.getId());
                rankRepository.findAllBySeasonId(season.getId()).forEach(rank -> {
                    RankRedis rankRedis = RankRedis.from(rank);
                    connection.hSet(keySerializer().serialize(hashKey),
                            hashKeySerializer().serialize(rank.getUser().getId().toString()),
                            hashValueSerializer().serialize(rankRedis));
                    if (rank.getWins() + rankRedis.getLosses() != 0){
                        connection.zAdd(keySerializer().serialize(zSetKey), rank.getPpp(),
                                valueSerializer().serialize(rank.getUser().getId().toString()));
                    }
                });
            });
            return null;
        });
    }
  • spring docs를 참고하고 이 프로젝트에 맞도록 커스텀해서 RedisCallback를 구현했다. 핵심은 hSet, zAdd 함수 사용시 사용하는 serializer에 주의하자
profile
backend developer

0개의 댓글