현재 필자는 대학에서 Smart Green Campus라는 프로젝트를 진행중이다.
Smart Green Campus란?
- 건축공학과, 컴퓨터공학과, 정보통신공학과가 협업하는 프로젝트로, 진행 과정은 다음과 같다.
- 건축공학과에서 학교의 일사량, 습도, 온도 등을 측정하여 Influx DB에 저장한다.
- 컴퓨터공학과에서 DB의 데이터를 가져와서 MQTT를 이용하여 정보통신공학과에 Publish!
- 정보통신공학과는 Broker, Subscriber를 구현하여 데이터를 수신받고, API 서버팀에 전송한다.
- API 서버팀은 최종 DB에 저장하고, FE팀이 웹 화면에 뿌려준다.
위의 과정에서 나는 컴퓨터공학과 측에서 송신하는 센싱값들을 MQTT를 이용해서 받고, API 서버팀에 보내주는 역할을 하고있다!
이 과정에서 나와 비슷한 사례들이 많이 있지 않아서 구현하는 데 쉽지 않았고, 그에 따른 경험을 공유하려 한다.
우선 MQTT가 뭔지 부터 알아보자.
MQTT에 대한 설명은 Github에 포스팅한 자료가 있어 대체합니다!
build.gradle
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
MqttSubscriberService.java
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MqttSubscriberService implements MqttCallback {
private MqttClient mqttClient;
private MqttConnectOptions mqttOptions;
@Autowired
private WebClientService webClientService;
public MqttSubscriberService init(final String server, final String clientId) throws MqttException {
mqttOptions = new MqttConnectOptions();
mqttOptions.setCleanSession(true);
mqttOptions.setKeepAliveInterval(30);
mqttClient = new MqttClient(server, clientId);
mqttClient.setCallback(this);
mqttClient.connect(mqttOptions);
return this;
}
// 커넥션이 종료되면 호출 - 통신 오류로 연결이 끊어지는 경우 호출
@Override
public void connectionLost(final Throwable cause) {
System.out.println("연결이 중단되었습니다.");
System.out.println(cause);
}
// 메시지가 도착하면 호출
@Override
public void messageArrived(final String topic, final MqttMessage message) throws Exception {
System.out.println("메시지도착");
System.out.println(message);
System.out.println("topic = " + topic + ", id = " + message.getId() + ", payload = " + new String(message.getPayload()));
webClientService.post(message); // 외부 POST Api 호출
}
// 구독 신청
public boolean subscribe(final String topic) throws MqttException {
if (topic != null) {
mqttClient.subscribe(topic, 0);
}
return true;
}
// 메시지의 배달이 완료되면 호출
@Override
public void deliveryComplete(final IMqttDeliveryToken token) {
}
}
친절하게도 MQTT 라이브러리에서 메시지가 도착하면, 커넥션이 종료되면 등등 여러 메서드를 제공해줘서 구현이 그리 어렵지는 않다!
다른 설명들은 주석을 달아놓았다.
POST
)하는 클래스WebClientService.java
import io.wisoft.mqtt.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import static org.springframework.http.HttpHeaders.CONTENT_TYPE;
import static org.springframework.http.MediaType.*;
@Service
public class WebClientService {
MqttConfig mqttConfig = new MqttConfig();
public void requestToApiServer(final MqttMessage message) {
Map<String, Object> bodyMap = extractdata(message);
final WebClient webClient = WebClient
.builder()
.baseUrl(mqttConfig.baseUrl)
.defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.build();
final Map<String, Object> response = (Map<String, Object>) webClient
.post()
.uri(mqttConfig.apiUrl)
.bodyValue(bodyMap)
.retrieve()
.bodyToMono(Map.class)
.block();
}
public static void post(final MqttMessage message) {
final WebClientService webClientService = new WebClientService();
webClientService.requestToApiServer(message);
}
private Map<String, Object> extractdata(final MqttMessage message) {
final Map<String, Object> bodyMap = new HashMap<>();
final String str = new String(message.getPayload());
final StringTokenizer st = new StringTokenizer(str, "=,");
st.nextToken();
final Long id = Long.valueOf(st.nextToken());
st.nextToken();
final Double number = Double.valueOf(st.nextToken());
st.nextToken();
final String measurement = st.nextToken();
bodyMap.put("memberId", id);
bodyMap.put("value", number);
bodyMap.put("measurement", measurement);
return bodyMap;
}
}
현재 컴퓨터공학과에서 보내주는 센싱값의 포맷은 다음과 같다.
"memberId=사용자 아이디, measurement=데이터 종류, value=센싱값"
따라서, StringTokenizer
를 이용해서 =
와 ,
기준으로 나누었고, 각각 WebClient를 사용하여 body에 실어서 송신하는 메서드를 구현하였다.
단, API 서버 측의 baseurl와 API uri은 비공개 처리하였다.
사용하는 외부 API의 uri와, 서버 url에 따라 유동적으로 넣어주면 된다.
이제 의문이 들 수 있을 것이다.
Publisher는? Broker는 뭘 사용한거야? 데이터를 Publisher로 해서 임의로 보내고 받아보는 과정은 없는데?
지금부터 알아보자!
brew install mosquitto
/opt/homebrew/opt/mosquitto/sbin/mosquitto -c /opt/homebrew/etc/mosquitto/mosquitto.conf
mosquitto_sub -v -t '$SYS/broker/clients/connected'
mosquitto_pub -h [서버 URL] -h 127.0.0.1 -p 1883 -t [토픽] -m [송신할 메시지]
필자의 애플리케이션 콘솔
호출한 API 서버팀의 콘솔
Publisher로부터 메시지가 도착할 때마다 정상적으로 외부 API가 호출되고, 호출된 서버에서도 정상적으로 쿼리가 날아가여 데이터베이스에 저장되는 모습을 확인할 수 있다!
혹시라도 필자처럼 오류가 나는 사람을 위해 내용을 추가한다.
애플리케이션을 실행시켰더니, 오류가 정말 많이 뜬다!
Caused by: java.lang.UnsatisfiedLinkError: failed to load the required native library
Caused by: java.lang.UnsatisfiedLinkError: could not load a native library: netty_resolver_dns_native_macos_aarch_64
Caused by: java.io.FileNotFoundException: META-INF/native/libnetty_resolver_dns_native_macos.jnilib
Caused by: java.io.FileNotFoundException: META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib
위와 같은 오류는 WebFlux를 한 번이라도 사용해보았다면 알 수 있을 것이다.
해결방법은 다음과 같다.
build.gradle
에 다음과 같은 의존성을 추가한다.implementation 'io.netty:netty-resolver-dns-native-macos:4.1.79.Final:osx-aarch_64'
경량 메시지 프로토콜인 MQTT를 이용해서 협업을 하고, 프로젝트를 진행해보자! 그리 어렵지 않다.
의존성 주입합시다.