다른 thread에서 asyncio 사용하기

socket을 통한 가격 정보 조회는 async로 접근해야 하고, 기존 로직은 thread를 사용해 concurrency를 구현했기 때문에 새로운 thread에서 async 함수를 호출하는 형태의 코드를 구성했다.
flutter를 사용할 때 async를 사용해 봤기에 쉽게 구현할 줄 알았지만, 알 수 없는 에러들이 발생했고 해결하는 데 생각보다 많은 시간이 들어서 정리해보려고 한다.

Event loop

python document에는 사용자가 루프 객체를 사용할 필요가 거의 없다고 나와 있다. 하지만 asyncio를 thread와 같이 사용하려면 event loop 없이는 해결이 불가능하다.
coroutine을 asyncio.run()의 인자로 넘기면 asyncio가 새 loop를 생성해 그 루프에서 loop.run_until_complete를 이용해 coroutine들을 실행한다. loop.run_until_complete를 실행한 라인은 완료될 때까지 블록되고, 루프에 등록된 객체들을 실행시킨다. 이미 실행중인 event loop안에서는 새로운 event loop를 실행할 수 없다. call_soon, call_later, call_at등으로 루프에 콜백을 등록할 수도 있다.
쓰레드마다 하나의 event loop가 있을 수 있다. 메인 쓰레드에서는 asyncio.get_event_loop()를 실행했을 때 event loop가 존재하지 않는다면 새로운 event loop를 생성해 반환하지만, 다른 쓰레드에서는 RuntimeError를 발생시킨다. 이는 event loop policy에 따라 달라질 수 있다.
여러 쓰레드가 하나의 event loop를 공유할 수도 있는데, 이 경우 asyncio.run_coroutine_threadsafe를 호출해 해당 루프에 coroutine을 등록할 수 있다.

기존 코드

    def start_update(self):
        """
        Create thread which updates price info
        """
        loop = asyncio.new_event_loop()

        bm = BinanceSocketManager(self.async_client, loop)
        socket = bm.symbol_ticker_socket(self.coin_name + self.cash_name)

        def update():
            async def update_async():
                while True:
                    async with socket:
                        while True:
                            data = await asyncio.wait_for(socket.recv(), timeout=self.SOCKET_TIMEOUT)
                            if not data:
                                print("No data received. Reconnecting..")
                                break
                            self.time = datetime.fromtimestamp(data['E'] / 1000)
                            self.price = float(data['c'])
            asyncio.run(update_async())

        threading.Thread(target=update, daemon=True).start()

Attached to a different loop

Future ~ attached to a different loop 에러가 발생했다. BinanceSocketManager는 생성한 loop를 인자로 잘 사용하니 문제가 없고, async_client를 메인 스레드에서 생성할 때 메인 스레드의 loop가 attach되었을 것이라 생각해 self.async_client.loop = loop를 추가했지만 그대로였다.
문제는 asyncio.run에 있었다. 이 함수는 새 loop을 생성해 거기서 async 함수들을 실행한다. BinanceSocketManager에 등록된 loop와 다르기 때문에 발생한 문제였다.
한 github 코드에서 해결책을 찾았다.

loop.run_until_complete(update_async())
loop.close()

위와 같이 수정했다.

Coroutine was never awaited

수정 후 coroutine 'update_async' was never awaited 오류가 발생했다. 이 경우 await문은 없지만 loop에 등록되었기에 발생할 수 없는 오류다. 구글링을 해 봐도 await를 실제로 안 해서 생긴 오류밖에 나오지 않았다. 한참을 찾고 이것저것 시도해 보다 loop.close()문을 지우고 나니 문제가 해결됐다. 분명 loop.run_until_complete문에서 블록되어 loop.close문은 실행되지 않을 텐데 어떻게 차이가 발생한 건지 모르겠다. 참고했던 코드와 다르게 무한 루프가 있는 게 원인이 아닌가 추측해본다.

수정 후 코드

    def start_update(self):
        """
        Create thread which updates price info
        """
        def update():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

            self.async_client.loop = loop
            bm = BinanceSocketManager(self.async_client, loop)
            socket = bm.individual_symbol_ticker_futures_socket(self.coin_name + self.cash_name)

            async def update_async():
                while True:
                    async with socket:
                        while True:
                            data = await asyncio.wait_for(socket.recv(), timeout=self.SOCKET_TIMEOUT)
                            if not data:
                                print("No data received. Reconnecting..")
                                break
                            self.time = datetime.fromtimestamp(data['E'] / 1000)
                            self.price = float(data['c'])

            loop.run_until_complete(update_async())  # runs forever

        threading.Thread(target=update, daemon=True).start()

Reference

python document -
https://docs.python.org/ko/3/library/asyncio-task.html
https://docs.python.org/3/library/asyncio-eventloop.html
asyncio+multithreading (github gist)-
https://gist.github.com/lars-tiede/01e5f5a551f29a5f300e

0개의 댓글