Microservices and Go: Unlocking the Secrets of Event-Driven Architecture
ํ๋ ์ํํธ์จ์ด ๊ฐ๋ฐ์ ๋ง์ดํฌ๋ก์๋น์ค ์ค์ฌ์ผ๋ก ๋น ๋ฅด๊ฒ ๋ณํํ๊ณ ์์ต๋๋ค.
์ด๋ ํ๋์ ๊ฑฐ๋ํ ์ ํ๋ฆฌ์ผ์ด์ ์ ์๊ณ ๋ ๋ฆฝ์ ์ผ๋ก ๋ฐฐํฌ ๊ฐ๋ฅํ ์๋น์ค๋ค๋ก ๋ถํ ํ๋ ๋ชจ๋ํ ๋ฐฉ์์ ๋๋ค.
์ด๋ฌํ ์๋น์ค๋ค์ ๋น๋๊ธฐ์ ์ผ๋ก ํต์ ํ๋ฉฐ, ๊ทธ ๊ธฐ๋ฐ์๋ ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ(Event-Driven Architecture, EDA)๊ฐ ์์ต๋๋ค.
EDA๋ ๋์ ํ์ฅ์ฑ, ์ ์ฐ์ฑ, ๊ทธ๋ฆฌ๊ณ ์๋น์ค ๊ฐ ๊ฒฐํฉ๋ ๊ฐ์๋ฅผ ๊ฐ๋ฅํ๊ฒ ํด์ค๋๋ค.
โก ๋น ๋ฅด๊ณ ๊ฐ๋ฒผ์
Go๋ ์ปดํ์ผ ์ธ์ด๋ก, ๋์ ์ฑ๋ฅ๊ณผ ๋ฎ์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ ์๋ํฉ๋๋ค.
๐ฆ ๋ด์ฅ๋ ๋์์ฑ ์ฒ๋ฆฌ ๊ธฐ๋ฅ
Goroutine๊ณผ Channel์ ํ์ฉํด ๊ณ ๋ถํ ์์ฒญ๋ ์์ฝ๊ฒ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
๐ฅ ๋ฏธ๋๋ฉํ๊ณ ํจ์จ์ ์ธ ๊ฐ๋ฐ
ํ์ค ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ์๊ณ ๋จ์ํ์ฌ, ๊ฐ๋ฐ ์๋์ ๋ฐฐํฌ ์๋ ๋ชจ๋ ํฅ์๋ฉ๋๋ค.
์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ(Event-Driven Architecture, EDA)๋ ๋ค์๊ณผ ๊ฐ์ ๋ฐฉ์์ผ๋ก ๋์ํ๋ ์ค๊ณ ํจํด์ ๋๋ค.
๋ง์ดํฌ๋ก์๋น์ค์์๋ ์๋น์ค ๊ฐ ํต์ ์ ์ด๋ฒคํธ ๊ธฐ๋ฐ์ผ๋ก ๋น๋๊ธฐ ์ฒ๋ฆฌํฉ๋๋ค.
์ฆ, ์ง์ HTTP ํธ์ถ์ ๋ณด๋ด๋ ๋์ , ์ด๋ฒคํธ๋ฅผ ๋ธ๋ก์ปค์ ๋ฐํ(publish)ํ๊ณ , ๋ค๋ฅธ ์๋น์ค๋ ํ์ํ ๋ ๊ทธ ์ด๋ฒคํธ๋ฅผ ๊ตฌ๋ (consume)ํฉ๋๋ค.
์ด๋ฒคํธ ๋ธ๋ก์ปค๋ ์์ฐ์์ ์๋น์ ์ฌ์ด๋ฅผ ์ค๊ฐํ๋ ํต์ฌ ๋ฏธ๋ค์จ์ด์ ๋๋ค. ๋ํ์ ์ธ ๋ธ๋ก์ปค๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค:

// Event Struct
type OrderEvent struct {
OrderID string `json:"orderId"`
Status string `json:"status"`
}
// Publish Event
func publishOrderEvent(producer sarama.SyncProducer, topic string, event OrderEvent) error {
jsonData, err := json.Marshal(event)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(jsonData),
}
_, _, err = producer.SendMessage(msg)
return err
}
# Initialize project
go mod init github.com/example/event-driven-go
# Add required packages
go get -u github.com/Shopify/sarama # Kafka Client
go get -u github.com/gin-gonic/gin # HTTP Router
์ด์ Kafka๋ก ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๋ ํ๋ก๋์๋ฅผ ๋ง๋ค์ด์ผ ํฉ๋๋ค.
์ด๋ฅผ ์ํด producer.go ํ์ผ์ ์์ฑํ๊ณ , Kafka ๋ธ๋ก์ปค์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ๋ก์ง์ ์์ฑํฉ๋๋ค.
OrderEvent ๋ฐํํ๊ธฐ (main.go)package main
import (
"encoding/json"
"log"
"time"
"github.com/Shopify/sarama"
)
// ์ฃผ๋ฌธ ์ด๋ฒคํธ ๊ตฌ์กฐ์ฒด ์ ์
type OrderEvent struct {
OrderID string `json:"orderId"`
Status string `json:"status"`
}
func main() {
brokers := []string{"localhost:9092"}
topic := "order_events"
// Kafka ํ๋ก๋์ ์์ฑ
producer, err := createKafkaProducer(brokers)
if err != nil {
log.Fatal("Failed to create Kafka producer:", err)
}
defer producer.Close()
// ์ด๋ฒคํธ ์์ฑ
event := OrderEvent{
OrderID: "123456",
Status: "CREATED",
}
// Kafka๋ก ์ด๋ฒคํธ ๋ฐํ
err = publishOrderEvent(producer, topic, event)
if err != nil {
log.Println("Failed to publish event:", err)
}
}
func createKafkaProducer(brokers []string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
return sarama.NewSyncProducer(brokers, config)
}
โ ๋๊ธฐ(Sync) ํ๋ก๋์๋ก ์์ฑ
โ ๋ฉ์์ง ์ ์ก ์ฑ๊ณต ์ฌ๋ถ ํ์ธ์ ์ํด
Return.Successes์ค์
func publishOrderEvent(producer sarama.SyncProducer, topic string, event OrderEvent) error {
jsonData, err := json.Marshal(event)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(jsonData),
}
_, _, err = producer.SendMessage(msg)
return err
}
๐น OrderEvent ๊ตฌ์กฐ์ฒด๋ฅผ JSON์ผ๋ก ์ง๋ ฌํ
๐น Kafka ํ ํฝ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ ์ก
์ด๋ฒคํธ ์ปจ์๋จธ๋ Kafka๋ก๋ถํฐ ์ด๋ฒคํธ๋ฅผ ๊ตฌ๋ (consume)ํ๊ณ , ๊ทธ์ ๋ฐ๋ผ ๋ก์ง์ ์ฒ๋ฆฌํ๋ ์ญํ ์ ํฉ๋๋ค.
consumer.go ์์ ์ฝ๋package main
import (
"encoding/json"
"fmt"
"log"
"github.com/Shopify/sarama"
)
type OrderEvent struct {
OrderID string `json:"orderId"`
Status string `json:"status"`
}
func main() {
brokers := []string{"localhost:9092"}
topic := "order_events"
consumeOrderEvents(brokers, topic)
}
func consumeOrderEvents(brokers []string, topic string) {
consumer, err := sarama.NewConsumer(brokers, nil)
if err != nil {
log.Fatal("Error creating Kafka consumer:", err)
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Error starting partition consumer:", err)
}
defer partitionConsumer.Close()
for msg := range partitionConsumer.Messages() {
var event OrderEvent
err := json.Unmarshal(msg.Value, &event)
if err != nil {
log.Println("Error decoding message:", err)
continue
}
processOrderEvent(event)
}
}
func processOrderEvent(event OrderEvent) {
fmt.Printf("Processing Order ID: %s, Status: %s\n", event.OrderID, event.Status)
}
๐ฆ 1. ์ด๋ฒคํธ ์์ฑ (Event Sourcing)
์ด๋ฒคํธ ์์ฑ์ ์์คํ ์ํ์ ๋ณํ๋ฅผ ์ผ์ผํค๋ ์ด๋ฒคํธ์ ์ฐ์์ ์ธ ํ๋ฆ์ ์ ์ฅํ๋ ์ํคํ ์ฒ ํจํด์ ๋๋ค.
์ด ๋ฐฉ์์์๋ ๋จ์ํ ํ์ฌ ์ํ๋ง์ ์ ์ฅํ๋ ๊ฒ์ด ์๋๋ผ, ๋ชจ๋ ์ํ ๋ณํ์ ๊ธฐ๋ก์ ์ ์ฅํจ์ผ๋ก์จ
์ธ์ ๋ ์ง ์ด๋ฒคํธ์ ํ์คํ ๋ฆฌ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ์ฌ ์ํ๋ฅผ ์ฌ๊ตฌ์ฑํ ์ ์์ต๋๋ค.
๐พ 2. CQRS (Command Query Responsibility Segregation)
CQRS๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋, ์ฐ๊ธฐ ์์ (Command)๊ณผ ์ฝ๊ธฐ ์์ (Query)์ ์์ ํ ๋ถ๋ฆฌํ๋ ํจํด์ ๋๋ค.
์ด๋ ๊ฒ ์ญํ ์ ๋๋๋ฉด ๊ฐ๊ฐ์ ๋ชจ๋ธ์ด ๊ณ ์ ์ ์๊ตฌ์ฌํญ์ ๋ง์ถฐ ์ต์ ํ๋ ์ ์์ด,
๊ฒฐ๊ณผ์ ์ผ๋ก ์์คํ ์ ํ์ฅ์ฑ๊ณผ ์ฑ๋ฅ์ ํฌ๊ฒ ํฅ์์ํฌ ์ ์์ต๋๋ค.
๐ฅ 3. ๋ฐ๋ ๋ ํฐ ํ (Dead Letter Queue, DLQ)
DLQ๋ ์ฒ๋ฆฌ์ ์คํจํ ๋ฉ์์ง๋ฅผ ๋ฐ๋ก ์ ์ฅํ๋ ํ์ ๋๋ค.
์ ์์ ์ธ ํ๋ฆ์์ ๋ฌธ์ ๊ฐ ๋ฐ์ํ์ ๋ ํด๋น ๋ฉ์์ง๋ฅผ ๋ถ๋ฆฌ ๋ณด๊ดํ์ฌ,
์ถํ ์์ธ์ ๋ถ์ํ๊ณ ์ฌ์ฒ๋ฆฌํ ์ ์๋๋ก ํด์ค๋๋ค.
์ด๋ ์์คํ ์ ์ ๋ขฐ์ฑ(reliability)์ ๋์ด๊ณ , ๋๋ฒ๊น ์ ์์ํ๊ฒ ๋ง๋ค์ด ์ค๋๋ค.
๋จ์ ํ ์คํธ (Unit Testing)
Go์์๋ ์ด๋ฒคํธ ๋ฐํ ๋ก์ง์ ๋ํด ๋จ์ ํ ์คํธ๋ฅผ ์์ฑํ ์ ์์ต๋๋ค.
์๋ฅผ ๋ค์ด, ์๋์ ๊ฐ์ด Mock Kafka ํ๋ก๋์๋ฅผ ์ฌ์ฉํด ํ ์คํธํ ์ ์์ต๋๋ค:
go
์ฝ๋ ๋ณต์ฌ
func TestPublishOrderEvent(t *testing.T) {
mockProducer := mocks.NewSyncProducer(t, nil)
event := OrderEvent{
OrderID: "123456",
Status: "CREATED",
}
err := publishOrderEvent(mockProducer, "test_topic", event)
assert.NoError(t, err)
}
์ด ํ
์คํธ๋ publishOrderEvent ํจ์๊ฐ ์ค๋ฅ ์์ด ์ ์์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋์ง ํ์ธํฉ๋๋ค.
ํตํฉ ํ ์คํธ (Integration Testing)
์ค์ Kafka ํ๊ฒฝ์์ ์ด๋ฒคํธ ํ๋ฆ์ ํ ์คํธํ๋ ค๋ฉด Docker๋ฅผ ์ฌ์ฉํด Kafka๋ฅผ ์คํ์ํค๊ณ ,
์ค์ ์๋๋ฆฌ์ค๋ฅผ ํตํด ์ ์ฒด ๋ง์ดํฌ๋ก์๋น์ค ๊ฐ ์ด๋ฒคํธ ํ๋ฆ์ ๊ฒ์ฆํ ์ ์์ต๋๋ค.
์ด ๋ฐฉ๋ฒ์ ์ค์ ์ด์ ํ๊ฒฝ์ ๊ฐ๊น์ด ์ํฉ์ ์ฌํํ ์ ์์ด ์์ ์ ์ธ ๋ฐฐํฌ์ ํฐ ๋์์ด ๋ฉ๋๋ค.
์ ํ๋ฆฌ์ผ์ด์ ๋์ปค๋ผ์ด์ง (Dockerize the Application)
์๋๋ Go๋ก ์์ฑํ ๋ง์ดํฌ๋ก์๋น์ค๋ฅผ Docker ์ด๋ฏธ์ง๋ก ๋น๋ํ๊ธฐ ์ํ Dockerfile์
๋๋ค:
FROM golang:1.18
WORKDIR /app
COPY . .
RUN go mod download
RUN go build -o main .
CMD ["./main"]
์ด ์ค์ ์ ๋ค์๊ณผ ๊ฐ์ ๋จ๊ณ๋ฅผ ์ํํฉ๋๋ค:
main ๋ฐ์ด๋๋ฆฌ๋ฅผ ๋น๋Kubernetes์ ๋ฐฐํฌ (Kubernetes Deployment)
์ด์ Docker ์ด๋ฏธ์ง๊ฐ ์ค๋น๋์์ผ๋ฉด, Kubernetes์์ ์คํํ ์ ์๋๋ก ์๋์ ๊ฐ์ Deployment๋ฅผ ์ ์ํฉ๋๋ค:
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
spec:
replicas: 3
selector:
matchLabels:
app: order-service
template:
metadata:
labels:
app: order-service
spec:
containers:
- name: order-service
image: order-service:v1
ports:
- containerPort: 8080
์ด ์ค์ ์ ๋ค์์ ์๋ฏธํฉ๋๋ค:
order-service๋ผ๋ ์ด๋ฆ์ Deployment๋ฅผ ์์ฑorder-service:v1 ์ด๋ฏธ์ง๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์คํโ ๋ฉฑ๋ฑ์ฑ(Idempotency)
์ด๋ฒคํธ ์ฒ๋ฆฌ ๋ก์ง์ ๊ฐ์ ์ด๋ฒคํธ๊ฐ ์ฌ๋ฌ ๋ฒ ์ฒ๋ฆฌ๋๋๋ผ๋ ๋ถ์์ฉ ์์ด ๋์ํด์ผ ํฉ๋๋ค.
์ด๋ฅผ ํตํด ์ฌ์๋ ์์๋ ์์ ํ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํด์ง๋๋ค.
โ ์คํค๋ง ์งํ(Schema Evolution)
์์ฐ์(Producer)์ ์๋น์(Consumer) ๊ฐ์ ํธํ์ฑ์ ์ ์งํ๊ธฐ ์ํด,
์ด๋ฒคํธ ๊ตฌ์กฐ์ ๋ฒ์ ์ ๋ณด๋ฅผ ํฌํจํ๊ฑฐ๋, backward compatibility๋ฅผ ๊ณ ๋ คํ ์ค๊ณ๊ฐ ํ์ํฉ๋๋ค.
โ ๋ชจ๋ํฐ๋ง ๋ฐ ๋ก๊น (Monitoring & Logging)
์ด๋ฒคํธ ํ๋ฆ์ ์ ํํ ์ถ์ ํ๊ณ ๋ณ๋ชฉ ํ์์ ์๋ณํ ์ ์๋๋ก,
์ค๊ฐ ๋จ๊ณ์์์ ๋ก๊น ๊ณผ ๋ฉํธ๋ฆญ ์์ง์ ํ์์ ์ ๋๋ค.
์: Prometheus + Grafana, OpenTelemetry ๋ฑ ์ฌ์ฉ
โ ์ํท ๋ธ๋ ์ด์ปค ํจํด(Circuit Breaker Pattern)
ํ๋์ ์๋น์ค ์ฅ์ ๊ฐ ์ฐ์์ ์ธ ์ฅ์ ๋ก ํผ์ง๋ ๊ฒ์ ๋ฐฉ์งํ๊ธฐ ์ํด,
์ผ์์ ์ผ๋ก ํธ์ถ์ ์ฐจ๋จํ๊ณ , ์์คํ ์ ๋ณดํธํ๋ ์ํท ๋ธ๋ ์ด์ปค ํจํด์ ์ ์ฉํฉ๋๋ค.
์: goresilience, sony/gobreaker ๊ฐ์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํ์ฉ ๊ฐ๋ฅ
Go ์ธ์ด๋ก ๋ง์ดํฌ๋ก์๋น์ค๋ฅผ ๊ตฌ์ถํ๋ฉด์ ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ(Event-Driven Architecture, EDA)๋ฅผ ๊ฒฐํฉํ๋ฉด,
๋์ ํ์ฅ์ฑ๊ณผ ์ฅ์ ์ ๊ฐํ ์์คํ ์ ๋ง๋ค ์ ์์ต๋๋ค.
Kafka, RabbitMQ, NATS ๊ฐ์ ๋ฉ์์ง ๋ธ๋ก์ปค๋ ์๋น์ค ๊ฐ์ ๊ฒฐํฉ๋๋ฅผ ๋ฎ์ถ๋ฉด์๋ ์ํํ ํต์ ์ ๋ณด์ฅํด ์ค๋๋ค.