Python asyncio

김지환·2022년 8월 8일
0

Asyncio

Python 3.5부터 내장된 비동기 프로그래밍을 위한 라이브러리이다.
해당 라이브러리를 사용하게된 이유는 다량의 request 작업을 해야할 일이 있었는데 이 때 sync 방식으로 request를 하게 되면 너무 많은 시간을 잡아먹게 되어서 asyncio를 도입하게됐다.

급하게 도입을 하고 나서 다시 asyncio에 대해 정리하다보니 불필요하게 중복 사용되는 부분도 개선할 수 있었다.

Asyncio를 이해하기위해 필요한 용어를 정리하고 내가 사용한 asyncio 의 실행흐름을 간단하게 정리해본다.

서브루틴

  • 함수를 호출하는 쪽을 루틴. 호출된 함수를 서브루틴이라고 한다

코루틴

  • 코루틴(Coroutine) 이란 특정 시점에 자신의 실행과 관련된 상태를 어딘가에 저장한 뒤 실행을 중단하고, 나중에 그 상태를 복원하여 실행을 재개할 수 있는 서브 루틴을 의미.
  • Python 은 제너레이터를 이용하여 코루틴을 구현함.
  • 코루틴의 정의는 def 키워드에 async 키워드를 더해주면 된다.
    • async def foo():

await 키워드

  • async 키워드와 마찬가지로 yield from 구문을 조금 더 쉽게 작성할 수 있도록 만든 것이라 보면 됨.
  • 코루틴 객체 뿐만 아니라 __await__() 메소드가 구현된 Awaitable 객체라면 무엇이든지 사용 가능
  • __await__() 메소드를 호출하여 제너레이터 객체를 얻고 이를 통해 해당 제너레이터를 실행하는 방식으로 동작.

퓨처 객체와 테스크 객체

퓨처 객체 ( Future Object )

  • 작업의 실행 상태 ( PENDING, CANCELLED, FINISHED ) 및 결과를 저장.
  • 예외가 발생한 경우에도 FINISHED상태가 된다.
  • 해당 퓨처 객체가 완료 ( Done ) 될 때 호출될 함수를 등록 하는 기능add_done_callback()
  • 작업을 직접 실행 하는 역할을 하지는 않는다.

테스크 객체 ( Task Object )

  • 퓨처를 상속하는 클래스.
  • 작업의 실행을 개시하는 역할도 가지고 있다.
  • 테스크 객체는 생성될 때 코루틴 객체를 넘겨받아 _coro 필드에 저장
  • 태스크 객체를 생성하려면 asyncio.run() or asyncio.create_task() 함수를 호출할 때 인자로 코루틴 객체를 넘겨줘야 함.
  • 테스크 객체의 __step() 메소드가 호출되면 코루틴의 실행이 게시된다.
  • I/O 관련 코루틴을 하는 코드를 만나면 제어를 이벤트 루프에 넘긴다.

이벤트 루프의 실행흐름

python 버전은 3.6 버전을 기반으로 작성하여 asyncio.run() 함수와 asyncio.create_task() 함수를 사용하지 않았다. 추후 상위 버전 python을 쓰게되면 개선할 수 있을 듯!

async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)

async def async_request(url_list):
    """
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(MAX_THREAD)
    conn = TCPConnector(limit=MAX_THREAD)
    # Create client session that will ensure we dont open new connection
    # per each request.
    async with ClientSession(connector=conn, trust_env=True) as session:
		tasks = [asyncio.ensure_future(bound_fetch(sem, DATAPACK_API_CONFIG_LIST["HOST"] + "/" + _url, session)) for _url in url_list]  # Make Task(Future) object
        responses = await asyncio.gather(*tasks)  # Get data all at once
        return responses

loop = asyncio.get_event_loop()
loop.run_until_complete(async_request(endpoint_list))
loop.close()
  1. loop.run_until_complete() 함수에 의해 main_task 가 실행되고, 이로 인해 async_request() 코루틴이 실행된다.
  2. asyncio 에서 제공하는 Semaphore 를 이용하여 생성되는 request 를 제한하도록 한다. ( 서버가 감당 할 수 있을 양으로 컨트롤하기 위해 )
  3. aiohttp 라이브러리의 ClientSession을 활용하여 코루틴에서 session을 사용할 수 있다. async with 를 활용하여 비동기 컨텍스트 메니저를 사용할 수 있다. 아래 코드에서 보듯이 또 다른 코루틴을 만듦을 볼 수 있다.
class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await log('exiting context')
  1. async_request() 코루틴은 asyncio.ensure_future() 함수를 통해 복수의 task객체들을 생성하고 실행을 예약한다.
  2. asyncio.gather() 코루틴은 등록된 task 가 모두 완료될 때까지 이벤트 루프와 제어권을 주고 받으면서 확인한다.
    1. bound_fetch(sem,url,session) 코루틴은 Future 객체를 만들고, Future 객체의 결과 값이 갱신되도록 이벤트 루프에 예약을 건 뒤, Future 객체를 await 한다.
    2. 제어권을 이벤트 루프에게 넘기게 되고 다음 Task가 일련의 과정을 똑같이 반복한다.
    3. asyncio.gather() 는 첫 테스크 등록때 그 테스크가 완료 상태가 될 때까지 await 중이 되는데 완료가 된다면 main_task 가 예약된다.
    4. main_task 재시작시 실행 중단 지점부터 다시 돌게되고 첫 task 이외에도 다른 task 들의 객체의 결과 값을 저장하고 완료되지 않은 다른 task를 await 하게된다. ( 이벤트 루프에게 제어를 넘김 )

좀 더 직관적으로 변경하면 아래와 같다.

import asyncio
import random


async def fetch(url):
    print(f"Fetch: {url} started")
    await asyncio.sleep(random.randint(1, 5))
    print(f"Fetch: {url} ended")
    # time.sleep(2)


async def bound_fetch(url):
    # Getter function with semaphore.
    print(f"Bound fetch {url} started")
    await fetch(url)
    print(f"Bound fetch {url} ended")


async def async_request(url_list):
    tasks = []
    tasks = [asyncio.ensure_future(bound_fetch(_url)) for _url in url_list]  # Make Task(Future) object
    responses = await asyncio.gather(*tasks)  # Get data all at once
    return responses

loop = asyncio.get_event_loop()
loop.run_until_complete(async_request(["https://naver.com", "https://yahoo.co.kr", "https://google.com"]))
loop.close()

순차적으로 제어권을 주고받으면서 작업이 진행되다 긴 시간의 I/O or Sleep 작업이 발생하고 event loop에서 누구에게도 할당할 수 없는 상황이 오게되고 그 때 event loop는 작업이 완료된 future객체를 기다리고 작업이 완료됐다면 가져와서 실행하게 되는 일련의 과정을 거치게 된다.

Reference

https://it-eldorado.tistory.com/159

profile
Developer

0개의 댓글