Shopify/sarama kafka 메세지 발행 테스트

Divan·2022년 8월 30일
3
post-thumbnail

Golang에서 카프카와 연결을 위한 인기있는 라이브러리가 2가지 정도 있다.
1. github/shopify/sarama : 순수한 go언어로 만들어졌다. MIT라이센스
2. github/confluentinc/confluent-kafka-go : C언어로 만들어진것을 Go언어로 래퍼하여 제공한다. shopify라이브러리보다 빠르지만, 구현하기가 다소 어렵다.

저는 shopify/sarama를 사용하기로 결정 하였습니다.

  • AWS 혹은 Localhost docker로 세팅하는법은 Link를 클링해주세요

1. 카프카 연결

sarama.NewConfig()를 이용하여 커넥션에 대한 Config를 설정합니다, 카프카는 기본적으로 클러스터링을 하기에 connectionString은 배열의 형태를 가집니다. 저는 개발환경을 셋업하였으므로 하나의 connectionString을 입력하겠습니다.

conf := sarama.NewConfig()
conf.Producer.Return.Successes = true
connectionString := []string{
	"<IP>:9092",
}

conn, err := sarama.NewClient(connectionString, conf)

2. 카프카로 메세지 발행

samara 패키지에는 메세지 발행을 위한 두가지 방법이 구현 되어있다. sarama.SyncProducer와 samara.AsynProducer가 있다.
비동기 방식은 높은 처리량을 처리할때 좋다고 합니다. 단순한 메세지는 덩치가큰 비동기식 보다 동기식을 더 선호한다고 합니다.

producer, err := sarama.NewSyncProducerFromClient(conn)
panicIfErr(err)
user := User{"Gopher", 7}
jsonBody, err := json.Marshal(user)

msg := &sarama.ProducerMessage{
	Topic: "test",
	Value: sarama.ByteEncoder(jsonBody),
}
_, _, err = producer.SendMessage(msg)

---

3. 메세시 확인

우선 container를 통하여 kafka에 토픽을 comsumer를 통하여 확인하겠습니다.

> docker exec -it <docker Container id  4자리 이상 or container anme> bin/bash
 > ./kafka-console-consumer.sh 
 			\--bootstrap-server <ip>:9092 
            \--topic msg_test --from-beginning

아래의 같이 소스가 실행될때 마다 메세지를 확인 가능합니다.


참고용 소스

package main
import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
)
func panicIfErr(err error) {
	if err != nil {
		fmt.Println("err :", err)
	}
}
type User struct {
	Name string
	Age  int
}
func main() {

	conf := sarama.NewConfig()
	conf.Producer.Return.Successes = true
	connectionString := []string{
		"127.0.01:9092",
	}
	conn, err := sarama.NewClient(connectionString, conf)
	panicIfErr(err)
    
	producer, err := sarama.NewSyncProducerFromClient(conn)
	panicIfErr(err)
	user := User{"Gopher", 7}
	jsonBody, err := json.Marshal(user)
	msg := &sarama.ProducerMessage{
		Topic: "test",
		Value: sarama.ByteEncoder(jsonBody),
	}
	_, _, err = producer.SendMessage(msg)
	panicIfErr(err)
}
profile
하루 25분의 투자

1개의 댓글

comment-user-thumbnail
2022년 8월 31일

깔끔한 정리네요 😀

답글 달기