[십자말풀이] 십자말풀이판 데이터베이스 삽입

강태원·2024년 10월 21일
0

십자말풀이 게임

목록 보기
4/6

작업 레포지토리 : https://github.com/fnzksxl/word-puzzle

지난 포스트에서는 십자말풀이 게임판을 만드는 작업까지 진행했습니다.
이번 포스트에서는 생성된 십자말풀이 게임판을 DB에 삽입하고,
이를 검색해보는 작업까지 진행해보도록 하겠습니다.

DB에 게임판 삽입

퍼즐과 정답을 저장할 테이블 구조를 먼저 정의해보겠습니다.

class Puzzle(BaseMin, Base):
    __tablename__ = "puzzle"

    puzzle = Column(JSON, nullable=False)
    name = Column(VARCHAR(50), nullable=False, default=lambda: str(uuid.uuid4()))


class PuzzleAnswer(BaseMin, Base):
    __tablename__ = "puzzleanswer"

    puzzle_id = Column(Integer, ForeignKey("puzzle.id"), nullable=False)
    word_id = Column(Integer, ForeignKey("wordinfo.id"), nullable=False)
    num = Column(Integer, nullable=False)

Puzzle

2차원 리스트로 만들어진 퍼즐은 JSON 형태로 테이블에 넣도록 하겠습니다.
name으로 클라이언트가 식별할 수 있는 컬럼을 하나 추가해줬습니다.
<

Answer

퍼즐에 할당된 정답은 puzzle_id와 word_id를 외래키로 잡아
어떤 게임판의 정답인지, 정답이 어떤 단어인지를 알 수 있게 만들었습니다.
함께 저장되는 num으로 최초에 게임판 생성할 때와 동일한 데이터를 만들 수 있도록 했습니다.

word_id도 외래키로 저장되어있어 조회 시에 word 테이블과의 조인 작업이 불가피합니다. 추후에 데이터가 많이 쌓인다면 반정규화도 고려해볼만한 사항입니다.

DB에 게임판을 삽입하는 로직은 생성 이후에 곧바로 이루어져야 하는 흐름입니다.
따라서, 게임판 생성 서비스에 함수를 추가하겠습니다.

async def insert_map_answer_into_db(self) -> None:
        """
        생성된 맵과 정답을 DB에 삽입한다.
        """
        map_row = Puzzle(puzzle=self.map)
        self.db.add(map_row)
        self.db.flush()

        insert_data = [
            {"puzzle_id": map_row.id, "word_id": desc["id"], "num": desc["num"]}
            for desc in self.desc
        ]

        self.db.bulk_insert_mappings(PuzzleAnswer, insert_data)
        self.db.commit()

puzzle을 먼저 flush해서 puzzle_id를 추출합니다.
이후에 puzzle_id와 self.desc 속의 num, desc로
여러 row를 한 번에 bulk_insert 합니다.

DB에서 게임판 조회

조회 서비스 구현

게임판 조회 서비스 클래스를 생성하고 조회하는 함수를 추가해보겠습니다.

class PuzzleReadService:
    def __init__(self, puzzle_id: int, db: Session = Depends(get_db)):
        self.db = db
        self.puzzle_id = puzzle_id

    async def read_puzzle_from_db_by_id(self) -> Dict:
        """
        데이터베이스에서 퍼즐 ID로 퍼즐과 정답 정보를 읽어와 반환한다.
        Args:
            puzzle_id (int): 반환할 퍼즐 ID
        Returns:
            Dict: 퍼즐, 정답 정보가 담긴 사전 데이터
        """
        puzzle = self.db.query(Puzzle).filter(Puzzle.id == self.puzzle_id).first()
        if puzzle is None:
            raise PuzzleNotExistException()
        answer = (
            self.db.query(PuzzleAnswer.num, WordInfo.pos, WordInfo.desc, WordInfo.word)
            .filter(PuzzleAnswer.puzzle_id == self.puzzle_id)
            .join(WordInfo, PuzzleAnswer.word_id == WordInfo.id)
            .all()
        )

        answer_json = [
            {"num": num, "desc": {"pos": pos, "desc": desc, "word": word}}
            for num, pos, desc, word in answer
        ]

        return {"map": puzzle.puzzle, "desc": answer_json}
  1. puzzle_id을 클라이언트에서 입력받았을 때, 퍼즐만 간단하게 불러옵니다.
    예외) puzzle_id인 puzzle이 존재하지 않을 때.
  2. PuzzleAnswer.puzzle_id가 puzzle_id이면서, PuzzleAnswer.word_id가 WordInfo.id인 PuzzleAnswer 객체들을 모두 불러옵니다.

예외 처리

from fastapi import HTTPException


class PuzzleNotExistException(HTTPException):
    def __init__(self, detail: str = "존재하지 않는 퍼즐입니다."):
        super().__init__(status_code=404, detail=detail)

예외 처리를 담당하는 모듈을 따로 만들어 추가해주었습니다.

다르게 PuzzleAnswer를 찾아오는 방법

SQLAlchemy ORM에서 제공하는 RelationShip을 사용해서 가져올 수 있습니다.

# import relationship on models.py
from sqlalchemy.orm import relationship


class WordInfo(BaseMin, Base):
    __tablename__ = "wordinfo"

    word = Column(VARCHAR(5), nullable=False, index=True)
    desc = Column(VARCHAR(200), nullable=False)
    len = Column(Integer, nullable=False)
    pos = Column(VARCHAR(7), nullable=False)

    puzzle_answers = relationship("PuzzleAnswer", back_populates="word")


class PuzzleAnswer(BaseMin, Base):
    __tablename__ = "puzzleanswer"

    puzzle_id = Column(Integer, ForeignKey("puzzle.id"), nullable=False)
    word_id = Column(Integer, ForeignKey("wordinfo.id"), nullable=False)
    num = Column(Integer, nullable=False)

    word = relationship("WordInfo", back_populates="puzzle_answers")

models.py에서 relationship을 추가해주고,

from sqlalchemy.orm import joinedload

answer = (
    self.db.query(PuzzleAnswer)
    .option(joinedload(PuzzleAnswer.wordinfo))
    .filter(PuzzleAnswer.puzzle_id == self.puzzle_id)
    .all()
)

answer_josn = [
    {
        "num": answer.num,
        "pos": answer.wordinfo.pos,
        "desc": answer.wordinfo.desc,
        "word": answer.wordinfo.word,
    }
    for answer in puzzle_answers
]

위에서 작성한 쿼리문과 반복문을 수정해주었습니다.
joinedload는 eager loading 하도록 해주는 옵션으로
이를 사용해서 불러오지않으면, answer.wordinfo.* 할 때마다
새로운 쿼리를 날리게되므로 N+1문제가 발생할 수 있습니다.

조회 엔드포인트 작성

@router.get("/{puzzle_id}")
async def read_puzzle(
    puzzle_id: int, puzzle_service: PuzzleReadService = Depends(PuzzleReadService)
):
    return await puzzle_service.read_puzzle_from_db_by_id()

요청을 보내보면, 잘 받아오는 것을 알 수 있습니다.

다음번에는 회원가입/로그인 기능에 대해서 포스트해보겠습니다.
읽어주셔서 감사합니다.

profile
가치를 창출하는 개발자! 가 목표입니다

0개의 댓글

관련 채용 정보