Spring boot로 mqtt subscribe 구현하기

xeonu·2022년 7월 28일
0

spring boot

목록 보기
1/2

mqtt를 사용한 이유

라즈베리파이에서 입력받은 센서값을 서버에 전송해야만 했다. 맘같아선 라즈베리파이에서 바로 http 통신을 구현하고 싶었지만 mqtt 통신이 전력소모도 낮고 좀 더 가볍다는 장점이 있어서 사용해보기로했다.


mqtt의 간단한 통신원리

publish와 subscribe


그림을 보면 Broker라는 매개체를 통해 센서와 다른 장비들이 통신하는 것을 확인할 수 있다. 센서가 broker에게 데이터를 전송하면 다른 장비들이 broker를 통해 구독(subscribe)해서 센서의 데이터를 받아오는 구조이다. 뉴스에 나오는 브로커는 안좋은 의미로 쓰이는데 여기서는 그런 것 같진 않다. 이 때 센서가 broker에게 데이터를 전송하는 과정을 publish, 다른 장비들이 broker에게 데이터를 요구하는 과정을 subscribe라고 한다. publish 과정은 라즈베리파이 쪽에서 파이썬으로 처리할 것이고, 스프링부트로 구현한 백엔드 쪽에서는 subscribe만 구현하면 된다.

Topic을 통한 통신

메시지를 publish-subscribe 하는 행위는 채널 단위로 일어난다. 이를 MQTT에서는 토픽이라고 부르고, 토픽은 슬래시(/)로 구분되는 계층 구조를 갖는다.
mqtt의 좀 더 자세한 설명은 mqtt 설명블로그를 참고하자.


스프링 부트로 subscribe 구현

build.gradle에 의존성 주입

implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-mqtt'

maven을 사용하는 사람들은 레퍼런스를 참고하자.

MqttConfig 클래스 생성

@Configuration
public class MqttConfig {

    private static final String BROKER_URL = "tcp://{여러분의 ip}:{포트번호, 보통 1336}";
    private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
    private static final String TOPIC_FILTER = "parking_lot/0/#";

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inboundChannel() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(BROKER_URL, MQTT_CLIENT_ID, TOPIC_FILTER);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler inboundMessageHandler() {
        return message -> {
            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
            System.out.println("Topic:" + topic);
            System.out.println("Payload" + message.getPayload());
        };
    }
}

초반에 브로커(mosquitto)가 설치되어 있는 AWS EC2에 보안설정을 해주지 않아 브로커에 접속이 불가능했다. message.getPayload()를 통해 정상적으로 json 형태로 입력받았다. 추후에 이 json 타입의 데이터를 db에 저장하는 과정을 구현해야한다.


유명 서비스기업에서도 종종 mqtt를 도입하는 경우도 있다고 하니 그 이유에 대해서 좀 더 고민해봐야겠다.

profile
백엔드 개발자가 되기위한 여정

0개의 댓글