[MQTT] Spring Boot MQTT Subscriber (with Raspberry PI)

주재완·2025년 1월 29일
0

MQTT

목록 보기
2/2
post-thumbnail

Spring Boot 에서 데이터 수집

백엔드로 센서 데이터를 보내서 처리를 할 필요가 있어 Spring Boot로 보내는 작업에 대해 작성합니다.

의존성

plugins {
	java
	id("org.springframework.boot") version "3.4.2"
	id("io.spring.dependency-management") version "1.1.7"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"

java {
	toolchain {
		languageVersion = JavaLanguageVersion.of(17)
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation("org.springframework.boot:spring-boot-starter-web")
	testImplementation("org.springframework.boot:spring-boot-starter-test")
	testRuntimeOnly("org.junit.platform:junit-platform-launcher")
	implementation("org.springframework.boot:spring-boot-starter-integration")
	implementation("org.springframework.integration:spring-integration-mqtt")
}

tasks.withType<Test> {
	useJUnitPlatform()
}

build.gradle 입니다. 모바일 환경을 고려하고 있어 우선 Kotlin 형식으로 받기는 했지만, 다른 방식도 크게 다르지는 않습니다. 다만, 기존 Spring Web 이외에 다른 의존성이 몇가지 필요합니다.

  • spring-boot-starter-integration : HTTP, TCP 등과 같은 전송 로직을 추상화하는 역할을 합니다.
  • spring-integration-mqtt : 이름 그대로 MQTT를 사용할 수 있습니다.

패키지 구조

실제 프로젝트를 진행하면 이것보다 훨씬 복잡한 구조를 가지겠지만, 현재는 메세지 테스트만을 진행하므로 간단하게 구성하였습니다.

src
 ├── main
 │   ├── java/com/example/demo
 │   │   ├── config
 │   │   │   ├── MqttConfig.java  <-- MQTT 설정 파일
 │   │   ├── handler
 │   │   │   ├── MqttMessageHandler.java  <-- MQTT 메시지 처리 핸들러
 │   │   ├── DemoApplication.java  <-- Spring Boot 메인 애플리케이션
 │   ├── resources
 │   │   ├── application.properties  <-- MQTT 브로커 설정

MqttConfig

  • 현재 서버로 데이터가 들어오는 것을 확인하는 것이기에 inboundChannel로 구성
  • 공식 문서 등에 여러 구현체들이 나와 있어 해당 문서를 참고하면서 구현하는 것이 좋습니다.
package com.example.demo.config;

import com.example.demo.handler.MqttMessageSubscriber;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker.url}")
    private String brokerUrl;

    @Value("${mqtt.client.id}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String topic;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() { // MQTT 클라이언트 관련 설정
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducer inboundChannel(
            MqttPahoClientFactory mqttClientFactory,
            MessageChannel mqttInputChannel
    ) { // inboundChannel 어댑터
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                brokerUrl,
                clientId,
                mqttClientFactory,
                topic
        );
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel);
        return adapter;
    }

    @Bean
    public MessageChannel mqttInputChannel() { // MQTT 구독 채널 생성
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel") // MQTT 구독 핸들러
    public MessageHandler inboundMessageHandler() {
        return new MqttMessageSubscriber();
    }
}

MqttMessageSubscriber

  • 메세지 핸들러를 받아 단순히 받은 메세지를 출력
package com.example.demo.handler;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageSubscriber implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("Received MQTT message: " + message.getPayload());
    }
}

application.properties

mqtt.broker.url = tcp://[브로커 IP 주소]:1883
mqtt.client.id = test-client
mqtt.topic = floor/room/temp
mqtt.username = [유저 이름]
mqtt.password = [비밀 번호]

Test

우선 Raspberry PI에 mosquitto 설정을 마쳤으면 다음과 같이 실행해줍니다. Raspberry PI가 브로커의 역할을 합니다.

Raspberry PI

브로커 설정

cd /etc/mosquitto # 경로 변경
sudo /etc/init.d/mosquitto stop # 실행 중인 브로커 중지
sudo mosquitto -c mosquitto.conf -v # 설정 파일에 맞게 브로커 실행

Publisher

mosquitto_pub -h [브로커의 IP 주소] -t floor/room/temp -m "Hello Spring Boot!"

Spring Boot

console

Received MQTT message: Hello Spring Boot!

참고

profile
언제나 탐구하고 공부하는 개발자, 주재완입니다.

0개의 댓글

관련 채용 정보