Modbus 연결만 며칠짼지
데이터는 잘 받아오고 저장도 잘 되는 것을 확인
근데 상태 이상감지 기능을 추가 개발해야하네?
생각해보니 당연한 수순..
고장나거나 이상이 생기면 즉각적으로 알아야하니까
원래는 알고리즘을 돌려 이상탐지를 추측하는 기능이 있었으니
고장이 났다 판단이 되면 Websocket으로 데이터를 넘겨 확인할 수 있게 해주어야 한다.
그럼 Modbus에서 heartbeat 구조는 어떻게 될까?
두개의 방법이 있다.
둘다 각각 장단점을 가지고 있다.

뭐.. 이렇다고 한다.
그래서 지금 내가 할 생각은 각 sensor 마다 id가 있으니 id를 기점으로 구분할 수 있어야하고, 이는 상태확인으로 연결될 수 있어야 한다.
그럼 받는 데이터가 save 되기 전 기능을 하나 추가해서 해당 데이터를 읽을 때 heartbeat가 flase라면? 이상으로 감지하여 알림을 가게 하면 되지않을까?
그럼 WS으로 연결이 가능하게끔 될거라 판단이 들었다.
읽기 기반 간단 구조
from pymodbus.client.sync import ModbusTcpClient
import time
MASTER_IP = "192.168.0.100" # Slave 장치의 IP
PORT = 502 # Modbus 기본 포트
client = ModbusTcpClient(MASTER_IP, port=PORT)
while True:
try:
response = client.read_holding_registers(1, 1) # 주소 1번 레지스터 읽기
if response.isError():
print("Error reading register")
else:
print(f"Heartbeat OK, Value: {response.registers[0]}")
except Exception as e:
print(f"Modbus Exception: {e}")
time.sleep(5) # 5초마다 확인
쓰기 기반 간단 구조
## Mater 구조
from pymodbus.client.sync import ModbusTcpClient
import time
client = ModbusTcpClient("192.168.0.100", port=502)
heartbeat_counter = 0
while True:
heartbeat_counter += 1 # Heartbeat 카운터 증가
client.write_register(2, heartbeat_counter) # 주소 2번 레지스터에 기록
print(f"Heartbeat Sent: {heartbeat_counter}")
time.sleep(5) # 5초마다 heartbeat 전송
### slave
from pymodbus.server.sync import StartTcpServer, ModbusSlaveContext, ModbusServerContext
import time
# Modbus 슬레이브의 데이터 저장소
store = ModbusSlaveContext(
di=None, co=None, hr={2: 0}, ir=None # 주소 2번 레지스터 초기화
)
context = ModbusServerContext(slaves=store, single=True)
# Heartbeat 감지 함수
last_heartbeat = 0
def check_heartbeat():
global last_heartbeat
while True:
new_value = context[0].getValues(3, 2, 1)[0] # 주소 2번의 값 읽기
if new_value != last_heartbeat:
print(f"Heartbeat Updated: {new_value}")
last_heartbeat = new_value
else:
print("No Heartbeat Update! Possible Master Failure!")
time.sleep(10) # 10초마다 확인
# 서버 시작
import threading
t = threading.Thread(target=check_heartbeat)
t.start()
StartTcpServer(context, address=("0.0.0.0", 502))
구현의 간단함만 보자면? 당연히 읽기 방법이 간단하다. slave에서 가져오면 되니까
나도 생각해보면 어쩌피 사업팀에서 보내주는 modbus data만 읽어오면 되니 read만 하면 되긴 한다.
그래도 하는김에 더 보자면
# paste.txt 파일에 추가/수정할 코드
import asyncio
import enum
from rich.console import Console
from rich.live import Live
import random, struct, time
from pymodbus.server import StartAsyncTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
import pandas as pd
# 기존 코드 유지...
# ===== Heartbeat 관련 코드 추가 =====
# Heartbeat 레지스터 주소 설정 (임의의 사용하지 않는 주소 영역)
HEARTBEAT_REGISTER = 9000 # 9000번대 레지스터 사용
HEARTBEAT_TIMEOUT = 15 # 15초 타임아웃
class HeartbeatMonitor:
def __init__(self, context, register_addr=HEARTBEAT_REGISTER, timeout=HEARTBEAT_TIMEOUT):
"""
Slave 측 Heartbeat 모니터링 클래스
Args:
context: Modbus 서버 컨텍스트
register_addr: Heartbeat 레지스터 주소
timeout: Heartbeat 타임아웃 시간(초)
"""
self.context = context
self.register_addr = register_addr
self.timeout = timeout
self.last_heartbeat_time = time.time()
self.last_heartbeat_value = 0
self.master_alive = False
self.logger = console.console
async def monitor(self):
"""
Master의 heartbeat 신호를 모니터링
- Heartbeat 레지스터 값의 변화를 모니터링
- 지정된 시간 내에 값이 변경되지 않으면 Master가 오프라인으로 간주
"""
self.logger.print("[Heartbeat] 모니터링 시작 (타임아웃: {}초)".format(self.timeout))
while True:
try:
# 홀딩 레지스터에서 현재 heartbeat 값 읽기
current_value = self.context[SlaveId].getValues(
FunctionCode.ReadHoldingRegisters.value,
self.register_addr,
1
)[0]
current_time = time.time()
# Heartbeat 값이 변경되었는지 확인
if current_value != self.last_heartbeat_value:
if not self.master_alive:
self.logger.print("[Heartbeat] Master가 온라인 상태가 되었습니다.")
self.master_alive = True
self.logger.print(f"[Heartbeat] 수신됨 (값: {current_value})")
self.last_heartbeat_value = current_value
self.last_heartbeat_time = current_time
# Heartbeat 타임아웃 확인
elapsed_time = current_time - self.last_heartbeat_time
if self.master_alive and elapsed_time > self.timeout:
self.logger.print(f"[Heartbeat] 타임아웃: {elapsed_time:.1f}초 동안 신호 없음")
self.logger.print("[Heartbeat] Master가 오프라인 상태로 간주됩니다.")
self.master_alive = False
# 오프라인 감지 시 취할 액션 추가
await self.on_master_disconnected()
await asyncio.sleep(1) # 1초마다 확인
except Exception as e:
self.logger.print(f"[Heartbeat] 모니터링 오류: {str(e)}")
await asyncio.sleep(1)
async def on_master_disconnected(self):
"""
Master 연결 끊김 이벤트 핸들러
안전 모드로 전환하거나 다른 조치를 취할 수 있음
"""
# 예: 특정 레지스터에 안전 값 설정
self.logger.print("[Heartbeat] 안전 모드로 전환합니다.")
# 필요한 경우 여기에 안전 모드 코드 추가
# 기존 main 함수 수정
async def main():
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [0] * MaxBlockSize),
co=ModbusSequentialDataBlock(0, [0] * MaxBlockSize),
hr=ModbusSequentialDataBlock(0, [0] * MaxBlockSize),
ir=ModbusSequentialDataBlock(0, [0] * MaxBlockSize))
context = ModbusServerContext(slaves=store, single=True)
# 초기 Heartbeat 레지스터 값 설정
context[SlaveId].setValues(FunctionCode.ReadHoldingRegisters.value, HEARTBEAT_REGISTER, [0])
# Heartbeat 모니터 초기화 및 시작
heartbeat_monitor = HeartbeatMonitor(context)
monitor_task = asyncio.create_task(heartbeat_monitor.monitor())
# 기존 서버 및 업데이트 태스크
server_task = StartAsyncTcpServer(context, address=("localhost", 502))
update_task = asyncio.create_task(update_context_data22(context))
# 모든 태스크 실행
await asyncio.gather(server_task, update_task, monitor_task)
# 기존 코드 유지...
위 같은 형식으로 port 설정이나 주소값 설정을 완료한 후에 heartbeat 모니터링을 시작하는 것을 볼 수 있다.
port는 동일해야하고 address는 겹치지않게 고유값을 정하는게 좋다.
# paste-2.txt에 있는 ModbusAdapter 클래스에 heartbeat 기능 추가
import enum
import struct
from pymodbus.client import ModbusTcpClient
import threading
import time
import logging
from device.adapters.adapter_base import AdapterBase, ConvRule
from device.adapters.enum_types import ResourceTypes
# 기존 코드 유지...
# Heartbeat 상수 정의
HEARTBEAT_REGISTER = 9000 # Slave와 동일한 주소 사용
HEARTBEAT_INTERVAL = 5 # 5초마다 heartbeat 전송
class ModbusAdapter(AdapterBase):
def __init__(self, rsc_type, name, params, simulate=False, pubsub=False) -> None:
self._params = params
self.logger = logging.getLogger(name)
# Heartbeat 관련 설정 추가
self.heartbeat_register = params.get('_heartbeat_register', HEARTBEAT_REGISTER)
self.heartbeat_interval = params.get('_heartbeat_interval', HEARTBEAT_INTERVAL)
self.heartbeat_counter = 0
self.heartbeat_running = False
self.heartbeat_thread = None
super().__init__(rsc_type, name, params, simulate=simulate, pubsub=pubsub)
@property
def info(self):
return self.params
def prepare(self):
return super().prepare()
def connect_device(self, **params) -> bool:
connected = True
self.client = ModbusTcpClient(**params)
if not self.client.connect():
# raise ConnectionError("Unable to connect to Modbus TCP server")
connected = False
else:
# Heartbeat 시작
self.start_heartbeat()
return connected
def disconnect_device(self):
# Heartbeat 중지
self.stop_heartbeat()
self.client.close()
def bind_event_handler(self):
pass
@property
def start_address(self):
return min(self.sensor_addresses) - (HoldingRegisterStartAddr + int(ZeroBased))
@property
def number_of_sensors(self) -> int:
return (max(self.sensor_addresses) - (HoldingRegisterStartAddr + int(ZeroBased)) - self.start_address) + self.sensor_count
def read_data_from_device(self, sensors: list) -> list:
# 기존 코드 유지...
# 이 부분은 변경 없음
# ...
def convert_values(self, rules: dict, sensor_data: list) -> dict:
# 기존 코드 유지...
# 이 부분은 변경 없음
# ...
# ===== Heartbeat 관련 메서드 추가 =====
def start_heartbeat(self):
"""Heartbeat 전송 스레드 시작"""
if not self.heartbeat_running:
self.heartbeat_running = True
self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
self.heartbeat_thread.start()
self.logger.info(f"Heartbeat 시작 (간격: {self.heartbeat_interval}초, 레지스터: {self.heartbeat_register})")
def stop_heartbeat(self):
"""Heartbeat 전송 중지"""
if self.heartbeat_running:
self.heartbeat_running = False
if self.heartbeat_thread:
self.heartbeat_thread.join(timeout=2)
self.logger.info("Heartbeat 중지")
def _heartbeat_loop(self):
"""Heartbeat 전송 루프"""
while self.heartbeat_running:
try:
self.send_heartbeat()
time.sleep(self.heartbeat_interval)
except Exception as e:
self.logger.error(f"Heartbeat 전송 오류: {str(e)}")
time.sleep(self.heartbeat_interval)
def send_heartbeat(self):
"""Heartbeat 신호 전송 (카운터 증가 방식)"""
try:
self.heartbeat_counter = (self.heartbeat_counter + 1) % 65536 # 16비트 값 범위 내에서 순환
result = self.client.write_register(self.heartbeat_register, self.heartbeat_counter)
if hasattr(result, 'isError') and result.isError():
self.logger.error(f"Heartbeat 신호 전송 실패: {result}")
return False
self.logger.debug(f"Heartbeat 신호 전송 성공 (값: {self.heartbeat_counter})")
return True
except Exception as e:
self.logger.error(f"Heartbeat 전송 중 예외 발생: {str(e)}")
return False
# 기존 dispose 메서드 수정
def dispose(self):
self.stop_heartbeat()
super().dispose()
# 기존 코드 유지...
마스터 쪽에서 data를 보낸다 생각할 때도 위 내용을 사업팀에서 진행해주어야 한다.
나는 주소값에 맞춰 데이터를 받아오고 그 내용에 대한 모니터링만 구현하면 되는 것
{
"load": {
"meter_name": {
"ModbusAdapter": {
"host": "localhost",
"port": 502,
"_cycletime": 1,
"_capture": true,
"_heartbeat_register": 9000, // Heartbeat 레지스터 주소
"_heartbeat_interval": 5 // Heartbeat 전송 간격(초)
}
}
}
}
위 처럼 config에 설정 정보만 제대로 넣고 실행한다면 문제는 없을 것으로 예상..
일단 회사 코드를 올릴 순 없으니
enabler를 만들어 테스트 해본 결과 모니터링은 잘 된다.
실제 데이터 연동해보고 다시 후기 올리겠다..