IoT 프로젝트_4(실시간 IoT 데이터 스트림 구축: 센서에서 클라우드까지)

김두현·2025년 3월 25일
0

SmartHome_IoT

목록 보기
6/6
post-thumbnail

이제 본격적인 프로젝트 진행에 앞서 가정에 설치할 센서 목록도 정하고 tech stack 도 픽스된 상태이다.

이번 포스팅에서는 우리 프로젝트의 코어 로직인 각 가정에 설치된 센서 데이터를 어떻게 회사 서버 데이터베이스까지 전달하는지? 에 대해 담겨있다.

먼저 센서 데이터가 어떻게 경유해 database까지 전달되는 지 간단하게 요약하면,

💡smarthome 프로젝트의 database는 회사의 private cloud에 위치

Sensor(Edge) → Gateway(Hub) → MQ(Message Queue) → Subscriber→ DB

센서에서 발생한 데이터는 위의 각 노드를 순서대로 경유해 최종 서버 database까지 전달돼 저장될 수 있다. 좀 더 각 노드를 자세히 살펴보면,

  • sensor는 센서 데이터를 생성하는 엣지 노드로, zigbee 기반의 다양한 값을 측정하는 디바이스를 의미
  • Gateway 는 센서와 직접 연결되는 hub 역할로 센서 데이터를 전달
  • MQ 는 gateway에서 전달된 메세지를 Subscriber(application)에게 전달 하는 역할 → MQTT Broker 사용
  • Subscriber은 메세지 큐에서 센서 데이터를 전달 받아 파싱 후 각 센서 타입에 맞게 DB에 삽입 → Python 으로 구현
  • DB 는 최종적으로 데이터가 저장되는 노드
    • InfluxDB: 온도, 습도 등의 센서 데이터 저장
    • MySQL: 사용자 계정, 게이트웨이 메타데이터, Pending 디바이스 정보 등

이제 각 단계에서 어떻게 데이터가 전달되고 저장되는지 자세하게 다뤄보자.


Sensor → Gateway

해당 단계에서는 센서별 제조사에서 제공한 메뉴얼을 통해 센서와 게이트웨이 간 수동으로 연결하는 작업이 필요하다.

프로젝트에서 사용하는 gateway는 Zigbee 프로토콜을 사용하는 센서라면 모두 연결이 가능하며, 센서와 게이트웨이 간 연결이 성공되면 센서는 특정 이벤트가 발생하거나 전송 주기에 맞게 gateway로 데이터를 전송한다.

gateway는 각 가정의 Wi-fi 네트워크와 연결이 필요하며 펌웨어에서 설정된 host로 데이터를 전송할 수 있다.

와이파이로 연결된 게이트웨이 ip 주소를 찾아 브라우저를 통해 접속하면 아래 그림처럼 게이트웨이와 센서 간 구상도를 GUI로 확인 가능하다.


Gateway → MQTT Broker → Subscriber

해당 단계부터는 클라우드 서버 내에 Mosquitto 브로커를 설치가 된 상태에서 진행할 수 있다.

MQTT 관점에서 보면 각 노드를 아래와 같은 역할을 한다.

  • 게이트웨이가 Publisher(게시자),
  • python으로 구현된 프로그램이 Subscriber(구독자) 역할
  • Wi-fi와 연결된 게이트웨이는 host로 가구에서 수집한 데이터를 broker에게 게시

MQTT는 메세지를 발행할 때 topic 이라는 채널 단위로 실행되는데, 센서 데이터를 발행할 때 tele/<gateway_id>/SENSOR 토픽으로 게시하므로 Subscriber는 Broker에 wild card를 이용한 tele/+/SENSOR 토픽을 구독해 여러 게이트웨이에서 발행한 데이터 전부를 수신할 수 있게 구현했다.

이와 같은 작업으로 센서가 게이트웨이로 데이터를 전송하면 게이트웨이는 브로커로 토픽을 발행하고, Subscriber는 해당 토픽을 구독했으므로 센서 데이터를 최종 수신할 수 있다.

💡이제 수신된 센서 데이터는 타입에 따라 파싱을 거치고 DB에 저장되는데 자세한 부분은 다음 단계에서 코드와 같이 설명하겠다.

Subscriber→ DB

본격적으로 MQTT Subscriber 프로그램을 구현할 차례이다. 테스트로 간단한 구독자 프로그램을 구현해 수신한 데이터를 출력하면 다음 그림과 같은 결과를 얻을 수 있다.

코드는 아래와 같은 큰 단계로 나누어 개발했다.

  1. MQTT 클라이언트 초기화 및 연결 설정

    • 파이썬 기반의 paho-mqtt 라이브러리를 사용해 MQTT 클라이언트 생성
    • TLS(SSL 인증서)와 사용자 인증 정보를 설정하여 보안 연결을 구성
    • 지정된 브로커 주소와 포트(예: 8883)로 연결을 시도
    import paho.mqtt.client as mqtt
    
    from datetime import datetime
    import ssl, json, random
    
    # MQTT 초기 설정
    broker_address = "[MQTT HOST]"
    port = 8883
    username = "[MQTT Username]"
    password = "[MQTT Password]"
    sensor_topic = "tele/+/SENSOR"
    state_topic = "tele/+/STATE"
    
    mqtt_client = mqtt.Client(f"client_{ random.randint(0, 100) }")
    mqtt_client.tls_set("~/cert/ca.crt", tls_version=ssl.PROTOCOL_TLSv1_2)
    mqtt_client.tls_insecure_set(True)
    
    mqtt_client.username_pw_set(username, password)
    con_res = mqtt_client.connect(broker_address, port)
    print("------------------------------------------------------------")
    print(f'con_res: {con_res}')
    
    # 여러 토픽을 구독 가능
    mqtt_client.subscribe(sensor_topic, 0)
    mqtt_client.subscribe(state_topic, 0)
    mqtt_client.loop_forever()
  2. 데이터베이스 초기화 및 연결 설정

    • mysql-connector 라이브러리 → 로컬 내 MySQL 초기화 및 연결
    • influxdb_client 라이브러리 → 로컬 내 InfluxDB 초기화 및 연결
      • InfluxDB api를 호출하기 위해서는 사전에 token 발급이 필요
    import influxdb_client
    from influxdb_client.client.write_api import SYNCHRONOUS
    import mysql.connector.pooling
    
    # Init InfluxDB
    token = "[TOKEN]"
    org = "[ORG]"
    url = "http://localhost:8086"
    bucket="smarthome"
    
    write_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
    write_api = write_client.write_api(write_options=SYNCHRONOUS)
    
    # Init MySQL
    dbconfig = {
        "host": "127.0.0.1",
        "user": "[USER]",
        "password": "[PASSWOD]",
        "database": "smarthome",
        "port": 3306,
        "charset" : "utf8"
    }
    conn_pool = mysql.connector.pooling.MySQLConnectionPool(pool_name="mypool",
                                                            pool_size=10,
                                                            **dbconfig)
  1. 이벤트 핸들러 등록

    • MQTT Broker에서 센서 데이터를 수신하기 위해 tele/+/SENSOR 토픽을 구독
      • 추가적으로 tele/+/STATE 토픽을 구독해 주기적으로 게이트웨이 상태 데이터를 수신
    • on_message 메소드 등록 → 브로커로부터 메세지가 도착하면 호출되는 콜백 함수
      • 게이트웨이가 토픽에 메세지를 게시하면 MQTT Broker가 토픽을 구독한 채널로 메세지를 전송
    #  서버 연결시 on_connect 실행
    def on_connect(client, userdata, flags, rc):
        print(f'flags: {flags}')
        print("rc: " + str(rc))
    
    # 브로커에게 메시지가 도착하면 on_message 실행
    def on_message(client, userdata, msg):
        try:
            data = json.loads(str(msg.payload.decode('utf-8')))
        except Exception as es:
            print(f"error: {es}")
            print(f'msg: {msg}')
            return
        ...
    
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message
    mqtt_client.on_subscribe = on_subscribe
  1. 데이터 파싱 및 처리 후 데이터베이스에 저장
💡수신한 데이터는 JSON 형식
  • 수신된 메시지의 토픽을 분리하여 게이트웨이 모델 정보를 추출
    • tele/+/SENSOR (wild card) 토픽을 구독했으므로 슬래시(/)를 기준으로 split하면 게이트웨이 ID 추출 가능
  • 수신한 JSON 데이터 특정 key 여부에 따른 분기 처리
    • ZbReceived key가 존재하면 → 센서가 측정한 값
    • SHTCX key가 존재하면 → 게이트웨이에서 측정한 값(온습도, 이슬점)
  • 분기에 따라 센서 데이터와 게이트웨이 데이터를 point로 변환 후 TSDB에 저장
    • 각각 데이터가 저장되는 Measurement 가 다름
  • 신규 센서(디바이스) 등록 시 해당 정보 데이터베이스 저장 자동화 → 다음 포스팅…?
def write_basic_data_tsdb(gateway_model, gateway_data):
    point = (
                influxdb_client.Point("GatewayData")
                .tag("mac_address" , gateway_model)
                .field("temperature", gateway_data["Temperature"])
                .field("humidity", gateway_data["Humidity"])
                .field("dew_point", gateway_data["DewPoint"])
            )
    write_api.write(bucket=bucket, org=org, record=point)

def write_sensor_data_tsdb(gateway_model, sensor_value):
		...
    point = influxdb_client.Point("SensorData").tag("mac_address" , gateway_model).tag("Device", sensor_value['Device'])
    for key, value in sensor_value.items():
        point.field(key, str(value))
            
    write_api.write(bucket=bucket, org=org, record=point)

def on_message(client, userdata, msg):
		...
		if topic_split[2] == "SENSOR":
        # 게이트웨이 기본 데이터 (온습도)
        if data.__contains__('SHTCX'):
            write_basic_data_tsdb(topic_split[1], data['SHTCX'])
        # Zigbee 센서 데이터
        elif data.__contains__('ZbReceived'):
            sensor_value = data['ZbReceived']
            # 센서 데이터 InfluxDB 저장
            write_sensor_data_tsdb(gateway_model, sensor_value)
		...

이제 nohup 명령어를 사용해 구현한 프로그램을 백그라운드로 서버 내에서 종료되지 않게 계속 실행되게 한 뒤 로그 파일로 리다이렉션하면 세팅 완료이다!!

<nohup python mqtt_clinet.py > mqtt.log &

최종 구상도

내가 구현한 IoT 데이터 스트림 구상도를 간략하게 그려보면 아래와 같이 도식화할 수 있다.

해당 다이어그램은 각 가정의 센서에서 시작해 Gateway를 통해 MQTT Broker로, 그리고 Python 기반의 Consumer에 의해 실시간으로 파싱되어 InfluxDB와 MySQL에 저장되는 데이터 파이프라인을 한눈에 보여준다.

profile
끄적끄적

0개의 댓글