[Spring Boot] MQTT를 이용한 외부 API로의 데이터 송수신

윤진원·2023년 5월 17일
1

Spring Boot

목록 보기
3/6
post-thumbnail

❗️문제 상황

현재 필자는 대학에서 Smart Green Campus라는 프로젝트를 진행중이다.

Smart Green Campus란?

  • 건축공학과, 컴퓨터공학과, 정보통신공학과가 협업하는 프로젝트로, 진행 과정은 다음과 같다.
  1. 건축공학과에서 학교의 일사량, 습도, 온도 등을 측정하여 Influx DB에 저장한다.
  2. 컴퓨터공학과에서 DB의 데이터를 가져와서 MQTT를 이용하여 정보통신공학과에 Publish!
  3. 정보통신공학과는 Broker, Subscriber를 구현하여 데이터를 수신받고, API 서버팀에 전송한다.
  4. API 서버팀은 최종 DB에 저장하고, FE팀이 웹 화면에 뿌려준다.

위의 과정에서 나는 컴퓨터공학과 측에서 송신하는 센싱값들을 MQTT를 이용해서 받고, API 서버팀에 보내주는 역할을 하고있다!


이 과정에서 나와 비슷한 사례들이 많이 있지 않아서 구현하는 데 쉽지 않았고, 그에 따른 경험을 공유하려 한다.

우선 MQTT가 뭔지 부터 알아보자.


💡문제 해결

MQTT란?

MQTT에 대한 설명은 Github에 포스팅한 자료가 있어 대체합니다!


Spring에서 MQTT 구현하기

  1. 우선, MQTT 의존성을 추가해주자.
  • build.gradle

    • MQTT 라이브러리를 사용하기 위한 의존성 + 외부 API를 호출하기 위한 webflux
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.springframework.boot:spring-boot-starter-webflux'

  1. MQTT Subscriber
  • MqttSubscriberService.java
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MqttSubscriberService implements MqttCallback {

    private MqttClient mqttClient;
    private MqttConnectOptions mqttOptions;

    @Autowired
    private WebClientService webClientService;

    public MqttSubscriberService init(final String server, final String clientId) throws MqttException {

        mqttOptions = new MqttConnectOptions();
        mqttOptions.setCleanSession(true);
        mqttOptions.setKeepAliveInterval(30);
        mqttClient = new MqttClient(server, clientId);
        mqttClient.setCallback(this);
        mqttClient.connect(mqttOptions);

        return this;
    }

    // 커넥션이 종료되면 호출 - 통신 오류로 연결이 끊어지는 경우 호출
    @Override
    public void connectionLost(final Throwable cause) {
        System.out.println("연결이 중단되었습니다.");
        System.out.println(cause);
    }

    // 메시지가 도착하면 호출
    @Override
    public void messageArrived(final String topic, final MqttMessage message) throws Exception {
        System.out.println("메시지도착");
        System.out.println(message);
        System.out.println("topic = " + topic + ", id = " + message.getId() + ", payload = " + new String(message.getPayload()));
        webClientService.post(message); // 외부 POST Api 호출
    }

    // 구독 신청
    public boolean subscribe(final String topic) throws MqttException {

        if (topic != null) {
            mqttClient.subscribe(topic, 0);
        }

        return true;
    }

    // 메시지의 배달이 완료되면 호출
    @Override
    public void deliveryComplete(final IMqttDeliveryToken token) {
    }
}

친절하게도 MQTT 라이브러리에서 메시지가 도착하면, 커넥션이 종료되면 등등 여러 메서드를 제공해줘서 구현이 그리 어렵지는 않다!

다른 설명들은 주석을 달아놓았다.


  1. 외부 API로 데이터를 송신(POST)하는 클래스
  • WebClientService.java
import io.wisoft.mqtt.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;

import static org.springframework.http.HttpHeaders.CONTENT_TYPE;
import static org.springframework.http.MediaType.*;

@Service
public class WebClientService {

    MqttConfig mqttConfig = new MqttConfig();

    public void requestToApiServer(final MqttMessage message) {

        Map<String, Object> bodyMap = extractdata(message);

        final WebClient webClient = WebClient
                .builder()
                .baseUrl(mqttConfig.baseUrl)
                .defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
                .build();

        final Map<String, Object> response = (Map<String, Object>) webClient
                .post()
                .uri(mqttConfig.apiUrl)
                .bodyValue(bodyMap)
                .retrieve()
                .bodyToMono(Map.class)
                .block();
    }

    public static void post(final MqttMessage message) {
        final WebClientService webClientService = new WebClientService();
        webClientService.requestToApiServer(message);
    }

    private Map<String, Object> extractdata(final MqttMessage message) {

        final Map<String, Object> bodyMap = new HashMap<>();

        final String str = new String(message.getPayload());
        final StringTokenizer st = new StringTokenizer(str, "=,");

        st.nextToken();
        final Long id = Long.valueOf(st.nextToken());

        st.nextToken();
        final Double number = Double.valueOf(st.nextToken());

        st.nextToken();
        final String measurement = st.nextToken();

        bodyMap.put("memberId", id);
        bodyMap.put("value", number);
        bodyMap.put("measurement", measurement);

        return bodyMap;
    }
}

현재 컴퓨터공학과에서 보내주는 센싱값의 포맷은 다음과 같다.

"memberId=사용자 아이디, measurement=데이터 종류, value=센싱값"

따라서, StringTokenizer 를 이용해서 =, 기준으로 나누었고, 각각 WebClient를 사용하여 body에 실어서 송신하는 메서드를 구현하였다.

단, API 서버 측의 baseurl와 API uri은 비공개 처리하였다.

사용하는 외부 API의 uri와, 서버 url에 따라 유동적으로 넣어주면 된다.


이제 의문이 들 수 있을 것이다.
Publisher는? Broker는 뭘 사용한거야? 데이터를 Publisher로 해서 임의로 보내고 받아보는 과정은 없는데?

지금부터 알아보자!


Mosquitto를 사용하여 Publisher, Broker 대체하기

  1. Mosquitto를 설치해준다.
  • 필자는 mac을 사용중이므로, 글 작성은 mac 기준으로 포스팅한다.
brew install mosquitto

  1. 일반적으로 설치했다면 다음과 같은 경로에 설치되었을 것이다. 다음 명령어로 mosquitto를 실행해준다.
/opt/homebrew/opt/mosquitto/sbin/mosquitto -c /opt/homebrew/etc/mosquitto/mosquitto.conf

  1. 터미널 탭을 하나 더 연 후, 다음 명령어로 MQTT Subscriber를 실행시켜준다.
mosquitto_sub -v -t '$SYS/broker/clients/connected'

  1. 터미널 탭을 하나 더 연 후, 다음 명령어로 MQTT Publisher를 실행시켜주고 메시지를 보내본다.
  • 서버 주소, Topic, 메시지 등은 사용자의 용도에 맞게 바꾸어주면 된다.
mosquitto_pub -h [서버 URL] -h 127.0.0.1 -p 1883 -t [토픽] -m [송신할 메시지]

  1. 애플리케이션 서버를 실행하고, Publisher로 메시지를 보내고 결과를 확인한다.
  • 필자의 애플리케이션 콘솔

  • 호출한 API 서버팀의 콘솔


Publisher로부터 메시지가 도착할 때마다 정상적으로 외부 API가 호출되고, 호출된 서버에서도 정상적으로 쿼리가 날아가여 데이터베이스에 저장되는 모습을 확인할 수 있다!


✅ 추가 정보

혹시라도 필자처럼 오류가 나는 사람을 위해 내용을 추가한다.

애플리케이션을 실행시켰더니, 오류가 정말 많이 뜬다!

Caused by: java.lang.UnsatisfiedLinkError: failed to load the required native library

Caused by: java.lang.UnsatisfiedLinkError: could not load a native library: netty_resolver_dns_native_macos_aarch_64
Caused by: java.io.FileNotFoundException: META-INF/native/libnetty_resolver_dns_native_macos.jnilib
Caused by: java.io.FileNotFoundException: META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib

위와 같은 오류는 WebFlux를 한 번이라도 사용해보았다면 알 수 있을 것이다.
해결방법은 다음과 같다.

  • build.gradle에 다음과 같은 의존성을 추가한다.
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.79.Final:osx-aarch_64'

👍🏻 결론

경량 메시지 프로토콜인 MQTT를 이용해서 협업을 하고, 프로젝트를 진행해보자! 그리 어렵지 않다.

profile
기억보단 기록을

1개의 댓글

comment-user-thumbnail
2023년 5월 17일

의존성 주입합시다.

답글 달기