Golang에서 카프카와 연결을 위한 인기있는 라이브러리가 2가지 정도 있다.
1. github/shopify/sarama : 순수한 go언어로 만들어졌다. MIT라이센스
2. github/confluentinc/confluent-kafka-go : C언어로 만들어진것을 Go언어로 래퍼하여 제공한다. shopify라이브러리보다 빠르지만, 구현하기가 다소 어렵다.
저는 shopify/sarama를 사용하기로 결정 하였습니다.
sarama.NewConfig()를 이용하여 커넥션에 대한 Config를 설정합니다, 카프카는 기본적으로 클러스터링을 하기에 connectionString은 배열의 형태를 가집니다. 저는 개발환경을 셋업하였으므로 하나의 connectionString을 입력하겠습니다.
conf := sarama.NewConfig()
conf.Producer.Return.Successes = true
connectionString := []string{
"<IP>:9092",
}
conn, err := sarama.NewClient(connectionString, conf)
samara 패키지에는 메세지 발행을 위한 두가지 방법이 구현 되어있다. sarama.SyncProducer와 samara.AsynProducer가 있다.
비동기 방식은 높은 처리량을 처리할때 좋다고 합니다. 단순한 메세지는 덩치가큰 비동기식 보다 동기식을 더 선호한다고 합니다.
producer, err := sarama.NewSyncProducerFromClient(conn)
panicIfErr(err)
user := User{"Gopher", 7}
jsonBody, err := json.Marshal(user)
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.ByteEncoder(jsonBody),
}
_, _, err = producer.SendMessage(msg)
---
우선 container를 통하여 kafka에 토픽을 comsumer를 통하여 확인하겠습니다.
> docker exec -it <docker Container id 4자리 이상 or container anme> bin/bash
> ./kafka-console-consumer.sh
\--bootstrap-server <ip>:9092
\--topic msg_test --from-beginning
아래의 같이 소스가 실행될때 마다 메세지를 확인 가능합니다.
참고용 소스
package main
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
)
func panicIfErr(err error) {
if err != nil {
fmt.Println("err :", err)
}
}
type User struct {
Name string
Age int
}
func main() {
conf := sarama.NewConfig()
conf.Producer.Return.Successes = true
connectionString := []string{
"127.0.01:9092",
}
conn, err := sarama.NewClient(connectionString, conf)
panicIfErr(err)
producer, err := sarama.NewSyncProducerFromClient(conn)
panicIfErr(err)
user := User{"Gopher", 7}
jsonBody, err := json.Marshal(user)
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.ByteEncoder(jsonBody),
}
_, _, err = producer.SendMessage(msg)
panicIfErr(err)
}
깔끔한 정리네요 😀