[JAVA] SpringBoot + MQTT-Broker 연동하기

조수현·2023년 10월 12일

MQTT란?

MQTT는 M2M, IOT를 위한 프로토콜로서, 최소한의 전력과 패킷량으로 통신하는 프로토콜이다.

따라서 IOT와 모바일 어플리케이션 등의 통신에 매우 적합한 프로토콜이다.

MQTT-Broker?

MQTT 브로커는 MQTT 프로토콜의 핵심 구성 요소 중 하나로, MQTT 클라이언트 간에 메시지를 라우팅하고 중계한다.

_놀이터에서의 비유로 설명하면,
MQTT 브로커는 놀이터의 관리자와 같다.
놀이터에 있는 다양한 놀이기구에 참가하는 어린이들은 MQTT 클라이언트와 같다.
놀이터 관리자는 어린이들이 원하는 놀이기구로 보내는 역할을 하며, 어린이들이 놀이기구에서 얻는 경험을 공유하도록 한다.
또한, 관리자는 어떤 어린이가 어떤 놀이기구에 참가하고 싶어하는지 관리하고, 어떤 놀이기구에서 어떤 어린이들이 놀고 있는지 추적한다. 이런 식으로 MQTT 브로커는 클라이언트 간의 메시지 교환을 중개하고 관리하는 역할을 한다.

이런 MQTT-Broker를 서버단에서 제어하고 개입할 수 있다.
오늘은 그것을 해보았다.

✅ Setting

의존성 추가

implementation 'org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5'
implementation "org.bouncycastle:bcpkix-jdk15on:1.70"
implementation 'com.google.code.gson:gson:2.9.0'

위에부터 각각 mqtt 객체를 위한 라이브러리,
ssl 인증서 인증을 위한 라이브러리,
json 파싱을 위한 라이브러리다.

🔐 SSL 인증

ssl 인증은 mqtt에 security를 추가한 mqtts를 위해 하는 과정이다.
인증 없이도 사용하는 경우가 있지만
대부분 인증을 사용하기 때문에 인증 과정도 실습해보자.

public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile, final String password) throws Exception {
        Security.addProvider(new BouncyCastleProvider());

        // load CA certificate
        X509Certificate caCert = null;

        FileInputStream fis = new FileInputStream(caCrtFile);
        BufferedInputStream bis = new BufferedInputStream(fis);
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
            // System.out.println(caCert.toString());
        }

        // load client certificate
        bis = new BufferedInputStream(new FileInputStream(crtFile));
        X509Certificate cert = null;
        while (bis.available() > 0) {
            cert = (X509Certificate) cf.generateCertificate(bis);
            // System.out.println(caCert.toString());
        }

        // load client private key
        PEMParser pemParser = new PEMParser(new FileReader(keyFile));
        Object object = pemParser.readObject();
        PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder()
                .build(password.toCharArray());
        JcaPEMKeyConverter converter = new JcaPEMKeyConverter()
                .setProvider("BC");
        KeyPair key;
        if (object instanceof PEMEncryptedKeyPair) {
            System.out.println("Encrypted key - we will use provided password");
            key = converter.getKeyPair(((PEMEncryptedKeyPair) object)
                    .decryptKeyPair(decProv));
        } else {
            System.out.println("Unencrypted key - no password needed");
            key = converter.getKeyPair((PEMKeyPair) object);
        }
        pemParser.close();

        // CA certificate is used to authenticate server
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(caKs);

        // client key and certificates are sent to server so it can authenticate
        // us
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", cert);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
                new java.security.cert.Certificate[] { cert });
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                .getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

        // finally, create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        return context.getSocketFactory();
    }

❗️중요 메서드

MqttClient에는 mqtt를 제어하는 중요 메서드들이 있다.

connect(MqttConnectionOptions option)
disconnect()
subscribe(String topic, int qos)
publish(String topic, MqttMessage message)
setCallback(CustomMqttCallbackService callback)

connect = 연결하기

disconnect = 연결끊기

subscribe = 구독하기(구독한 이후로 메세지를 받음)

publish = 발행하기(구독자에게 메세지를 발행함)

setCallback = 연결되어있는동안 생기는 콜백을 콜백한다.

이 메서드들을 활용해서 자바에서 메세지를 발행하고 받아보자.

💬 메세지 발행

 public String execute(PublishRequest request) throws Exception {
        MqttClient client = new MqttClient("ssl://localhost:1883", request.getClientId());
        client.connect(mqttFacade.getMqttConnectionOptions(caFilePath, clientCrtFilePath, clientKeyFilePath));
        MqttMessage message = new MqttMessage();
        message.setPayload(request.getMessage().getBytes("UTF-8"));
        client.publish(request.getTopic(), message);
        client.disconnect();
        return "Message publish completed! Message: " + request.getMessage() + " to topic: " + request.getTopic() + " with client id: " + request.getClientId() + "!";
}

나는 도커로 ubuntu안에 mqtt-broker를 만들었기때문에 ssl:/localhost:1883 으로 접속하면 됐다.
서버 상황에 따라 조정하자.

또한 caFilePath, clientCrtFilePath,clientKeyFilePath은 개인이 발급받은 ssl 인증서의 주소를 넣어주면 된다.

이렇게 해서 테스트를 해보면

메세지가 잘 발행되었다는 String을 리턴받았다.
또한 MQTTX로 접속해서 해당 milk라는 topic을 구독한 계정을 하나 만들어놓고 보니

내가 발행한 hello는 잘 도착하였다.

📰 구독하기

public String execute(PublishRequest request) throws Exception {
        MqttClient client = new MqttClient("ssl://localhost:1883", request.getClientId());
        client.connect(mqttFacade.getMqttConnectionOptions(caFilePath, clientCrtFilePath, clientKeyFilePath));
        MqttMessage message = new MqttMessage();
        message.setPayload(request.getMessage().getBytes("UTF-8"));
        client.publish(request.getTopic(), message);
        client.disconnect();
        return "Message publish completed! Message: " + request.getMessage() + " to topic: " + request.getTopic() + " with client id: " + request.getClientId() + "!";
    }

이것도 위의 발행처럼 조정하면 된다.
테스트 해보겠다.

잘 구독이 되었다고 return String이 도착했다!

💬 메세지 수신

메세지 수신에 대해서는 자바에서 실시간 통신을 지원하지 않기때문에 어떻게 받을 수 있을지 많이 생각해봤었다.
그래서 내린 결론은
callback 받는 메세지를 로그로 띄우자는 것이었다.
물론 return값으로는 실시간 통신이 되도록 못하지만
로그로는 몇초 이상 기다리게 한 후 그것을 띄우면 되니까
어느정도 메세지를 실시간적(?)으로 확인할 수 있기 때문이다.
따라서 로그는 MqttCallBack을 상속받은CustomCallBackService에서 메서드를 재정의하였다.

@AllArgsConstructor
@Getter
@Slf4j
public class CustomMqttCallbackService implements MqttCallback {

    private MqttClient client;
    private String topic;

    @Override
    public void disconnected(MqttDisconnectResponse disconnectResponse) {

    }

    @Override
    public void mqttErrorOccurred(MqttException exception) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("message arrived");
        log.info("message arrived");
        log.info("topic: " + topic);
        log.info("return 페이로드: " + message.toString());

    }

    @Override
    public void deliveryComplete(IMqttToken token) {

    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {

    }

    @Override
    public void authPacketArrived(int reasonCode, MqttProperties properties) {

    }
}

중간의 messageArrived의 내용을 메세지에 관해 로그를 찍도록 변경해주었다.
테스트를 하니

잘도착했다. ㅎㅎ
끝!

profile
꿈틀꿈틀 지렁이입니다 😎

0개의 댓글