educative - kotlin - 14

Sung Jun Jin·2021년 5월 15일
1
post-custom-banner

Asynchronous Programming (비동기 프로그래밍)

현대 어플리케이션에서는 원격 서비스의 호출, DB 업데이트, 검색 등 많은 작업들이 내부적으로 계속 돌아간다. 프로그램의 효율화를 위해서는 각각의 작업들을 비동기, non-blocking(I/O 작업이 완료될 때 까지 기다리지 않고 다른 작업을 수행하는 개념) 방식으로 처리해야한다.

Starting sequentially

특정 공항의 온도와 연착여부를 알아보기 위해 기상정보를 수집하는 코드를 짜보자. 사용하는 API와 JSON parser는 FAA(미국 연방항공국)에서 제공해준다.

import java.net.URL
import com.beust.klaxon.*

class Weather(@Json(name = "Temp") val temperature: Array<String>)

class Airport(
  @Json(name = "IATA") val code: String,
  @Json(name = "Name") val name: String,
  @Json(name = "Delay") val delay: Boolean,
  @Json(name = "Weather") val weather: Weather) {

  companion object {
    fun getAirportData(code: String): Airport? {
      val url = "https://soa.smext.faa.gov/asws/api/airport/status/$code"
      return Klaxon().parse<Airport>(URL(url).readText())
    }
  }
}
  • Weather, Airport 클래스에선 전부 @Json annotation을 통해 JSON response를 매핑해준다.
  • getAirportdata() 메소드에서는 API 응답을 파싱해서 Airport 인스턴스를 생성한다.

메인 메소드에서는 우선 순차적으로 데이터를 가져오도록 하고 실행시간을 측정해주는 measureTimeMillis() 함수도 짜준다.

fun main() {                  
  val format = "%-10s%-20s%-10s"
  println(String.format(format, "Code", "Temperature", "Delay"))
  
  val time = measureTimeMillis {
    val airportCodes = listOf("LAX", "SFO", "PDX", "SEA")

    val airportData: List<Airport> = 
      airportCodes.mapNotNull { anAirportCode ->
        Airport.getAirportData(anAirportCode)
      }         
    
    airportData.forEach { anAirport -> 
      println(String.format(format, anAirport.code,
        anAirport.weather.temperature.get(0), anAirport.delay))
    }
  }
  
  println("Time taken $time ms")
}

실행결과 네트워크 상태에 따라 다르겠지만 대략 2초의 시간이 걸렸다.

Code Temperature       Delay
LAX  68.0 F (20.0 C)   false
SFO  50.0 F (10.0 C)   true
PDX  56.0 F (13.3 C)   false
SEA  55.0 F (12.8 C)   true 
Time taken 2112 ms

Making asynchronous

여태까지 구현한 getAirportData() 함수는 모두 blocking calls 방식이다. 즉, 요청이 발생하고 완료될 때까지 모든 일을 중단한다. 요약하면 다음과 같다

  1. LAX 공항 정보에 대한 요청을 보냄
  2. 그 다음 순서인 SFO 공항은 위에서 보낸 요청이 처리되기 전까지 대기

이 방식을 이전 공항에 대한 요청이 처리될때까지 기다리지 않고, 동시에 다른 요청을 같이 처리할 수 있는 non blocking calls 방식으로 바꿔보자.

import kotlin.system.*
import kotlinx.coroutines.*

fun main() = runBlocking {                  
  val format = "%-10s%-20s%-10s"
  println(String.format(format, "Code", "Temperature", "Delay"))
  
  val time = measureTimeMillis {
    val airportCodes = listOf("LAX", "SFO", "PDX", "SEA")

    val airportData: List<Deferred<Airport?>> = 
      airportCodes.map { anAirportCode ->
        async(Dispatchers.IO) { 
          Airport.getAirportData(anAirportCode)
        }
      }         

    airportData
      .mapNotNull { anAirportData -> anAirportData.await() }
      .forEach { anAirport -> 
        println(String.format(format, anAirport.code,
        anAirport.weather.temperature.get(0), anAirport.delay))
    }
  }
  
  println("Time taken $time ms")
}

비동기 처리의 떠한 코드 블록 내 작업이 완료 되기를 기다리는 runBlocking() 함수를 활용해. 메인 함수의 부분을 runBlocking()으로 감싸줬다.

여기서는 코루틴 블록의 결과를 가지고 와야하기 때문에 launch() 보다는 async() 함수를 사용한다. async() 함수를 사용해 CouroutineContext를 인자로 넘겨줘서 각자의 thread pool에서 getAirportData() 메소드를 실행시키도록 할 수 있다.

여기서 airportData의 타입이 List<Deferred<Airport?>> 인 이유는 async() 함수로 시작된 코루틴 블록은 Deferred 객체를 반환하기 때문이다.

맵을 돌면서 getAirportData()를 호출하는 첫번째 iteration에서 async() 함수는 람다 표현식으로 받은 인자를 실행시키고 Deferred<Airport?> 객체를 반환하여 리스트에 저장한다.

두번째 iteration부터는 getAirportData() 실행된 결과를 가지고 오기 위해 await() 메소드를 호출하고 가공한다.

위 작업들이 async() 함수를 사용했으므로 각자 메인 스레드가 아닌 다른 스레드에서 이루어진다. 실행결과를 보자, 실행시간은 1.6초로 전에 비해서 조금 줄었다. 물론 네트워크 컨디션에 따라서 실행시간은 달라질 수 있다

Code Temperature       Delay
LAX  68.0 F (20.0 C)   false
SFO  50.0 F (10.0 C)   true
PDX  56.0 F (13.3 C)   false
SEA  55.0 F (12.8 C)   true 
Time taken 1676 ms

Exception Handling

launch()async()를 사용해 코루틴을 실행하면 예외처리도 깔끔하게 해줘야 한다.

Be Mindful of Structured Concurrency

명시적으로 스코프를 지정해주지 않는 이상, 코루틴은 부모 코루틴의 context에서 돌아가는 계층적 구조를 가지고 있다. 이를 structured concurrency(구조화 된 동시성)이라고 한다. 이 계층 구조를 활용하면 부모 코루틴의 자동으로 자식 코루틴의 수명주기를 제어할 수 있다. 반대로 자식 코루틴에서 예외가 발생하면 부모 코루틴에서도 예외가 발생할 수 있다.

여기서는 자식 코루틴에서 문제가 생겼을때 부모 코루틴의 실행을 취소하지 않는 방법을 알아보자.

launch and exceptions

launch() 함수를 사용하다가 발생한 예외는 호출한 쪽에서 받아볼 수 없다. 이런 예외를 핸들링 하기 위해 전에 있던 공항 예제를 다시 가져와보자.

import kotlinx.coroutines.*

fun main() = runBlocking {
  try {
    val airportCodes = listOf("LAX", "SF-", "PD-", "SEA")

    val jobs: List<Job> = airportCodes.map { anAirportCode ->
      launch(Dispatchers.IO + SupervisorJob()) {
        val airport = Airport.getAirportData(anAirportCode)
        println("${airport?.code} delay: ${airport?.delay}")
      }
    }

    jobs.forEach { it.join() }
    jobs.forEach { println("Cancelled: ${it.isCancelled}") }
  } catch(ex: Exception) {
    println("ERROR: ${ex.message}")
  }
}

우선 첫번째로 메인 함수를 try-catch 블록으로 한번 감싸주자

코루틴 예외 처리 과정을 보기 위해 여기서 async() 함수 대신 launch() 함수로 교체했다. launch() 함수를 사용하면 코루틴의 흐름을 제어하는 Job 객체를 반환한다. Job 객체의 isCancelled 프로퍼티를 사용하면 코루틴의 성공여부를 확인해볼 수 있다.

코드를 실행시켜보자

LAX delay: false
SEA delay: true
Exception in thread "DefaultDispatcher-worker-1" Exception in ...

메인 함수에서 try-catch문로 예외 핸들링을 했음에도 불구하고 코루틴에서 발생한 예외를 잡지 못했다. 이유는 launch() 함수로 실행된 코루틴은 예외를 호출부로 던지지 않는 특성 때문이다.

따라서 launch() 함수를 사용한다면 CoroutineExceptionHandler를 사용해 코루틴에서 발생한 예외를 처리해줘야 한다. 그리고 launch() 함수에 등록을 시켜줘야 한다.

fun main() = runBlocking {
  // 핸들러 정의 
  val handler = CoroutineExceptionHandler { context, ex ->
    println(
      "Caught: ${context[CoroutineName]} ${ex.message?.substring(0..28)}")
  }

  try {
    val airportCodes = listOf("LAX", "SF-", "PD-", "SEA")

    val jobs: List<Job> = airportCodes.map { anAirportCode ->
      // 코루틴 네임 할당, handler 등록
      launch(Dispatchers.IO + CoroutineName(anAirportCode) +
        handler + SupervisorJob()) {
        val airport = Airport.getAirportData(anAirportCode)
        println("${airport?.code} delay: ${airport?.delay}")
      }
    }
    
    jobs.forEach { it.join() }
    jobs.forEach { println("Cancelled: ${it.isCancelled}") }
  } catch(ex: Exception) {
    println("ERROR: ${ex.message}")
  }
}

실행결과

Caught: CoroutineName(PD-) Unable to instantiate Airport 
Caught: CoroutineName(SF-) Unable to instantiate Airport 
SEA delay: true
LAX delay: false
Cancelled: false
Cancelled: true
Cancelled: true
Cancelled: false

async and exceptions

async() 함수는 launch()와 다르게 Deferred<T> 객체를 반환한다. 이 객체는 코루틴에서 발생하는 예외를 호출부로 던져주는 특성이 있다. 따라서 launch() 처럼 따로 코루틴 예외 핸들러를 만들어서 넘겨줘도 무시한다. 결론은 async() 함수를 사용하면 간단하게 try-catch문으로 감싸주면 핸들링이 된다.

import kotlinx.coroutines.*

fun main() = runBlocking {
  val airportCodes = listOf("LAX", "SF-", "PD-", "SEA")
  
  val airportData = airportCodes.map { anAirportCode ->
    async(Dispatchers.IO + SupervisorJob()) {
      Airport.getAirportData(anAirportCode)
    }
  }
  
  for (anAirportData in airportData) {
    try {
      val airport = anAirportData.await()

      println("${airport?.code} ${airport?.delay}")
    } catch(ex: Exception) {
      println("Error: ${ex.message?.substring(0..28)}")
    }
  }
}

실행결과

LAX false
Error: Unable to instantiate Airport
Error: Unable to instantiate Airport
SEA true
profile
주니어 개발쟈🤦‍♂️
post-custom-banner

1개의 댓글

comment-user-thumbnail
2021년 6월 6일

멋저요 자기

답글 달기