이제 본격적인 프로젝트 진행에 앞서 가정에 설치할 센서 목록도 정하고 tech stack 도 픽스된 상태이다.
이번 포스팅에서는 우리 프로젝트의 코어 로직인 각 가정에 설치된 센서 데이터를 어떻게 회사 서버 데이터베이스까지 전달하는지? 에 대해 담겨있다.
먼저 센서 데이터가 어떻게 경유해 database까지 전달되는 지 간단하게 요약하면,
💡smarthome 프로젝트의 database는 회사의 private cloud에 위치
Sensor(Edge) → Gateway(Hub) → MQ(Message Queue) → Subscriber→ DB
센서에서 발생한 데이터는 위의 각 노드를 순서대로 경유해 최종 서버 database까지 전달돼 저장될 수 있다. 좀 더 각 노드를 자세히 살펴보면,
sensor는 센서 데이터를 생성하는 엣지 노드로, zigbee 기반의 다양한 값을 측정하는 디바이스를 의미Gateway 는 센서와 직접 연결되는 hub 역할로 센서 데이터를 전달MQ 는 gateway에서 전달된 메세지를 Subscriber(application)에게 전달 하는 역할 → MQTT Broker 사용Subscriber은 메세지 큐에서 센서 데이터를 전달 받아 파싱 후 각 센서 타입에 맞게 DB에 삽입 → Python 으로 구현DB 는 최종적으로 데이터가 저장되는 노드이제 각 단계에서 어떻게 데이터가 전달되고 저장되는지 자세하게 다뤄보자.
해당 단계에서는 센서별 제조사에서 제공한 메뉴얼을 통해 센서와 게이트웨이 간 수동으로 연결하는 작업이 필요하다.
프로젝트에서 사용하는 gateway는 Zigbee 프로토콜을 사용하는 센서라면 모두 연결이 가능하며, 센서와 게이트웨이 간 연결이 성공되면 센서는 특정 이벤트가 발생하거나 전송 주기에 맞게 gateway로 데이터를 전송한다.
gateway는 각 가정의 Wi-fi 네트워크와 연결이 필요하며 펌웨어에서 설정된 host로 데이터를 전송할 수 있다.
와이파이로 연결된 게이트웨이 ip 주소를 찾아 브라우저를 통해 접속하면 아래 그림처럼 게이트웨이와 센서 간 구상도를 GUI로 확인 가능하다.
해당 단계부터는 클라우드 서버 내에 Mosquitto 브로커를 설치가 된 상태에서 진행할 수 있다.
MQTT 관점에서 보면 각 노드를 아래와 같은 역할을 한다.
MQTT는 메세지를 발행할 때 topic 이라는 채널 단위로 실행되는데, 센서 데이터를 발행할 때 tele/<gateway_id>/SENSOR 토픽으로 게시하므로 Subscriber는 Broker에 wild card를 이용한 tele/+/SENSOR 토픽을 구독해 여러 게이트웨이에서 발행한 데이터 전부를 수신할 수 있게 구현했다.
이와 같은 작업으로 센서가 게이트웨이로 데이터를 전송하면 게이트웨이는 브로커로 토픽을 발행하고, Subscriber는 해당 토픽을 구독했으므로 센서 데이터를 최종 수신할 수 있다.
💡이제 수신된 센서 데이터는 타입에 따라 파싱을 거치고 DB에 저장되는데 자세한 부분은 다음 단계에서 코드와 같이 설명하겠다.
본격적으로 MQTT Subscriber 프로그램을 구현할 차례이다. 테스트로 간단한 구독자 프로그램을 구현해 수신한 데이터를 출력하면 다음 그림과 같은 결과를 얻을 수 있다.

코드는 아래와 같은 큰 단계로 나누어 개발했다.
MQTT 클라이언트 초기화 및 연결 설정
paho-mqtt 라이브러리를 사용해 MQTT 클라이언트 생성import paho.mqtt.client as mqtt
from datetime import datetime
import ssl, json, random
# MQTT 초기 설정
broker_address = "[MQTT HOST]"
port = 8883
username = "[MQTT Username]"
password = "[MQTT Password]"
sensor_topic = "tele/+/SENSOR"
state_topic = "tele/+/STATE"
mqtt_client = mqtt.Client(f"client_{ random.randint(0, 100) }")
mqtt_client.tls_set("~/cert/ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2)
mqtt_client.tls_insecure_set(True)
mqtt_client.username_pw_set(username, password)
con_res = mqtt_client.connect(broker_address, port)
print("------------------------------------------------------------")
print(f'con_res: {con_res}')
# 여러 토픽을 구독 가능
mqtt_client.subscribe(sensor_topic, 0)
mqtt_client.subscribe(state_topic, 0)
mqtt_client.loop_forever()
데이터베이스 초기화 및 연결 설정
mysql-connector 라이브러리 → 로컬 내 MySQL 초기화 및 연결influxdb_client 라이브러리 → 로컬 내 InfluxDB 초기화 및 연결import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
import mysql.connector.pooling
# Init InfluxDB
token = "[TOKEN]"
org = "[ORG]"
url = "http://localhost:8086"
bucket="smarthome"
write_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
write_api = write_client.write_api(write_options=SYNCHRONOUS)
# Init MySQL
dbconfig = {
"host": "127.0.0.1",
"user": "[USER]",
"password": "[PASSWOD]",
"database": "smarthome",
"port": 3306,
"charset" : "utf8"
}
conn_pool = mysql.connector.pooling.MySQLConnectionPool(pool_name="mypool",
pool_size=10,
**dbconfig)
이벤트 핸들러 등록
tele/+/SENSOR 토픽을 구독tele/+/STATE 토픽을 구독해 주기적으로 게이트웨이 상태 데이터를 수신on_message 메소드 등록 → 브로커로부터 메세지가 도착하면 호출되는 콜백 함수# 서버 연결시 on_connect 실행
def on_connect(client, userdata, flags, rc):
print(f'flags: {flags}')
print("rc: " + str(rc))
# 브로커에게 메시지가 도착하면 on_message 실행
def on_message(client, userdata, msg):
try:
data = json.loads(str(msg.payload.decode('utf-8')))
except Exception as es:
print(f"error: {es}")
print(f'msg: {msg}')
return
...
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.on_subscribe = on_subscribe
💡수신한 데이터는 JSON 형식
tele/+/SENSOR (wild card) 토픽을 구독했으므로 슬래시(/)를 기준으로 split하면 게이트웨이 ID 추출 가능ZbReceived key가 존재하면 → 센서가 측정한 값SHTCX key가 존재하면 → 게이트웨이에서 측정한 값(온습도, 이슬점)point로 변환 후 TSDB에 저장Measurement 가 다름def write_basic_data_tsdb(gateway_model, gateway_data):
point = (
influxdb_client.Point("GatewayData")
.tag("mac_address" , gateway_model)
.field("temperature", gateway_data["Temperature"])
.field("humidity", gateway_data["Humidity"])
.field("dew_point", gateway_data["DewPoint"])
)
write_api.write(bucket=bucket, org=org, record=point)
def write_sensor_data_tsdb(gateway_model, sensor_value):
...
point = influxdb_client.Point("SensorData").tag("mac_address" , gateway_model).tag("Device", sensor_value['Device'])
for key, value in sensor_value.items():
point.field(key, str(value))
write_api.write(bucket=bucket, org=org, record=point)
def on_message(client, userdata, msg):
...
if topic_split[2] == "SENSOR":
# 게이트웨이 기본 데이터 (온습도)
if data.__contains__('SHTCX'):
write_basic_data_tsdb(topic_split[1], data['SHTCX'])
# Zigbee 센서 데이터
elif data.__contains__('ZbReceived'):
sensor_value = data['ZbReceived']
# 센서 데이터 InfluxDB 저장
write_sensor_data_tsdb(gateway_model, sensor_value)
...
이제 nohup 명령어를 사용해 구현한 프로그램을 백그라운드로 서버 내에서 종료되지 않게 계속 실행되게 한 뒤 로그 파일로 리다이렉션하면 세팅 완료이다!!
<nohup python mqtt_clinet.py > mqtt.log &
내가 구현한 IoT 데이터 스트림 구상도를 간략하게 그려보면 아래와 같이 도식화할 수 있다.
해당 다이어그램은 각 가정의 센서에서 시작해 Gateway를 통해 MQTT Broker로, 그리고 Python 기반의 Consumer에 의해 실시간으로 파싱되어 InfluxDB와 MySQL에 저장되는 데이터 파이프라인을 한눈에 보여준다.

