Kafka 메세지 Consum 테스트

Divan·2022년 8월 31일
1
post-thumbnail

원래는 메세지 발행까지만 정리할려고 했는데... Consumer에 대한 정리요청도 들어와서 간단히.. 하겠습니다.
메세지를 소비하는것을 Container에서 "kafka-console-producer.sh"를 사용하여 메세지를 발행하고 Application에서 확인도 가능합니다. 하지만 Unmarshal하기 번거로움으로 하나의 쓰레드에서 발행하고 바로 소비하겠습니다.


메세지 소비 (Consumer)

1. connection 속성을 지정합니다.


현재는 변경없이 Default로 사용하겠습니다.
해동 속석으로 connection 생성

conf := sarama.NewConfig() // connection 속성을 지정
conf.Comsumer.....
conn, err := sarama.NewClient(connectionString, conf) // connection 생성

2. comsumer 생성 및 파티션 확인

topic에는 다수의 파티션의 존재 합니다. 그렇기에 파티션의 여러개라면 loop를 만들어서 각 파티션 별로 메세지를 받아올수 있습니다. 여기서는 partion을 하나만 있으므로 loop를 사용하지 않습니다. 여기서는 파티션의이 하나이므로, 아래의 코드에서는 partions에는 0 만존재 합니다.
kafka에 특성상 토픽에는 하나 이상의 partition 존해합니다. 파티션별로 읽어들이는 내용이 다르므로 메세지를 읽어 들이기 위해서는 파티션 지정이 필수적 입니다.
Partion 설명 : https://engkimbs.tistory.com/691

consumer, err := sarama.NewConsumerFromClient(conn)
partitions, err := consumer.Partitions(topic)

3. 파티션으로 부터 메세지 읽기

ConsumePartition 메서드에서는 토픽, 파티션, 읽고자하는 index를 지정합니다. 읽고자하는 index부터 최근 메세지까지 메세지를 읽어 드립니다.
pConsumer.Messages()를 실행시키면 계속 listen 상태로 대기하게 됩니다. 그렇기에 일반적으로는 go루틴를 만들고, 채널과 같이 사용합니다.. 아래 참고용 소스에서는 만들지 않았습니다.

partitonNo := 0
startFromIndex := 0

pConsumer, err := consumer.ConsumePartition(topic, partitonNo, startFromIndex)
count := 0
for msgGet := range pConsumer.Messages() {
	body := User{}
	err := json.Unmarshal(msgGet.Value, &body)
	panicIfErr(err)
	fmt.Println("no :", count, ", age : ", body.Age, ", name : " ,body.Name)
	count ++;
}

4. 참고용 코드

gorutine을 만들지 않았으므로 어플리케이션이 listen 상태로 대기합니다. 코드를 한번 실행마다 큐에 publish가 한번 발생하므로, 실행 횟수 만큼 토픽으로부터 읽어드리는 횟수도 비례하게 증가합니다.

  • 종료는 control + c

package main
import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
)
func panicIfErr(err error) {
	if err != nil {
		fmt.Println("err :", err)
	}
}
type User struct {
	Name string
	Age  int
}

func main() {

	conf := sarama.NewConfig()
	conf.Producer.Return.Successes = true
	conf.c
	connectionString := []string{
		"localhost:9092",
	}
	topic := "blog_test"
	conn, err := sarama.NewClient(connectionString, conf)
	panicIfErr(err)

	fmt.Println("conn :", conn)

	producer, err := sarama.NewSyncProducerFromClient(conn)
	panicIfErr(err)
	user := User{"Gopher", 7}
	jsonBody, err := json.Marshal(user)

	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.ByteEncoder(jsonBody),
	}
	_, _, err = producer.SendMessage(msg)
	panicIfErr(err)
	consumer, err := sarama.NewConsumerFromClient(conn)
	pConsumer, err := consumer.ConsumePartition(topic, 0, 0)
	count := 0
	for msgGet := range pConsumer.Messages() {
		body := User{}
		err := json.Unmarshal(msgGet.Value, &body)
		panicIfErr(err)
		fmt.Println("no :", count, ", age : ", body.Age, ", name : " ,body.Name)
		count ++;
	}
}
profile
하루 25분의 투자

0개의 댓글