Docker 설치
⇒ Windows나 Mac에서는 Docker Desktop을 다운로드 받아서 설치
⇒ Windows에 설치할 때는 WSL(Windows Subsystem for Linux - 윈도우즈 안에서 리눅스 환경을 직접 실행할 수 있게 해주는 도구)이 설치되어야 한다.
{ Docker desktop 다운로드 하고 로그인 한 후 wsl --update하면 끝 }
Kafka
⇒ 개요

⇒ 사용 이유
docker-compose.yml 파일 생성
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
docker-compose up -d
docker ps
docker exec -it kafka /bin/bash
# vi가 없으면 파일 복사로 해결
vi /opt/kafka/config/server.properties
이 부분 주석을 해제
listeners=PLAINTEXT://:9092
이 부분은 추가
delete.topic.enable=true
auto.create.topics.enable=true
파일 복사
도커 컨테이너에서 호스트 컴퓨터로 파일 복사
docker cp [컨테이너이름]:[컨테이너 내부 경로][호스트 파일 경로]
docker cp kafka:/opt/kafka/config/server.properties C:\Users\USER\Downloads\Cloud\Kafka
호스트에서 도커 컨테이너로 파일을 복사
docker cp [호스트 파일 경로][컨테이너이름]:[컨테이너 내부 경로]
docker cp C:\Users\USER\Downloads\Cloud\Kafka\server.properties kafka:/opt/kafka/config/server.properties
⇒ kafka 테스트
docker exec -it kafka /bin/bash
cd /opt/kafka/bin
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic exam-topic
kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic
kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092
kafka-console-consumer.sh --topic exam-topic --bootstrap-server localhost:9092 --from-beginning
CQRS
⇒ 개요
⇒ 전통적인 방식의 문제점
⇒ 해결책
⇒ 구현 이슈
⇒ CQRS를 사용해야 하는 경우
⇒ CQRS 사용이 권장되지 않는 경우
⇒ 이벤트 소싱과 CQRS 패턴
CQRS 구현
⇒ 애플리케이션은 파이썬의 Django와 React를 이용해서 개발하고 데이터베이스는 MySQL과 MongoDB 사용
⇒ 준비 작업
MySQL이나 Maria DB 같은 RDBMS를 설치하고 접속 확인
데이터 읽기에 사용할 MongoDB 설치하고 접속 확인
Kafka 설치하고 접속 확인
PostgreSQL을 설치
my-postgres는 컨테이너 이름 변경 가능
1234 도 postgres 사용자의 비밀번호
docker run --name my-postgres -e POSTGRES_PASSWORD=1234 -p 5432:5432 -d postgres
⇒ 데이터 기록 프로젝트
python -m venv myvenvmyvenv\Scripts\activate Linux & Mac : source myvenv/bin/activatepip install django
pip install djangorestframework
pip install psycopg2-binary # postgresql을 사용 시django-admin startproject writebook
cd writebook
python manage.py startapp writeapp
from pathlib import Path
import pymysql
pymysql.install_as_MySQLdb
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
"rest_framework",
"writeapp"
]
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'postgres',
'USER': 'postgres',
'PASSWORD': '1234',
'HOST': '127.0.0.1',
'PORT': '5432'
}
}
TIME_ZONE = 'Asia/Seoul'
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('cqrs/', include("writeapp.urls")) # cqrs/로 시작하는 url은 writeapp의 urls.py 파일이 처리
]
from django.urls import path
from .views import helloAPI
urlpatterns = [
path("hello/", helloAPI)
]
from rest_framework.response import Response
from rest_framework.decorators import api_view
@api_view(['GET'])
def helloAPI(request):
return Response('hello world')
python manage.py runserver 127.0.0.1:7000
http://127.0.0.1:7000/cqrs/hello
from django.db import models
# Create your models here.
class Book(models.Model):
bid = models.AutoField(primary_key=True)
title = models.CharField(max_length=50)
author = models.CharField(max_length=50)
category = models.CharField(max_length=50)
pages = models.IntegerField()
price = models.IntegerField()
published_date = models.DateField()
description = models.TextField()
python manage.py makemigrations writeapp
python manage.py migrate
from rest_framework import serializers
from .models import Book
class BookSerializer(serializers.ModelSerializer):
class Meta:
model = Book
fields = ['bid', 'title', 'category', 'pages', 'price', 'published_date', 'description']
from rest_framework.response import Response
from rest_framework.decorators import api_view
from .models import Book
from .serializers import BookSerializer
from rest_framework import status
@api_view(['GET'])
def helloAPI(request):
return Response('hello world')
# POST 방식의 요청이 온 경우 처리
@api_view(['POST'])
def bookAPI(request):
# 클라이너트에서 넘겨준 데이터를 가져오기
data = request.data
# 웹에서 넘어온 데이터는 문자열이므로 숫자로 변환
data['pages'] = int(data['pages'])
data['price'] = int(data['price'])
# 데이터베이스에 삽입 가능하도록 변환
serializer = BookSerializer(data=data)
# 삽입
if(serializer.is_valid()):
serializer.save()
return Response(serializer.data, status = status.HTTP_201_CREATED)
return Response(serializer.errors, status = status.HTTP_400_BAD_REQUEST)
from django.urls import path
from .views import helloAPI, bookAPI
urlpatterns = [
path("hello/", helloAPI),
path("book/", bookAPI)
]
⇒ Client 프로젝트
npm install --save --legacy-peer-deps @material-ui/core
npm install --save --legacy-peer-deps @material-ui/icons
npm install -g yarn
yarn add axios
import React , { useState }from "react"
import { TextField, Paper, Button, Grid } from "@material-ui/core";
function AddToDo(props) {
const [title, setTitle] = useState("");
const [author, setAuthor] = useState("");
const [category, setCategory] = useState("");
const [pages, setPages] = useState("");
const [price, setPrice] = useState("");
const [published_date, setPublished_date] = useState("");
const [description, setDescription] = useState("");
const onTitleChange = (event) => {
setTitle(event.target.value);
};
const onAuthorChange = (event) => {
setAuthor(event.target.value);
};
const onCategoryChange = (event) => {
setCategory(event.target.value);
};
const onPagesChange = (event) => {
setPages(event.target.value);
};
const onPriceChange = (event) => {
setPrice(event.target.value);
};
const onPublished_dateChange = (event) => {
setPublished_date(event.target.value);
};
const onDescriptionChange = (event) => {
setDescription(event.target.value);
};
const onSubmit = (event) => {
event.preventDefault();
const book = {}
book.title=title
book.author = author
book.category = category
book.pages = pages
book.price = price
book.published_date = published_date
book.description = description
props.add(book)
setTitle("")
setAuthor("")
setCategory("")
setPages("")
setPrice("")
setPublished_date("")
setDescription("")
};
return(
<Paper style={{ margin: 16, padding: 16 }}>
<Grid container>
<Grid xs={6} md={6} item style={{ paddingRight: 16 }}>
<TextField
onChange={onTitleChange}
value = {title}
placeholder="Add Book Title"
fullWidth
/>
</Grid>
<Grid xs={6} md={6} item style={{ paddingRight: 16 }}>
<TextField
onChange={onAuthorChange}
value = {author}
placeholder="Add Book Author"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onCategoryChange}
value = {category}
placeholder="Add Book Category"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onPagesChange}
value = {pages}
placeholder="Add Book Pages"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onPriceChange}
value = {price}
placeholder="Add Book Price"
fullWidth
/>
</Grid>
<Grid xs={3} md={3} item style={{ paddingRight: 16 }}>
<TextField
onChange={onPublished_dateChange}
value = {published_date}
placeholder="Add Book Published_Date"
fullWidth
/>
</Grid>
<Grid xs={11} md={11} item style={{ paddingRight: 16 }}>
<TextField
onChange={onDescriptionChange}
value = {description}
placeholder="Add Book Description"
fullWidth
/>
</Grid>
<Grid xs={1} md={1} item>
<Button
fullWidth
color="secondary"
variant="outlined"
onClick={onSubmit}
>
+
</Button>
</Grid>
</Grid>
</Paper>
);
}
export default AddToDo;
import './App.css';
import { Paper } from '@material-ui/core';
import AddToDo from './AddBook';
import Axios from "axios"
function App() {
const add = (book) => {
Axios.post("http://127.0.0.1:7000/cqrs/book/", book).then(response) => {
if(response.data.bid) {
alert("삽입에 성공")
} else {
alert("삽입 실패")
}
}
}
return (
<div className="App">
<Paper style={{ margin: 16 }}>
<AddBook add = {add}/>
</Paper>
</div>
);
}
export default App;
⇒ 몽고 데이터베이스에서 데이터를 읽어오는 프로젝트
python -m venv myvenv
myvenv\Scripts\activate
pip install django
pip install djangorestframework
pip install psycopg2-binary # postgresql을 사용 시
pip install pymongo
pip install django-cors-headers # 자바스크립트 프로젝트에서 서버의 데이터를 ajax로 사용하도록 하기 위해서
django-admin startproject readbook
cd readbook
python manage.py startapp readapp
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
"rest_framework",
"readapp",
"corsheaders"
]
MIDDLEWARE = [
"corsheaders.middleware.CorsMiddleware",
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
CORS_ORIGIN_WHITELIST=['http://127.0.0.1:3000','http://localhost:3000']
CORS_ALLOW_CREDENTIALS=True
python manage.py runserver 127.0.0.1:8000
http://127.0.0.1:8000
from django.apps import AppConfig
import psycopg2
from pymongo import MongoClient
from datetime import datetime
class ReadappConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = 'readapp'
def ready(self):
print("시작하자마자 실행")
# postgresql에 접속
con = psycopg2.connect(
host = "localhost",
database = "postgres",
user="postgres",
password="1234",
port="5432"
)
# mongoDB 연결
conn = MongoClient('mongodb://localhost:27017')
# 기존 데이터 삭제
db = conn.cqrs
collect = db.books
collect.delete_many({})
# postgresql에서 데이터 읽어오기
cursor = con.cursor()
cursor.execute("select * from writeapp_book")
data = cursor.fetchall()
# print(data)
# 데이터를 읽어서 MongoDB에 삽입
for imsi in data:
date = imsi[6].strftime("%Y-%m-%d")
doc = {'bid':imsi[0], 'title':imsi[1], 'author':imsi[2],
'category':imsi[3], 'pages':imsi[4], 'price':imsi[5],
'published_date':date, 'description':imsi[7]}
collect.insert_one(doc)
con.close
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
"rest_framework",
"corsheaders",
"readapp.apps.ReadappConfig"
]
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('cqrs/', include('readapp.urls'))
]
from django.urls import path
from .views import bookAPI
urlpatterns = [
path("books/", bookAPI)
]
from django.shortcuts import render
from rest_framework.response import Response
from rest_framework.decorators import api_view
from rest_framework import status
from pymongo import MongoClient
from bson import json_util
import json
@api_view(['GET'])
def bookAPI(request):
conn = MongoClient('127.0.0.1')
db = conn.cqrs
collect = db.books
result = collect.find()
data = []
for r in result:
data.append(r)
return Response(json.loads(json_util.dumps(data)),
status=status.HTTP_201_CREATED)
python manage.py runserver 127.0.0.1:8000
http://127.0.0.1:8000/cqrs/books
⇒ 쓰기 프로젝트에서 Kafka를 연동해서 데이터 쓰기를 할 떄 kafka에 토픽을 전달
from rest_framework.response import Response
from rest_framework.decorators import api_view
from .models import Book
from .serializers import BookSerializer
from rest_framework import status
# 카프카에 게시를 하기 위한 라이브러리
from kafka import KafkaProducer
import json
# Producer 클래스
class MessageProducer:
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
self.producer = KafkaProducer(
bootstrap_servers=self.broker,
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
acks=0,
api_version=(2, 5, 0),
key_serializer=str.encode,
retries=3,
)
def send_message(self, msg, auto_close=True):
try:
future = self.producer.send(self.topic, value=msg, key="key")
self.producer.flush()
if auto_close:
self.producer.close()
future.get(timeout=2)
return {"status_code":200, "error":None}
except Exception as exc:
raise exc
@api_view(['GET'])
def helloAPI(request):
return Response('hello world')
# POST 방식의 요청이 온 경우 처리
@api_view(['POST'])
def bookAPI(request):
# 클라이너트에서 넘겨준 데이터를 가져오기
data = request.data
# 웹에서 넘어온 데이터는 문자열이므로 숫자로 변환
data['pages'] = int(data['pages'])
data['price'] = int(data['price'])
# 데이터베이스에 삽입 가능하도록 변환
serializer = BookSerializer(data=data)
# 삽입
if(serializer.is_valid()):
# 데이터 삽입
serializer.save()
#카프카에 이벤트를 발행
broker = ["localhost:9092"]
topic = "cqrstopic"
pd = MessageProducer(broker, topic)
# 데이터와 작업 내역을 한꺼번에 전송
msg ={"task":"insert","data":serializer.data}
res = pd.send_message(msg)
print(res)
return Response(serializer.data, status = status.HTTP_201_CREATED)
return Response(serializer.errors, status = status.HTTP_400_BAD_REQUEST)
⇒ 테이터 읽기 프로젝트에서 카프카 토픽 추가
from django.apps import AppConfig
import psycopg2
from pymongo import MongoClient
from datetime import datetime
from kafka import KafkaConsumer
import json
import threading
# 카프카 컨슈머 클래스
class MessageConsumer:
def __init__(self, broker, topic):
self.broker = broker
self.consumer = KafkaConsumer(
topic, bootstrap_server=self.broker,
value_deserializer=lambda x: x.decode("utf-8"),
group_id="my-group", auto_offset_reset="earlist",
enable_auto_commit=True,)
def receive_message(self):
try:
for message in self.consumer:
# 카프카로부터 가져온 데이터를 JSON 파싱을 수행해서 딕셔너리로 변환
result = json.loads(message.value)
# data 부분만 추출
imsi = result["data"]
# data를 MongoDB에 삽입할 수 있는 구조로 변경
doc = {'bid':imsi['bid'],
'title':imsi['title'],
'author':imsi['author'],
'category':imsi['category'],
'pages':imsi['pages'],
'price':imsi['price'],
'published_date':imsi['published_date'],
'description':imsi['description']}
# mongo db에 데이터 삽입
conn = MongoClient('mongodb://localhost:27017')
db = conn.cqrs
collect = db.books
# 실제로는 task를 확인해서 작업해야 한다.
collect.insert_one(doc)
conn.close()
except Exception as exc:
raise exc
class ReadappConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = 'readapp'
def ready(self):
print("시작하자마자 실행")
# postgresql에 접속
con = psycopg2.connect(
host = "localhost",
database = "postgres",
user="postgres",
password="1234",
port="5432"
)
# mongoDB 연결
conn = MongoClient('mongodb://localhost:27017')
# 기존 데이터 삭제
db = conn.cqrs
collect = db.books
collect.delete_many({})
# postgresql에서 데이터 읽어오기
cursor = con.cursor()
cursor.execute("select * from writeapp_book")
data = cursor.fetchall()
# print(data)
# 데이터를 읽어서 MongoDB에 삽입
for imsi in data:
date = imsi[6].strftime("%Y-%m-%d")
doc = {'bid':imsi[0], 'title':imsi[1], 'author':imsi[2],
'category':imsi[3], 'pages':imsi[4], 'price':imsi[5],
'published_date':date, 'description':imsi[7]}
collect.insert_one(doc)
con.close
broker = ["localhost:9092"]
topic = "cqrstopic"
# broker와 topic 정보를 이용해서 컨슈머를 생성
consumer = MessageConsumer(broker,topic)
# 메시지를 받는 메서드를 스레드로 등록해서 실행
t = threading.Thread(traget=consumer.receive_message)
t.start()