Modbus Heartbeat 구조

Junkyu_Kang·2025년 3월 19일

Modbus 연결만 며칠짼지

데이터는 잘 받아오고 저장도 잘 되는 것을 확인

근데 상태 이상감지 기능을 추가 개발해야하네?

생각해보니 당연한 수순..

고장나거나 이상이 생기면 즉각적으로 알아야하니까

원래는 알고리즘을 돌려 이상탐지를 추측하는 기능이 있었으니

고장이 났다 판단이 되면 Websocket으로 데이터를 넘겨 확인할 수 있게 해주어야 한다.

그럼 Modbus에서 heartbeat 구조는 어떻게 될까?

두개의 방법이 있다.

  1. slave에서 read base로 구현하는 방법
  2. master에서 write base로 구현하는 방법

둘다 각각 장단점을 가지고 있다.

뭐.. 이렇다고 한다.

그래서 지금 내가 할 생각은 각 sensor 마다 id가 있으니 id를 기점으로 구분할 수 있어야하고, 이는 상태확인으로 연결될 수 있어야 한다.

그럼 받는 데이터가 save 되기 전 기능을 하나 추가해서 해당 데이터를 읽을 때 heartbeat가 flase라면? 이상으로 감지하여 알림을 가게 하면 되지않을까?

그럼 WS으로 연결이 가능하게끔 될거라 판단이 들었다.

읽기 기반 간단 구조

from pymodbus.client.sync import ModbusTcpClient
import time

MASTER_IP = "192.168.0.100"  # Slave 장치의 IP
PORT = 502  # Modbus 기본 포트

client = ModbusTcpClient(MASTER_IP, port=PORT)

while True:
    try:
        response = client.read_holding_registers(1, 1)  # 주소 1번 레지스터 읽기
        if response.isError():
            print("Error reading register")
        else:
            print(f"Heartbeat OK, Value: {response.registers[0]}")
    except Exception as e:
        print(f"Modbus Exception: {e}")

    time.sleep(5)  # 5초마다 확인

쓰기 기반 간단 구조

## Mater 구조
from pymodbus.client.sync import ModbusTcpClient
import time

client = ModbusTcpClient("192.168.0.100", port=502)

heartbeat_counter = 0

while True:
    heartbeat_counter += 1  # Heartbeat 카운터 증가
    client.write_register(2, heartbeat_counter)  # 주소 2번 레지스터에 기록
    print(f"Heartbeat Sent: {heartbeat_counter}")
    
    time.sleep(5)  # 5초마다 heartbeat 전송
### slave
from pymodbus.server.sync import StartTcpServer, ModbusSlaveContext, ModbusServerContext
import time

# Modbus 슬레이브의 데이터 저장소
store = ModbusSlaveContext(
    di=None, co=None, hr={2: 0}, ir=None  # 주소 2번 레지스터 초기화
)
context = ModbusServerContext(slaves=store, single=True)

# Heartbeat 감지 함수
last_heartbeat = 0

def check_heartbeat():
    global last_heartbeat
    while True:
        new_value = context[0].getValues(3, 2, 1)[0]  # 주소 2번의 값 읽기
        if new_value != last_heartbeat:
            print(f"Heartbeat Updated: {new_value}")
            last_heartbeat = new_value
        else:
            print("No Heartbeat Update! Possible Master Failure!")

        time.sleep(10)  # 10초마다 확인

# 서버 시작
import threading
t = threading.Thread(target=check_heartbeat)
t.start()

StartTcpServer(context, address=("0.0.0.0", 502))

구현의 간단함만 보자면? 당연히 읽기 방법이 간단하다. slave에서 가져오면 되니까

나도 생각해보면 어쩌피 사업팀에서 보내주는 modbus data만 읽어오면 되니 read만 하면 되긴 한다.

그래도 하는김에 더 보자면

# paste.txt 파일에 추가/수정할 코드

import asyncio
import enum
from rich.console import Console
from rich.live import Live
import random, struct, time
from pymodbus.server import StartAsyncTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
import pandas as pd

# 기존 코드 유지...

# ===== Heartbeat 관련 코드 추가 =====
# Heartbeat 레지스터 주소 설정 (임의의 사용하지 않는 주소 영역)
HEARTBEAT_REGISTER = 9000  # 9000번대 레지스터 사용
HEARTBEAT_TIMEOUT = 15     # 15초 타임아웃

class HeartbeatMonitor:
    def __init__(self, context, register_addr=HEARTBEAT_REGISTER, timeout=HEARTBEAT_TIMEOUT):
        """
        Slave 측 Heartbeat 모니터링 클래스
        
        Args:
            context: Modbus 서버 컨텍스트
            register_addr: Heartbeat 레지스터 주소
            timeout: Heartbeat 타임아웃 시간(초)
        """
        self.context = context
        self.register_addr = register_addr
        self.timeout = timeout
        self.last_heartbeat_time = time.time()
        self.last_heartbeat_value = 0
        self.master_alive = False
        self.logger = console.console
        
    async def monitor(self):
        """
        Master의 heartbeat 신호를 모니터링
        - Heartbeat 레지스터 값의 변화를 모니터링
        - 지정된 시간 내에 값이 변경되지 않으면 Master가 오프라인으로 간주
        """
        self.logger.print("[Heartbeat] 모니터링 시작 (타임아웃: {}초)".format(self.timeout))
        
        while True:
            try:
                # 홀딩 레지스터에서 현재 heartbeat 값 읽기
                current_value = self.context[SlaveId].getValues(
                    FunctionCode.ReadHoldingRegisters.value, 
                    self.register_addr, 
                    1
                )[0]
                current_time = time.time()
                
                # Heartbeat 값이 변경되었는지 확인
                if current_value != self.last_heartbeat_value:
                    if not self.master_alive:
                        self.logger.print("[Heartbeat] Master가 온라인 상태가 되었습니다.")
                        self.master_alive = True
                    
                    self.logger.print(f"[Heartbeat] 수신됨 (값: {current_value})")
                    self.last_heartbeat_value = current_value
                    self.last_heartbeat_time = current_time
                
                # Heartbeat 타임아웃 확인
                elapsed_time = current_time - self.last_heartbeat_time
                if self.master_alive and elapsed_time > self.timeout:
                    self.logger.print(f"[Heartbeat] 타임아웃: {elapsed_time:.1f}초 동안 신호 없음")
                    self.logger.print("[Heartbeat] Master가 오프라인 상태로 간주됩니다.")
                    self.master_alive = False
                    
                    # 오프라인 감지 시 취할 액션 추가
                    await self.on_master_disconnected()
                
                await asyncio.sleep(1)  # 1초마다 확인
            except Exception as e:
                self.logger.print(f"[Heartbeat] 모니터링 오류: {str(e)}")
                await asyncio.sleep(1)
    
    async def on_master_disconnected(self):
        """
        Master 연결 끊김 이벤트 핸들러
        안전 모드로 전환하거나 다른 조치를 취할 수 있음
        """
        # 예: 특정 레지스터에 안전 값 설정
        self.logger.print("[Heartbeat] 안전 모드로 전환합니다.")
        # 필요한 경우 여기에 안전 모드 코드 추가

# 기존 main 함수 수정
async def main():
    store = ModbusSlaveContext(
        di=ModbusSequentialDataBlock(0, [0] * MaxBlockSize),
        co=ModbusSequentialDataBlock(0, [0] * MaxBlockSize),
        hr=ModbusSequentialDataBlock(0, [0] * MaxBlockSize),
        ir=ModbusSequentialDataBlock(0, [0] * MaxBlockSize))
    context = ModbusServerContext(slaves=store, single=True)

    # 초기 Heartbeat 레지스터 값 설정
    context[SlaveId].setValues(FunctionCode.ReadHoldingRegisters.value, HEARTBEAT_REGISTER, [0])
    
    # Heartbeat 모니터 초기화 및 시작
    heartbeat_monitor = HeartbeatMonitor(context)
    monitor_task = asyncio.create_task(heartbeat_monitor.monitor())

    # 기존 서버 및 업데이트 태스크
    server_task = StartAsyncTcpServer(context, address=("localhost", 502))
    update_task = asyncio.create_task(update_context_data22(context))

    # 모든 태스크 실행
    await asyncio.gather(server_task, update_task, monitor_task)

# 기존 코드 유지...

위 같은 형식으로 port 설정이나 주소값 설정을 완료한 후에 heartbeat 모니터링을 시작하는 것을 볼 수 있다.

port는 동일해야하고 address는 겹치지않게 고유값을 정하는게 좋다.

# paste-2.txt에 있는 ModbusAdapter 클래스에 heartbeat 기능 추가

import enum
import struct
from pymodbus.client import ModbusTcpClient
import threading
import time
import logging

from device.adapters.adapter_base import AdapterBase, ConvRule
from device.adapters.enum_types import ResourceTypes

# 기존 코드 유지...

# Heartbeat 상수 정의
HEARTBEAT_REGISTER = 9000  # Slave와 동일한 주소 사용
HEARTBEAT_INTERVAL = 5     # 5초마다 heartbeat 전송

class ModbusAdapter(AdapterBase):
    def __init__(self, rsc_type, name, params, simulate=False, pubsub=False) -> None:
        self._params = params
        self.logger = logging.getLogger(name)
        
        # Heartbeat 관련 설정 추가
        self.heartbeat_register = params.get('_heartbeat_register', HEARTBEAT_REGISTER)
        self.heartbeat_interval = params.get('_heartbeat_interval', HEARTBEAT_INTERVAL)
        self.heartbeat_counter = 0
        self.heartbeat_running = False
        self.heartbeat_thread = None
        
        super().__init__(rsc_type, name, params, simulate=simulate, pubsub=pubsub)

    @property
    def info(self):
        return self.params

    def prepare(self):
        return super().prepare()

    def connect_device(self, **params) -> bool:
        connected = True
        self.client = ModbusTcpClient(**params)
        if not self.client.connect():
            # raise ConnectionError("Unable to connect to Modbus TCP server")
            connected = False
        else:
            # Heartbeat 시작
            self.start_heartbeat()
            
        return connected

    def disconnect_device(self):
        # Heartbeat 중지
        self.stop_heartbeat()
        self.client.close()

    def bind_event_handler(self):
        pass
    
    @property
    def start_address(self):
        return min(self.sensor_addresses) - (HoldingRegisterStartAddr + int(ZeroBased))
    
    @property
    def number_of_sensors(self) -> int:
        return (max(self.sensor_addresses) - (HoldingRegisterStartAddr + int(ZeroBased)) - self.start_address) + self.sensor_count

    def read_data_from_device(self, sensors: list) -> list:
        # 기존 코드 유지...
        # 이 부분은 변경 없음
        # ...
        
    def convert_values(self, rules: dict, sensor_data: list) -> dict:
        # 기존 코드 유지...
        # 이 부분은 변경 없음
        # ...
    
    # ===== Heartbeat 관련 메서드 추가 =====
    def start_heartbeat(self):
        """Heartbeat 전송 스레드 시작"""
        if not self.heartbeat_running:
            self.heartbeat_running = True
            self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
            self.heartbeat_thread.start()
            self.logger.info(f"Heartbeat 시작 (간격: {self.heartbeat_interval}초, 레지스터: {self.heartbeat_register})")
    
    def stop_heartbeat(self):
        """Heartbeat 전송 중지"""
        if self.heartbeat_running:
            self.heartbeat_running = False
            if self.heartbeat_thread:
                self.heartbeat_thread.join(timeout=2)
            self.logger.info("Heartbeat 중지")
    
    def _heartbeat_loop(self):
        """Heartbeat 전송 루프"""
        while self.heartbeat_running:
            try:
                self.send_heartbeat()
                time.sleep(self.heartbeat_interval)
            except Exception as e:
                self.logger.error(f"Heartbeat 전송 오류: {str(e)}")
                time.sleep(self.heartbeat_interval)
    
    def send_heartbeat(self):
        """Heartbeat 신호 전송 (카운터 증가 방식)"""
        try:
            self.heartbeat_counter = (self.heartbeat_counter + 1) % 65536  # 16비트 값 범위 내에서 순환
            result = self.client.write_register(self.heartbeat_register, self.heartbeat_counter)
            
            if hasattr(result, 'isError') and result.isError():
                self.logger.error(f"Heartbeat 신호 전송 실패: {result}")
                return False
                
            self.logger.debug(f"Heartbeat 신호 전송 성공 (값: {self.heartbeat_counter})")
            return True
        except Exception as e:
            self.logger.error(f"Heartbeat 전송 중 예외 발생: {str(e)}")
            return False

    # 기존 dispose 메서드 수정
    def dispose(self):
        self.stop_heartbeat()
        super().dispose()

# 기존 코드 유지...

마스터 쪽에서 data를 보낸다 생각할 때도 위 내용을 사업팀에서 진행해주어야 한다.

나는 주소값에 맞춰 데이터를 받아오고 그 내용에 대한 모니터링만 구현하면 되는 것

{
  "load": {
    "meter_name": {
      "ModbusAdapter": {
        "host": "localhost",
        "port": 502,
        "_cycletime": 1,
        "_capture": true,
        "_heartbeat_register": 9000,  // Heartbeat 레지스터 주소
        "_heartbeat_interval": 5      // Heartbeat 전송 간격(초)
      }
    }
  }
}

위 처럼 config에 설정 정보만 제대로 넣고 실행한다면 문제는 없을 것으로 예상..

일단 회사 코드를 올릴 순 없으니

enabler를 만들어 테스트 해본 결과 모니터링은 잘 된다.

실제 데이터 연동해보고 다시 후기 올리겠다..

profile
강준규

0개의 댓글