파이썬 asyncio 코루틴과 태스크

개발자 강세영·2022년 11월 14일
0

TIL

목록 보기
49/70
post-thumbnail

asyncio

  • asyncio는 async/await 구문을 사용하여 동시성 코드를 작성하는 파이썬의 기본 라이브러리이다.

  • asyncio는 고성능 네트워크 및 웹 서버, 데이터베이스 연결 라이브러리, 분산 작업 큐 등을 제공하는 여러 파이썬 비동기 프레임워크의 기반으로 사용된다.

  • asyncio는 I/O 바운드이면서 고수준의 구조화된 네트워크 코드에 적합하다.

async/await

  • async/await 문법으로 선언되는 코루틴은 asyncio 애플리케이션을 만드는 데 있어서 가장 일반적인 방법이다.
import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(main()) # hello 출력 -> 1초 대기 -> world 출력
  • 단지 코루틴을 호출하는 것으로는 실행되지 않는다.
main()
<coroutine object main at 0x1053bb7c8>
  • asyncio는 코루틴을 실제로 실행하기 위한 세 가지 주요 방법을 제공한다.
  1. 최상위 진입점 main() 함수를 실행하는 asyncio.run() 함수 (위 예시 코드의 main)

  2. async def 함수 안에서 await 코루틴 사용.

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    
# 다음 코드는 1초를 기다린 후 “hello”를 프린트한 다음 또 2초를 기다린 후 “world”를 프린트한다.
async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())
예상 출력:
started at 17:13:52
hello
world
finished at 17:13:55
  1. 코루틴을 asyncio 태스크로 동시에 실행하는 asyncio.create_task() 함수 사용.
  • 위의 예를 수정해서 두 개의 say_after 코루틴을 동시에 실행하는 코드
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")
  • 예상 출력은 이제 이 코드 조각이 이전의 것보다 1초 빠르게 실행되었음을 보여준다:
started at 17:14:32
hello
world
finished at 17:14:34

awaitable(어웨이터블)

  • 객체가 await 표현식에서 사용될 수 있으면 어웨이터블(awaitable) 객체라고 한다.

  • 많은 asyncio API는 어웨이터블을 받아들이도록 설계되었다.

  • 어웨이터블 객체에는 코루틴, 태스크, 퓨처의 세 가지 유형이 있다.

코루틴(Coroutine)

  • 파이썬 코루틴은 어웨이터블이므로 다른 코루틴에서 await 될 수 있다.
import asyncio

async def nested():
    return 42

async def main():
 
    # nested() 
    # 이런식으로 호출하면 아무 일도 일어나지 않는다.
	# await 되지 않았기 때문이다.
    
    print(await nested())  # 42 출력

asyncio.run(main())
  • 코루틴 이라는 용어는 두 가지의 밀접한 개념에 관련이 있다.
  • 코루틴 함수: async def 함수명
  • 코루틴 객체: 코루틴 함수를 호출하여 반환된 객체
  • asyncio는 기존 제너레이터 기반 코루틴도 지원한다.
    단 현재 파이썬에서 제너레이터 기반 코루틴에 대한 지원이 중단되었으며, 3.10 버전에서 삭제될 예정이다.

태스크(Task)

  • 태스크는 코루틴을 동시에 예약하는 데 사용된다.

  • 코루틴을 asyncio.create_task()와 같은 함수를 사용하여 태스크로 넣으면 코루틴이 곧 실행되도록 자동으로 예약된다.

import asyncio

async def nested():
    return 42

async def main():
	# nested()를 main()에서 동시적으로 실행하도록 해준다.
    task = asyncio.create_task(nested())

    # 이제 task를 통해 nested()를 취소하거나 nested()가 완료될 때까지 기다릴 수 있다.
    await task

asyncio.run(main())

퓨처(Future)

  • 퓨처는 비동기 연산의 최종 결과를 나타내는 특별한 저수준 어웨이터블 객체이다.

  • 퓨처 객체를 기다릴 때, 그것은 코루틴이 퓨처가 다른 곳에서 해결될 때까지 기다릴 것을 뜻한다.

  • 콜백 기반 코드를 async/await와 함께 사용하려면 asyncio의 퓨처 객체가 필요하다.

  • 일반적으로 응용 프로그램 수준 코드에서 퓨처 객체를 만들 필요는 없다.

  • 때때로 라이브러리와 일부 asyncio API에 의해 노출되는 퓨처 객체를 기다릴 수 있다:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )
  • 퓨처 객체를 반환하는 저수준 함수의 좋은 예는 loop.run_in_executor()이다.

asyncio 프로그램 실행하기

asyncio.run(coro, *, debug=False)

  • 코루틴 coro를 실행하고 결과를 반환한다.

  • 이 함수는 전달된 코루틴을 실행하고 asyncio 이벤트 루프와 비동기 제너레이터의 파이널리제이션을 관리한다.

  • 다른 asyncio 이벤트 루프가 같은 스레드에서 실행 중이면 이 함수를 호출할 수 없다.

  • debug=True 이면 이벤트 루프가 디버그 모드로 실행된다.

  • 이 함수는 항상 새 이벤트 루프를 만들고 끝에 이벤트 루프를 닫는다. asyncio 프로그램의 메인 진입 지점으로 사용해야 하고, 이상적으로는 한 번만 호출해야 한다.

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

태스크 만들기

asyncio.create_task(coro, *, name=None)

  • coro 코루틴을 Task로 감싸고 실행을 예약한다. Task 객체를 반환한다.

  • name이 None이 아니면, Task.set_name()을 사용하여 태스크의 이름으로 설정된다.

  • get_running_loop()에 의해 반환된 루프에서 태스크가 실행되고, 현재 스레드에 실행 중인 루프가 없으면 RuntimeError가 발생한다.

  • 이 함수는 파이썬 3.7에서 추가되었다. 파이썬 3.7 이전 버전에서는, 대신 저수준의 asyncio.ensure_future() 함수를 사용할 수 있다.

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

대기하기(sleep)

coroutine asyncio.sleep(delay, result=None, *, loop=None)

  • delay(초) 동안 블록한다.

  • 파이썬 time 모듈의 time.sleep()과 비슷한데, time.sleep()은 어웨이터블 객체가 아니라서 await로 쓸 수 없다.

  • result 인수가 있으면 코루틴이 완료될 때 호출자에게 result를 반환한다.

  • sleep()은 항상 현재 태스크를 일시 중단해서 다른 태스크를 실행할 수 있도록 한다.

  • loop 매개변수는 파이썬 3.8 부터 지원이 중단되며 3.10부터는 삭제될 예정이다.

  • 5초 동안 현재시간을 매초 표시하는 코루틴의 예:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

동시에 태스크 실행하기

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

  • aws 시퀀스에 있는 어웨이터블 객체를 동시에 실행한다.

  • aws에 있는 어웨이터블이 코루틴이면 자동으로 태스크로 예약된다.

  • 모든 어웨이터블이 성공적으로 완료되면, 반환된 값들이 합쳐진 리스트가 반환된다. 결괏값의 순서는 aws에 있는 어웨이터블의 순서와 일치한다.

  • return_exceptions가 False(기본값)면, 첫 번째 발생한 예외가 gather()를 기다리는 태스크로 즉시 전파된다. aws 시퀀스의 다른 어웨이터블은 취소되지 않고 계속 실행된다.

  • return_exceptions가 True면, 예외는 성공적인 결과처럼 처리되고, 결과 리스트에 집계된다.

  • gather()가 취소되면, 모든 제출된 (아직 완료되지 않은) 어웨이터블도 취소된다.

  • aws 시퀀스의 Task나 Future가 취소되면, 그것이 CancelledError를 일으킨 것처럼 처리된다 – 이때 gather() 호출은 취소되지 않는다. 이것은 제출된 태스크/퓨처 하나를 취소하는 것이 다른 태스크/퓨처를 취소하게 되는 것을 막기 위한 것이다.

  • loop 매개변수는 파이썬 3.8 부터 지원이 중단되며 3.10부터는 삭제될 예정이다.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

취소로부터 보호하기

awaitable asyncio.shield(aw)

  • 어웨이터블 객체를 취소로부터 보호한다. aw가 코루틴이면 자동으로 Task로 예약된다.
task = asyncio.create_task(something())
res = await shield(task)

위 코드는 다음과 같다.

res = await something()
  • 단, 해당 코루틴이 포함된 코루틴이 취소된 경우 something()에서 실행 중인 Task는 취소되지 않는다. something()의 관점에서 취소는 발생하지 않는다. 호출자가 취소된 경우에도 await 표현식을 사용하면 CancelledError가 발생한다.

  • something()이 다른 수단(즉, 자기 자신에 의해)에 의해 취소되면, shield()도 취소된다.

  • 취소를 완전히 무시하려면(권장하지 않음) 다음과 같이 shield() 함수를 try/except 절과 결합해야 한다.

task = asyncio.create_task(something())
try:
	res = await shield(task)
except CancelledError:
	res = None
  • 중요: 작업이 실행 중에 사라지는 것을 방지하려면 이 함수에 전달된 작업에 대한 참조를 저장하라. 이벤트 루프는 작업에 대한 약한 참조만 유지한다. 다른 곳에서 참조되지 않는 작업은 작업이 완료되기도 전에 언제든지 가비지 컬렉션 될 수 있다.
  • 파이썬 3.10 변경: loop 매개변수가 제거됨.
  • 파이썬 3.10 이후 지원 중단: aw가 Future와 유사한 객체가 아니고 실행 중인 이벤트 루프가 없으면 지원 중단(Deprecation) 경고가 발생한다.

시간 초과

asyncio.timeout(delay)

  • 뭔가를 기다리는 데 소요되는 시간을 제한하기 위해 사용할 수 있는 비동기 컨텍스트 관리자를 반환한다.

  • delay는 None 또는 float/int 초의 대기 시간이 될 수 있다.

  • delay가 None이면 시간 제한이 적용되지 않는다. 이는 컨텍스트 관리자가 생성될 때 delay를 알 수 없는 경우에 유용할 수 있다.

  • None 또는 None이 아닌 경우 모두 밑에서 설명할 Timeout.reschedule()을 사용하여 이미 생성된 컨텍스트 관리자를 재설정할 수 있다.

async def main():
    async with asyncio.timeout(10):
        await long_running_task()
  • 위의 예시 코드에서 long_running_task가 완료되는 데 10초 이상 걸리면, 컨텍스트 관리자는 현재 작업을 취소하고 내부적으로 asyncio.CancelledError를 처리하여 포착하고 처리할 수 있는 TimeoutError로 변환한다.
  • asyncio.timeout() 컨텍스트 관리자는 asyncio.CanceledErrorTimeoutError로 변환하는데, 이는 TimeoutError는 컨텍스트 관리자의 외부에서만 포착될 수 있음을 의미한다.
  • TimeoutError을 잡아내는 예시 코드
async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")
  • asyncio.timeout()에 의해 생성된 컨텍스트 관리자는 다른 기한으로 재설정되고 검사될 수 있다

class asyncio.Timeout(when)

  • 기한이 지난 코루틴을 취소하기 위한 비동기 컨텍스트 관리자이다.
  • when은 컨텍스트가 시간 초과되어야 하는 절대 시간이어야 한다. 이 시간은 이벤트 루프의 시계로 측정된다.
  • when이 None이면 시간 초과가 트리거 되지 않는다.
  • when < loop.time()인 경우 이벤트 루프의 다음 이터레이션에서 시간 초과가 트리거된다.
  • 메서드
    • when() → float | None
      현재 기한을 반환하거나, 현재 기한이 설정되지 않은 경우 None을 반환한다.
    • reschedule(when: float | None)
      시간초과를 재설정 한다.
    • expired() → bool
      컨텍스트 관리자가 기한을 초과했는지 여부를 반환한다.
  • 예시 코드
async def main():
    try:
        # We do not know the timeout when starting, so we pass ``None``.
        async with asyncio.timeout(None) as cm:
            # We know the timeout now, so we reschedule it.
            new_deadline = get_running_loop().time() + 10
            cm.reschedule(new_deadline)

            await long_running_task()
    except TimeoutError:
        pass

    if cm.expired():
        print("Looks like we haven't finished on time.")
  • 시간 초과 컨텍스트 관리자는 안전하게 중첩될 수 있고, 파이썬 3.11 버전에서 추가되었다.

asyncio.timeout_at(when)

  • asyncio.timeout()과 유사하지만 when이 대기를 중지할 절대 시간이거나 None이라는 점이 다르다.
  • 파이썬 3.11 버전에서 추가되었다.
  • 예시 코드
async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

coroutine asyncio.wait_for(aw, timeout)

  • 어웨이터블 객체 aw가 대기 시간이 초과되어 완료될 때까지 기다린다.
  • aw가 코루틴이면 aw는 자동으로 태스크(Task)로 예약된다.
  • timeout은 None 또는 float/int 초의 대기 시간이 될 수 있다.
  • timeout이 None이면 미래에 수행될 작업이 완료될 때까지 블록한다.
  • 시간 초과가 발생하면 작업을 취소하고 TimeoutError를 발생시킨다.
  • 작업 취소를 방지하려면 asyncio.shield()로 감싸면 된다.
  • 이 함수는 미래에 수행될 작업이 실제로 취소될 때까지 기다리므로 총 대기 시간이 제한 시간을 초과할 수 있다. 만약 취소 중에 예외가 발생하면 전파된다.
  • 만약 대기가 취소된다면 미래에 수행될 aw도 취소된다.
  • 파이썬 3.7 변경: 시간 초과로 인해 aw가 취소되면 wait_for 함수는 aw가 취소될 때까지 기다린다. 이전에는 TimeoutError가 즉시 발생했다.
  • 파이썬 3.10 변경: loop 매개변수가 제거됨.
  • 예시 코드
async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

출처

0개의 댓글