우리는 비동기적으로 실행되며 많은 양의 데이터를 처리할 수 있는 시스템을 가지고 싶다. 이를 위해서는 Akka Stream
을 사용해야 한다. Akka stream은 reactive distributed system
이다. 본격적으로 Akka stream에 대해 공부하기 전에 관련 규칙들을 정리할 예정이다.
reactive stream 사이트에도 잘 정리되어 있지만, 별개로 정리해 볼 예정이다.
Publisher는 reactive stream에서 요소들을 제공하는 역할을 한다. (emits the elements) Akka Stream에서는 Publisher
를 Source
로 부른다.
Subscriber는 Publisher가 제공하는 요소들을 받는 역할을 한다. (receives the elements that publisher emits) Subscriber는 Akka Stream에서는 Subscriber
를 Sink
라고 부른다.
Processor는 Publisher가 제공해주는 요소(데이터)들을 가공해주는 역할을 한다. (transform the element along the way) 이 작업 역시 비동기적으로 작동한다.
그림으로 나타내면 위와 같이 Publisher
와 Subscriber
사이 Processor
가 중간 역할을 하는 것이다. 이때, 여러 개의 Processor가 중첩될 수도 있다. 즉, Publisher -> processor -> processor -> ... -> processor -> Subscriber
와 같은 플로우가 가능하다.
Reactive Streams는 SPI(Service Provider Interface)
이다. SPI는 어떻게 작업을 할 지 정의할 수 있으며, 컴포넌트 사이의 프로토콜 역시 정의할 수 있다. 이때 유의해야할 점은 API가 아닌 점이다. Stream을 구현하기 위해 여러 API를 사용할 수 있다. 우리는 여기서 Akka Streams API를 사용하여 SPI를 구현할 것이다.
우리는 Akka Stream을 위해 뼈대가 있어야 한다. Akka Stream은 다음과 같은 환경에서 작성된다.
val system = ActorSystem("AkkaStream")
val materializer = Materializer(system)
우리는 그 동안의 챕터에서 ActorSystem
타입의 system
은 많이 사용했었다. 하지만, Akka Streams에서는 Materializer
타입의 상수 역시 선언할 것이다. Materializer
는 stream들을 Run시킬 수 있도록 한다. 이때, ActorSystem
이 implicit argument
로 들어간다.
(캡쳐 이미지는 ActorMaterializer
이지만, 현재는 Materializer
를 사용해야 한다. Materializer
가 요구하 implicit 인수는 ActorMaterializer
와 같다.)
따라서, 위의 코드처럼 system을 직접 넣어주어도 되지만, ActorSystem을 Implicit로 선언해주면 된다. 따라서 밑과 같은 코드의 형태가 된다.
implicit val system = ActorSystem("AkkaStream")
val materializer = Materializer
이때, materializer는 stream들을 실행시킬 수 있는 역할인데, 이 역시 implicit로 들어간다. 밑은 stream을 실행시키기 위핸 run()
메서드이다.
Materializer
를 implicit로 요구하므로 위쪽의 Materializer
타입의 상수를 implicit
키워드를 붙여준다. 따라서 다음과 같은 코드를 기본으로 작성해야 한다.
implicit val system = ActorSystem("AkkaStream")
implicit val materializer = Materializer
와 같이 코드를 작성해야 한다.
Source
는 reactive streams의 Publisher에 해당하며, Sink
는 Subscriber, Flow
는 Processor에 해당 한다. 위의 ActorSystem
과 Materializer
는 이 3개의 요소를 사용하기 위한 환경 작업이었고, 이제 실제로 코드를 작성하려고 한다.
이 때, 유의해야 할 점은 Source, Sink, Flow 모두 akka.stream.scaladsl
패키지에 있는 class인 것이다. import시킬 때 제대로 확인하고 Import하자.
val integerSources = Source(1 to 10)
val nameSource = Source.single[String]("leesomyoung")
위와 같이 integerSources
를 보면 1부터 10까지의 정수를 보내는 source인 것을 알 수 있으며, 1개의 데이터만 가질 경우 Source.single[데이터타입]( 값 )
을 사용하는 것을 알 수 있다. 이때 주의해야 할 점은, Source는 serializable
하며, immutable
한 객체들을 가질 수 있다는 점이다.
다음은 Source
에서 배출한 요소들의 데이터를 처리하는 Processor 역할의 Flow
를 선언해주어야 한다.
val doubleIntegerFlow = Flow[Int].map(_ * 2)
val integerTakeThreeFlow = Flow[Int].take(3)
val nameDropFlow = Flow[String].map(_.substring(3))
위의 예시 코드들을 보면 알겠지만, Flow[데이터타입].HOF
형태로 작성을 한다.(HOF : High Order Function) 이때, Flow 역시 akka.stream.scaladsl
패키지 내 원소임을 유의하자.
예시 코드에서 doubleIntegerFlow
는 모든 원소들을 2배 해주는 데이터 프로세스이며, integerTakeThreeFlow
는 리스트의 여러 개 원소 중 앞의 3개를 가져오는 프로세스이다. 그리고, nameDropFlow는 가장 앞의 3글자를 제외한 모든 글자를 가져오는 프로세스이다.
Sink
는 Source에서 제공하는 데이터들을 Flow를 통해 프로세싱된 데이터를 받는 곳이다. Sink 역시 akka.stream.scaladsl
패키지의 원소이며, Sink.HOF[데이터타입]( 작업 )
와 같은 형태로 정의된다.
val integerSink = Sink.foreach[Int](println)
val nameSink = Sink.foreach[String](println)
예시를 위한 코드이므로 간단하게 Sink에서 받아온 모든 데이터들을 출력하도록 하였다.
Source
, Flow
, Sink
를 연결하기 위해서는 via
와 to
메서드를 사용해야 한다. 3개를 동시에 연결하기 위해서는 Source명.via(flow명).to(Sink명)
과 같이 작성해야 한다.
이때, Flow
는 여러 개가 chaining될 수 있는 것에 유의한다. 여러 개를 연결하기 위해서는 Source명.via(flow명).via(flow명).via(flow명).via(flow명).to(Sink명)
과 같이 작성해주면 된다.
val nameWork = nameSource.via(nameDropFlow).to(nameSink)
val integerWork = integerSources.via(integerTakeThreeFlow).via(doubleIntegerFlow).to(integerSink)
예시 코드들은 위와 같이 작성하면 된다. integerWork
는 앞의 3개의 원소만 가져오는 작업과 모든 원소를 2배 하는 작업을 연결한 코드이다.
실행하기 위해서는 run
메서드를 실행해야 한다.
integerWork.run()
nameWork.run()
과 같이 실행하면 된다.
그러면 실행 결과는 위와 같이 된다. 우리는 여기서 비동기적으로 실행되므로 integerWork가 실행되는 도중 먼저 작업이 끝난 nameWork가 출력되는 것을 알 수 있다.