[ MindDiary ] 이슈 7. Redis에 여러 개의 데이터 추가 시 네트워크 문제 해결하기 : Redis PipeLining

Dayeon myeong·2021년 10월 14일
3

Mind Diary

목록 보기
7/10

스트레스 검사나 우울 검사와 같은 자가 진단 검사 데이터를 모두 레디스에 추가할 때 발생한 문제입니다. 하나의 검사 데이터를 추가할 때마다 redis의 데이터 추가 명령어가 발생하면서 네트워크의 병목 현상이 발생할 수 있습니다.

레디스는 이벤트 루프와 단일 스레드 방식을 사용하며 동시에 많은 사용자들에게 응답이 가능합니다. 즉, 동시성을 보장할 수 있고 고성능입니다.

하지만 기본적으로 Redis 는 TCP를 사용하며 클라이언트 - 서버 모델과 Request - Reponse 방식을 사용합니다. 클라이언트가 매번 서버에 요청할 때마다 응답을 받아야만 다음 요청이 가능한 구조인 것입니다.

따라서 수십만개의 요청을 보낸다면 응답값을 매번 받아야 하며 네트워크에 부하가 발생합니다. 이는 곧 클라이언트에서 서버로 그리고 다시 서버에서 클라이언트로 왕복하는 RTT 비용이 수십만번 발생하여 네트워크에 부하가 발생하는 것을 의미합니다.

Redis 문서에서는 이러한 연속적으로 많은 요청을 수행해야할 경우의 문제를 해결하기 위해서 레디스 파이프라이닝 기능을 제공합니다. 매번 응답을 받지 않아도 레디스 서버에 여러 명령어를 한번에 요청할 수 있고, 마지막으로 한번에 응답을 읽을 수 있게 하는 기능입니다.

이러한 레디스 파이프라이닝 방식을 사용하는 방식을 통해 RTT 비용이 수십만번 발생하지 않고, 단 한번만 RTT 비용이 발생하게 됩니다.

//한번에 3개의 명령어를 요청
client : incr x
client : incr x
client : incr x

//한번에 응답을 받음
server : 1
server : 2
server : 3

예를 들어 키를 1씩 증가하는 3개의 연산이 레디스 파이프라이닝으로 이루어질 경우 매 요청시마다 RTT 비용이 발생하지 않고, 세개의 incr 명령어에 대해 단 한번만 RTT비용이 발생하게 됩니다.

또한, Redis 문서에 따르면 RTT만 해결해주는 것이 아니라 실제로 Redis 서버에서 초당 수행할 수 있는 작업의 수를 향상시킬 수도 있다고 합니다.

레디스 파이프라이닝 기능이 없다면 각 명령어에 대해 처리할 때 사용자 영역에서 커널 영역으로 이동하는 read(), write() 같은 시스템 콜이 일어납니다. 그리고 이러한 I/O 작업 시스템 콜에 대해 context swtich가 발생하게 되며 이는 엄청난 속도 저하를 일으킬 수 있습니다.

반면 파이프라이닝을 사용하면 일반적으로 하나의 read()로 여러 명령을 읽고, 하나의 write()로 여러 응답을 한번에 전달하기 때문에 context switch 수가 더 적게 일어나고, 그 덕분에 초당 수행할 수 있는 작업의 수가 늘어난다고 합니다.

Redis 파이프라이닝을 사용하면 이렇듯 여러 장점이 있기 때문에 Redis 파이프라이닝을 사용했습니다.

( 이러한 파이프라이닝의 단점으로는 클라이언트가 파이프라이닝을 사용하여 명령을 보내는 동안 서버는 메모리를 사용하여 응답을 queue에 넣는다는 것입니다. 따라서 파이프라이닝으로 많은 명령을 보내야할 경우에는 메모리에 부하가 일어날 수 있습니다. 따라서 적당한 수의 batch 방식으로 처리하는 것이 좋다고 합니다.)

( 또다른 단점을 생각해본다면, HTTP 1.1의 파이프라이닝의 단점과 비슷하게 순서대로 응답이 꼭 와야만 된다는 것을 들 수 있을 것 같습니다. 만약 먼저 보낸 요청의 응답이 지연될 경우 뒤의 응답은 대기하게 되는 HOB 문제 문제가 있을 수 있을 것 같습니다. )

Redis 설정하기

우선 Redis를 사용하기 위해 설정을 해줘야 합니다.

//build.gradle
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-data-redis', version: '2.5.3'

Spring 에서는 Redis를 사용하기 위해 Spring data redis를 제공합니다. 따라서 dependency에 이를 추가해야 합니다.

//RedisConfiguration.class
@Configuration
public class RedisConfiguration {

  @Value("${spring.redis.host}")
  private String host;

  @Value("${spring.redis.port}")
  private int port;


/*
Redis 통신을 위해서는 Redis Connection Factory에서 Redis connection을 생성해야 합니다. 그렇기 때문에 Redis Connection Factory 설정이 필요합니다.

RedisConnectionFactory 인터페이스의 구현체로는 Lettuce를 사용했습니다.

Lettuce는 여러 장점이 있기 때문입니다.
비동기로 요청하기 때문에 성능상 이점이 있고,
하나의 커넥션으로 여러 스레드에서 공유되어 사용 가능하도록 설계되어 있어서 thread-safe합니다.
*/
  @Bean
  public RedisConnectionFactory redisConnectionFactory() {
    return new LettuceConnectionFactory(host, port);
  }

/*
objectMapper는 Json -> Ojbect(역직렬화) / Object -> Json (직렬화) 같은 기능을 담당한다고 합니다.
LocalDateTime을 매핑하기 위해 아래와 같이 설정해줬습니다.
*/

  @Bean
  public ObjectMapper objectMapper() {
    ObjectMapper mapper = new ObjectMapper();
    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    mapper.registerModules(new JavaTimeModule(), new Jdk8Module());
    return mapper;
  }


/*
Redis를 사용할 때 Key는 전부 String으로, value는 String이나 Object 객체를 사용할 것입니다.

그래서 Key는 StringRedisSerializer를 사용했습니다.

또한, Object나 Json 형식으로 redis에 저장하기 위해서는 Jackson 관련 Serializer를 사용해야 합니다.

그 중에서, 제가 사용한 GenericJackson2JsonRedisSerializer는 별도의 class type을 지정해줄 필요 없이 자동으로 Object를 Json 으로 직렬화해줍니다.
단점으로는 객체의 Class Type을 함께 레디스에 넣기 때문에 그 클래스 타입으로만 가져올 수 있다는 단점이 있습니다.
*/
  @Bean
  public RedisTemplate<String, Object> redisTemplate() {

    GenericJackson2JsonRedisSerializer serializer =
        new GenericJackson2JsonRedisSerializer(objectMapper());

    RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(redisConnectionFactory());
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.setValueSerializer(serializer);
    redisTemplate.setHashKeySerializer(new StringRedisSerializer());
    redisTemplate.setHashValueSerializer(serializer);
    return redisTemplate;
  }


/*
스프링 Data Redis에서는 Redis CacheManer와 @EnableCaching이라는 어노테이션을 사용하여 캐시 추상화를 제공합니다. 캐시 추상화란 AOP를 사용하여 애플리케이션의 코드를 수정하지 않고도 캐시 기능을 메서드에 적용할 수 있게 해줍니다. 이를 통해 메서드에 캐싱이 적용됨으로써 캐시에 저장된 데이터로 메서드의 실행 횟수를 줄여줍니다.

또한, Redis CacheManer를 통해 캐시의 직렬화, 역직렬화, 만료시간을 관리할 수 있습니다.
*/
  @Bean
  public RedisCacheManager redisCacheManager() {

    RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration
        .defaultCacheConfig()
        .serializeKeysWith(
            RedisSerializationContext.SerializationPair
                .fromSerializer(new StringRedisSerializer()))
        .serializeValuesWith(
            RedisSerializationContext.SerializationPair
                .fromSerializer(new GenericJackson2JsonRedisSerializer(objectMapper()))
        )
        .entryTtl(Duration.ofDays(1L));

    return RedisCacheManager.RedisCacheManagerBuilder
        .fromConnectionFactory(redisConnectionFactory())
        .cacheDefaults(redisCacheConfiguration)
        .build();
  }

}
//MindDiaryApplication.class

/*
@EnableCaching은 메서드에 캐싱 관련 어노테이션이 있는지 모든 스프링 빈을 post processor를 통해 스캔하는 역할을 합니다. 만약 캐싱 관련 어노테이션이 발견된다면 프록시가 자동생성되어 메소드 호출을 가로채 캐싱 처리합니다.
*/
@SpringBootApplication
@EnableCaching
public class MindDiaryApplication {

	public static void main(String[] args) {
		SpringApplication.run(MindDiaryApplication.class, args);
	}

}

Redis PipeLining 사용하기

Spring data redis 문서에 따르면 executePipelined 메소드와 RedisCallback을 사용하여 파이프라이닝을 사용할 수 있습니다.
저는 자가진단 데이터를 redis의 list 자료구조로 넣을 것이기 때문에 redis connection의 listCommands 메서드를 사용하여 키와 값을 push해줍니다.
이때, rpush 등 이러한 커맨드들은 파라미터를 byte[]로 넘겨줘야 하기 때문에

기존에 설정했던 key(String)와 value(json)의 serializer를 사용하여 byte[] 배열로 변화하여 값을 넣어줬습니다.

(redis는 기본적으로 byte 배열을 저장하는 형식입니다. 따라서, 저장하려는 데이터는 직렬화 과정을 거쳐 byte 형태가 되어야 redis에 정상적으로 저장될 수 있습니다.)

또한 추가적으로 반드시 null을 리턴해야 합니다. 리턴한 값은 파이프라인에 의해 결과가 덮여씌어지기 때문에 필요하지 않다고 합니다.

 public void saveAll(List<Question> questions) {

    RedisSerializer keySerializer = redisTemplate.getStringSerializer();
    RedisSerializer valueSerializer = redisTemplate.getValueSerializer();

    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
      questions.forEach(question -> {
        String key = getKey(question.getDiagnosisId());
        connection.listCommands().rPush(keySerializer.serialize(key),
            valueSerializer.serialize(question));
      });
      return null;
    });

  }

위 코드를 통해 레디스에 자가진단 데이터를 넣는 여러 명령어를 한 번에 요청할 수 있게 되었고, 네트워크 병목 문제를 해결할 수 있게 되었습니다.

참고 문헌

Spring Data Redis

Using pipelining to speedup Redis queries - Redis

Nodejs, Redis 가 단일 쓰레드인 이유는?

Redis의 동시성(Concurrency)개념과 고립성(Isolation)을 위한 Transaction 처리

@EnableCaching Annotation in Spring

RedisTemplate (Spring Data Redis 2.5.5 API)

RTT (Round Trip Time, 왕복 시간)란? 개념정리

profile
부족함을 당당히 마주하는 용기

0개의 댓글