MQTT_GoLang Library - Broker & Client (1)

wldbs._.·2024년 10월 8일
1

MQTT & Go

목록 보기
3/10
post-thumbnail

Go언어에서 MQTT 프로토콜을 사용할 수 있도록 여러 라이브러리를 제공한다.
추후 진행할 프로젝트에서 브로커는 Mochi로, 클라이언트는 Paho로 진행하기로 결정했다.

출처



1. 일대일 통신 (1:1)

브로커 Go 코드

1) Broker Code_GoLang : port 1883

package main

import (
	"log"       // 로깅을 위한 패키지
	"os"        // 운영 체제와 상호작용하기 위한 패키지
	"os/signal" // 운영 체제의 신호를 처리하기 위한 패키지
	"syscall"   // 시스템 호출 인터페이스를 제공하는 패키지

	mqtt "github.com/mochi-mqtt/server/v2"       // Mochi MQTT 서버를 위한 패키지
	"github.com/mochi-mqtt/server/v2/hooks/auth" // 인증을 처리하는 훅 패키지
	"github.com/mochi-mqtt/server/v2/listeners"  // 리스너 설정을 위한 패키지
)
func main() {
	// 서버가 종료될 때까지 신호를 대기하기 위한 채널 생성
	sigs := make(chan os.Signal, 1) // 신호 수신, buffer = 1
	done := make(chan bool, 1)      // 서버가 종료될 때 사용, buffer = 1

	// SIGINT(인터럽트, Ctrl + C) 또는 SIGTERM(종료, kill)을 받으면 신호 채널에 전달
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	// 새로운 MQTT 서버 생성
	server := mqtt.New(nil) // 기본 설정 사용하여 서버 생성

	// 모든 연결 요청을 허용하도록 인증 훅 추가
	// 훅(Hook)은 서버의 특정 이벤트나 행동에 개입할 수 있도록 해주는 함수나 로직
	_ = server.AddHook(new(auth.AllowHook), nil) // nil : 추가 설정 없이 기본 동작을 사용

	// 해당 코드 필수, 생략하고 코드 실행하면 2024/10/07 14:59:59 발행자 클라이언트 브로커에 연결 실패: not Authorized 오류 출력
	// 기본 포트(1883)에서 TCP 리스너를 생성
	tcp := listeners.NewTCP(listeners.Config{ // 1883 포트에서 수신할 TCP 리스너를 생성
		ID:      "t1",    // 리스너 ID 설정
		Address: ":1883", // 리스너 주소 및 포트 설정
	})

	// 서버에 TCP 리스너를 추가
	// -> 서버는 리스너가 수신하는 모든 연결을 처리
	err := server.AddListener(tcp)
	if err != nil { // 리스너 추가 시 오류가 발생하면 로그 출력하고 프로그램 종료
		log.Fatal(err)
	}
	// 신호 수신될 때까지 대기하는 고루틴
	go func() {
		// 화살표(<-) 왼쪽에 아무것도 없는 것은 "채널에서 값을 수신하여 변수에 저장하지 않겠다"는 의미
		<-sigs         // sigs 채널에 데이터 들어올 때까지 대기 -> SIGINT나 SIGTERM 신호가 들어올 때까지 고루틴이 멈춰있다가, 신호가 들어오면 다음 구문으로 이동
		server.Close() // 서버 정리 작업: mochi-mqtt 서버의 모든 리스너를 종료하고, 서버 인스턴스를 안전하게 종료
		done <- true   // 신호가 수신되면 done 채널에 true 보냄 -> 프로그램 안전하게 종료됨
	}()

	// 서버 실행하는 고루틴
	go func() {
		err := server.Serve() // 서버 시작, 클라이언트 연결 수신, 서버가 실행 중인 동안 이 고루틴은 종료되지 않음
		if err != nil {       // 서버 실행 중 오류 발생하면 로그 출력하고 프로그램 종료
			log.Fatal(err)
		}
	}()

	// 프로그램 종료될 때까지 대기
	<-done // 메인 함수는 done 채널에서 값을 받을 때까지 대기
	// done 채널은 main() 함수 위쪽에서 정의된 done <- true에 의해 값이 전달된다

	// 신호를 대기하는 고루틴이 done <- true를 보내면 메인 고루틴에서 <-done에 의해 값이 수신되고 프로그램이 종료된다.
	// -> 이 경우 서버를 실행 중인 고루틴(server.Serve())도 함께 강제 종료됨 => 서버는 비정상적으로 종료될 수 있다. => server.Close()
}

부가 설명

1. TCP Listener란?
: 네트워크 소켓을 열어서 특정 포트(예: 1883)에서 클라이언트 연결을 대기하는 역할
-> 서버가 클라이언트와 통신하기 위해서는 반드시 리스너가 필요 [클라이언트 연결 대기, 연결 수락, 연결 관리 역할]

클라이언트가 서버의 특정 포트에 연결하려고 하면, TCP 리스너가 그 요청을 받아들여 서버의 애플리케이션 로직(예: 메시지 처리, 데이터베이스 작업 등)으로 연결을 넘겨준다!

2. Hook이란?
: auth.AllowHook은 인증 절차 없이 모든 클라이언트를 받아들이는 동작 수행, 기본적으로 모든 인증 요청을 허용하는 훅 (=인증 및 권환 확인 절차 pass)

  • 클라이언트의 연결을 무조건 허용하지 않을 경우 -> username, password, tls/ssl 인증서 등으로 클라이언트 인증 후 연결!

3. 코드 내 _란?
: Go 언어에서 _는 빈 식별자(blank identifier)로, 값을 무시하거나 필요 없는 값을 처리할 때 사용
-> server.AddHook() 함수가 반환하는 값을 사용하지 않겠다는 의미


2) Client Code_GoLang

클라이언트 Go 코드

package main

import (
	"fmt"
	"log"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)
// 메시지 수신 핸들러: 주제에 대해 발행한 메시지가 구독자에게 발행된 경우에 실행되는 callback 타입
// 메시지 핸들러가 설정되지 않으면(nil), MQTT 클라이언트는 특정 메시지를 수신할 때 아무런 동작을 수행하지 않는다!
var msgHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("수신한 메시지: 토픽 [%s] - 메시지 [%s]\n", msg.Topic(), msg.Payload())
}
func main() {
	// 발행자 클라이언트 옵션 설정
	publisherOpts := mqtt.NewClientOptions().
		AddBroker("tcp://localhost:1883"). // 브로커 주소
		SetClientID("publisherClient").    // 발행자 클라이언트 ID 설정
		SetUsername("username").           // 사용자 이름 설정 (필요시)
		SetPassword("password")            // 비밀번호 설정 (필요시)

	// 발행자 클라이언트 객체 생성
	publisherClient := mqtt.NewClient(publisherOpts)
	if token := publisherClient.Connect(); token.Wait() && token.Error() != nil { // Token defines the interface for the tokens used to indicate when actions have completed.
		log.Fatalf("발행자 클라이언트 브로커에 연결 실패: %v\n", token.Error())
	}
	fmt.Printf("발행자 %s 가 브로커 [%s]에 연결됨\n", publisherOpts.ClientID, publisherOpts.Servers[0].String())
	// => 발행자 publisherClient 가 브로커 [tcp://localhost:1883]에 연결됨
	// 발행자 객체의 Servers 필드 -> 첫 번째 브로커의 주소 가져와 (여러 브로커 연결된 경우 대비, 배열로 저장) 문자열로 반환
	// 구독자 클라이언트 옵션 설정
	subscriberOpts := mqtt.NewClientOptions().
		AddBroker("tcp://localhost:1883").   // 브로커 주소
		SetClientID("subscriberClient").     // 구독자 클라이언트 ID 설정
		SetDefaultPublishHandler(msgHandler) // 기본 메시지 핸들러 설정

	// 구독자 클라이언트 객체 생성
	subscriberClient := mqtt.NewClient(subscriberOpts)
	if token := subscriberClient.Connect(); token.Wait() && token.Error() != nil {
		log.Fatalf("구독자 브로커에 연결 실패: %v\n", token.Error())
	}
	fmt.Printf("구독자 %s 가 브로커 [%s]에 연결됨\n", subscriberOpts.ClientID, subscriberOpts.Servers[0].String())
	// test/topic 구독
	if token := subscriberClient.Subscribe("test/topic", 0, msgHandler); token.Wait() && token.Error() != nil {
		fmt.Printf("구독 오류: %v\n", token.Error())
	} else {
		fmt.Printf("구독자 %s 가 test/topic 구독 완료\n", subscriberOpts.ClientID)
	}

	// 발행자 클라이언트가 메시지 발행
	message := "Hello, MQTT!"
	token := publisherClient.Publish("test/topic", 0, false, message)
	token.Wait()
	fmt.Printf("발행자 %s 가 메시지 발행: %s\n", publisherOpts.ClientID, message)

	// 3sec 동안 대기하여 메시지 수신 대기
	time.Sleep(3 * time.Second)

	// 250millisec 기다린 후 클라이언트 종료
	subscriberClient.Disconnect(250)
	publisherClient.Disconnect(250)

	fmt.Println("클라이언트 종료됨")
}

부가 설명

1. token이란?
: 클라이언트가 서버와의 비동기 작업(연결, 메시지 발행, 구독 등)의 상태와 결과를 추적하고 관리

  • In the context of the Paho MQTT library,
    a token is a structure used to track the state of an MQTT operation,
    such as a connection, publish, subscribe, or unsubscribe action.

  • It represents an asynchronous operation and
    allows you to track whether the operation has completed,
    whether it has succeeded or failed,
    and retrieve any error messages if the operation failed.

    • In summary, while token may seem like a data type,
      it is more accurately described as a state-tracking mechanism for MQTT operations in the Paho library.
  • Technically, the token in Paho is a struct that implements the Token interface in Go.

  • The Token interface provides methods such as:
    1) Wait(): Blocks the execution until the operation represented by the token is complete.
    2) WaitTimeout(timeout time.Duration): Blocks the execution for a specified time or until the operation is complete.
    3) Error(): Returns the error if the operation failed, otherwise it returns nil.

Since it implements an interface, it's not just a basic data type,
but a mechanism to handle the state of the operation.
It encapsulates the process of checking whether the MQTT operation was successful or encountered any errors.


2. 코드 분석
if token := subscriberClient.Connect(); token.Wait() && token.Error() != nil {
// 1. 객체가 브로커에 연결을 시도 후 결과 나타내는 token 객체 반환
// 2. 연결 완료될 때까지 블로킹하여 대기, 호출 끝나면 연결 시도 끝났음을 의미
// - 성공했는지 실패했는지는 token 객체의 상태 통해 알 수 있다!
// 3. 연결 시도 중 발생한 에러가 nil이 아니라면 연결 시도 실패했음을 의미
log.Fatalf("구독자 브로커에 연결 실패: %v\n", token.Error())
// subscriberClient가 MQTT 브로커에 연결을 시도한 후,
// 만약 연결이 실패하면 프로그램을 종료하고 "구독자 브로커에 연결 실패"라는 에러 메시지와 함께 발생한 에러 내용을 출력
}


3. token의 주요 기능
1) 작업 상태 확인:
token은 클라이언트와 서버 간의 작업(연결, 메시지 발행 등)이 성공적으로 완료되었는지, 아니면 오류가 발생했는지를 확인.

2) Wait() 메서드:
token.Wait()는 해당 작업이 완료될 때까지 기다리는 메서드.
예를 들어, 브로커에 연결이 완료될 때까지 대기하거나 메시지가 발행될 때까지 대기.

3) Error() 메서드:
token.Error()는 작업이 성공적으로 완료되었는지, 아니면 오류가 발생했는지를 확인.
작업이 성공하면 nil을 반환하고, 오류가 발생하면 오류 내용을 반환.


4. Token 인터페이스 - paho.mqtt.golang
: token 자체는 Token 인터페이스의 구현체

-> Wait() bool, WaitTimeout(time.Duration) bool, Done() <-chan struct{}, Error() error 메서드 정의

[구현체]

  • ClientToken은 클라이언트 연결 작업을 추적
  • PublishToken은 메시지 발행 작업을 관리
  • SubscribeToken은 구독 작업을 추적

4. callback function
: 콜백 함수는 전달인자로 다른 함수에 전달되는 함수

[위키백과]
프로그래밍에서 콜백(callback) 또는 콜백 함수(callback function)는 다른 코드의 인수로서 넘겨주는 실행 가능한 코드를 말한다.
콜백을 넘겨받는 코드는 이 콜백을 필요에 따라 즉시 실행할 수도 있고, 아니면 나중에 실행할 수도 있다.

일반적으로 콜백수신 코드로 콜백 코드(함수)를 전달할 때는 콜백 함수의 포인터 (핸들), 서브루틴 또는 람다함수의 형태로 넘겨준다.
콜백수신 코드는 실행하는 동안에 넘겨받은 콜백 코드를 필요에 따라 호출하고 다른 작업을 실행하는 경우도 있다.
다른 방식으로는 콜백수신 코드는 넘겨받은 콜백 함수를 '핸들러'로서 등록하고, 콜백수신 함수의 동작 중 어떠한 반응의 일부로서 나중에 호출할 때 사용할 수도 있다 (비동기 콜백).

  • 콜백은 코드 재사용을 할 때 유용하다.

5. 메시지 핸들러 설정
if token := subscriberClient.Subscribe("test/topic", 0, msgHandler); token.Wait() && token.Error() != nil

  • MQTT 클라이언트에서 메시지 핸들러(MessageHandler)가 nil로 설정된 경우,
    이는 클라이언트가 기본 메시지 처리 함수를 제공하지 않았다는 것을 의미
  • 메시지 핸들러가 설정되지 않으면, MQTT 클라이언트는 특정 메시지를 수신할 때 아무런 동작을 수행하지 않는다.

즉, 구독한 주제에서 메시지가 도착해도, 메시지 핸들러가 없기 때문에 그 메시지를 처리할 로직이 없는 상태가 된다.
이런 경우에는 기본적인 콜백 함수가 호출되지 않고, 클라이언트는 해당 메시지를 무시하게 된다.

메시지를 수신하기 위해서는 메시지 핸들러가 반드시 설정되어야 함

  • 이를 위해 SetDefaultPublishHandler 메서드를 사용하거나, Subscribe 메서드 호출 시 개별 주제마다 메시지 핸들러를 설정해야 한다

추가) SetDefaultPublishHandler 메서드란?
: Eclipse Paho MQTT Go 라이브러리에서 제공하는 메서드로, MQTT 클라이언트가 수신한 메시지를 처리하기 위한 기본 핸들러를 설정하는 데 사용
: 클라이언트가 구독하고 있는 주제로부터 메시지를 수신할 때 호출된다
: 메시지가 수신되었지만, 해당 주제에 대한 별도의 핸들러가 설정되지 않은 경우에 호출된다



1-1 WireShark

QoS=0

  • : 발행자의 연결(1883) -> 구독자의 연결(1883) -> 구독자의 구독(test/topic) -> 발행자의 메시지 발행 -> 브로커의 메시지 발행 (전달 받아) -> 연결 끊기

QoS=1

  • : 발행자의 연결(1883) -> 구독자의 연결(1883) -> 구독자의 구독(test/topic) -> 발행자의 발행 -> 브로커의 확인 -> 브로커의 발행 -> 구독자의 확인 -> 연결 끊기

QoS 다르게

    • : 구독자의 QoS를 1로, 발행자의 QoS를 0으로 -> 구독자의 Requested QoS=1, 구독자의 Granted QoS=1, 발행 시 QoS level=0
    • : 구독자의 QoS를 0로, 발행자의 QoS를 2으로 -> 구독자의 Requested QoS=0, 구독자의 Granted QoS=2, 발행 시 QoS level=0

-> 프로토콜이 mqtt이고, tcp 포트가 1883인 부분만 보고 싶다 -> tcp.port==1883 and mqtt 로 필터 설정
1. MQTT Protocol, Port 1883
2. 발행자와 구독자의 연결 요청 및 확인
3. 구독자의 구독 요청 및 확인
4. 발행자의 메시지 발행
5. 연결 끊기

profile
공부 기록용 24.08.05~ #LLM #RAG

0개의 댓글