[트러블슈팅] Modbus Address와 Register 규약 + PyModbus

Junkyu_Kang·2025년 3월 21일

최근 몇주간 나에게 고통을 주었던 Modbus Protocol 연결 방법을 드디어 알게 되었다..

많은 시도를 해보았다.. 내가 고통을 오래 받은만큼 혹시 도움이 필요한 사람이 있다면 도움이 되었으면 한다..

기본적으로 현재 매핑되어있는 값의 대한 정보는
bindings.json 파일에 저장시켜놓았다.

예를 들어보자

{
    "solarmodule": {
        "SolarReadingProfile": {
            "solarReading": {
                "readingMMTR": {
                    "DmdVAh": {
                        "actVal": [
                            "40001",
                            "kWh",
                            "int64",
                            "0.1",
                            "",
                            "normal",
                            ""
                        ]
                    },
                    "DmdWh": {
                        "actVal": [
                            "40009",
                            "kWh",
                            "int64",
                            "0.1",
                            "",
                            "normal",
                            ""
                        ]
                    },
...

위 처럼 각 값에 대한 정보를 매핑시키는데 순서대로 주소, 값, 타입, scale... 등등을 해놓는다.

각각 쓰일 일이 있으니 질문이 있다면 따로..

그럼 이제 중요한 부분은 무엇일까

바로 Address와 Register다.

조건은 2개다.
1. Address는 서로 겹치지 아니해야하고, 각 고유 주소값의 영역을 보장해야한다.
2. Register는 꼭 타입 규격에 맞게 pack, unpack을 해야한다.

이유는 다음과 같다.

Modbus에서 Register는 2바이트의 공간을 차지한다.

그래서 당연히 초과하면 추가의 register를 보장해야한다.

그렇다면 register를 4byte만큼 사용한다면 address도 당연히 2개 만큼의 공간을 보장해야한다.

예를 들어보자

                        "actVal": [
                            "40001",
                            "kWh",
                            "int64",
                            "0.1",
                            "",
                            "normal",
                            ""
                        ]

위 값의 address는 40001이다 그렇다면 int64(long)이면 8byte의 공간이 필요하다.
그럼 address의 공간은 8/2 4개의 공간이 필요하다.
즉 40001 ~ 40004까지 다른 값의 address를 부여하면 안된다는 뜻이 된다.

더 자세하게 하면 곤란하니 방법만 차근차근 보자

우선 modbus를 slave, master 구조로 구현한다 생각하자.

slave는 값을 보내주는, 즉 테스트 환경을 구현한다.

아래 코드는 랜덤값을 구현하고 해당 값을 패킹하는 과정까지 구현한 것이다.

def generate_random_data_float(fc: FunctionCode):
    # float_data = [random.uniform(10.0, 90.0) for _ in range(120)]
    # float_data[0] = cr.get_load()
    # float_data[1] = cr.get_pv()
    load_value = cr.get_load()
    pv_value = cr.get_pv()
    float_data = [load_value] * 60 + [pv_value] * 60
    register_data = []
    for value in float_data:
        # Convert float to two 16-bit registers
        packed = struct.pack('>f', value)
        high, low = struct.unpack('>HH', packed)
        register_data.extend([high, low])
    return float_data, register_data

def generate_random_data_bool(fc: FunctionCode):
    data = [1 for _ in range(100)]
    return data

def generate_data_int32(fc: FunctionCode):
    int32_values = [random.randint(0, 100) for _ in range(30)]
    register_data = []
    
    for value in int32_values:
        packed = struct.pack('>i', value)
        high, low = struct.unpack('>HH', packed)
        register_data.extend([high, low])
    
    return int32_values, register_data

def generate_data_uint32(fc: FunctionCode):
    uint32_values = [random.randint(0, 100) for _ in range(30)]
    register_data = []
    
    for value in uint32_values:
        packed = struct.pack('>I', value) 
        high, low = struct.unpack('>HH', packed)
        register_data.extend([high, low])
    
    return uint32_values, register_data

def generate_data_int64(fc: FunctionCode):
    int64_values = [random.randint(0, 100) for _ in range(20)]
    register_data = []
    
    for value in int64_values:
        packed = struct.pack('>q', value)
        reg1, reg2, reg3, reg4 = struct.unpack('>HHHH', packed)
        register_data.extend([reg1, reg2, reg3, reg4])
    
    return int64_values, register_data

def generate_data_uint64(fc: FunctionCode):
    uint64_values = [random.randint(0, 100) for _ in range(20)]
    register_data = []
    
    for value in uint64_values:
        packed = struct.pack('>Q', value)
        reg1, reg2, reg3, reg4 = struct.unpack('>HHHH', packed)
        register_data.extend([reg1, reg2, reg3, reg4])
    
    return uint64_values, register_data

그럼 이 값들을 어떻게 사용할까?

사용한 라이브러리는 다음과 같다
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext

async def update_context_data33(context):
    with Live(console=console, refresh_per_second=1) as live:
        while True:
            origin_float, float_registers = generate_random_data_float(FunctionCode.ReadHoldingRegisters)
            
            origin_int32, int32_registers = generate_data_int32(FunctionCode.ReadHoldingRegisters)
            origin_uint32, uint32_registers = generate_data_uint32(FunctionCode.ReadHoldingRegisters)
            origin_int64, int64_registers = generate_data_int64(FunctionCode.ReadHoldingRegisters)
            origin_uint64, uint64_registers = generate_data_uint64(FunctionCode.ReadHoldingRegisters)
            
            basic_registers = generate_random_data(FunctionCode.ReadHoldingRegisters)
            
            holding_registers = (
                basic_registers +      # 일반 정수 값
                float_registers +      # 부동 소수점 값
                int32_registers +      # int32 값
                uint32_registers +     # uint32 값
                int64_registers +      # int64 값
                uint64_registers +     # uint64 값
                generate_random_data_bool(FunctionCode.ReadHoldingRegisters)  # 불리언 값
            )
            
            float_start = len(basic_registers)
            int32_start = float_start + len(float_registers)
            uint32_start = int32_start + len(int32_registers)
            int64_start = uint32_start + len(uint32_registers)
            uint64_start = int64_start + len(int64_registers)
            bool_start = uint64_start + len(uint64_registers)
            
            data_info = [
                "Float 데이터 (인덱스 {}-{}): {}".format(float_start, float_start + len(float_registers) - 1, origin_float[:5]),
                "Int32 데이터 (인덱스 {}-{}): {}".format(int32_start, int32_start + len(int32_registers) - 1, origin_int32[:5]),
                "UInt32 데이터 (인덱스 {}-{}): {}".format(uint32_start, uint32_start + len(uint32_registers) - 1, origin_uint32[:5]),
                "Int64 데이터 (인덱스 {}-{}): {}".format(int64_start, int64_start + len(int64_registers) - 1, origin_int64[:5]),
                "UInt64 데이터 (인덱스 {}-{}): {}".format(uint64_start, uint64_start + len(uint64_registers) - 1, origin_uint64[:5]),
            ]
            
            # 레지스터 크기 제한으로 인해 청크로 나누어 업데이트
            chunk_size = 125  # Modbus 프로토콜 제한
            for start in range(0, len(holding_registers), chunk_size):
                chunk = holding_registers[start:start + chunk_size]
                context[SlaveId].setValues(FunctionCode.ReadHoldingRegisters.value, start, chunk)
            
            info_message = "\n".join(data_info)
            live.update(f"홀딩 레지스터 업데이트 완료. 총 레지스터 수: {len(holding_registers)}\n{info_message}")
            
            await asyncio.sleep(1)  # 1초마다 업데이트

친절하게 확인용 print문도 남겨두었다. 설명과 함꼐.

다음 순서를 보자

random한 값을 만든다.
1. 랜덤값을 생성한다. 각 타입에 맞게 설정할 수 있으며 해당 코드에 ReadHoldingRegisters로 설정하여 넘긴다.
2. holdilg_registers 내부에 값을을 지정한다.
3. 각 값들의 start_register의 위치값을 알 수 있도록 지정한다.
4. 크기제한에 맞게 나누어 업데이트 해준다.

위 내용만 보면 쉬워보인다.
난 이게 참 어렵게 느껴졌고 디버깅을 걸어도 모르겠어서 정보를 찾는데 뭐가 나오질 않아서 고통스러웠다..

다른 자세한 코드는 회사 코드라 지우겠다..
따로 slave에서 받은 데이터를 master에서 다시 unpack하는 과정을 보자

    def convert_values(self, rules: dict, sensor_data: list) -> dict:
        data_array = self.list_to_bytes(sensor_data)
        # print("data_array: ", data_array)
        # self.logger.debug(f"Data array length: {len(data_array)}")
        result = {}
        for key, v in rules.items():
                try:
                    addr = int(v[ConvRule.Address]) - HoldingRegisterStartAddr
                    self.logger.debug(f"[{key}] Single rule: Original address {v[ConvRule.Address]}, adjusted addr: {addr}")
                except Exception as e:
                    self.logger.error(f"규칙 처리 에러 (키: {key}): {e}")
                    continue
                idx = (addr - self.start_address) * 2
                self.logger.debug(f"[{key}] Single rule: Computed index: {idx}")
                datatype = v[ConvRule.Type].lower()
                if datatype == 'float':
                    result[key] = struct.unpack('>f', data_array[idx:idx+4])[0]
                elif 'int64' in datatype:
                    result[key] = int(struct.unpack('>q', data_array[idx:idx+8])[0] * float(v[ConvRule.Scale]))
                elif 'uint64' in datatype:
                    result[key] = int(struct.unpack('>Q', data_array[idx:idx+8])[0] * float(v[ConvRule.Scale]))
                elif 'uint' in datatype:
                    result[key] = int(struct.unpack('>I', data_array[idx:idx+4])[0] * float(v[ConvRule.Scale]))
                elif 'int' in datatype:
                    result[key] = int(struct.unpack('>i', data_array[idx:idx+4])[0] * float(v[ConvRule.Scale]))
                elif datatype == 'bool':
                    if v[ConvRule.Category] == 'binarized' and v[ConvRule.Rule]:
                        analog = int(struct.unpack('>h', data_array[idx:idx+2])[0] * float(v[ConvRule.Scale]))
                        result[key] = eval(f'{analog} {v[ConvRule.Rule]}')
                    else:
                        result[key] = bool(struct.unpack('>H', data_array[idx:idx+2])[0])
                self.logger.debug(f"[{key}] Single rule: Converted value: {result[key]}")
        return result

일부 코드를 지우고 보면 다음과 같다.
해당 내용은 단일 규칙을 처리하는 과정이다.

보면 받은 데이터 타입에 따라 unpack 과정을 달리하는 것을 볼 수 있다.

내용을 보면 간단하게 idx를 보고 register의 크기를 재배정하는 것을 볼 수 있다.

struct.unpack, pack 정보의 대한 내용은 아래를 참고하자

링크텍스트

Modbus의 데이터 타입은 다음과 같다

데이터 타입크기레지스터 개수예제 (40001부터 시작)
INT1616-bit1개40001
INT3232-bit2개40001, 40002
FLOAT3232-bit2개40001, 40002
INT6464-bit4개40001 ~ 40004
FLOAT6464-bit4개40001 ~ 40004
STRING(6글자)96-bit6개40001 ~ 40003
BOOLEAN1-bit1개 (Coil)00001

잘 살펴보자..

데이터 타입변환 함수 예제설명
INT16struct.unpack(">h", b'\x12\x34')[0]Big-Endian 16-bit 정수 변환
INT32struct.unpack(">i", b'\x12\x34\x56\x78')[0]Big-Endian 32-bit 정수 변환
FLOAT32struct.unpack(">f", b'\x42\x40\x00\x00')[0]Big-Endian 32-bit 부동소수점 변환
INT64struct.unpack(">q", b'\x00\x00\x00\x00\x12\x34\x56\x78')[0]Big-Endian 64-bit 정수 변환
FLOAT64struct.unpack(">d", b'\x40\x09\x21\xfb\x54\x44\x2d\x18')[0]Big-Endian 64-bit 부동소수점 변환
Packstruct.pack(">HH", 16961, 16384)2개 레지스터(16-bit씩)로 32-bit 값 저장
Unpackstruct.unpack(">f", struct.pack(">HH", 16961, 16384))[0]2개 레지스터 값을 FLOAT으로 변환

물론 구현부에 보다 많은 내용이 필요하고
read_data_from_device에서 사이즈를 지정하는 등으 작업을 해야하지만
근본적으로 위 내용이 가장 난해하고 혼자서 보려면 어려울 것이다.

참고
링크텍스트
링크텍스트

마지막으로 강조하는건 절대 address와 register의 정의를 몰라선 안된다. 그럼 불러오는 과정에서 해괴한 값을 볼 수 있을 것이다.

profile
강준규

0개의 댓글