Asyncio를 활용한 Simple Tensorflow Inference 비동기 서버 구축

jh.cin·2020년 10월 18일
0
  1. 클라이언트가 100차원 벡터의 입력 데이터를 서버에 보내고, 서버는 데이터를 수신받고 모델에 입력하여 에측 결과를 다시 클라이언트에게 보내는 예제
  2. 예측 결과 값의 범위는 0~21
  3. 비동기 방식으로 데이터 송/수신 처리
import asyncio
import requests
import os
from socket import *

from inference import inference
from timer import Timer
import numpy as np

#현재는 Tensorflow Inference 함수(Tensorflow, Pytorch 등 인퍼런스 양식에 맞춰서 활용 가능)
def infer(predictor,x):
	train_timer=Timer()
	train_timer.tic()
	result=predictor.infer(x)
	train_timer.toc()
	print("result:%d/time:%.9f"%(result,train_timer.total_time))
	return result


class Server:
  
	def __init__(self,predictor):
		self.loop = asyncio.get_event_loop()
		self.clients = []
		self.predictor=predictor
		self.delimiter='\n'

	#서버 실행
	def run(self):
		self.loop.create_task(self.echo_server(('', 9000)))
		print("serving on\n")
		self.loop.run_forever()
    
	#서버 종료
	def connection_lost(self,client,addr):
		self.clients.remove(client)
		client.close()
		print('disconnection from {}'.format(str(addr)))
        
	#잔여 수신 데이터 처리  
	async def receive_check(self,msg,client):
		if msg[-1] != self.delimiter:
			total_msg=''
			total_msg+=msg
			while(True):
				_msg = await self.loop.sock_recv(client, 16384)
				if _msg:
					_msg = _msg.decode()
					total_msg+=_msg
					if total_msg[-1] == self.delimiter:
						return total_msg
		else:
			return msg
			
			
	#수신한 데이터 적합한 포맷 양식으로 디코딩(100차원의 벡터가 입력 데이터)
	async def decode_data(self,msg):
		data=msg.split(self.delimiter)
		data_list=[]
		for d in data:
			x=[]
			x_str_list=d.split(' ')
			for x_str in x_str_list:
				if x_str!='':
					x.append(float(x_str))
			if len(x)!=0:
				data_list.append(x)
		return data_list
		
	#비동기 서버 생성 / 클라이언트 접속 요청 처리 
	async def echo_server(self, address):
		sock = socket(AF_INET, SOCK_STREAM)
		sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
		sock.bind(address)
		sock.listen(5)
		sock.setblocking(False)
		while True:
			client, addr = await self.loop.sock_accept(sock)
			self.clients.append(client)
			print('Connection from {}'.format(str(addr)))
			self.loop.create_task(self.echo_handler(client,addr))
	
	#클라이언트로 부터 수신 받은 데이터 처리
	async def echo_handler(self, client,addr):
		with client:
			while True:
				try:
					msg = await self.loop.sock_recv(client, 16384)
					if msg:
						msg = msg.decode()						
						msg = await self.receive_check(msg,client)
						data_list= await self.decode_data(msg)
						for x in data_list:
							result = await self.loop.run_in_executor(None,infer,self.predictor,x)
							result = (str(result)+self.delimiter).encode()
							self.loop.sock_sendall(client, result)
						
				except ConnectionResetError:
					self.connection_lost(client,addr)
					break
	
			   
			print('Connection closed.')
			
			
num_label=21
num_dim=100
weights_file='./weight/w'
predictor=inference(num_label,num_dim,weights_file)
server = Server(predictor)
server.run()
profile
그냥 프로그래머

0개의 댓글