CPU Bound 작업은 주로 CPU의 처리 능력에 의해 성능이 제한되는 작업을 의미한다. 이러한 작업은 CPU가 많은 계산을 수행해야 하며, CPU의 처리 속도가 전체 작업의 성능을 결정한다.
I/O Bound 작업은 주로 I/O 자원(디스크, 네트워크, 키보드, 마우스 등)의 속도에 의해 성능이 제한되는 작업을 의미한다. 이러한 작업은 CPU보다 I/O 장치의 처리 속도가 더 느리기 때문에, I/O 작업이 완료될 때까지 CPU가 대기하는 경우가 많다.
우리가 멀티스레딩을 하는 목적은 작업을 동시에 처리하고 싶어서이다. 우리가 원하는 작업이 CPU Bound라면 애초에 CPU 연산을 활용하는 작업이기 때문에 크게 문제가 되지 않지만 중간중간에 끼어있는 I/O Bound 작업이 존재하게 된다.
문제는 이러한 data의 I/O를 기다리는 과정에서 CPU는 아무런 작업을 하지 않고 대기하고 있다는 점이다.
(I/O Bound 작업의 예로는 DB와의 인터렉션, HTTP 요청 등이 있다)
해결법: CPU에게 쉬지 않고 계속 작업을 부여할 수 있도록 동시성을 적용한 코드를 구현함으로 문제를 해결할 수 있다.
비동기 작업이 완료되면 실행할 콜백 함수들이 저장되는 큐이다. 예를 들어, 네트워크 요청이 완료되면 해당 요청에 대한 콜백이 이 큐에 추가된다.
+) 작업이 완료되기 전까지 프로그램의 흐름을 차단하지 않기 위해, 작업이 완료되었을 때 실행할 코드를 미리 정의해 두어야 하고. 이때 사용되는 것이 콜백 함수.
이벤트 루프는 메인 스레드에서 실행되며, 콜백 큐에서 작업을 꺼내어 실행하는 역할을 한다.
비동기 함수 호출: 비동기 함수를 호출하면, 해당 함수는 즉시 반환되고, 실제 작업은 백그라운드에서 진행. 이때, 함수는 콜백 큐에 작업을 추가한다.
작업 완료: 비동기 작업이 완료되면, 해당 작업에 대한 콜백이 콜백 큐에 추가된다.
이벤트 루프 실행: 이벤트 루프는 주기적으로 콜백 큐를 확인하고, 대기 중인 작업이 있다면 이를 실행한다. 이 과정은 프로그램이 종료될 때까지 계속된다.
Async IO는 코드가 비동기적으로 동작할 수 있도록 해주는 기법이다.
(A가 완료되지 않았음에도 B를 수행시키는 것)
만약 A, B, C 작업이 모두 CPU-bound 작업이라면 이렇게 비동기적으로(혹은 동시에) 실행시키더라도 큰 의미가 없다. 왜냐하면 멀티코어 환경이라면 다를 수도 있지만, 싱글 코어라면 어차피 프로세서 1개가 동시에 할 수 있는 작업은 1개이기 때문이다.
하지만 중간중간에 IO Bound 작업이 섞여있다면, 해당 작업으로 인한 대기시간 동안 다른 작업의 CPU-bound를 처리할 수 있어 CPU는 계속해서 쉬지 않고 일을 할당받을 수 있게 된다.
I/O 요청: 프로그램이 I/O 작업을 요청합니다. 이 요청은 비동기 방식으로 처리됩니다.
작업 수행: I/O 작업이 백그라운드에서 수행되는 동안, 프로그램은 다른 작업을 계속 진행합니다.
완료 통지: I/O 작업이 완료되면, 시스템은 프로그램에 알리거나 콜백 함수를 호출하여 결과를 전달합니다.
결과 처리: 프로그램은 I/O 작업의 결과를 처리합니다.
동시성을 구현하는 방법 중, 멀티프로세싱과는 다르게 하나의 프로세스 내에서 동작하는 멀티스레드와 비동기 IO는 곧잘 서로 비교되곤 한다.
두개의 핵심적인 차이는 전체 작업 과정이 이루어질 동안, 스레드가 계속 살아있냐/필요할때만 살아있냐의 차이이다. 비동기 IO에서는 스레드가 필요할 때만 활성화되기 때문에 멀티프로세싱/스레딩에 비해 메모리나 리소스 측면에서도 더욱 경제적이다. (물론 I/O Bound 한정)
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
asyncio.run(main())
/*
Hello ...
... World!
*/
파이썬에서는 async를 메서드 앞에 붙여주어서 비동기 함수를 선언하는데, 이를 coroutine이라고 부른다.
보통 동기 함수들은 해당 함수를 호출하면 바로 함수가 동기적으로 실행되지만 이러한 비동기 함수들은 실행을 할 경우, 바로 실행이 되지 않고, coroutine이라는 이름의 객체를 반환한다.
do_async() # <coroutine object do_async at 0x~~~>
코루틴 객체가 반환되었다는 것이 해당 함수가 실행이 되었다는 걸 의미하는게 아니다. 이러한 코루틴을 실행시키는 것에는 여러가지 방법이 있는데 대표적으로 "await"가 있다.
async def other_async():
await do_async()
하지만 await 키워드는 반드시 다른 비동기 영역에서만 사용할 수 있다.
Python은 본래에 동기로 동작하는 언어이며, 해당 async 영역 안에서만 비동기로 동작하게 만드는 것이기 때문이다. 보다 정확하게 표현하자면 "이벤트 루프"라고 불리는 여러 코루틴들을 비동기적으로 실행시켜 줄 객체가 활성화되어 있어야 한다.
현재 코드가 실행되는 구간에 이벤트 루프가 활성되어 있지 않다면 비동기와 관련된 문법이나 코루틴을 비동기적으로 실행시킬 수 없다.
코루틴이란 cooperative routine의 줄임말로, 서로 협력하는 루틴이라는 의미이다.
메인 루틴, 코루틴과 같은 작업이 동시에 수행을 한다고 가정할 때 각 작업이 대기시간에 들어갈 때, 재빠르게 문맥을 전환(context switching)하여 다른 작업을 하러 가도록 하여 동시적 실행이 가능하도록 하는 기술이다.
import time
def io_bound(work_name):
time.sleep(1)
print("complete:", work_name)
def main() :
io_bound("work_1")
io_bound("work_2")
io_bound("work_3")
start_time = time.time()
main()
print("실행 시간:", time.time() - start_time)
# 실행 시간 = 3.0XXXXXX
async def io_bound(work_name):
time.sleep(1)
print("complete:", work_name)
async def main() :
io_bound("work_1")
io_bound("work_2")
io_bound("work_3")
이렇게 단순하게 바꿔서 진행하려고 하면 error는 아니지만 에러 메시지가 출력이 된다.
async로 선언된 비동기 함수들은 그대로 실행할 경우, 해당 함수가 실행되는 것이 아닌 해당 함수의 코루틴 객체를 반환한다. 해당 객체는 반환만 되어서는 함수를 정상적으로 실행할 수 없다. -> main 함수를 실행하는 파이썬의 동기 영역이다.
비동기 함수를 실행시키기 위해서는 반드시 이벤트 루프라는 객체가 런타임 내에 존재해야 하는데, asyncio.run()을 통해 해당 행동을 쇼트컷으로 실행시킬 수 있다.
import time, asyncio
async def io_bound(work_name):
time.sleep(1)
print("complete:", work_name)
async def main() :
io_bound("work_1")
io_bound("work_2")
io_bound("work_3")
start_time = time.time()
asyncio.run(main()) // check
print("실행 시간:", time.time() - start_time)
이렇게 run을 넣어주면 제대로 작동하게 될까? 아쉽지만 아니다. 왜냐하면 main은 asyncio.run에 의해 동기적으로 작동하지만 마찬가지로 io_bound에 대해서는 여전히 같은 문제를 가지고 있기 때문이다. 그러면 각 함수마다 run을 해주면 되지 않나? 아쉽지만 이것도 아니다. 왜냐하면 run은 동기환경에서 비동기 함수를 실행시키기 위해 이벤트 루프 객체를 생성하는 행동까지만 가지고 있기 때문에 io_bound는 이미 비동기함수 안에 있어 이벤트 루프 객체가 존재한다는걸 의미한다.
다행히 await 키워드를 이용해 간편하게 비동기적으로 처리해줄 수 있지만 await는 해당 비동기 함수의 결과값이 반환될때까지 기다리게 한다.
하지만 컴퓨터가 IO Bound 작업의 타이밍을 모르기 때문에 문제가 발생한다. 위의 그림처럼 프로세서가 알아서 IO Bound를 감지하고 실행구간을 suspend하고 현재 이벤트 루프에 존재하는 다른 task를 수행하러 가야겠지만(만약 없다면 해당 await를 기다림) time.sleep(1)에서 문제가 발생하는 것이다.
해당 함수는 원래 동기 함수기 때문에 자신의 IO Bound에 의해 병목이 걸려도 await등의 다른 루틴에게 제어권을 넘겨주질 못한다.
따라서 시간을 대기하는 함수도 asyncio.sleep(1)로 수정하도록 한다.
import time, asyncio
async def io_bound(work_name):
await asyncio.sleep(1) // Check !
print("complete:", work_name)
async def main() :
await io_bound("work_1")
await io_bound("work_2")
await io_bound("work_3")
start_time = time.time()
asyncio.run(main())
print("실행 시간:", time.time() - start_time)
하지만 놀랍게도 아직도 3.0XXXX의 시간이 걸린다. 그 이유는 천천히 확인해보면 알 수 있다.
"await"키워드는 해당 함수를 실행시키고 결과가 나올때까지 해당 라인에서 기다리되, 이벤트 루프에 다른 task가 있다면 그걸 처리해라라는 의미이다.
main()함수를 확인해보면 모든 구문에 await가 되어있다. 따라서 이전의 io_bound()의 결과가 나오기 전까지는 다음 io_bound()를 실행시키지 않는다. 즉, 비동기처럼 작동하지만 사실은 동기적 작동과 차이점이 없는 것이다.
위 문제를 해결하기 위해서는 2가지의 방법이 있다.
1. io_bound 1,2,3을 한꺼번에 이벤트 루프에 넣고 기다린다.
2. io_bound 1,2,3을 각각 실행만 시키고 맨 마지막에 한꺼번에 기다린다.
이제부터 자세히 알아보도록 하겠다.
asyncio에서는 복수의 코루틴을 한꺼번에 이벤트 루프에 넣어 실행시키고 한꺼번에 await하는 기능이 존재한다. 바로 gather()과 wait()이다.
awaitable asyncio.gather(*aws, return_exceptions=False)
#aws 시퀀스에 있는 어웨이터블 객체들을 동시에 실행한다.
#aws에 있는 어웨이터블이 자동으로 이벤트 루프에 등록 된다.
#모든 어웨이터블이 성공적으로 완료되면, 각 어웨이터블에서 반환된 값들이 합쳐진 리스트를 반환한다.
*aws란, 비동기 함수를 실행할 때 나오는 코루틴 객체들이다. 이를 원하는 만큼 넣어준다.
async def main() :
await asyncio.gather(
io_bound("work_1"),
io_bound("work_2"),
io_bound("work_3")
)
이렇게 작성하면 작동하는데 걸리는 시간이 1초로 줄어든 것을 확인할 수 있다.
async def main() :
await asyncio.wait([
io_bound("work_1"),
io_bound("work_2"),
io_bound("work_3")
])
wait() 함수도 gather()과 비슷하게 사용가능하다. 하나의 list로 만들어서 보내면 된다.
python에서는 await를 사용하지 않고 코루틴 객체를 실행시킬 수 있는 방법이 존재한다. 바로 해당 코루틴 객체를 Task라는 객체로 등록하는 것이다.
asyncio.create_task()로 등록된 코루틴 객체는 곧 실행되도록 이벤트 루프에 자동으로 예약된다.
해당 함수는 Task라는 객체를 반환하는데 해당 객체를 통해 해당 task에 대한 결과 통보를 탐지하거나 받을 수 있다.
async def main() :
task1 = asyncio.create_task(io_bound("work_1"))
task2 = asyncio.create_task(io_bound("work_2"))
task3 = asyncio.create_task(io_bound("work_3"))
await task1, task2, task3
장고 채널스의 카운트다운을 이용해서 예시를 보여주도록 하겠다.
for currTime in range(15, 0, -1):
if self.confirmed:
return
await self.channel_layer.group_send(
self.room_group_name,
{
"type": "send_to_group",
"message": currTime,
},
)
await asyncio.sleep(1)
이렇게 15초 동안 카운트다운을 하는 로직이 있다고 하면 어떻게 작동하게 될까?
우리가 원하는 것은 매초 시간이 지날때마다 클라이언트에게 남은 시간을 보내는 것이지만, 실질적으로는 group_send와 같은 비동기 함수가 호출되면 바로 실행되지 않고 이벤트 루프의 작업큐에 들어가게 된다. 하지만 sleep()을 하면 그 코루틴의 동작을 멈추는 것이기 때문에 이벤트 루프는 이를 바로 처리하지 못하고 같이 대기하게 된다. 이런 상황에서 반복문을 통해 계속 대기하게 되면, 결국 대기만 하다가 반복문이 끝나서 코루틴이 활성화 되었을때, 한꺼번에 작동하는 것처럼 보이게 된다.
async def send_timer():
for currTime in range(15, 0, -1):
if self.confirmed:
return
await self.channel_layer.group_send(
self.room_group_name,
{
"type": "send_to_group",
"message": currTime,
},
)
await asyncio.sleep(1)
asyncio.create_task(send_timer())
따라서 이렇게 task형식으로 바꿔준다면, await가 되어도 이벤트 루프는 멈추지 않고 다른 비동기 작업을 처리하게 된다. 따라서 반복문을 기다리는 동안에도 group_send작업을 처리하게 되는 것이다.
참고 자료
비동기(asyncio)에 대해 알아보자(1,2,3) - IML
멀티쓰레드와 비동기 IO에 대해
코루틴 설명 공식문서
본 포스팅은 상업적 목적을 포함하지 않습니다.