
Maria DB 접속되는지 확인
Mongo DB 접속되는지 확인
Kafka 구동
가상 환경 생성
python3 -m venv django_cqrs_venv
가상 환경 활성화
Mac:
source django_cqrs_venv/bin/activate
Windows:
django_cqrs_venv\Scripts\activate
필요한 패키지 설치
pip install django
pip install djangorestframework
pip install mysqlclient
프로젝트 생성
django-admin startproject writebook .
애플리케이션 생성
python manage.py startapp writeapp
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'writeapp'
]DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'cqrs',
'USER': 'root',
'PASSWORD': 'mypassword',
'HOST':'127.0.0.1',
'PORT':''
}
}urls.py 파일을 수정해서 cqrs로 시작하는 요청은 app의 urls에서 처리하도록 설정
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('cqrs/', include("writeapp.urls"))
]
writeapp 에 urls.py 파일을 만들고 요청을 처리하는 부분을 작성
from django.urls import path
from .views import helloAPI
urlpatterns = [
path("hello/", helloAPI)
]
views.py 파일에 helloAPI 작성
from rest_framework.response import Response
from rest_framework.decorators import api_view
@api_view(['GET'])
def helloAPI(request):
return Response("hello world")
python manage.py runserver 127.0.0.1:7000
모델 생성:writeapp 의 models.py 파일에 작성
from django.db import models
class Book(models.Model):
bid = models.AutoField(primary_key=True)
title = models.CharField(max_length=50)
author = models.CharField(max_length=50)
category = models.CharField(max_length=50)
pages = models.IntegerField()
price = models.IntegerField()
published_date = models.DateField()
description = models.TextField()
python manage.py makemigrations writeapppython manage.py migrate
show tables;
인스턴스 단위로 JSON 데이터를 만들어서 전송할 수 있도록 Serializer 클래스를 생성(serializers.py)
from rest_framework import serializers
from .models import Book
class BookSerializer(serializers.ModelSerializer):
class Meta:
model = Book
fields = ['bid', 'title', 'author',
'category', 'pages', 'price',
'published_date', 'description']
POST 방식으로 요청을 하면 데이터를 삽입하는 요청 처리 함수를 작성(views.py 파일에 작성)
from rest_framework.response import Response
from rest_framework.decorators import api_view
from .models import Book
from .serializers import BookSerializer
from rest_framework import status
@api_view(['GET'])
def helloAPI(request):
return Response("hello world")
@api_view(['POST'])
def bookAPI(request):
#전송된 데이터 읽기
data = request.data
#숫자로 변환
data['pages'] = int(data['pages'])
data['price'] = int(data['price'])
#Model 형태로 변환
serializer = BookSerializer(data=data)
if serializer.is_valid():
serializer.save() #테이블에 저장
#성공한 경우
return Response(serializer.data,
status=status.HTTP_201_CREATED)
#실패한 경우
return Response(serializer.errors,
status = status.HTTP_400_BAD_REQUEST)
애플리케이션의 urls.py 파일에서 url 과 요청 처리 함수 연결
from django.urls import path
from .views import helloAPI, bookAPI
urlpatterns = [
path("hello/", helloAPI),
path("book/", bookAPI)
]
python manage.py runserver 127.0.0.1:7000{
"title":"cqrs",
"author":"adam",
"category":"cloud native",
"pages": 300,
"price": 25000,
"published_date":"2024-10-08",
"description":"마이크로 서비스 패턴"
}
화면에 성공 메시지가 출력되는지 확인

데이터베이스에서도 확인
select * from writeapp_book;

클라이언트 프로젝트가 저장될 디렉토리 생성
디렉토리에서 client application 생성
yarn create react-app cqrsclient
react 애플리케이션 실행
yarn start

아이콘 사용을 위한 패키지 설치
npm install --save --legacy-peer-deps @material-ui/core
npm install --save --legacy-peer-deps @material-ui/icons
yarn add axios데이터를 삽입하기 위한 컴포넌트 생성(AddBook.jsx)
import React, { useState } from "react";
import { TextField, Paper, Button, Grid } from "@material-ui/core";
function AddBook(props) {
const [title, setTitle] = useState("");
const [author, setAuthor] = useState("");
const [category, setCategory] = useState("");
const [pages, setPages] = useState("");
const [price, setPrice] = useState("");
const [published_date, setPublished_date] = useState("");
const [description, setDescription] = useState("");
const onTitleChange = (event) => {
setTitle(event.target.value);
};
const onAuthorChange = (event) => {
setAuthor(event.target.value);
};
const onCategoryChange = (event) => {
setCategory(event.target.value);
};
const onPagesChange = (event) => {
setPages(event.target.value);
};
const onPriceChange = (event) => {
setPrice(event.target.value);
};
const onPublished_dateChange = (event) => {
setPublished_date(event.target.value);
};
const onDescriptionChange = (event) => {
setDescription(event.target.value);
};
const onSubmit = (event) => {
event.preventDefault();
const book = {};
book.title = title;
book.author = author;
book.category = category;
book.pages = pages;
book.price = price;
book.published_date = published_date;
book.description = description;
props.add(book);
setTitle("");
setAuthor("");
setCategory("");
setPages("");
setPrice("");
setPublished_date("");
setDescription("");
};
return (
<Paper style={{ margin: 16, padding: 16 }}>
<Grid container>
<Grid xs={6} md={6} item style={{ paddingRight: 16 }}>
<TextField
onChange={onTitleChange}
value={title}
placeholder="Add Book Title"
fullWidth
/>
</Grid>
<Grid xs={6} md={6} item style={{ paddingRight: 16 }}>
<TextField
onChange={onAuthorChange}
value={author}
placeholder="Add Book Author"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onCategoryChange}
value={category}
placeholder="Add Book Category"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onPagesChange}
value={pages}
placeholder="Add Book Pages"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onPriceChange}
value={price}
placeholder="Add Book Price"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onPublished_dateChange}
value={published_date}
placeholder="Add Book Published_Date"
fullWidth
/>
</Grid>
<Grid xs={11} md={11} item style={{ paddingRight: 16 }}>
<TextField
onChange={onDescriptionChange}
value={description}
placeholder="Add Book Description"
fullWidth
/>
</Grid>
<Grid xs={1} md={1} item>
<Button
fullWidth
color="secondary"
variant="outlined"
onClick={onSubmit}
>
+
</Button>
</Grid>
</Grid>
</Paper>
);
}
export default AddBook;
App.js를 수정해서 AddBook 컴포넌트를 화면에 출력
import "./App.css";
import { Paper } from "@material-ui/core";
import AddBook from "./AddBook";
import Axios from "axios";
function App() {
//데이터 추가를 위한 함수
const add = (book) => {
console.log("book : ", book);
Axios.post("http://127.0.0.1:7000/cqrs/book/", book).then((response) => {
console.log(response.data);
if (response.data.bid) {
alert("저장에 성공했습니다.");
} else {
alert("코멘트를 저장하지 못했습니다.");
}
});
};
return (
<div className="App">
<Paper style={{ margin: 16 }}>
<AddBook add={add} />
</Paper>
</div>
);
}
export default App;
python manage.py runserver 127.0.0.1:7000
yarn start

write 프로젝트에 패키지 설치
pip3 install django-cors-headers
settings.py 파일의 INSTALLED_APP에 'corsheaders'를 추가
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'writeapp',
'corsheaders'
]
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]settings.py 파일에 요청을 허락할 WHITELIST 작성
CORS_ORIGIN_WHITELIST = ['http://127.0.0.1:3000',
'http://localhost:3000']
CORS_ALLOW_CREDENTIALS = True
클라이언트에서 다시 요청



python3 -m venv read_venv
read_venv\Scripts\activatepip install django
pip install djangorestframework
pip install pymysql
pip install django-cors-headers
pip install pymongodjango-admin startproject readbook .python manage.py startapp readappsettings.py 파일의 INSTALL_APPS에 추가
'rest_framework',
'readapp',
'corsheaders'
settings.py 파일의 MIDDLEWARE 최상단에 추가
'corsheaders.middleware.CorsMiddleware',
CORS_ORIGIN_WHITELIST = ['http://127.0.0.1:3000',
'http://localhost:3000']
CORS_ALLOW_CREDENTIALS = Trueapps.py 파일에 앱이 시작되면 한 번만 수행하는 코드를 작성
from django.apps import AppConfig
class ReadappConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'readapp'
def ready(self):
print("시작하자 마자 한 번만 수행")
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'readapp.apps.ReadappConfig',
'corsheaders'
]
ready 메서드를 수정해서 MySQL의 데이터를 MongoDB로 복제
from django.apps import AppConfig
import pymysql
from pymongo import MongoClient
from datetime import datetime
class ReadappConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'readapp'
def ready(self):
print("시작하자 마자 한 번만 수행")
#mysql에 접속
con = pymysql.connect(host='127.0.0.1',
port=3306,
user='root',
passwd='mypassword',
db='cqrs',
charset='utf8')
#Mongo DB에 접속해서 기존 컬렉션 삭제
conn = MongoClient('127.0.0.1')
db=conn.cqrs
collect = db.books
collect.delete_many({})
#MySQL의 테이블 읽기
cursor = con.cursor()
cursor.execute("select * from writeapp_book")
data = cursor.fetchall()
#데이터 순회하면서 데이터를 읽어서 Mongodb에 삽입
for imsi in data:
#문자열을 날짜 형식으로 변환
date = imsi[6].strftime("%Y-%m-%d")
#Mongodb 데이터 형태 생성
doc = {'bid':imsi[0], 'title':imsi[1],
'author':imsi[2], 'category':imsi[3],
'pages':imsi[4], 'price':imsi[5],
'published_date': date, 'description':imsi[7]}
collect.insert_one(doc)
con.close()
use cqrs
db.books.find({})
readbook의 urls.py를 수정해서 cqrs로 시작하는 요청은 readapp의 urls가 처리하도록 수정
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('cqrs/', include("readapp.urls"))
]
readapp에 urls.py 파일을 만들고 요청과 처리 메서드를 연결
from django.urls import path
from .views import bookAPI
urlpatterns = [
path("books/", bookAPI)
]
readapp의 views.py 파일에 bookAPI 함수 작성
from rest_framework.response import Response
from rest_framework.decorators import api_view
from rest_framework import status
from pymongo import MongoClient
from bson import json_util
import json
@api_view(['GET'])
def bookAPI(request):
conn = MongoClient("127.0.0.1")
db = conn.cqrs
collect = db.books
#데이터 전체 조회
result = collect.find()
data = []
for r in result:
data.append(r)
return Response(json.loads(json_util.dumps(data)),
status=status.HTTP_201_CREATED)
실행
python manage.py runserver
브라우저에서 확인: http://127.0.0.1:8000/cqrs/books/

import "./App.css";
import { Paper } from "@material-ui/core";
import AddBook from "./AddBook";
import Axios from "axios";
import React, { useEffect, useState } from "react";
function App() {
//상테를 생성 - 변수를 생성하고 접근자 함수를 생성
const [items, setItems] = useState([]);
//화면이 출력되자마자 수행될 함수
useEffect(() => {
Axios.get("http://127.0.0.1:8000/cqrs/books/").then((response) => {
if (response.data) {
setItems(response.data);
} else {
alert("읽기 실패");
}
});
}, []);
//데이터 추가를 위한 함수
const add = (book) => {
console.log("book : ", book);
Axios.post("http://127.0.0.1:7000/cqrs/book/", book).then((response) => {
console.log(response.data);
if (response.data.bid) {
alert("저장에 성공했습니다.");
} else {
alert("코멘트를 저장하지 못했습니다.");
}
});
};
return (
<div className="App">
<Paper style={{ margin: 16 }}>
<AddBook add={add} />
</Paper>
{items.map((item, index) => (
<p key={index}>{item.title}</p>
))}
</div>
);
}
export default App;

현재 서버를 데이터 사용 용도에 따라 분리시켜서 구현하고 저장소도 분리시킴
: Polyglot 하다고 할 수 있으며 CQRS로 구현한 것은 맞는데 데이터 동기화가 이루어지지 않음
데이터가 추가될 때 읽기 전용 데이터베이스에도 데이터가 추가되어야 한다.
패키지 설치
pip install kafka-python
views.py 파일 수정
from rest_framework.response import Response
from rest_framework.decorators import api_view
from .models import Book
from .serializers import BookSerializer
from rest_framework import status
from kafka import KafkaProducer
import json
#메시지를 전송하는 kafka producer 클래스
class MessageProducer:
#카프카 서버와 토픽을 매개변수로 받아서 메시지를 전송하는 인스턴스를 생성
def __init__(self, broker, topic) -> None:
self.broker = broker
self.topic = topic
#카프카 프로듀서 설정
self.producer = KafkaProducer(
bootstrap_servers = self.broker,
value_serializer = lambda x : json.dumps(x).encode('utf-8'),
acks=0,
api_version=(2,5,0),
key_serializer=str.encode,
retries=3
)
def send_message(self, msg, auto_close=True):
try:
future = self.producer.send(self.topic, value=msg, key='key')
self.producer.flush()
future.get(timeout=2)
return {"status_code":200, "error":None}
except Exception as exc:
raise exc
@api_view(['GET'])
def helloAPI(request):
return Response("hello world")
@api_view(['POST'])
def bookAPI(request):
#전송된 데이터 읽기
data = request.data
#숫자로 변환
data['pages'] = int(data['pages'])
data['price'] = int(data['price'])
#Model 형태로 변환
serializer = BookSerializer(data=data)
if serializer.is_valid():
serializer.save() #테이블에 저장
# 삽입에 성공한 경우 카프카에게 메시지를 전송
broker = ["localhost:9092"]
topic = "cqrstopic"
#프로듀서 생성
pd = MessageProducer(broker,topic)
#메시지 전송
msg = {"task":"insert", "data":serializer.data}
res = pd.send_message(msg)
print(res)
#성공한 경우
return Response(serializer.data,
status=status.HTTP_201_CREATED)
#실패한 경우
return Response(serializer.errors,
status = status.HTTP_400_BAD_REQUEST)
프로젝트 실행 시 six 모듈 에러가 발생하는 경우
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules["kafka.vendor.six.moves"] = six.moves클라이언트를 실행하여 데이터 추가하고 kafka에 메세지가 전달되는지 확인
클라이언트에서 데이터 삽입

터미널에서 토픽을 구독하여 kafka 메세지 수신 확인
docker exec -it kafka /bin/bash
cd /opt/kafka/bin
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cqrstopic --from-beginning

pip install kafka-python
pip3 install six==1.6.0애플리케이션의 apps.py 파일을 수정
from django.apps import AppConfig
import pymysql
from pymongo import MongoClient
from datetime import datetime
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules["kafka.vendor.six.moves"] = six.moves
from kafka import KafkaConsumer
import threading
import json
#카프카 메시지를 읽는 클래스
class MessageConsumer:
def __init__(self, broker, topic):
self.broker = broker
self.consumer = KafkaConsumer(
topic,
bootstrap_servers = self.broker,
value_deserializer = lambda x:x.decode("utf-8"),
group_id="my-group",
auto_offset_reset = "earliest",
enable_auto_commit = True
)
#메시지를 읽어오는 메서드
def receive_message(self):
try:
for message in self.consumer:
result = json.loads(message.value)
imsi = result["data"]
doc = {"bid":imsi["bid"], "title":imsi["title"],
"author":imsi["author"],
"category":imsi["category"],
"pages":imsi["pages"],
"price":imsi["price"],
"published_date":imsi["published_date"],
"description":imsi["description"]}
#Mongo DB에 삽입
conn = MongoClient('127.0.0.1')
db=conn.cqrs
collect = db.books
collect.insert_one(doc)
conn.close()
except Exception as exc:
raise exc
class ReadappConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'readapp'
def ready(self):
print("시작하자 마자 한 번만 수행")
#mysql에 접속
con = pymysql.connect(host='127.0.0.1',
port=3306,
user='root',
passwd='rapa7630',
db='cqrs',
charset='utf8')
#Mongo DB에 접속해서 기존 컬렉션 삭제
conn = MongoClient('127.0.0.1')
db=conn.cqrs
collect = db.books
collect.delete_many({})
#MySQL의 테이블 읽기
cursor = con.cursor()
cursor.execute("select * from writeapp_book")
data = cursor.fetchall()
#데이터 순회하면서 데이터를 읽어서 Mongodb에 삽입
for imsi in data:
#문자열을 날짜 형식으로 변환
date = imsi[6].strftime("%Y-%m-%d")
#Mongodb 데이터 형태 생성
doc = {'bid':imsi[0], 'title':imsi[1],
'author':imsi[2], 'category':imsi[3],
'pages':imsi[4], 'price':imsi[5],
'published_date': date, 'description':imsi[7]}
collect.insert_one(doc)
con.close()
#메시지 수신자 생성
broker = ["localhost:9092"]
topic = "cqrstopic"
consumer = MessageConsumer(broker, topic)
#스레드로 메서드 호출
t = threading.Thread(target = consumer.receive_message)
t.start()
클라이언트에서 데이터를 삽입하고, 새로고침하여 Kafka를 통해 MongoDB에도 데이터가 추가되는지 확인
클라이언트에서 데이터 삽입

새로고침하여 삽입된 데이터가 출력되는지 확인

apps.js 파일을 수정
import "./App.css";
import { Paper } from "@material-ui/core";
import AddBook from "./AddBook";
import Axios from "axios";
import React, { useEffect, useState } from "react";
function App() {
//상태를 생성 - 변수를 생성하고 접근자 함수를 생성
const [items, setItems] = useState([]);
const [data, setData] = useState([0]);
//화면이 출력되자마자 수행될 함수
useEffect(() => {
Axios.get("http://127.0.0.1:8000/cqrs/books/").then((response) => {
if (response.data) {
setItems(response.data);
} else {
alert("읽기 실패");
}
});
}, [data]);
//데이터 추가를 위한 함수
const add = (book) => {
console.log("book : ", book);
Axios.post("http://127.0.0.1:7000/cqrs/book/", book).then((response) => {
console.log(response.data);
if (response.data.bid) {
alert("저장에 성공했습니다.");
//데이터가 추가될 때 상태를 변경해서
//데이터를 다시 출력하도록 합니다.
setData(1);
} else {
alert("코멘트를 저장하지 못했습니다.");
}
});
};
return (
<div className="App">
<Paper style={{ margin: 16 }}>
<AddBook add={add} />
</Paper>
{items.map((item, index) => (
<p key={index}>{item.title}</p>
))}
</div>
);
}
export default App;

