Asyncio를 활용한 Simple Tensorflow Inference 비동기 서버 구축

jh.cin·2020년 10월 18일
  1. 클라이언트가 100차원 벡터의 입력 데이터를 서버에 보내고, 서버는 데이터를 수신받고 모델에 입력하여 에측 결과를 다시 클라이언트에게 보내는 예제
  2. 예측 결과 값의 범위는 0~21
  3. 비동기 방식으로 데이터 송/수신 처리
import asyncio
import requests
import os
from socket import *

from inference import inference
from timer import Timer
import numpy as np

#현재는 Tensorflow Inference 함수(Tensorflow, Pytorch 등 인퍼런스 양식에 맞춰서 활용 가능)
def infer(predictor,x):
	train_timer=Timer()
	train_timer.tic()
	result=predictor.infer(x)
	train_timer.toc()
	print("result:%d/time:%.9f"%(result,train_timer.total_time))
	return result


class Server:
  
	def __init__(self,predictor):
		self.loop = asyncio.get_event_loop()
		self.clients = []
		self.predictor=predictor
		self.delimiter='\n'

	#서버 실행
	def run(self):
		self.loop.create_task(self.echo_server(('', 9000)))
		print("serving on\n")
		self.loop.run_forever()
    
	#서버 종료
	def connection_lost(self,client,addr):
		self.clients.remove(client)
		client.close()
		print('disconnection from {}'.format(str(addr)))
        
	#잔여 수신 데이터 처리  
	async def receive_check(self,msg,client):
		if msg[-1] != self.delimiter:
			total_msg=''
			total_msg+=msg
			while(True):
				_msg = await self.loop.sock_recv(client, 16384)
				if _msg:
					_msg = _msg.decode()
					total_msg+=_msg
					if total_msg[-1] == self.delimiter:
						return total_msg
		else:
			return msg
			
			
	#수신한 데이터 적합한 포맷 양식으로 디코딩(100차원의 벡터가 입력 데이터)
	async def decode_data(self,msg):
		data=msg.split(self.delimiter)
		data_list=[]
		for d in data:
			x=[]
			x_str_list=d.split(' ')
			for x_str in x_str_list:
				if x_str!='':
					x.append(float(x_str))
			if len(x)!=0:
				data_list.append(x)
		return data_list
		
	#비동기 서버 생성 / 클라이언트 접속 요청 처리 
	async def echo_server(self, address):
		sock = socket(AF_INET, SOCK_STREAM)
		sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
		sock.bind(address)
		sock.listen(5)
		sock.setblocking(False)
		while True:
			client, addr = await self.loop.sock_accept(sock)
			self.clients.append(client)
			print('Connection from {}'.format(str(addr)))
			self.loop.create_task(self.echo_handler(client,addr))
	
	#클라이언트로 부터 수신 받은 데이터 처리
	async def echo_handler(self, client,addr):
		with client:
			while True:
				try:
					msg = await self.loop.sock_recv(client, 16384)
					if msg:
						msg = msg.decode()						
						msg = await self.receive_check(msg,client)
						data_list= await self.decode_data(msg)
						for x in data_list:
							result = await self.loop.run_in_executor(None,infer,self.predictor,x)
							result = (str(result)+self.delimiter).encode()
							self.loop.sock_sendall(client, result)
						
				except ConnectionResetError:
					self.connection_lost(client,addr)
					break
	
			   
			print('Connection closed.')
			
			
num_label=21
num_dim=100
weights_file='./weight/w'
predictor=inference(num_label,num_dim,weights_file)
server = Server(predictor)
server.run()
profile
그냥 프로그래머

0개의 댓글