스레드와_코루틴을_함께_사용

매일 공부(ML)·2022년 8월 15일
0

이어드림

목록 보기
119/146

asyncio로 쉽게 옮겨 갈 수 있도록 스레드와 코루틴을 함께 사용하라

TDD를 가능하게 하려면, 코드베이스에서 블로킹 I/O에스레드를 사용하는 부분과 비동기 I/O에 코루틴을 사용하는 부분이 서로 호환되고 공존해야한다.

#로그 파일을 한 출력 스트림으로 병합 후 디버깅 돕는

#파일 핸들의 tell 메서드를 사용 시 현재 읽기 중인 위치가 파일의 길이와 일치하는지 알아내기

# 새로운 데이터가 없으면 예외  발생

class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0,2)
    length = handle.tell()

    if length == offset:
        raise NoNewData


    handle.seek(offset, 0)
    return handle.readline()
#while 루프를 사용하여 작업자 스레드
# 콜백 함수로 그 줄을 전달하여 출력 로그에 데이터 기록
#데이터 없을 시 스레드 일시 정지 상태
#입력 파일 핸들이 닫히면 스레드도 종료

import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)
#각 스레드의 출력을 한 출력 파일로 모으기
# write 도우미 함수인 Lock인스턴스를 사용해서 출력 스트림에 데이터를 쓰는 순서 직렬화
# 중간에 출동해서 서로 뒤섞이는 일 없게 한다.

from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target = tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

스래드 기반 구현으로부터 점진적으로 asyncio 와 코루틴 기반을 바꾸기

하향식(top-down)

  • main 진입점처럼 코드베이스에서 가장 높은 구성 요솝터 시작하여 개별 함수와 클래스로 내려가면서 작업
  • 진입점부터 차례로 포팅해나가므로, 공통 모듈 포팅이 끝나면 모든 곳에서 코루틴 사용

순서

  1. 최상위 함수가 def 대신 async def를 사용하게 변경
  2. 최상위 함수가 I/O를 호출하는 부분을 asyncio.run_in_executor로 감싸라
  3. run_in_executor 호출이 사용하는 자원이나 콜백이 제대로 동기화 됐는지 확인
    4.호출 계층의 잎 쪽으로 내려가면서 중간에 있는 함수와 메서드를 코루틴으로 변환하며 get_event_loop와 run_in_executor호출 없애려고 시도
#run_thread 함수에 대해서 위의 1~3단계 따르기

import asyncio

async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()
    
    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)
        
        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(coro, loop) #Lock인스턴스와 writer 도우미 함수 필요 없다
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(
                None, tail_file, handle, interval, write)
            tasks.append(task)

        await asyncio.gather(*tasks) #데이터 기록 완료
#호출 계층으로 한 단계 더 들어가서 run_tasks_mixed함수에 4단계 적용하기

# 블로킹 I/O 대신 비동기 코루틴 사용하게 변경

async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)

상향식 접근 방법

하향식과 접근 방법이 비슷하지만 변환 과정에서 호출 계층을 반대 방향으로 옮겨간다는 점이 다르다

  1. 프로그램에서 잎 부분에 있는, 포팅하려는 함수의 비동기 코루틴 버전을 새로 만들기

  2. 기존 동기 함수를 변경해서 코루틴 버전을 호출하고 실제 동작을 구현하는 이벤트 루프를 실행하라

  3. 호출 계층을 한 단계 올려서 다른 코루틴 계층을 만들고, 기존에 동기적 함수를 호출하던 부분을 1단계에서 정의한 코루틴 호출 바꾸기

  4. 비동기 부분을 결합하기 위해서 2단계에서 만든 동기적인 래퍼가 더 이상 필요하지 않고 삭제해라

def tail_file(handle, interval, write_func):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def write_async(data):
        write_func(data)

    coro = tail_async(handle, interval, write_async)
    loop.run_until_complete(coro)

Summary

  • asyncio 이벤트 루프의 run_in_excutor 메서드(이 메서드에 대해서 await를 사용해 완료를 기다릴 수 있다)를 사용하면 코루틴이 ThreadPoolExecutor 스레드 풀을 사용해서 동기적인 함수 호출하고, 이 기능을 활용하면 코드를 하향식으로 asyncio로 마이크레이션할 수 있다.

  • asyncio 이벤트 루프의 run_until_complete 메서드를 사용하면 동기적인 코드가 코루틴을 호출하고 완료를 기다릴 수 있다. asyncio.run_coroutine_threadsafe도 같은 기능을 제공하지만 스레드 경계에서도 안전하게 작동한다. 이 두 메서드를 활용하면 코드를 상향식으로 asyncio로 마이그레이션할 때 도움이 된다.

profile
성장을 도울 아카이빙 블로그

0개의 댓글