Line/Armeria - 오픈 소스 기여기 02

JeongYong·2024년 7월 15일
1

오픈 소스

목록 보기
2/3

이전 포스트

타임아웃이란? (StreamMessage.timeout())

애플리케이션 레벨의 실질적인 유휴 스트림을 감지하기 위해 데이터 청크 간의 제한 시간을 설정하는 기능이다. (실질적인 스트림 자체의 타임아웃 기능을 설정)

예를 들어 이전 데이터 청크가 도착하고, 다음 데이터 청크가 도착하기까지 제한 시간을 설정한다. (Flux.timeout()과 유사한 기능이다.)

즉, 이 기능은 커넥션을 유지하고 있지만 실제로 메시지를 주고 받지 않는 유휴 스트림을 감지하는 기능이다.

이 기능으로 인한 이점은 다음과 같다.

  • 서버 자원 낭비 방지
    • 커넥션 유지 비용 절감 -> 메시지가 오지 않는 유휴 스트림 정리
    • Backpressure 부담 완화 -> 지연이 생긴 스트림을 정리하므로 전체 시스템의 처리량 유지에 도움.
  • 사용자 경험(UX) 향상
    • 빠른 실패(Fail Fast)
    • 불필요한 대기 방지
  • 운영 및 장애 대응 효율화
    • 장애 파악 속도 향상 -> 클라이언트가 느리게 응답하는지, 네트워크 지연이 있는지 등을 로그나 알람으로 빠르게 인지 가능

기여하기

나는 이 기능의 유용성에 대해서 메인테이너분에게 말씀드렸고, 유용할 것 같다며 한번 구현해 보라고 하셨다.

그래서 어떻게 설계할지 고민하며 시간을 보내는 와중에 실제로 이 기능이 필요해서 이슈 - 5744가 등록되었고, 이를 본격적으로 맡게 되었다.

StreamTimeoutMode

타임아웃 모드는 3가지 존재한다.

  1. UNTIL_FIRST
    구독 후 첫 데이터 청크가 도착할 때까지만 제한 시간을 설정한다.

  2. UNTIL_NEXT
    이전 데이터 청크가 도착하고, 다음 데이터 청크가 도착할 때까지 제한 시간을 설정한다.

  3. UNTIL_EOS
    구독 후 마지막 데이터 청크가 도착할 때까지 제한 시간을 설정한다.

class TimeoutStreamMessage

TimeoutStreamMessage는 StreamMessage를 implements하고, StreamMessage 타입 객체의 타임아웃 기능을 제공한다.

기존 객체(StreamMessage 타입 객체)의 기능을 유지하면서, 기능을 추가하기 위해서 데코레이터 패턴을 사용했다.
ex) new TimeoutStreamMessage(기존 객체, timeoutDuration, timeoutMode);

내부에서는 이 기존 객체를 delegate로써 사용해 기능을 유지하는 메서드는 이 객체의 메서드를 호출해 줬다.

타임아웃 기능을 위해 재정의할 메서드는 subscribe()이다.

method subscribe(Subscriber<? super T> subscriber, EventExecutor executor, SubscriptionOption... options)

@Override
public void subscribe(Subscriber < ? super T > subscriber, EventExecutor executor,
    SubscriptionOption...options) {
    delegate.subscribe(new TimeoutSubscriber < > (subscriber, executor, timeoutDuration, timeoutMode),
        executor, options);
}

StreamMessage의 subscribe는 오버로드되어서 여러 개 존재한다. 하지만 최종적으로 실행되는 것은 이 메서드이며, 기존 subscriber 기능의 타임아웃 기능을 추가하기 위해서 데코레이터 패턴을 사용했다.

class TimeoutSubscriber에서는 타임아웃을 스케줄하기 위해서 subscribe()의 인자로 전달된 Netty의 EventExecutor를 그대로 사용한다.

여기서 Neety의 EventExecutor는 Netty에서 비동기 I/O 및 테스크 실행을 담당하는 이벤트 루프의 핵심 인터페이스다.

EventExecutor의 특징은 다음과 같다.

  • 단일 스레드 모델: 각 이벤트 루프는 단일 스레드에서 동작하여, 여러 테스크가 순차적으로 실행된다. 이 덕분에 동시성 문제가 줄어들고, 별도의 lock 없이 작업을 처리할 수 있다.

  • 스케줄링 기능: EventExecutor는 ScheduledExecutorService의 기능을 제공하므로, 지연(delay) 작업이나 주기적 작업을 쉽게 예약할 수 있다. 예를 들어 eventExecutor.schedule(task, delay, timeUnit)을 사용하면, 지정된 지연 시간 후에 태스크가 실행된다.

  • 낮은 오버헤드: 이벤트 루프 내에서 I/O와 태스크를 처리하기 때문에, 별도의 스레드 전환 없이 효율적인 비동기 처리가 가능하다.

EventExecutor가 타이머 작업과 같은 비동기 작업에 적합한 이유

  1. 특징에서 볼 수 있듯이 낮은 오버헤드 및 높은 성능을 가지고 있다. (I/O 이벤트와 태스크 실행을 같은 이벤트 루프 내에서 치리하므로, 별도의 스레드 간 전환 없이 빠른 태스크 실행이 가능하다. 그래서 동기화 오버헤드를 줄이고, 예측 가능한 실행 순서를 보장함.)

  2. 지연 시간이나 주기적 실행을 쉽게 관리할 수 있다. 그래서 특정 시간 견격 후에 실행되어야 하는 작업에 매우 적합하다.

  3. 비동기 및 논블로킹

EventExecutor의 역할

  1. 이벤트 루프 관리
    Netty에서는 이벤트 루프 모델을 사용하는데, 이벤트 루프는 네트워크 이벤트를 감지하고 해당 이벤트에 대한 콜백을 실행하는 무한 루프다.

  2. 작업 큐 관리
    EventExecutor는 비동기 작업을 큐에 넣고 이를 처리한다. 작업 큐에 들어간 작업은 이벤트 루프의 실행 컨텍스트에서 비동기적으로 실행된다.

위에서 볼 수 있듯이 EventExecutor는 비동기 작업을 처리하는 데 최적화되어 있다. 그래서 타이머 작업과 같은 비동기 작업을 수행하기에 적합하다.

또한 Netty의 ScheduledFutre 인터페이스를 통해 EventExecutor로 스케줄링된 작업을 쉽게 관리할 수 있다.

여기서 ScheduledFuture는 예약된 태스크의 실행 상태를 관리하는 Future 객체로, 지연 실행 및 주기적 실행을 지원한다.

class TimeoutSubscriber

TimeoutSubscribe 클래스는 타임 아웃 기능이 추가된 Subscriber다.
이를 위해 필요한 Subscriber, Runnable, Subscription을 구현하고 있다.

핵심적인 구현 부분을 살펴보겠다.

1. method onSubscribe 구현

@Override
public void onSubscribe(Subscription s) {
    subscription = s;
    delegate.onSubscribe(this);
    if (completed || canceled) {
        return;
    }
    lastEventTimeNanos = System.nanoTime();
    timeoutFuture = scheduleTimeout(timeoutNanos);
}

onSubscribe()는 구독자가 발행자를 구독했을 때 가장 먼저 호출되는 함수다.
그리고 인자로 Subscription 타입 객체가 전달된다.

이 객체는 request()로 받고자 하는 데이터양을 요청할 수 있고, cancel()로 구독 관계를 취소할 수 있다.

내가 정의한 onSubscribe()에서는 먼저 다운스트림(delegate)의 onSubscribe()를 호출해서 세팅을 먼저 완료한다.

그리고 구독 시점을 기준으로 시간을 기록하고, 타이머를 설정한다.

구현 과정에서 이슈
1) 최초의 타임아웃 스케줄 설정이 구독(subscribe) 시점인가 데이터 요청(request) 시점인가?

데이터 요청 시점으로 설정한다면, 구독 후 데이터 요청이 이루어지지 않는 상황에서 유휴 스트림을 감지할 수 없게 된다. 그래서 서버의 리소스를 계속 점유하게 된다.
또한 구독자에게 문제가 발생했을 때 데이터 요청을 하지 않을 가능성이 높다.

그래서 구독 시점으로 설정하는 것이 맞다고 생각했고, 구독 시점 설정으로 구현해 줬다.

2) 다운스트림의 onSubscribe()를 호출하고, 전달된 subscription으로 cancel()를 호출한다면, 설정된 타이머는 취소되지 않는다.

-> 그래서 Timeoutsubscriber 클래스는 Subscription 구현하고, cancel()에서 설정된 타이머를 취소하게끔 재정의했다. 그리고 이를 다운스트림의 onSubscirbe(this)로 전달해 줬다.

@Override
public void cancel() {
    canceled = true;
    cancelSchedule();
    subscription.cancel();
}

3) 다운스트림의 onSubscribe()를 호출했을 때, 구독 관계가 취소되거나 스트림의 작업이 완료될 수 있는데 이때 타이머가 설정되는 문제가 있었다.

-> 그래서 completed와 canceled flag를 둬서 false일 때만 설정되게끔 해줬다.

2. EventExcutor로 타이머 설정

private ScheduledFuture < ? > scheduleTimeout(long delay) {
    return executor.schedule(this, delay, TimeUnit.NANOSECONDS);
}

EventExecutor에는 스케줄을 설정할 수 있는 schedule()이 있다.
schedule()에는 첫 번째 인자로 Runnable 객체가 전달되고, Runnable의 run()이 실행된다. 두 번째, 세 번째 인자는 스케줄을 설정한 시점부터 얼마 후에 실행할지를 결정한다.

즉, 설정된 시간을 초과하면 run()이 실행된다.

이 run()에서는 시간 지연이 발생했는지를 판단한다. 만약 발생했다면 StreamTimeoutException을 다운스트림의 onError()로 전달하고, 스트림을 종료시킨다.

@Override
public void run() {
    if (timeoutMode == StreamTimeoutMode.UNTIL_NEXT) {
        final long currentTimeNanos = System.nanoTime();
        final long elapsedNanos = currentTimeNanos - lastEventTimeNanos;

        if (elapsedNanos < timeoutNanos) {
            final long delayNanos = timeoutNanos - elapsedNanos;
            timeoutFuture = scheduleTimeout(delayNanos);
            return;
        }
    }
    completed = true;
    delegate.onError(new StreamTimeoutException(
        String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode)));
    subscription.cancel();
}

구현 과정에서 이슈

1) 초기 구현에서는 NEXT 모드일 때 데이터를 받으면, 스케줄을 취소하고, 다시 설정하는 방식으로 구현했고, 이는 잦은 취소와 설정으로 비효율적이었다.

-> 그래서 onSubscribe()에서 최초로 타이머를 설정하고, 데이터를 받을 때마다 lastEventTimeNanos의 시간을 기록했다. 타이머 설정된 이후에는 스케줄을 취소하지 않아 실행을 유지해 줬고, 스케줄이 실행되었을 때 시간 지연을 체크했다.

시간 지연 검사는 구체적으로 run()이 실행된 시점(currentTimeNanos)과 lastEventTimeNanos를 빼서, 그 시간이 설정된 시간보다 크다면 시간 지연이 발생된 것으로 판단했다.

그렇지 않은 경우에는 타이머를 재설정해 줬는데, 다음번 run()을 실행할 시간은 timeoutNanos(설정된 시간) - (currentTimeNanos - lastEventTimeNanos)로 설정해 줌으로써, 정확히 필요한 시점에만 스케줄이 실행되게끔 해줬다.

이렇게 타이머 취소 작업을 없애고, 타이머 설정은 최소화함으로써 최적화할 수 있었다.

2) 전파 지연 문제
전파 지연이란, 청크 데이터 사이의 지연이 발생해 스트림이 유휴 상태로 판단되어 종료된 이후에도, 추가적인 신호(onNext)가 처리되는 문제를 말한다.

이러한 현상이 발생하는 이유는 타임아웃에 따른 에러 신호가 발행된 이후에도 해당 신호가 구독자에게 도달하기까지 시간차가 존재하고(전파 지연), 그 사이에 퍼블리셔측에서 추가 데이터를 emit할 수 있기 때문이다.

이를 방지해 주지 않으면 스트림이 종료된 후에도 데이터가 전달되는 상황이 발생한다.

-> 타임아웃이 발생한 시점에 즉시 boolean completed를 true로 바꿔 해결해 줄 수 있다. 여기서 completed를 volatile 정의하지 않아도 되는 이유는 이벤트 루프가 싱글 스레드여서 여러 스레드가 동시에 접근하는 경우가 없기 때문이다.

이와 관련된 내용은 Reactive-Streams에 Subscriber 규칙6에 있다.

규칙 6: "If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled."
→ 이는 onError()나 onComplete()가 호출되면, 구독 관계가 종료되어 추가적인 데이터 전송이나 요청은 없어야 함을 의미한다.

onError()가 호출되고, 어떠한 동작도 있으면 안된다고, 명시되어 있다. 그래서 구독 관계를 끊는 코드(subscription.cancel())도 반드시 포함되어야 한다.

3. method onNext 구현

 @Override
 public void onNext(T t) {
         if (completed || canceled) {
             PooledObjects.close(t);
             return;
         }
         switch (timeoutMode) {
             case UNTIL_NEXT:
                 lastEventTimeNanos = System.nanoTime();
                 break;
             case UNTIL_FIRST:
                 cancelSchedule();
                 timeoutFuture = null;
                 break;
             case UNTIL_EOS:
                 break;
         }
         delegate.onNext(t);
}

onNext()는 발행자가 발행한 데이터가 전달되는 메서드다.
여기서는 timeoutMode에 따라서 다르게 동작해야 된다.

  • UNTIL_EOS 모드는 아무런 작업도 할 필요가 없다.

  • UNTIL_FIRST 모드는 타임아웃 스케줄을 취소하면 된다.
    최초의 타이머가 설정된 후 run()이 실행되지 않고 onNext()가 호출되었다는 것은 타임아웃이 발생하지 않았음을 보장한다.

  • UNTIL_NEXT 모드는 데이터를 받을 때마다 lastEventTimeNanos를 업데이트해 준다.
    마지막으로 언제 받았는지를 알면 타임아웃이 발생했는지를 판단할 수 있다.

타임아웃 관련 작업을 하고, delegate.onNext()로 받은 데이터를 전달한다. (다운스트림으로 전달)

만약 completed나 canceled가 true라면 받은 데이터를 해제하고 리턴한다.

구현 과정에서 이슈
1) UNTIL_FIRST에서 cancelSchedule()을 호출하는데 이후에 스케줄이 돌아가지 않아도 cancelSchedule()이 호출된다. 그래서 휘발성 필드에 접근하는 timeoutFuture.isCancelled()를 반복적으로 실행하는 문제가 있다.

void cancelSchedule() {
    if (timeoutFuture != null && !timeoutFuture.isCancelled()) {
        timeoutFuture.cancel(false);
    }
}

-> 해결은 간단하다. cancelSchedule() 후에 timeoutFuture = null; 해주면 .isCancelled()는 실행되지 않는다.

불필요한 휘발성 필드 접근을 줄여야 하는 이유

  1. 성능 저하 (Memory Barrier):
  • 메모리 배리어란?
    • CPU는 성능 향상을 위해 명령어를 재정렬할 수 있는데, 이 경우 여러 스레드 간에 연산 순서가 뒤바뀔 위험이 있다.
    • 메모리 배리어(Memory Barrier)는 “이전의 모든 메모리 연산(예: 변수에 값을 쓰거나 읽는 작업)이 반드시 완료된 후, 그 이후의 연산이 실행되도록 강제”한다. (CPU의 성능 향상을 위한 재정렬이 일어나지 않게 됨)
    • volatile 필드에 접근할 때마다 JVM과 CPU는 이런 메모리 배리어를 삽입한다.
  1. 캐시 무효화 및 캐시 바운스 (Cache Invalidation & Cache Bounce):
  • 캐시 무효화

    • volatile 필드는 모든 스레드가 최신 값을 보도록 보장하기 위해, CPU 캐시에 저장된 값을 사용하지 않고 메인 메모리의 값을 읽거나, 캐시를 동기화(업데이트)하도록 만든다.
    • 이 과정은 캐시의 이점을 제대로 활용하지 못하게 만들어, 성능 저하로 이어진다.
  • 캐시 바운스란?

    • 여러 스레드가 같은 volatile 필드에 빈번하게 접근하면, 한 코어에서 값이 수정될 때마다 다른 코어의 캐시도 해당 값을 최신 상태로 갱신해야 한다.
    • 이로 인해 캐시 라인이 여러 코어 사이에서 계속 이동(바운스)하게 되고, 그 결과로 시스템 전체의 성능이 떨어지게 된다.
  1. 컨텍스트 스위칭 및 시스템 부하 증가(간접적 효과):
  • volatile 변수에 대한 빈번한 접근은 캐시 동기화 문제(캐시 바운스)로 인해 CPU 간의 협업이 복잡해지고,
    그 결과로 시스템 자원 사용과 간접적인 스레드 스케줄링(컨텍스트 스위칭) 오버헤드가 증가할 수 있다.

  • 이러한 부하 증가는 전체 시스템의 응답 속도를 늦추고, 성능 저하로 이어질 수 있다.

method timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode)

timeout() 메서드는 StreamMessage의 default 메서드로 정의한다.

1) timeout(Duration timeoutDuration)

default StreamMessage < T > timeout(Duration timeoutDuration) {
    return timeout(timeoutDuration, StreamTimeoutMode.UNTIL_NEXT);
}

이 메서드는 모드를 정하지 않고, 타임아웃 기간만 정한다. 디폴트 모드는 UNTIL_NEXT이다.
결과적으로 timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode)을 호출한다.

2) timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode)

default StreamMessage < T > timeout(Duration timeoutDuration, StreamTimeoutMode timeoutMode) {
    requireNonNull(timeoutDuration, "timeoutDuration");
    requireNonNull(timeoutMode, "timeoutMode");
    return new TimeoutStreamMessage < > (this, timeoutDuration, timeoutMode);
}

이는 기존 StreamMessage 타입의 객체의 타임아웃 기능을 추가한 TimeoutStreamMessage를 리턴한다.

timeout() 메서드는 StreamMessage를 구현 하는 HttpResponse, HttpRequest, WebSocket에서도 재정의가 필요하다. 왜냐하면 timeout()을 실행하는 타입에 맞게 반환해 줘야 되기 때문이다.

0개의 댓글