오픈소스 Armeria에 기여해보기(2) - StreamMessage 분석

WooSeong·2022년 3월 5일
1
post-thumbnail

이슈 선정 🐦

까치밥을 먹어보자


저번글에 이어 기여할만한 이슈를 선정해보겠습니다.

Armeria 에서는 저같은 초보 기여자들을 위해 감사하게도 good first issue 라벨이 붙은 쉬운 이슈들을 올려 주시곤 합니다. 말하자면 초보 기여자들을 위한 까치밥 같은것 이지요

good first issue는 초보기여자들이 접근하기 쉬운 이슈이기 때문에 저처럼 기여를 해보고 싶어 하시는 분들께서 금방 낚아 채가시곤 합니다.
저도 사실 이슈를 살펴보고 있었는데 2일째 보고있던중에 다른분께서 글을 먼저 다셔서 그 이슈를 포기하고 다른이슈로 갈아탔었습니다😂 이 글을 보는 초보 기여자 여러분들은 그런일 없게 빠르게 이슈를 파악해서 코멘트를 다시는걸 추천드립니다

제가 선정한 이슈입니다.

Provide a general way to write an StreamMessage to a file

StreamMessage를 파일로 쓸 때의 일반적인 방법을 writeTo 메서드를 만들어 제공하라는 이슈입니다.

StreamMessage기능을 자세히 알지 못하더라도 writeTo 메서드를 만드는데 어려움은 없겠지만 오픈소스를 통한 성장이 목적이기에 StreamMessage 에대해 자세히 파헤쳐 보겠습니다.

StreamMessge 파헤쳐보기 ⛏⛏

우선 StreamMessageAPI 문서를 살펴보겠습니다.

A variant of Reactive Streams Publisher, which allows only one Subscriber. Unlike a usual Publisher, a StreamMessage can stream itself only once

하나의 Subscriber만 가지는 Reactive Streams의Publisher이고 일반적인 Publisher와 다르게 자신을 한번만 스트리밍 할 수 있다고 합니다.

Publisher? Subscriber? Reactive Streams는 또 무엇일까요..?
어떤일을 하는지 알아 보도록 하겠습니다.

Reactive Streams공식 홈페이지에서는 다음과 같이 정의 하고있습니다.

Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking back pressure.

논블로킹(Non-blocking) 백 프레셔(back pressure)의 스트리밍(Streaming)을 이용한 비동기(Asynchronous) 데이터 처리의 표준이라고 합니다. 😨

모르는 말이 엄청 나오는것 같습니다.. 하나하나 알아보겠습니다.
대부분의 용어의 정의는 리액티브 프로그래밍에 대해 정의한 리액티브선언문에서 찾아 볼 수 있습니다.

Reactive Streams의 정의


비동기(Asynchronous) 처리 ❓

동기처리방식과 비교하여 그림으로 살펴보겠습니다.

동기 처리 방식에선 클라이언트가 서버에 요청(request)를 보내면 서버에서 응답(response)이 올때 현재 쓰레드(thread)가 다른 일을 하지 못하고 기다리게 됩니다. 이것을 블로킹(blocking)이라고 합니다. 이것 때문에 먼저 보낸 서버의 응답이 끝나고 나서야 다른서버에 요청을 보낼 수 있습니다.

다음은 비동기 처리 방식입니다. 비동기 방식은 동기 방식과 다르게 현재 쓰레드가 블로킹 되지 않기 때문에 다른 서버에 요청을 보낸 뒤 다른 일을 처리 할 수도 있고 다른 서버에 요청을 보낼 수도 있습니다.

한마디로 하나씩 일을 하는 동기 방식과 다르게 일을 동시에 처리 할 수 있다는것 입니다.

스트리밍 처리 ❓

전통적인 데이터 처리 방식과 비교하여 그림으로 살펴보겠습니다

전통적인 데이터 처리 방식에서는, 데이터 처리 요청이 오면 처리에 필요한 데이터를 모두 애플리케이션의 메모리에 저장한 다음에 처리를 해야합니다. 이 방식은 만약 필요한 데이터의 크기가 메모리 용량 보다 크다면 'out of memory' 에러가 발생하는 단점과 많은 요청이 몰리면 다량의 가비지 컬렉션(Garbage Collection)에 의해 서버가 응답하지 못하는 일이 생길수 있다는 단점이 있습니다.

스트림 처리방식은 이름처럼 입력 데이터에 대한 파이프 라인을 만들어 데이터가 들어오는 대로 물 흐르듯이 구독(subscribe)하고, 데이터를 하나씩 처리 한 뒤 발행(publish)까지 한번에 연결하여 처리하는 방식입니다. 이를 적용하면 크기가 작은 시스템 메모리로도 많은 양의 데이터를 처리 할 수 있다고 합니다.

전통적인 데이터는 얼음이고 스트림데이터는 물이라고 생각하면 더 와닿는것 같습니다.

백프레셔(back pressure) ❓

1초에 100개의 메시지를 전송하는 Publisher
1초에 10개의 메시지를 처리하는 Subscriber 가 있다고 가정해봅시다.

이 Publisher가 초당 100개의 메시지를 그대로 전송하게되면 Subscriber는 이를 다 처리 하지못하고 에러를 발생 시키거나 재요청하며 리소스를 낭비하게 될것입니다. 이런식으로 Subscriber의 상태를 고려하지 않고 데이터를 밀어 넣는 방식을 푸시(push)방식이라고 합니다.

푸시방식의 문제점은 Subscriber가 필요한 만큼의 데이터만 Publisher에게 요청 함으로써 해결 할 수 있습니다. 이러한 방식을 풀(pull)방식 이라고합니다.

Subscriber 가 이미 9개의 일을 처리하고 있다면 추가로 1개만 더 요청하여 탄력적으로 일을 처리 할 수도 있습니다. 이러한 풀 방식이 백프레셔 입니다.

Reactive Streams API 🌊

ReactiveStreams를 정의하는 용어들을 알아보니 무엇인지 감이 잡히는것 같습니다.
이번엔 내부가 어떻게 구성되어 있는지 살펴보겠습니다.
자세한 내부구성은 ReactivStreams 깃허브 에서 보실 수 있습니다.


public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s); //Subscriber를 구독
}
 
public interface Subscriber<T> {
   public void onSubscribe(Subscription s); // Supscription 전달
   public void onNext(T t); // 데이터 처리
   public void onError(Throwable t); // 에러처리
   public void onComplete(); // 작업완료
}
 
public interface Subscription {
   public void request(long n); // n개의 데이터 요청
   public void cancel(); // 구독 취소
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {} // 처리단계를 나타냄

기능은 코드에 보시는바와 같습니다.

다음은 Reactive Streams의 사용흐름을 알아보겠습니다.


1. subscriber가 subscribe함수를 사용해 Publisher에게 구독을 요청합니다.


2. Publisher는 onSubscribe 함수를 사용해 Subscriber에게 Subscription을 전달합니다.

3. 이제 SubscriptionSubscriberPublisher 간 통신의 매개체가 됩니다. SubscriberPublisher에게 직접 데이터 요청을 하지 않습니다. Subscriptionrequest 함수를 통해 Publisher에게 전달합니다.


4. PublisherSubscription을 통해 SubscriberonNext에 데이터를 전달하고, 작업이 완료되면 onComplete, 에러가 발생하면 onError 시그널을 전달합니다.


5. SubscriberPublisher, Subscription이 서로 유기적으로 연결되어 통신을 주고받으면서 subscribe부터 onComplete까지 연결되고, 이를 통해 백 프레셔가 완성됩니다.

(출처 - LINE 기술블로그 Ikhun Um님의 "Armeria로 Reactive Streams와 놀자! – 1" )

인터페이스로만 구성되어 있는 것을 볼 수 있는데요. 그렇다면 구현체는 어디있는 것일까요?

Reactive Streams 는 공식문서에서 주어지는 명세서에 맞춰 구현하여 사용할 수 있습니다. 하지만 매우 어려운일이라 직접 만들기보단 이미 만들어져있는 구현체를 사용하면 된다고 합니다. 용도에 따라 다양한 구현체가 있지만 모두 Reactive Streams를 사용해서 통신을 할 수가 있습니다.

Armeria에서는 어떻게 사용하고 있을까요?

앞서 본 StreamMessage의 정의에서도 확인 할 수 있듯이, 지금까지 알아봐온 StreamMessage가 바로 Reative Streams의 Publisher를 상속받아 기능을 확장한 함수형 인터페이스 였습니다.

이렇게해서 StreamMessage의 정의에 대해서 알게되었습니다.

계속해서 StreamMessage가 어떤기능을 하는지 API문서를 계속 읽어보겠습니다.

StreamMessage API문서 읽어내리기 📗👀


When is a StreamMessage fully consumed?

A StreamMessage is complete (or 'fully consumed') when:

  • the Subscriber consumes all elements and Subscriber.onComplete() is invoked,
  • an error occurred and Subscriber.onError(Throwable) is invoked,
  • the Subscription has been cancelled or
  • abort() has been requested.


    When fully consumed, the CompletableFuture returned by whenComplete() will complete, which you may find useful because Subscriber does not notify you when a stream is cancelled.

Reactive Streams에대해 알아봤기 때문에 작업완료조건들과 작업완료후에 반환되는것을 나타내는 것이라는것을 쉽게 알 수 있습니다. 😏

다음줄도 봐보겠습니다.

Publication and Consumption of pooled HttpData objects

StreamMessage will discard the publication request of a pooled HttpData silently and release it automatically when the publication is attempted after the stream is closed.

For pooled HttpData, StreamMessage will convert them into its unpooled version that never leak, so that the Subscriber does not need to worry about leaks.

처음보는 내용이 많으니 해석해보겠습니다.

pooled HttpData 객체를 발행하고 소모.

StreamMessage는 pooled HttData의 게시요청을 자동으로 삭제하고 스트림이 닫힌 후 게시가 시도될 때 자동으로 해제합니다.

pooled HttpData의 경우 StreamMessage는 누출되지 않는 unpooled 버전으로 변환하므로 구독자는 누출에 대해 걱정할 필요가 없습니다.

StreamMessagepooled HttpData를 사용하는 것을 알 수 있었습니다.
근데 pooled HttpData가 뭔지 모르겠네요..! 🤪 한번 알아 보도록하겠습니다

pooled HttpData가 뭘까?

우선 HttpData가 무엇인지 알아보겠습니다. Armeria의 HttpData 클래스에 대한 문서를 통해 알아보겠습니다

HTTP/2 data that contains a chunk of bytes.

바이트청크를 포함하는 HTTP/2 데이터 라고 한 줄로 간단히 설명되어 있습니다. 그럼 pooled HttpData은 무엇일까요?
PooledObject 클래스 문서 에서는 다음과 같이 정의하고 있습니다.

What is a pooled HttpData?

A pooled HttpData is a special variant of HttpData whose HttpData.isPooled() returns true.It's usually created via HttpData.wrap(ByteBuf) by wrapping an existing ByteBuf

HttpData의 특수변형으로 HttpData.isPooled()를 사용했을때 true를 반환 한다고 합니다. HttpData.wrap(ByteBuf)를 사용해서 생성할 수 있다고 하네요.

여기서 ByteBuf는 java표준인 NIO의 ByteBuffers를 편하게 사용하기위해 Netty에서 제공하는 기능으로 바이트 데이터를 저장하고 읽는 저장소의 역할을 합니다.

간단한 그림으로 표현하면 다음과 같습니다.

StreamMessage API 기타 내용

나머지 내용을 읽어보겠습니다. 바로 해석해서 보겠습니다.

Subscriber가 StreamMessage pooled HttpData의 복사본을 만드는 것을 원하지 않는 경우 subscribe할때 SubscriptionOption으로 WITH_POOLED_OBJECTS를 지정합니다. Subscriber는 Subscriber.onNext(Object)로 제공된 객체를 해제할 책임이 있습니다.

Subscriber.onError(Throwable)는 Subscription.cancel()에 의해 발생하는 CancelledSubscriptionException을 제외한 모든 예외가 발생할 때 호출됩니다.Subscription.cancel()이 호출될 때 Subscriber가 Subscriber.onError(Throwable)에 의해 알림을 받도록 하려면 subscribe할 때 SubscriptionOption.NOTIFY_CANCELLATION을 지정합니다.

특정상황에서 사용시의 안내사항을 볼 수 있습니다.

StreamMessage 정리 😀

지금까지 StreamMessage에 대해 알아본것을 정리해보겠습니다.

StreamMessage는 Reactive Streams 의 Publisher를 구현한 것이고 하나의 Subscriber를 갖습니다.

StreamMessagepooled HttpData를 발행하며 소비 합니다.

도식으로 단순화 하면 다음과 같습니다.

지금까지 이슈해결을 위해 Armeria의 함수형 인터페이스 StreamMessage 에 대하여 알아보았습니다.

다음포스트부터는 본격적으로 이슈를 해결해보겠습니다.

읽어주셔서 감사합니다.

피드백주시면 감사히 받겠습니다😊

- 참고자료

LINE 기술블로그 Ikhun Um님의 "Armeria로 Reactive Streams와 놀자! – 1"
ReactivStreams 깃허브
리액티브선언문
Armeria API 문서
Netty API 문서

profile
개발자로 성장중입니다

0개의 댓글