socket을 통한 가격 정보 조회는 async로 접근해야 하고, 기존 로직은 thread를 사용해 concurrency를 구현했기 때문에 새로운 thread에서 async 함수를 호출하는 형태의 코드를 구성했다.
flutter를 사용할 때 async를 사용해 봤기에 쉽게 구현할 줄 알았지만, 알 수 없는 에러들이 발생했고 해결하는 데 생각보다 많은 시간이 들어서 정리해보려고 한다.
python document에는 사용자가 루프 객체를 사용할 필요가 거의 없다고 나와 있다. 하지만 asyncio를 thread와 같이 사용하려면 event loop 없이는 해결이 불가능하다.
coroutine을 asyncio.run()
의 인자로 넘기면 asyncio
가 새 loop를 생성해 그 루프에서 loop.run_until_complete
를 이용해 coroutine들을 실행한다. loop.run_until_complete
를 실행한 라인은 완료될 때까지 블록되고, 루프에 등록된 객체들을 실행시킨다. 이미 실행중인 event loop안에서는 새로운 event loop를 실행할 수 없다. call_soon
, call_later
, call_at
등으로 루프에 콜백을 등록할 수도 있다.
쓰레드마다 하나의 event loop가 있을 수 있다. 메인 쓰레드에서는 asyncio.get_event_loop()
를 실행했을 때 event loop가 존재하지 않는다면 새로운 event loop를 생성해 반환하지만, 다른 쓰레드에서는 RuntimeError
를 발생시킨다. 이는 event loop policy에 따라 달라질 수 있다.
여러 쓰레드가 하나의 event loop를 공유할 수도 있는데, 이 경우 asyncio.run_coroutine_threadsafe
를 호출해 해당 루프에 coroutine을 등록할 수 있다.
def start_update(self):
"""
Create thread which updates price info
"""
loop = asyncio.new_event_loop()
bm = BinanceSocketManager(self.async_client, loop)
socket = bm.symbol_ticker_socket(self.coin_name + self.cash_name)
def update():
async def update_async():
while True:
async with socket:
while True:
data = await asyncio.wait_for(socket.recv(), timeout=self.SOCKET_TIMEOUT)
if not data:
print("No data received. Reconnecting..")
break
self.time = datetime.fromtimestamp(data['E'] / 1000)
self.price = float(data['c'])
asyncio.run(update_async())
threading.Thread(target=update, daemon=True).start()
Future ~ attached to a different loop
에러가 발생했다. BinanceSocketManager
는 생성한 loop를 인자로 잘 사용하니 문제가 없고, async_client를 메인 스레드에서 생성할 때 메인 스레드의 loop가 attach되었을 것이라 생각해 self.async_client.loop = loop
를 추가했지만 그대로였다.
문제는 asyncio.run
에 있었다. 이 함수는 새 loop을 생성해 거기서 async 함수들을 실행한다. BinanceSocketManager
에 등록된 loop와 다르기 때문에 발생한 문제였다.
한 github 코드에서 해결책을 찾았다.
loop.run_until_complete(update_async())
loop.close()
위와 같이 수정했다.
수정 후 coroutine 'update_async' was never awaited
오류가 발생했다. 이 경우 await문은 없지만 loop에 등록되었기에 발생할 수 없는 오류다. 구글링을 해 봐도 await를 실제로 안 해서 생긴 오류밖에 나오지 않았다. 한참을 찾고 이것저것 시도해 보다 loop.close()
문을 지우고 나니 문제가 해결됐다. 분명 loop.run_until_complete
문에서 블록되어 loop.close
문은 실행되지 않을 텐데 어떻게 차이가 발생한 건지 모르겠다. 참고했던 코드와 다르게 무한 루프가 있는 게 원인이 아닌가 추측해본다.
def start_update(self):
"""
Create thread which updates price info
"""
def update():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.async_client.loop = loop
bm = BinanceSocketManager(self.async_client, loop)
socket = bm.individual_symbol_ticker_futures_socket(self.coin_name + self.cash_name)
async def update_async():
while True:
async with socket:
while True:
data = await asyncio.wait_for(socket.recv(), timeout=self.SOCKET_TIMEOUT)
if not data:
print("No data received. Reconnecting..")
break
self.time = datetime.fromtimestamp(data['E'] / 1000)
self.price = float(data['c'])
loop.run_until_complete(update_async()) # runs forever
threading.Thread(target=update, daemon=True).start()
python document -
https://docs.python.org/ko/3/library/asyncio-task.html
https://docs.python.org/3/library/asyncio-eventloop.html
asyncio+multithreading (github gist)-
https://gist.github.com/lars-tiede/01e5f5a551f29a5f300e