다루는 내용
- asyncio 언제 써야해? 장점?
- 기초 예시, 기초 예시 코드
- 기초 개념
- awatiable: coroutine, Future , Task
- 다중 작업을 동시에 실행시키는 방법?
언제 써야해? 장점?
- 한줄요약: 1개의 주방 / 1개의 chef / 10개의 해야할 요리
- 코루틴은 스레드와 달리 아래의 것들이 필요 없다.
- 메모리 부가 비용
- 스레드 시작 비용
- 컨텍스트 전환 비용
- 복잡한 lock(동기화 코드) 이용
- 시스템 콜(블록킹 I/O와 스레드 시작도 포함해)을 코루틴으로 만들면, 프로그램의 응답성이 좋아지고 사용자가 느끼는 지연 시간을 줄일 수 있다.
- 시스템 콜
- 운영 체제의 커널이 제공하는 서비스에 대해, 응용 프로그램의 요청에 따라 커널에 접근하기 위한 인터페이스이다.
- 언어로 작성된 프로그램들은 직접 시스템 호출을 사용할 수 없기 때문에, 고급 API를 통해 시스템 호출에 접근하게 하는 방법이다.
- 작업의 진행 상황을 반복적으로 측정하는 event loop 가 존재하여, 각 작업들을 효율적으로 관리해주므로 → I/O 대기 시간이 효율적으로 관리됩니다.
- 다른 멀티 스레딩 패키지들은 "non-blocking APIs" 라는 점에서 비동기이지만, 여전히 thread/process pools가 알아서 하는 것에 의존하고 있다.
- request 요청 시점과 - 결과 반환 시점을 알 수 있다면 → asyncio를 써서 보다 효율적으로 다른 작업으로 전환할 수 있다.
- (확인 필요)코루틴이 여러개 수행되더라도, 단일 스레드에서 수행하기 때문에 lock을 사용할 필요가 없다. (I/O 는 asyncio가 제공하는 이벤트 루프의 일부분으로 병렬화된다.)
- blocking call이 전혀 없다. asyncio.run() entry point가 유일한 blocking part이다.
- asyncio는 multiple servers와 연결된 servers와 clients 의 선택 도구로 사용되야 한다.
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
return
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
기초 예시
코루틴?
-
코루틴은 비동기 프로그래밍을 위해 사용되는 함수
-
실행을 중지하고 재개할 수 있음
-
async와 await 키워드를 사용하여 정의되며, I/O 바운드 작업을 효율적으로 처리하기 위해 사용
-
코루틴은 비동기 작업을 순차적으로 작성할 수 있게 해주어, 복잡한 비동기 코드를 더 쉽게 관리할 수 있게 해줍니다.
-
coroutine
- 실행을 일시적으로 중단하고 다시 시작할 수 있도록 함
- 코루틴은 동시성은 제공하지만, 병렬성은 제공하지 않습니다.
- 외부에서 제어할 수 없는 비선점 멀티태스킹을 지원
- 비선점 멀티태스킹: CPU를 차지하고 있는 스레드가, 자신이 이제 CPU 연산이 필요 없음을 나타냈을 때에만 OS가 이를 회수할 수 있는 경우
- 선점형 멀티태스킹: 특정 스레드가 CPU를 차지해서 사용하더라도, OS가 타이머나 여타 트리거를 통해 개입하여 강제로 CPU 사용을 빼았아 올 수 있는 경우
async def abc():
코드
ABC = abc()
await ABC
- abc 함수가 코루틴이 된다.
- 코루틴은 비동기로 실행된다.
- ABC = abc()
- 코드가 실행되지 않고, 코루틴 객체(인스턴스)를 리턴할 뿐이다.
- "await 코루틴 인스턴스를 해야만"!!
- 팬 아웃 수행
기초 예시 코드
- asyncio는 coroutines를 이용하여 쓰인다.
- coroutines는 await를 이용하여 흥미로운 일이 일어날 때까지 parallel하게 기다린다(suspend).
- suspending은 blocking이랑 다르다. suspend는 event loop thread를 허용한다. → 다른 것들을 계속하기 위해
- (확인 필요)코루틴 안에서 다른 코루틴을 실행할 때는 await을 이용합니다.
import asyncio
async def add(a, b):
print('add: {0} + {1}'.format(a, b))
await asyncio.sleep(1.0)
return a + b
async def print_add(a, b):
result = await add(a, b)
print('print_add: {0} + {1} = {2}'.format(a, b, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_add(1, 2))
loop.close()
- loop = asyncio.get_event_loop()
- 현재의 이벤트 루프를 기져옵니다.
- 코루틴 또는 콜백에서 호출될 때, 이 함수는 항상 실행 중인 이벤트 루프를 반환합니다.
- 이 함수는 다소 복잡한 동작을 하므로, coroutine과 callback에서 get_event_loop 보다 get_running_loop 함수를 사용하는 것이 좋습니다.
- loop.run_until_complete(future)
- future 는 코루틴 객체 혹은 future 객체
- future(Future의 인스턴스)가 완료할 때까지 이벤트 루프를 실행합니다.
- 코루틴이 이벤트루프에서 실행되도록 예약하고, 해당 코루틴이 끝날 때까지 기다립니다.
- 인자가 코루틴 객체면, asyncio.Task로 실행되도록 묵시적으로 예약 됩니다.
- 퓨처의 결과를 반환하거나 퓨처의 예외를 일으킵니다.
- loop.close()
- 이벤트 루프를 닫습니다.
- 이 함수를 호출할 때 루프는 반드시 실행 중이지 않아야 합니다. 계류 중인 모든 콜백을 버립니다.
- 이 메서드는 모든 큐를 비우고 실행기를 종료하지만, 실행기가 완료할 때까지 기다리지 않습니다.
- 이 메서드는 되돌릴 수 없습니다. 이벤트 루프가 닫힌 후에 다른 메서드를 호출해서는 안 됩니다.
기초 개념
- single process / single thread
- 활성화된 코루틴은 종료될 때까지 1KB 미만의 메모리를 사용한다.
- 스레드와 마찬가지로, 코루틴도 환경으로부터 입력을 소비하고 결과를 출력할 수 있는 독립적인 함수다.
- 코루틴은 매 await 식에서 일시 중단되고, 일시 중단된 대기 가능성이 해결된 다음에, async 함수로부터 실행을 재개한다는 차이점이 있다.
- 외부 환경에 대한 명령(예: I/O)와, 원하는 명령을 수행하는 방법을 구현하는 것(예: 이벤트 루프)를 코드에서 분리해준다.
- asyncio 내장 모듈을 사용하면, thread와 블로킹 I/O를 사용하는 기존 코드를 → 코루틴과 비동기 I/O를 사용하는 코드로 쉽게 포팅할 수 있다.
Awaitable: coroutine , Future , Task
- awaitable
- await을 할 수 있는 객체
- coroutines, Tasks, Futures가 해당
- coroutines
- 코루틴 인스턴스를 await 호출했을 때, 비로소 수행된다.
- 코루틴은 동시성은 제공하지만, 병렬성은 제공하지 않습니다.
- asyncio.Future
- 미래에 할 일을 표현하는 클래스인데 할 일을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.
- Future는 coroutine을 예외 처리들을 위해 감싼 것
- asyncio.Task
- Future을 상속해서 만든 클래스
- asyncio.Future의 기능 + 실행할 코루틴의 객체를 포함하고 있습니다.
- 수행할 코루틴을 이벤트 루프에 전달합니다.
- 코루틴이 future을 기다리면, task는 코루틴의 수행을 중단하고 future과 완료되기를 기다립니다.
- future이 완료되면 코루틴의 수행이 재개됩니다.
- 태스크는 코루틴의 실행을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.
- coroutine과 차이점
- 태스크를 만든 시점에 코루틴을 수행하기 시작합니다.
- (논란) 태스크는 병렬성도 제공하나?
- Future와의 차이점: Task는 Futures에 event loop와 같이 연계한 것이라고 보면 된다.
future
async def set_after(fut, delay, value):
await asyncio.sleep(delay)
fut.set_result(value)
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
loop.create_task(
set_after(fut, 1, '... world'))
print('hello ...')
print(await fut)
asyncio.run(main())
future = asyncio.Future(*, loop=None)
- Future는 비동기 연산의 최종 결과를 나타냅니다.
- 스레드 안전하지 않습니다.
- awaitable object 이다.
- 코루틴은 그들이 "result나 예외를 내거나, 혹은 취소가 될 때"까지 Future objects를 기다릴 수 있습니다.
- 간단한 규칙은 사용자가 만나는 API에서 Future 객체를 절대 노출하지 않는 것이며, Future 객체를 만드는 권장 방법은 loop.create_future()를 호출하는 것입니다.
future.result()
- Future의 결과를 반환합니다.
- Future가 완료(done)했고 set_result() 메서드로 결과가 설정되었으면, 결괏값이 반환됩니다.
- Future가 완료(done)했고 set_exception() 메서드로 예외가 설정되었으면, 이 메서드는 예외를 발생시킵니다.
- Future가 취소(cancelled)되었으면, 이 메서드는 CancelledError 예외를 발생시킵니다.
- Future의 결과를 아직 사용할 수 없으면, 이 메서드는 InvalidStateError 예외를 발생시킵니다.
future.set_result(result)
- Future를 완료(done)로 표시하고, 그 결과를 설정합니다.
- Future가 이미 완료(done)했으면, InvalidStateError 에러를 발생시킵니다.
future.set_exception(exception)
- Future를 완료(done)로 표시하고, 예외를 설정합니다.
- Future가 이미 완료(done)했으면, InvalidStateError 에러를 발생시킵니다.
future.done()
- Future가 완료(done)했으면 True를 반환합니다.
- Future는 취소(cancelled)되었거나 set_result() 나 set_exception() 호출로 결과나 예외가 설정되면 완료(done)됩니다.
future.cancelled()
- Future가 최소(cancelled)되었으면, True를 반환합니다.
- 이 메서드는 대개 결과나 예외를 설정하기 전에 Future가 취소(cancelled)되었는지 확인하는 데 사용됩니다:
if not fut.cancelled():
fut.set_result(42)
future.add_done_callback(callback, *, context=None)
- Future가 완료(done)될 때 실행할 콜백을 추가합니다.
- callback는 유일한 인자인 Future 객체로 호출됩니다.
fut.add_done_callback(
functools.partial(print, "Future:"))
future.cancel(msg=None)
- Future를 취소하고 콜백을 예약합니다.
- Future가 이미 완료(done)했거나 취소(cancelled)되었으면, False를 반환합니다.
- 그렇지 않으면 Future의 상태를 취소(cancelled)로 변경하고, 콜백을 예약한 다음 True를 반환합니다.
future.exception()
- 이 Future에 설정된 예외를 반환합니다.
- Future가 완료(done)했을 때만 예외(또는 예외가 설정되지 않았으면 None)가 반환됩니다.
- Future가 취소(cancelled)되었으면, 이 메서드는 CancelledError 예외를 발생시킵니다.
- Future가 아직 완료(done)하지 않았으면, 이 메서드는 InvalidStateError 예외를 발생시킵니다.
future.get_loop()
- Future 객체가 연결된 이벤트 루프를 반환합니다.
Task
async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
task = asyncio.Task(coro, *, loop=None, name=None)
- 파이썬 코루틴을 실행하는 퓨처류 객체입니다. 스레드 안전하지 않습니다.
- 태스크는 이벤트 루프에서 코루틴을 실행하는 데 사용됩니다.
- 만약 코루틴이 Future를 기다리고 있다면, 태스크는 코루틴의 실행을 일시 중지하고 Future의 완료를 기다립니다.
- 퓨처가 완료되면, 감싸진 코루틴의 실행이 다시 시작됩니다.
- 이벤트 루프는 협업 스케줄링을 사용합니다:
- 이벤트 루프는 한 번에 하나의 Task를 실행합니다.
- Task가 Future의 완료를 기다리는 동안, 이벤트 루프는 다른 태스크, 콜백을 실행하거나 IO 연산을 수행합니다.
- 테스크를 만들려면 고수준 asyncio.create_task() 함수를 사용하거나, 저수준 loop.create_task() 나 ensure_future() 함수를 사용하십시오.
- 태스크의 인스턴스를 직접 만드는 것은 권장되지 않습니다.
- 실행 중인 Task를 취소하려면 cancel() 메서드를 사용하십시오.
- 이를 호출하면 태스크가 감싼 코루틴으로 CancelledError 예외를 던집니다.
- 코루틴이 취소 중에 Future 객체를 기다리고 있으면, Future 객체가 취소됩니다.
- cancelled()는 태스크가 취소되었는지 확인하는 데 사용할 수 있습니다.
- 이 메서드는 감싼 코루틴이 CancelledError 예외를 억제하지 않고 실제로 취소되었으면 True를 반환합니다.
- asyncio.Task는 Future.set_result()와 Future.set_exception()을 제외한 모든 API를 Future에서 상속받습니다.
task = asyncio.create_task(coro, *, name=None, context=None)
- coro 코루틴을 Task로 감싸고 실행을 예약합니다(스케줄링 합니다). Task 객체를 반환합니다.
- 큐에 넣은 후 이벤트 루프에서 실행한다.
- 팬 아웃 접근 방법
- name이 None이 아니면, Task.set_name()을 사용하여 태스크의 이름으로 설정됩니다.
- get_running_loop()에 의해 반환된 루프에서 태스크가 실행되고, 현재 스레드에 실행 중인 루프가 없으면 RuntimeError가 발생합니다.
다중 작업을 동시에 실행시키는 방법?
async def simulate(grid):
next_grid = Grid(grid.height, grid.width)
coroutine_instances = []
for y in range(grid.height):
for x in range(grid.width):
task = step_cell(
y, x, grid.get, next_grid.set)
coroutine_instances.append(task)
await asyncio.gather(*coroutine_instances)
return next_grid
grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
for i in range(5):
columns.append(str(grid))
grid = asyncio.run(simulate(grid))
print(columns)
- awaitable asyncio.gather(*aws, return_exceptions=False)
- aws 시퀀스에 있는 어웨이터블 객체를 동시에 실행합니다.
- aws에 있는 어웨이터블이 코루틴이면 자동으로 태스크로 예약됩니다.
- 모든 어웨이터블이 성공적으로 완료되면, 결과는 반환된 값들이 합쳐진 리스트입니다.
- await을 앞에 붙이면, 성공될 때까지 기다립니다. fan-in / join의 역할을 합니다.
- 결괏값의 순서는 aws에 있는 어웨이터블의 순서와 일치합니다.
- return_exceptions가 False(기본값)면, 첫 번째 발생한 예외가 gather()를 기다리는 태스크로 즉시 전파됩니다.
- aws 시퀀스의 다른 어웨이터블은 취소되지 않고 계속 실행됩니다.
- return_exceptions가 True면, 예외는 성공적인 결과처럼 처리되고, 결과 리스트에 집계됩니다.
- gather()가 취소되면, 모든 제출된 (아직 완료되지 않은) 어웨이터블도 취소됩니다.
- aws 시퀀스의 Task나 Future가 취소되면, 그것이 CancelledError를 일으킨 것처럼 처리됩니다 – 이때 gather() 호출은 취소되지 않습니다.
- 이것은 제출된 태스크/퓨처 하나를 취소하는 것이 다른 태스크/퓨처를 취소하게 되는 것을 막기 위한 것입니다.