[Akka] Event Adapter

smlee·2023년 10월 24일
0

Akka

목록 보기
42/50
post-thumbnail

Event Adapter는 Schema가 변경되었을 때 자동으로 변환시켜주는 어댑터이다.

Event Adapter는 다음과 같은 순서로 동작한다

actor -> WriteEventAdapter -> Serializer -> journal -> ReadEventAdapter -> actor로 기존 스키마를 바뀐 스키마 형태로 바꾼다.

예시를 보며 Event Adapter의 용례를 알아볼 예정이다.


어쿠스틱 기타 재고를 관리하는 InventoryManager Actor가 있다고 생각하자. Guitar는 기타의 데이터를 가지고 있으며, AddGuitar라는 command를 통해 메시지를 받아들이고, GuitarAdded라는 레코드를 사용하여 복구한다.

object InventoryManager {

  case class Guitar(id: String, model: String, make: String)

  case class AddGuitar(guitar: Guitar, quantity: Int)

  case class GuitarAdded(guitarId: String, guitarModel: String, guitarMake: String, quantity: Int)
  
}

class InventoryManager extends PersistentActor with ActorLogging {
	import InventoryManager._
    
    val inventory: mutable.Map[Guitar, Int] = new mutable.HashMap[Guitar, Int]()
    
    override def persistenceId: String = "guitar-inventory-manager"
    
    override def receiveCommand: Receive = {
    	case AddGuitar(guitar @ Guitar(id, model, make), quantity) =>
        	val record = GuitarAdded(id, model, make, quantity)
            
            persist(record){ _ =>
        		addGuitar(guitar, quantity)

        		log.info(s"Added $quantity x $guitar to Inventory")
      		}

    	case "print" =>
      		log.info(s"Current inventory is: $inventory")
    }
    
    private def addGuitar(guitar: Guitar, quantity:Int) = {
      val existingQuantity = getGuitarQuantity(guitar)

      inventory.put(guitar, quantity + existingQuantity)
    }
    
    private def getGuitarQuantity(guitar: Guitar): Int = inventory.getOrElse(guitar, 0)
    
    override def receiveRecover: Receive = {
    	case event @ GuitarAdded(id, model, make, quantity) =>
        	log.info(s"Recovered: $event")

      		val currentGuitar = Guitar(guitarId, guitarModel, guitarMake)
      		addGuitar(currentGuitar, quantity)
    }
}

위와 같이 간단한 PersistentActor를 작성한다. 어쿠스틱 기타 재고가 들어왔다는 command가 들어오면 재고 수량을 올리고, 이를 이벤트로 저장하는 간단한 액터이다. 이 액터를 테스트하기 위해 다음과 같은 코드를 작성하고 실행해본다.

val system = ActorSystem("guitarInventorySystem")
val actor = system.actorOf(Props[InventoryManager], "simpleGuitarInventoryManager")

val guitars = (1 to 10).map(num => Guitar(s"guitar$num", s"scala $num", "StudyingAkka"))

guitars.foreach(guitar => actor ! AddGuitar(guitar, 5))

위와 같이 10개의 기타 종류를 선언하고 각 종류마다 5개의 재고를 추가하는 코드를 작성하면 결과는 다음과 같다.

어쿠스틱 기타가 종류별로 5개 씩 정상적으로 추가된 것이 확인된다. 이 데이터들이 제대로 복구되는지를 확인하기 위하여 foreach 문을 주석처리하고 재실행한다.

제대로 복구가 되는 것을 알 수 있다.

만약 해당 프로젝트가 성공적이어서 어쿠스틱 기타 뿐만 아니라 일렉트릭 기타 역시 취급하게 되었다고 가정해보자. 이때, 기존에 쌓여있던 데이터들은 유지한 채, 데이터 타입에 기타 종류만 포함시켜야 한다. 따라서 추가된 데이터 타입을 위해 Guitar에 타입란을 추가하고, Event는 새로운 객체를 만들어 GuitarAddedV2를 만든다.

변경된 부분만 보면 다음과 같다.

object InventoryManager {
  val ACOUSTIC = "acoustic"
  val ELECTRIC = "electric"

  case class Guitar(id: String, model: String, make: String, guitarType: String = ACOUSTIC)
  
  case class GuitarAddedV2(guitarId: String, guitarModel: String, guitarMake: String, quantity: Int, guitarType: String = ACOUSTIC)
}

class InventoryManager extends PersistentActor with ActorLogging{
	 override def receiveRecover: Receive = {
        case event @ GuitarAddedV2(guitarId, guitarModel, guitarMake, quantity, guitarType) =>
          log.info(s"Recovered: $event")

          val currentGuitar = Guitar(guitarId, guitarModel, guitarMake, guitarType)
          addGuitar(currentGuitar, quantity)

  }

  override def receiveCommand: Receive = {
    case AddGuitar(guitar @ Guitar(id, model, make, guitarType), quantity) =>
      val record = GuitarAddedV2(id, model, make, quantity, guitarType)

      persist(record){ _ =>
        addGuitar(guitar, quantity)

        log.info(s"Added $quantity x $guitar to Inventory")
      }
    }
}

위처럼 Actor가 변경하였다. 그리고 main 역시 일부만 변경한다.

  val guitars = (1 to 10).map(num =>
    if(num%2 == 0)
      Guitar(s"guitar$num", s"scala $num", "StudyingAkka", ELECTRIC)
    else
      Guitar(s"guitar$num", s"scala $num", "StudyingAkka", ACOUSTIC)
  )

위와 같이 변경하였으니, 실행하면 dead letter가 생긴다.

그 이유는 스키마가 변했으므로 해당 스키마를 처리할 수 없는 것이다. 이를 위해 EventAdapter를 사용한다.

1. application.conf

EventAdapter를 사용하기 위해서는 config에 어떤 어댑터를 사용할지 등록해준다.

eventAdapters{
    akka.persistence.journal.plugin = "cassandra-journal"
    akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"

    cassandra-journal {
            event-adapters {
                guitar-inventory-enhancer = "part4_practices.GuitarReadEventAdapter"
            }
        }

        event-adapter-bindings {
            "part4_practices.GuitarReadEventAdapter" = guitar-inventory-enhancer
        }
}

cassandra-journalevent-adapters를 등록하며, event-adapter-bindings에 어댑터명과 path를 바꾸어서 작성해준다.

2. eventAdapter 작성

class GuitarReadEventAdapter extends ReadEventAdapter{
  import part4_practices.actors.InventoryManager._

  override def fromJournal(event: Any, manifest: String): EventSeq = event match {
    case GuitarAdded(guitarId, guitarModel, guitarMake, quantity) =>
      EventSeq.single(GuitarAddedV2(guitarId, guitarModel, guitarMake, quantity, ACOUSTIC))

    case other => EventSeq.single(other)

  }
}

ReadEventAdapter trait를 상속하여 fromJournal을 통해 바꿀 형태로 변환한 다음 EventSeq.single(바꿀 타입)을 리턴하도록 한다.

3. config 적용

main의 ActorSystem에서 해당 config를 가져온다.

val system = ActorSystem("guitarInventorySystem", ConfigFactory.load().getConfig("eventAdapters"))

0개의 댓글