TDD를 가능하게 하려면, 코드베이스에서 블로킹 I/O에스레드를 사용하는 부분과 비동기 I/O에 코루틴을 사용하는 부분이 서로 호환되고 공존해야한다.
#로그 파일을 한 출력 스트림으로 병합 후 디버깅 돕는
#파일 핸들의 tell 메서드를 사용 시 현재 읽기 중인 위치가 파일의 길이와 일치하는지 알아내기
# 새로운 데이터가 없으면 예외 발생
class NoNewData(Exception):
pass
def readline(handle):
offset = handle.tell()
handle.seek(0,2)
length = handle.tell()
if length == offset:
raise NoNewData
handle.seek(offset, 0)
return handle.readline()
#while 루프를 사용하여 작업자 스레드
# 콜백 함수로 그 줄을 전달하여 출력 로그에 데이터 기록
#데이터 없을 시 스레드 일시 정지 상태
#입력 파일 핸들이 닫히면 스레드도 종료
import time
def tail_file(handle, interval, write_func):
while not handle.closed:
try:
line = readline(handle)
except NoNewData:
time.sleep(interval)
else:
write_func(line)
#각 스레드의 출력을 한 출력 파일로 모으기
# write 도우미 함수인 Lock인스턴스를 사용해서 출력 스트림에 데이터를 쓰는 순서 직렬화
# 중간에 출동해서 서로 뒤섞이는 일 없게 한다.
from threading import Lock, Thread
def run_threads(handles, interval, output_path):
with open(output_path, 'wb') as output:
lock = Lock()
def write(data):
with lock:
output.write(data)
threads = []
for handle in handles:
args = (handle, interval, write)
thread = Thread(target = tail_file, args=args)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
스래드 기반 구현으로부터 점진적으로 asyncio 와 코루틴 기반을 바꾸기
#run_thread 함수에 대해서 위의 1~3단계 따르기
import asyncio
async def run_tasks_mixed(handles, interval, output_path):
loop = asyncio.get_event_loop()
with open(output_path, 'wb') as output:
async def write_async(data):
output.write(data)
def write(data):
coro = write_async(data)
future = asyncio.run_coroutine_threadsafe(coro, loop) #Lock인스턴스와 writer 도우미 함수 필요 없다
future.result()
tasks = []
for handle in handles:
task = loop.run_in_executor(
None, tail_file, handle, interval, write)
tasks.append(task)
await asyncio.gather(*tasks) #데이터 기록 완료
#호출 계층으로 한 단계 더 들어가서 run_tasks_mixed함수에 4단계 적용하기
# 블로킹 I/O 대신 비동기 코루틴 사용하게 변경
async def tail_async(handle, interval, write_func):
loop = asyncio.get_event_loop()
while not handle.closed:
try:
line = await loop.run_in_executor(None, readline, handle)
except NoNewData:
await asyncio.sleep(interval)
else:
await write_func(line)
하향식과 접근 방법이 비슷하지만 변환 과정에서 호출 계층을 반대 방향으로 옮겨간다는 점이 다르다
프로그램에서 잎 부분에 있는, 포팅하려는 함수의 비동기 코루틴 버전을 새로 만들기
기존 동기 함수를 변경해서 코루틴 버전을 호출하고 실제 동작을 구현하는 이벤트 루프를 실행하라
호출 계층을 한 단계 올려서 다른 코루틴 계층을 만들고, 기존에 동기적 함수를 호출하던 부분을 1단계에서 정의한 코루틴 호출 바꾸기
비동기 부분을 결합하기 위해서 2단계에서 만든 동기적인 래퍼가 더 이상 필요하지 않고 삭제해라
def tail_file(handle, interval, write_func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def write_async(data):
write_func(data)
coro = tail_async(handle, interval, write_async)
loop.run_until_complete(coro)
asyncio 이벤트 루프의 run_in_excutor 메서드(이 메서드에 대해서 await를 사용해 완료를 기다릴 수 있다)를 사용하면 코루틴이 ThreadPoolExecutor 스레드 풀을 사용해서 동기적인 함수 호출하고, 이 기능을 활용하면 코드를 하향식으로 asyncio로 마이크레이션할 수 있다.
asyncio 이벤트 루프의 run_until_complete 메서드를 사용하면 동기적인 코드가 코루틴을 호출하고 완료를 기다릴 수 있다. asyncio.run_coroutine_threadsafe도 같은 기능을 제공하지만 스레드 경계에서도 안전하게 작동한다. 이 두 메서드를 활용하면 코드를 상향식으로 asyncio로 마이그레이션할 때 도움이 된다.