라즈베리파이에서 입력받은 센서값을 서버에 전송해야만 했다. 맘같아선 라즈베리파이에서 바로 http 통신을 구현하고 싶었지만 mqtt 통신이 전력소모도 낮고 좀 더 가볍다는 장점이 있어서 사용해보기로했다.
그림을 보면 Broker라는 매개체를 통해 센서와 다른 장비들이 통신하는 것을 확인할 수 있다. 센서가 broker에게 데이터를 전송하면 다른 장비들이 broker를 통해 구독(subscribe)해서 센서의 데이터를 받아오는 구조이다. 뉴스에 나오는 브로커는 안좋은 의미로 쓰이는데 여기서는 그런 것 같진 않다. 이 때 센서가 broker에게 데이터를 전송하는 과정을 publish, 다른 장비들이 broker에게 데이터를 요구하는 과정을 subscribe라고 한다. publish 과정은 라즈베리파이 쪽에서 파이썬으로 처리할 것이고, 스프링부트로 구현한 백엔드 쪽에서는 subscribe만 구현하면 된다.
메시지를 publish-subscribe 하는 행위는 채널 단위로 일어난다. 이를 MQTT에서는 토픽이라고 부르고, 토픽은 슬래시(/)로 구분되는 계층 구조를 갖는다.
mqtt의 좀 더 자세한 설명은 mqtt 설명블로그를 참고하자.
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-mqtt'
maven을 사용하는 사람들은 레퍼런스를 참고하자.
@Configuration
public class MqttConfig {
private static final String BROKER_URL = "tcp://{여러분의 ip}:{포트번호, 보통 1336}";
private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId();
private static final String TOPIC_FILTER = "parking_lot/0/#";
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inboundChannel() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(BROKER_URL, MQTT_CLIENT_ID, TOPIC_FILTER);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler inboundMessageHandler() {
return message -> {
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
System.out.println("Topic:" + topic);
System.out.println("Payload" + message.getPayload());
};
}
}
초반에 브로커(mosquitto)가 설치되어 있는 AWS EC2에 보안설정을 해주지 않아 브로커에 접속이 불가능했다. message.getPayload()를 통해 정상적으로 json 형태로 입력받았다. 추후에 이 json 타입의 데이터를 db에 저장하는 과정을 구현해야한다.
유명 서비스기업에서도 종종 mqtt를 도입하는 경우도 있다고 하니 그 이유에 대해서 좀 더 고민해봐야겠다.