[FastAPI] 데이터베이스 연결

귤티·2024년 5월 9일
0

FastAPI

목록 보기
6/10
git init
git add database models routes main.py
git commit -m "데이터베이스가 없는 기본 애플리케이션 커밋"
git checkout -b planner-sql
source venv/bin/activate
pip install sqlmodel

SQLModel 구조와 기능

테이블

데이터베이스에 저장된 데이터를 가지고 있는 객체다.

SQLModel을 사용해서 테이블을 생성하려면 테이블 모델 클래스를 먼저 정의해야 한다.
pydantic 모델처럼 테이블을 정의하지만 이번에는 SQLModel의 서브 클래스로 정의한다.
클래스 정의는 table이라는 설정 변수를 갖는다. 이 변수를 통해 해당 클래스가 SQLModel 테이블이라는 것을 인식한다.

모델 클래스 안에 정의된 변수는 따로 지정하지 않으면 기본 필드로 설정된다.
만약 필드의 특성을 지정하고 싶다면 Field() 함수를 사용하면 된다.

class Event(SQLModel, table=True):
	id: Optional[int] = Field(default=None, primary_key=True)
    title: str,
    image: str,
    description: str,
    location: str,
    tags: List[str]

id만 필드로 정의되고 나머지는 컬럼을 정의된다. id 필드는 테이블의 기본키로도 사용된다.

로우

데이터베이스 테이블로 전달된 데이터는 특정 컬럼 아래에 있는 로우에 저장된다.
로우에 데이터를 추가하고 저장하려면 테이블의 인스턴스를 만든 후 인스턴스의 변수에 원하는 데이터를 할당하면 된다. 예를 들어 하나의 이벤트 데이터를 이벤트 테이블에 추가하려면 다음과 같이 이벤트 모델의 인스턴스를 만들어야 한다.

new_event = Event(title="Book Launch",
				  image="src/fastapi.png",
                  description="The book launch event will be held at Packt HQ, Packt city",
                  location="Google Meet"
                  tags=["packt", "book"]

그 다음 Session 클래스를 사용해서 데이터베이스 트랜잭션을 만든다.

with Session(engine) as session:
	session.add(new_event)
    session.commit()

세션

세션 객체는 코드와 데이터베이스 사이에서 이루어지는 처리를 관리하며 주로 특정 처리를 데이터베이스에 적용하기 위해 사용된다.

Session 클래스는 SQL 엔진의 인스턴스를 인수로 사용한다.

Session 클래스의 메소드:

  • add(): 처리 대기 중인 데이터베이스 객체를 메모리에 추가.
  • commit(): 현재 세션에 있는 트랜잭션을 모두 정리
  • get(): 데이터베이스에서 단일 로우를 추출한다. 모델과 문서 ID를 인수로 사용

데이터베이스 생성

SQLModel에서는 SQLAlchemy 엔진을 사용해서 데이터베이스를 연결한다.
SQLAlchemy 엔진은 create_engine() 메소드를 사용해서 만들며 SQLModel 라이브러리에서 import한다.

create_engine() 메소드는 데이터베이스 URL을 인수로 사용한다. sqlite:///database.db 또는 sqlite:///database.sqlite와 같은 형식이다.
create_engine()은 echo를 선택적 인수로 지정할 수 있다. True로 설정하면 실행된 SQL 명령을 출력한다.

create_engine() 메소드만으로는 데이터베이스 파일을 만들 수 없다. SQLModel.metadata.crreate_all(engine) 메소드를 사용해서 create_engine() 메소드의 인스턴스를 호출해야 한다.

database_file = "database.db"
engine = create_engine(database_file, echo=True)
SQLModel.metadata.create_all(engine)

create_all() 메소드는 데이터베이스 뿐만 아니라 테이블도 생성한다.
중요한 점: 데이터베이스 연결 파일(connection.py)에서 테이블 파일을 import 해야 한다는 것이다.

database 폴더에 connection.py 파일 생성 - 이 파일에 데이터베이스 연결을 위한 데이터를 설정

애플리케이션 데이터베이스 연동
1. models/events.py에 Event 모델 클래스 변경:

from typing import List, Optional
from sqlmodel import SQLModel, JSON, Field, Column


class Event(SQLModel, tabe=True):
    id: int = Field(default=None, primary_key=True)
    title: str
    image: str
    description: str
    tags: List[str] = Field(sa_column=Column(JSON))
    location: str

    class Config:
        arbitrary_types_allowed = True
        schema_extra = {
            "example": {
                "title": "FastAPI Book Launch",
                "image": "https://linktomyimage.com/image.png",
                "description": "We will be discussing the contents of the FastAPI book in this event. Ensure to come with your own copy to win gifts!",
                "tags": ["python", "fastapi", "book", "launch"],
                "location": "Google Meet"
            }
        }
  1. UPDATE 처리의 바디 유형으로 사용할 SQLModel 클래스 추가:
class EventUpdate(SQLModel):
    title: Optional[str]
    image: Optional[str]
    description: Optional[str]
    tags: Optional[List[str]]
    location: Optional[str]

    class Config:
        schema_extra = {
            "example": {
                "title": "FastAPI Book Launch",
                "image": "https://linktomyimage.com/image.png",
                "description": "We will be discussing the contents of the FastAPI book in this event. Ensure to come with your own copy to win gifts!",
                "tags": ["python", "fastapi", "book", "launch"],
                "location": "Google Meet"
            }
        }
  1. connection.py에 데이터베이스 및 테이블 생성을 위한 설정 작성:
from sqlmodel import SQLModel, Session, create_engine
from models.events import Event


database_file = "planner.db"
database_connection_string = f"sqlite:///{database_file}"
connect_args = {"check_same_thread": False}
engine_url = create_engine(database_connection_string, echo=True, connect_args=connect_args)

def conn():
    SQLModel.metadata.create_all(engine_url)

def get_session():
    with Session(engine_url) as session:
        yield session
        

데이터베이스 파일의 위치(없는 경우 생성된다.), 연결 문자열, 생성된 SQL 데이터베이스의 인스턴스를 변수에 저장한다.
conn() 함수는 SQLModel을 사용해서 데이터베이스와 테이블을 생성하고 get_session()을 사용해서 데이터베이스 세션을 애플리케이션 내에서 유지한다.

  1. main.py 변경
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from typing import List
from database.connection import conn

from routes.users import user_router
from routes.events import event_router

import uvicorn

app = FastAPI()

app.include_router(user_router, prefix="/user")
app.include_router(event_router, prefix="/event")

@app.on_event("startup")
def on_startup():
    conn()

@app.get("/")
async def home():
    return RedirectResponse(url="/event/")

if __name__ == "__main__":
    uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)

애플리케이션을 실행하면 데이터베이스가 생성된다. 시작 시 conn() 함수를 호출해서 데이터베이스를 생성하기 때문이다.
데이터베이스 엔진을 만들 때 echo를 True로 설정 - 명령 실행 시 SQL 명령이 화면에 출력된다.

데이터베이스를 사용하도록 CRUD 처리 라우트 변경

이벤트 생성

routes/event.py 변경

from fastapi import APIRouter, Body, HTTPException, status, Depends, Request
from database.connection import get_session
from models.events import Event, EventUpdate

get_session() 함수를 통해 라우트가 세션 객체에 접근할 수 있다.

Depends 클래스는 FastAPI 애플리케이션에서 의존성 주입을 담당한다. 이 클래스는 함수를 인수로 사용하거나 함수 인수를 라우트에 전달할 수 있게 해서 어떤 처리가 실행되는지 필요한 의존성을 확보해준다

신규 이벤트 생성 POST 라우트 변경:

@event_router.post("/new")
async def create_event(new_event: Event,
                       session=Depends(get_session)) -> dict:
    session.add(new_event)
    session.commit()
    session.refresh(new_event)
    return{
        "message": "Event created successfully."
    }

데이터베이스 처리에 필요한 세션 객체가 get_session() 함수에 의존하도록 설정.
함수 내에서는 데이터(이벤트)를 세션에 추가하고 데이터베이스에 등록(커밋)한 후 세션을 업데이트한다.

라우트 변경 확인:

curl -X 'POST' 'http://127.0.0.1:8000/event/new' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "title": "FastAPI Book Launch", "image": "fastapi-book.jpeg", "description": "We will be discussing the contents of the FastAPI book in this event. Ensure to come with your own copy to win gifts!", "tags": ["python", "fastapi", "book", "launch"], "location": "Google Meet" }'

이벤트 조회

GET 라우트 변경:

from sqlmodel import select


@event_router.get("/", response_model=List[Event])
async def retrieve_all_event(session=Depends(get_session)) -> List[Event]:
    statement = select(Event)
    events = session.exec(statement).all()
    return events
    

@event_router.get("/{id}", response_model=Event)
async def retrieve_event(id: int, session=Depends(get_session)) -> Event:
    event = session.get(Event, id)
    if event:
        return event
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="Event with supplied ID does not exist"
    )

테스트

curl -X 'GET' 'http://127.0.0.1:8000/event/' -H 'accept: application/json'
curl -X 'GET' 'http://127.0.0.1:8000/event/1' -H 'accept: application/json'

이벤트 변경

routes/events.py에 UPDATE 라우트 추가:

@event_router.put("/edit/{id}", response_model=Event)
async def update_event(id: int, new_data: EventUpdate, session=Depends(get_session)) -> Event:
    event = session.get(Event, id)
    if event:
        event_data = new_data.dict(exclude_unset=True)
        for key, value in event_data.items():
            setattr(event, key, value)
        session.add(event)
        session.commit()
        session.refresh(event)

        return event
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="Event with supplied ID does not exist"
    )

이벤트를 변경하기 전에 해당 이벤트가 존재하는지를 먼저 확인.
이벤트를 변경한 후에는 변경한 데이터를 반환.

테스트:

curl -X 'PUT' 'http://127.0.0.1:8000/event/edit/1' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "title": "Packts FastAPI book launch 2" }'

이벤트 삭제

events.py 파일에서 기존 delete 라우트 변경:

@event_router.delete("/delete/{id}")
async def delete_event(id: int, session=Depends(get_session)) -> dict:
    event = session.get(Event, id)
    if event:
        session.delete(event)
        session.commit()
        return{
            "message": "Event deleted successfully."
        }
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="Event with supplied ID does not exist"
    )

테스트:

curl -X 'DELETE' 'http://127.0.0.1:8000/event/delete/1' -H 'accept: application/json'

몽고DB 설정

FastAPI와 몽고DB를 연결해주는 도구로는 beanie를 사용
beanie는 비동기 객체 문서 매퍼(ODM)로, 데이터베이스 처리를 담당.

pip install beanie==1.13.1

문서

NoSQL 데이터베이스에서는 데이터 저장을 위해 문서를 사용한다. 문서는 데이터베이스 컬렉션에 데이터가 저장되는 형식으로, Pydantic 모델과 동일한 방식으로 정의된다.
SQL과의 차이점: beanie가 제공하는 Document 클래스를 사용한다는 것이다.

문서 정의 예:

from beanie import Document

class Event(Document):
	name: str
    location: str
    
    class Settings:
    	name = "events"

Settings 서브 클래스는 몽고DB 데이터베이스 내에 설정한 이름으로 컬렉션을 생성한다.

CRUD 처리를 위한 메소드:

  • insert(), create(): 문서 인스턴스에 의해 호출되며 데이터베이스 내에 새로운 레코드를 생성한다. 단일 데이터는 insert_one() 메소드를 사용해 추가하고 여러 개의 데이터는 insert_many() 메소드를 사용해 추가한다.
event1 = Event(name="Packt office launch", location="Hybrid")
event2 = Event(name="Hanbit office launch", location="Hybrid")
await event1.create()
await event2.create()
await Event.insert_many([event1, event2])
  • find(), get(): find() 메소드는 인수로 지정한 문서를 문서 목록에서 찾는다. get() 메소드는 지정한 ID와 일치하는 단일 문서를 반환한다. find_one() 메소드는 지정한 조건과 일치하는 단일 문서를 반환한다.
event = await Event.get("74478287284ff")
# 일치하는 아이템의 리스트 반환
event = await Event.find(Event.location == "Hybrid").to_list()
# 단일 이벤트 반환
event = awiat Event.find_one(Event.location == "Hybrid")
  • save(), update(), upsert(): save() 메소드는 데이터를 신규 문서로 저장할 때 사용, update() 메소드는 기존 문서를 변경할 때 사용, upsert() 메소드는 조건에 부합하는 문서가 없으면 신규로 추가할 때 사용된다.
event = await Event.get("74478287284ff")
update_query = {"$set": {"location": "virtual"}}
await event.update(update_query)

이 코드는 변경하고자 하는 쿼리를 추출한 후 해당 문서의 location 필드를 virtual로 변경한다.

  • delete(): 데이터베이스에서 문서를 삭제한다
event = await Event.get("74478287284ff")
await event.delete()

데이터베이스 초기화

  1. database 폴더에 connection.py 파일 생성
    pydantic의 BaseSettings 부모 클래스를 사용해서 설정 변수를 읽을 수 있다.

  2. connection.py에 코드 추가:

from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from typing import Optional
from pydantic import BaseSettings

class Settings(BaseSettings):
    DATABASE_URL: Optional[str] = None

    async def initialize_database(self):
        client = AsyncIOMotorClient(self.DATABASE_URL)
        await init_beanie(database=client.get_default_database(),
                          document_models=[])
        
        class Config:
            env_file = ".env"

Settings 클래스를 정의해서 데이터베이스 URL을 설정한다. 데이터베이스 URL은 Config 서브 클래스에 정의된 환경 파일(env_file)에서 읽어온다. 마지막으로 initialize_database() 메소드를 정의해서 데이터베이스를 초기화한다.

init_beanie() 메소드는 데이터베이스 클라이언트를 설정한다. SQLModel에서 생성한 몽고 엔진 버전과 문서 모델을 인수로 설정한다.

  1. models 폴더의 모델 파일을 변경:
from beanie import Document
from typing import Optional, List

class Event(Document):
    id: int
    title: str
    image: str
    description: str
    tags: List[str]
    location: str

    class Config:
        schema_extra = {
        "example": {
            "title": "Fast API Book Launch",
            "image": "https://linktomyimage.com/image.png",
            "description": "We will be discussing the contents of the FastAPI book in this event. Ensure to come with your own copy to win gifts!",
            "tags": ["python", "fastapi", "book", "launch"],
            "location": "Google Meet"
        }
    }
    class Settings:
        name = "events"
  1. UPDATE 처리를 위한 pydantic 모델을 동일한 파일에 추가:
class EventUpdate(BaseModel):
    title: Optional[str]
    image: Optional[str]
    description: Optional[str]
    tags: Optional[List[str]]
    location: Optional[str]

    class Config:
        schema_extra = {
        "example": {
            "title": "Fast API Book Launch",
            "image": "https://linktomyimage.com/image.png",
            "description": "We will be discussing the contents of the FastAPI book in this event. Ensure to come with your own copy to win gifts!",
            "tags": ["python", "fastapi", "book", "launch"],
            "location": "Google Meet"
        }
    }
  1. models/users.py 변경:
from pydantic import BaseModel, EmailStr
from beanie import Document, Link
from typing import Optional, List
from models.events import Event

class User(Document):
    email: EmailStr
    password: str
    events: Optional[List[Event]]

    class Settings:
        name = "users"
        
    class Config:
        schema_extra={
            "example": {
                "email": "fastapi@packt.com",
                "password": "strong!!!",
                "events": [],
            }
        }

class UserSignIn(BaseModel):
    email: EmailStr
    password: str

    class Config:
        schema_extra={
            "example":{
                "email": "fastapi@packt.com",
                "password": "strong!!!",
            }
        }
  1. connection.py의 document_models 필드 변경:
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from typing import Optional
from pydantic import BaseSettings
// import
from models.users import User
from models.events import Event

class Settings(BaseSettings):
    DATABASE_URL: Optional[str] = None

    async def initialize_database(self):
        client = AsyncIOMotorClient(self.DATABASE_URL)
        await init_beanie(database=client.get_default_database(),
                          document_models=[Event, User]) // 이 부분
        
        class Config:
            env_file = ".env"
  1. 환경 파일(.env)을 생성한 다음 데이터베이스 URL을 추가:
touch .env                  
echo DATABASE_URL=mongodb://localhost:27017/planner >> .env

CRUD 처리

connection.py 파일에 새로운 Database 클래스 추가
이 클래스는 초기화 시 모델을 인수로 받는다:

class Database:
    def __init__(self, model):
        self.model = model

데이터베이스 초기화 중에 사용되는 모델은 Event 또는 User 문서의 모델이다.

생성 처리

Database 클래스 안에 save() 메소드 추가.
이 메소드는 레코드 하나를 데이터베이스 컬렉션에 추가한다.

async def save(self, document) -> None:
        await document.create()
        return

문서를 인수로 받는 save() 메소드 정의. 문서의 인스턴스를 받아서 데이터베이스 인스턴스에 전달

조회 처리

데이터베이스 컬렉션에서 단일 레코드를 불러오거나 전체 레코드를 불러오는 메소드를 작성:

async def get(self, id: PydanticObjectId) -> Any:
    doc = await self.model.get(id)
    if doc:
        return doc
    return False
    
async def get_all(self) -> List[Any]:
    docs = await self.model.find_all().to_list()
    return docs

변경 처리

기존 레코드 변경 메소드 작성:

async def update(self, id: PydanticObjectId, body: BaseModel) -> Any:
    doc_id = id
    des_body = body.dict() // model_dump()로 변경
    des_body = {k:v for k, v in des_body.items() if v is not None}
    update_query = {"$set": {
            field: value for field, value in
            des_body.items()
    }}

    doc = await self.get(doc_id)
    if not doc:
   		return False
    await doc.update(update_query)
    return doc

update() 메소드는 하나의 ID와 pydantic 스키마(모델)를 인수로 받는다. 스키마에는 클라이언트가 보낸 PUT 요청에 의해 변경된 필드가 저장된다. 변경된 요청 바디는 딕셔너리에 저장된 다음 None 값을 제외하도록 필터링된다. 이 작업이 완료되면 변경 쿼리에 저장되고 beanie의 update() 메소드를 통해 실행된다.

삭제 처리

데이터베이스에서 레코드 삭제 메소드:

async def delete(self, id:PydanticObjectId) -> bool:
    doc = await self.get(id)
    if not doc:
        return False
    await doc.delete()
    return True

해당 레코드가 있는지 확인하고, 있으면 삭제

라우트 변경

routes/events.py

from fastapi import APIRouter, Body, HTTPException, status
from beanie import PydanticObjectId
from database.connection import Database

from models.events import Event, EventUpdate
from typing import List

event_database = Database(Event)

event_router = APIRouter(
    tags=["Events"],
)

events = []

@event_router.get("/", response_model=List[Event])
async def retrieve_all_event() -> List[Event]:
    evnets = await event_database.get_all()
    return events

@event_router.get("/{id}", response_model=Event)
async def retrieve_event(id: PydanticObjectId) -> Event:
    event = await event_database.get(id)
    for event in events:
        if event.id == id:
            return event
        
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="Event with supplied ID does not exist"
    )

@event_router.post("/new")
async def create_event(body: Event = Body(...)) -> dict:
    await event_database.save(body)
    events.append(body)
    return{
        "message": "Event created successfully."
    }

@event_router.delete("/{id}")
async def delete_event(id: int) -> dict:
    event = event_database.delete(id)
    for event in events:
        if event.id == id:
            events.remove(event)
            return{
                "message": "Event deleted successfully."
            }
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="Event with supplied ID does not exist"
    )

@event_router.delete("/")
async def delete_all_event() -> dict:
    events.clear()
    return{
        "message": "Event deleted successfully."
    }

@event_router.put("/{id}", response_model = Event)
async def update_event(id: PydanticObjectId, body: EventUpdate) -> Event:
    updated_event = await event_database.update(id, body)
    if not update_event:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Event with supplied ID does not exist"
        )
    return update_event

routes/users.py

from fastapi import APIRouter, HTTPException, status
from models.users import User, UserSignIn
from database.connection import Database

user_database = Database(User)

user_router = APIRouter(
    tags=["User"],
)

users = {}

@user_router.post("/signup")
async def sign_new_user(user: User) -> dict:
    user_exist = await User.find_one(User.email == user.email)
    if user.email in users:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="User with supplied username exists"
        )
    await user_database.save(user)
    return {
        "message": "User successfully registered!"
    }

@user_router.post("/signin")
async def sign_user_in(user: UserSignIn) -> dict:
    user_exist = await User.find_one(User.email == user.email)
    if user.email not in users:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User does not exist"
        )
    if user_exist.password == user.password:
        return{
            "message": "User signed in successfully."
        }
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid details passed"
    )

main.py

from fastapi import FastAPI
from database.connection import Settings
from routes.users import user_router
from routes.events import event_router
import uvicorn

app = FastAPI()
settings = Settings()


app.include_router(user_router, prefix="/user")
app.include_router(event_router, prefix="/event")

@app.on_event("startup")
async def init_db():
    await settings.initialize_database()

dotenv 라이브러리 설치

pip install pydantic[dotenv]

몽고 DB 데이터베이스가 상주할 폴더를 생성한 후 몽고 DB 인스턴스를 시작

mkdir store
mongod --dbpath store

애플리케이션 실행

python main.py

테스트:

curl -X 'POST' 'http://127.0.0.1:8000/event/new' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "title": "FastAPI Book Launch", "image": "fastapi-book.jpeg", "description": "We will be discussing the contents of the FastAPI book in this event. Ensure to come with your own copy to win gifts!", "tags": ["python", "fastapi", "book", "launch"], "location": "Google Meet" }'
curl -X 'GET' 'http://127.0.0.1:8000/event/' -H 'accept: application/json'

단일 이벤트 조회: 앞선 실행 결과에서 나온 id를 사용

curl -X 'GET' 'http://127.0.0.1:8000/event/{id}' -H 'accept: application/json'
curl -X 'PUT' 'http://127.0.0.1:8000/event/{id}' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "location": "Hybrid" }'
curl -X 'DELETE' 'http://127.0.0.1:8000/event/{id}' -H 'accept: application/json'
curl -X 'POST' 'http://127.0.0.1:8000/user/signup' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "email": "fastapi@packt.com", "password": "Strong!!!", "events": [] }'
curl -X 'POST' 'http://127.0.0.1:8000/user/signin' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "email": "fastapi@packt.com", "password": "strong!!" }'
profile
취준 진입

0개의 댓글

관련 채용 정보