OpenAPI
"개방된 API", 누구나 사용될 수 있도록 API의 엔드포인트가 개방
OpenAPI / OpenAPI Specification (OAS)
RESTful API를 기 정의된 규칙에 API spec을 json이나 yaml로 표현하는 방식
RESTful API 디자인에 대항 정의 표준
예전에는 Swagger 2.0와 같은 이름으로 불렸다가, 3.0 버전부터 OpenAPI 3.0 Specification으로 지칭
OpenAPI: 이전에 Swagger Specification으로 알려진 Specification 자체
Swagger: OpenAPI를 Implement하기 위한 도구
FastAPI는 API를 정의하기 위해 OpenAPI 표준을 사용하여 모든 API로 "스키마"를 생성
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
경로 매개변수
@app.get("/items/{item_id}")
async def read_item(item_id):
@app.get("/items/{item_id}")
async def read_item(item_id: int):
쿼리 매개변수
http://127.0.0.1:8000/items/?skip=0&limit=10
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
return fake_items_db[skip : skip + limit]
요청 본문
from pydantic import BaseModel
class Item(BaseModel):
name: str
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
@app.pst("/items/")
async def create_item(item: Item):
return item
기본설정은 위와 같이 구성하고 VPC 설정과 보안그룹 설정을 해주었다.
MySQl Workbench를 이용하여 엔드포인트를 통해 RDS에 연결하였다.
SHOW DATABASES;
CREATE DATABASE my_msa_db;
USE my_msa_db;
CREATE TABLE tb _product ( product_ id INT PRIMARY KEY AUTO_ INCREMENT, product_img
VARCHAR(50), product_ name VARCHAR(50), product_ desc VARCHAR(50), price INT, delivery_fee
INT, uploaded DATETIME, seller INT);
종속성 설치
pip install sqlalchemy
pip install pymysql
Requirement.txt 업데이트
pydantic==1.10.4
sqlalchemy==1.4.46
pymysql==1.0.2
Docker 빌드
docker build -t order-app --platform linux/amd64 .
app 디렉토리에 database.py 파일 생성 후, 아래 코드 작성
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative base
from salalchemy.orm import sessionmaker
DATABASE_URL = 'mysq|+pymysq|://<db-user>:<db-password>@<db-cluster-endpoint>:3306/my_msa_db'
engine = create engine(
DATABASE URL
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative base()
➡️ 'engine = create_engine(DATABASE_URL)': 데이터베이스 연결 URL을 사용하여
SQLAlchemy 엔진을 생성한다.
이 엔진은 데이터베이스와의 연결을 나타내며 모든 데이터베이스 작업을 처리한다.
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine):
sessionmaker 클래스를 사용하여 세션 생성기를 만든다.
이 세션 생성기는 데이터베이스와의 세션을 생성할 때 사용된다.
autocommit 및 autoflush 매개 변수는 세션 동작을 제어하는 데 사용되고, bind 매개 변수는
이 세션을 어떤 엔진과 연결할 것인지 지정한다. 여기서는 위에서 생성한 engine을 사용한다.
Base = declarative_base(): SQLAlchemy의 declarative_base 클래스를 사용하여
데이터베이스 모델을 정의할 기본 클래스를 생성한다.
이 클래스를 상속하여 데이터베이스 테이블을 정의할 수 있습니다.
from sqlalchemy import Column, TEXT, INT, DATETIME
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TbProduct(Base):
__tablename__ = "tb_product"
product_id = Column(INT, nullable=False, autoincrement=True, primary_key=True)
product_img = Column(TEXT, nullable=True)
product_name = Column(TEXT, nullable=True)
product_desc = Column(TEXT, nullable=True)
price = Column(INT, nullable=True)
delivery_fee = Column(INT, nullable=True)
uploaded = Column(DATETIME, nullable=True)
seller = Column(INT, nullable=True)
from typing import Optional
from pydantic import BaseModel
from datetime import datetime
class Product(BaseModel):
product_id : int
product_img : str
product_name : str
product_desc : Optional[str] = None
price : int
delivery_fee : int
uploaded : datetime
seller : int
class Config:
orm_mode = True
from sqlalchemy.orm import Session
from app.models.tb_product import TbProduct
def read_products(db: Session):
return db.query(TbProduct).all()
import uuid
from datetime import datetime
from sqlalchemy.orm import Session
from app.models.tb_order import TbOrder
from app.schemas.order import OrderCreate
def create_order(order: OrderCreate, db):
order.order_id = str(uuid.uuid1())
created = datetime.now()
order.created = created.strftime("%Y-%m-%d %H:%M:%s")
db_order = TbOrder(**order.dict())
db.add(db_order)
db.commit()
db.refresh(db_order)
return db_order
from sqlalchemy import Column, TEXT, INT, DATETIME
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TbOrder(Base):
__tablename__ = "tb_order"
order_id = Column(TEXT, nullable=False, primary_key=True)
product_id = Column(INT, nullable=False)
user_id = Column(INT, nullable=False)
created = Column(DATETIME, nullable=True)%
import uuid
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class Order(BaseModel):
order_id : Optional[str] = str(uuid.uuid1())
product_id : int
user_id : int
created : Optional[str] = datetime.now().strftime("%Y-%m-%d %H:%M:%s")
class Config:
orm_mode = True
class OrderCreate(Order):
pass
MQ란?: Amazon MQ
브로커 엔진은 RabbitMQ로 생성하였다.
배포 모드는 단일 인스턴스 브로커로 설정하였고
위와 같이 구성하였다.
AMQP 프로토콜을 사용해서 Message Queue와 통신할 계획이다.
AMQP: 클라이언트가 서버에 요청을 보낼 수 있고 서버가 클라이언트에 요청을 보낼 수 있는
양방향 RPC 프로토콜
또, RabbitMQ를 연동하기위해 Pika 라이브러리를 사용할 것이다.
Python RPC 클라이언트
Python에서 AMQP 0-9-1 프로토콜을 사용하게 해주는 라이브러리
Pika는 각 비동기 연결 어댑터에서 IO루프를 구현하거나 확장
pip install pika
pika==1.3.1
docker build -t order-app --platform linux/amd64 .
mkdir conn
import pika
import json
from app.schemas.order import Order, OrderCreate
from fastapi.encoders import jsonable_encoder
rabbitmq_user = '<user>'
rabbitmq_password = '<password>'
rabbitmq_broker_id = '<broker-id>'
region = 'ap-northeast-2'
MQ_URL = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
params = pika.URLParameters(MQ_URL)
class PikaClient:
def __init__(self):
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
def is_open(self):
return self.channel.is_open
➡️ 'user', 'password', 'broker-id'를 입력한다.
from .conn.pika_client import PikaClient
...
mq_client = PikaClient()
...
@app.get("/is_open_channel")
def is_open_channel():
return mq_client.is_open()
주문 페이지에서 REST Call을 통해서 주문이 발생하게 되면 order-app(back-end)에서는
pika client를 통해서 Amazon MQ (Rabbit MQ)에 메시지를 동기적으로 전달.
그러면 관리자 페이지에서 주문 내역을 실시간으로 업데이트 하기 위해 WebSocket을 통해서
Front-end / Backe-end 간 양방향 통신을 지원.
def pub_order(self, orderCreare):
queue = 'orders'
self.channel.queue_declare(queue)
messgage = json.dumps(jsonable_encoder(orderCreare))
self.channel.basic_publish(
exchange='',
routing_key=queue,
body=messgage
)
return messgage
➡️ 채널을 통해 연결할 Queue를 선언, Publisher 함수를 통해서 지정한 큐에 주문 내역을
Json String으로 인코딩해서 이벤트 발행.
main.py
@app.post("/order")
def create_order(order: order.OrderCreate, db: Session = Depends(get_db)):
order_crud.create_order(order, db)
order.order_id = str(uuid.uuid1())
order.created = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
new_order = order_crud.get_order(order, db)
return mq_client.pub_order(new_order)
pip install asyncio
pip install aio-pika
pip install websockets
asyncio==3.4.3
aio-pika==8.3.0
websockets==10.4
docker build -t admin-app
@app.websocket("/admin/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connection = await connect(MQ_URL)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue('orders')
async def on_message(message: IncomingMessage):
async with message.process():
await websocket.send_text(f"{message.body}")
await queue.consume(on_message)
while True:
await queue.consume(on_message)
await asyncio.sleep(1)
➡️ Fast API는 다음과 같은 단순한 형태로 WebSocket을 지원한다.
기존의 WebSocket 어노테이션을 사용하고 함수의 인자로 WebSocket을 선언하면
웹소켓을 위한 설정이 끝난다.
aio-pika 라이브러리로 주문 페이지와 동일하게 커넥션을 만들고 채널을 가져온다.
Queue를 선언해주고 on-message 함수를 통해서 메세지가 컨슘될 때 콜백을 처리할 수 있다.
기존에 사용하지 않았던 지시자들이 2개가 사용되는데 async, awiat 이다.
async는 해당함수를 비동기로 처리하고 await는 해당 구문이 처리될 때 까지
해당 함수의 실행을 기다린다.