FastApi + dependency_injector + rabbimq

CHOJUNGHO96·2024년 1월 24일

해당프로젝트 github link : https://github.com/CHOJUNGHO96/FastApi-dependency_injector-rabbitmq.git

FastApi + dependency_injector + rabbimq 구조

FastApi-dependency_injector-rabbimq
├── api
   ├── end_point.py
├── background
   ├── receiver.py
├── main.py
├── containers.py
├── pyproject.toml
├── poetry.lock
├── Dockerfile
└── docker-compose.yml



main.py 코드

from fastapi import FastAPI
from containers import RabbitMQContainer

# FastAPI 앱 인스턴스 생성
app = FastAPI()

# 의존성 컨테이너 구성 및 실행
RabbitMQContainer()

FastAPI 애플리케이션을 초기화하고 Rabbitmq 컨테이너초기화



containers.py 코드

from dependency_injector import containers, providers
import pika


class RabbitMQContainer(containers.DeclarativeContainer):
    wiring_config = containers.WiringConfiguration(packages=["api"])
    connection = providers.Singleton(
        pika.BlockingConnection,
        pika.ConnectionParameters(host="node1", port=5672, credentials=pika.PlainCredentials("admin", "admin"))
        )

api패키지와 Wiring 후 싱글톤으로 pika라이브러리의 rabbitmq 커넥션객체 생성



/api/end_point.py 코드

from dependency_injector.wiring import inject, Provide
from fastapi import Depends, HTTPException
from main import app
from containers import RabbitMQContainer
import pika


# 메시지 발행 엔드포인트

@app.post("/")
@inject
def publish_message(message: str, rabbitmq_connection: pika.BlockingConnection = Depends(Provide[RabbitMQContainer.connection])):
    channel = rabbitmq_connection.channel()
    channel.queue_declare(queue='hello')
    channel.basic_publish(exchange='', routing_key='hello', body=message)
    return {"message": "전송완료"}

Depends로 RabbitMQContainer.connection 의존성 주입받고 메세지를 큐에 쌓아줌



/background/receiver.py 코드

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="node1", port=5672, credentials=pika.PlainCredentials("admin", "admin")))
channel = connection.channel()
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    message = body.decode('utf-8')
    print(f" [x] 수신완료: {message}")


channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

try:
    print(' [*] 메세지 수신 대기중... 종료하려면 CTRL+C를 누르시오')
    channel.start_consuming()
except:
    print(" [*] 종료")
    channel.stop_consuming()

rabbitmq 커넥후 hello 채널연결 channel.start_consuming() 진행후 channel.basic_consume로 콜백함수를 인자로 넘겨서 큐에있는 메세지 수신



Dockerfile 코드

# Python 이미지 사용
FROM python:3.11

# 작업 디렉토리 설정
WORKDIR /app

# 의존성 설치
COPY pyproject.toml poetry.lock* /app/
RUN pip install poetry
RUN poetry install

# 소스 코드 복사
COPY . /app

# FastAPI 실행
CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

pyproject.toml 에있는 라이브러리 사용을위해 poetry 의존성 설치후 FastApi 앱 실행



docker-compose.yml 코드

version: '3.8'
services:
  web:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - node1

  node1 :
    image: rabbitmq:3-management-alpine
    container_name: rabbitmq-stream
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: "admin"
      RABBITMQ_DEFAULT_PASS: "admin"

Dockerfil 기반으로 web 컨테이너 실행 및 rabbitmq 이미지 컨테이너 실행


## 실행TEST

1. message 입력후 api 요청

2. queues에 해당 message가 쌓임

3. 백그라운드로 consumer 실행시 queues에 먼저쌓인 메세지 구독


profile
백엔드개발자

0개의 댓글