[Akka] Graph와 GraphDSL

smlee·2023년 11월 1일
0

Akka

목록 보기
47/50
post-thumbnail

이전 Akka 포스트들에서 Source, Flow, Sink를 엮어서 graph를 만들었다. 하지만 그전에는 input과 output이 1:1인 그래프만 작성했었다. GraphDSL을 사용하여 1:N 관계가 포함된 복잡한 그래프 역시 작성해 볼 예정이다.


  1. fan-in
    fan-in은 input이 여러 개이며 output은 1개인 것을 뜻한다.
  • Zip[A,B] - 2개의 Input과 1개의 output을 뱉는 것으로, A와 B 2개 요소를 묶어 (A, B) 튜플로 리턴한다.
  • Concat[A] - 2개의 Input과 1개의 output을 뱉는 것으로, 두 개의 stream을 연결한다.
  • ZipWith[A, B, ... , Out] - N개의 Input과 1개의 Output을 뱉는 것으로, N개의 Input을 1개의 output으로 리턴한다.
  • Merge[In] - N개의 input과 1개의 output을 가진다. input 데이터들에서 랜덤으로 픽하여 하나 씩 output으로 뱉는 것을 뜻한다.
  1. fan-out
    fan-out은 input이 1개이며 output은 여러 개인 것을 뜻한다.
  • Broadcast[T] - 1개의 Input이 있으며, N개의 Output이 있다. input 요소가 각각 output으로 나온다.
  • Balance[T] - 1개의 Input이 있으며, N개의 Output이 있다. input 요소가 output 포트 중 하나로 방출된다.
  • Unzip[A, B] - 1개의 Input에 2개의 output이 있다. (A, B)와 같은 형태의 튜플 요소가 방출된다.
  • UnzipWith[In, A, B, ...] - 1개의 Input에 N개의 Output이 있다. N이 20 이하일 때 요소들을 방출한다.

복잡한 graph는 다음과 같이 코드가 작성된다.

import scala.concurrent.duration._

val numberSource = Source(1 to 1000)
val fastSource = numberSource.throttle(10, 1 second)
val slowSource = numberSource.throttle(2, 1 second)

val incrementer = Flow[Int].map(_ + 1)
val multiplier = Flow[Int].map(_ * 10)

val firstSink = Sink.foreach[Int](num => println(s"Sink 1: $num"))
val secondSink = Sink.foreach[Int](num => println(s"Sink 2: $num"))

val complexGraph = RunnableGraph.fromGraph(
	GraphDSL.create(){ implicit builder: GraphDSL.Builder[NotUsed] =>
    	
        import GraphDSL.Implicits._
        
       val merge = builder.add(Merge[Int](2))
       val balance = builder.add(Balance[Int](2))

       fastSource ~>  incrementer ~> merge.in(0)
       slowSource ~>  multiplier ~> merge.in(1)

       merge.out ~> balance

       balance.out(0) ~> firstSink
       balance.out(1) ~> secondSink

       ClosedShape
        
    }
)

complexGraph.run()

위의 코드를 실행하면 어떻게 될까? 위의 코드는 2개의 Source에서 2개의 flow를 연결한 후 flow의 결과값들을 각각 2개의 Sink에 연결한 것이다.

굳이 복잡한 과정을 거치지 않고도 위와 같이 2개의 Source가 잘 처리되는 것을 알 수 있다.

  val input = Source(1 to 1000)

  val incrementer = Flow[Int].map(_ + 1) 
  val multiplier = Flow[Int].map(_ * 10) 
  val output = Sink.foreach[(Int, Int)](println) 

  val graph = RunnableGraph.fromGraph( 
    GraphDSL.create() { implicit builder:GraphDSL.Builder[NotUsed] => 

      import GraphDSL.Implicits._ 

      val broadcast = builder.add(Broadcast[Int](2)) 
      val zip = builder.add(Zip[Int, Int]) 

      input ~> broadcast
      broadcast.out(0) ~> incrementer ~> zip.in0
      broadcast.out(1) ~> multiplier ~> zip.in1

      zip.out ~> output

      ClosedShape 
    } 
  ) 

   graph.run() 

위의 코드는 2개의 flow에서 계산된 결과를 엮어 하나의 튜플을 출력하는 코드이다. Source -> 1:2 -> broadcast -> 2:1 -> zip -> output의 수순을 거친다.

Reference

0개의 댓글