SQLAlchemy는 Python의 강력한 ORM(Object-Relational Mapping) 라이브러리입니다.
pip install sqlalchemy
# 데이터베이스 드라이버도 설치
pip install pymysql # MySQL
pip install psycopg2 # PostgreSQL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 데이터베이스 연결
engine = create_engine(
"mysql+pymysql://user:password@localhost:3306/database",
echo=True, # SQL 로그 출력
pool_pre_ping=True # 연결 상태 확인
)
# 세션 팩토리 생성
SessionLocal = sessionmaker(bind=engine)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# 관계 정의
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(String(2000))
user_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)
# 관계 정의
author = relationship("User", back_populates="posts")
# 모든 테이블 생성
Base.metadata.create_all(engine)
def create_user(db: Session, username: str, email: str):
user = User(username=username, email=email)
db.add(user)
db.commit()
db.refresh(user) # DB에서 생성된 ID 등을 다시 로드
return user
# 사용 예시
with SessionLocal() as db:
new_user = create_user(db, "john_doe", "john@example.com")
def get_users(db: Session, skip: int = 0, limit: int = 10):
return db.query(User).offset(skip).limit(limit).all()
def get_user_by_id(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_by_username(db: Session, username: str):
return db.query(User).filter(User.username == username).first()
# 조건부 조회
def get_recent_users(db: Session):
from datetime import datetime, timedelta
week_ago = datetime.utcnow() - timedelta(days=7)
return db.query(User).filter(User.created_at >= week_ago).all()
def update_user_email(db: Session, user_id: int, new_email: str):
user = db.query(User).filter(User.id == user_id).first()
if user:
user.email = new_email
db.commit()
db.refresh(user)
return user
# 대량 업데이트
def update_users_bulk(db: Session):
db.query(User).filter(User.email.like("%@old.com")).update(
{User.email: "updated@new.com"},
synchronize_session=False
)
db.commit()
def delete_user(db: Session, user_id: int):
user = db.query(User).filter(User.id == user_id).first()
if user:
db.delete(user)
db.commit()
return user
# 대량 삭제
def delete_old_users(db: Session):
from datetime import datetime, timedelta
year_ago = datetime.utcnow() - timedelta(days=365)
db.query(User).filter(User.created_at < year_ago).delete()
db.commit()
# JOIN 사용
def get_users_with_posts(db: Session):
return db.query(User).join(Post).all()
# LEFT JOIN
def get_all_users_with_post_count(db: Session):
from sqlalchemy import func
return db.query(
User.username,
func.count(Post.id).label('post_count')
).outerjoin(Post).group_by(User.id).all()
# 관계를 통한 조회
def get_user_posts(db: Session, user_id: int):
user = db.query(User).filter(User.id == user_id).first()
return user.posts if user else []
from sqlalchemy import and_, or_, not_
def complex_user_search(db: Session, username: str = None, email_domain: str = None):
query = db.query(User)
conditions = []
if username:
conditions.append(User.username.like(f"%{username}%"))
if email_domain:
conditions.append(User.email.like(f"%@{email_domain}"))
if conditions:
query = query.filter(and_(*conditions))
return query.all()
# 서브쿼리
def get_users_with_recent_posts(db: Session):
from datetime import datetime, timedelta
week_ago = datetime.utcnow() - timedelta(days=7)
recent_post_users = db.query(Post.user_id).filter(
Post.created_at >= week_ago
).subquery()
return db.query(User).filter(
User.id.in_(recent_post_users)
).all()
def transfer_operation(db: Session):
try:
# 여러 작업을 하나의 트랜잭션으로
user1 = get_user_by_id(db, 1)
user2 = get_user_by_id(db, 2)
# 작업 수행
user1.balance -= 100
user2.balance += 100
db.commit() # 모든 변경사항 커밋
except Exception as e:
db.rollback() # 오류 시 롤백
raise e
def complex_operation(db: Session):
try:
# 첫 번째 작업
user = create_user(db, "test", "test@example.com")
# 세이브포인트 생성
savepoint = db.begin_nested()
try:
# 위험한 작업
risky_operation(db)
savepoint.commit()
except Exception:
# 세이브포인트로 롤백 (메인 트랜잭션은 유지)
savepoint.rollback()
# 최종 커밋
db.commit()
except Exception:
db.rollback()
from sqlalchemy.orm import Session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# FastAPI에서 사용
@app.get("/users/")
def read_users(db: Session = Depends(get_db)):
return get_users(db)
from contextlib import contextmanager
@contextmanager
def get_db_session():
db = SessionLocal()
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
# 사용 예시
with get_db_session() as db:
user = create_user(db, "john", "john@example.com")
# N+1 문제 해결
def get_users_with_posts_optimized(db: Session):
from sqlalchemy.orm import joinedload
return db.query(User).options(joinedload(User.posts)).all()
# 선택적 로딩
def get_user_with_recent_posts(db: Session, user_id: int):
from sqlalchemy.orm import selectinload
return db.query(User).options(
selectinload(User.posts).filter(Post.created_at >= week_ago)
).filter(User.id == user_id).first()
def bulk_create_users(db: Session, users_data: list):
users = [User(**data) for data in users_data]
db.bulk_save_objects(users)
db.commit()
def bulk_update_users(db: Session, updates: list):
db.bulk_update_mappings(User, updates)
db.commit()
# 좋은 예
def good_pattern():
with SessionLocal() as db:
user = get_user_by_id(db, 1)
return user.username
# 나쁜 예 - 세션이 닫히지 않음
def bad_pattern():
db = SessionLocal()
user = get_user_by_id(db, 1)
return user.username # db.close() 누락
def handle_concurrent_update(db: Session, user_id: int):
try:
# 낙관적 락 사용
user = db.query(User).filter(User.id == user_id).with_for_update().first()
user.last_login = datetime.utcnow()
db.commit()
except Exception as e:
db.rollback()
# 재시도 로직 또는 오류 처리
handle_concurrency_error(e)
# 인덱스 활용
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(100), nullable=False, index=True) # 인덱스 추가
username = Column(String(50), unique=True) # 유니크 제약조건
# 쿼리 계획 확인
def analyze_query(db: Session):
query = db.query(User).filter(User.email.like("%@gmail.com"))
print(str(query.statement.compile(compile_kwargs={"literal_binds": True})))