[ST, ADS] Pyads로 Twincat과 연결해보자

Junkyu_Kang·2024년 12월 20일

저번 프로젝트에서 pyads로 Twincat과 연결하여 데이터를 받는 업무를 수행한 적 있다.

신기하게도 pyads를 사용하면 TwincatConnector를 만들어 하드웨어 장비와 연결할 수 있었고 데이터를 쉽게 받아올 수 있었따.

기본 적인 연결 로직을 보자.

class TwincatConnector(ConnectorBase):
    def __init__(self, resource_id: str, channel: str, sample_size: int=96, site_name: str='sitename'):
        super().__init__(resource_id, channel, sample_size, site_name)
        net_ip, ipaddr, variable = channel.split(":")
        self.AMS_NET_IP = net_ip
        self.PLC_IP = ipaddr
        self.variable = variable
        self.plc = None
        self.data_callback = None
        self._stop_event = asyncio.Event()  # 비동기 이벤트

    async def connect(self):
        try:
            self.plc = pyads.Connection(self.AMS_NET_IP, pyads.PORT_TC3PLC1, self.PLC_IP)
            await asyncio.to_thread(self.plc.open)  # 비동기로 PLC 열기
            print("PLC 연결 성공")
            return True
        except Exception as e:
            print(f"연결 실패: {e}")
            return False

    async def disconnect(self):
        self._stop_event.set()
        if self.plc and self.plc.is_open:
            await asyncio.to_thread(self.plc.close)
            print("PLC 연결 종료")

위 내용을 보면 나는 비동기로 1초마다 데이터를 받아와야했기에 thread를 설정해두었지만 필수는 아니다! init 내용을 보면 기본적인 연결에 필요한 내용을 확인할 수 있다.

AMS_NET_IP, PLC_IP가 가장 기본적인 인자이다.

해당 값을 기준으로 연결을 시도하기 때문에 확인 작업이 꼭 필요하며 나는 이 과정을 main에서 handler를 만들어 넘겨주었따.

async def main():
   global handler
   handler = TwincatConnector("NAME", "AMS_NET_IP:PLC_IP:VALUE")

   try:
       if await handler.connect():
           await handler.subscribe()
           await asyncio.Event().wait()
   except KeyboardInterrupt:
       print("프로그램 종료")
   finally:
       await handler.disconnect()

if __name__ == "__main__":
   asyncio.run(main())

뭐 대단한 걸 만든건 아니기 때문에 연결 로직 자체도 간단하다.
필요 인자를 handler로 넘겨주고 실행만 시키면 된다! 그리고 handler를 연결하고 구독하여 이벤트를 쓰레드로 발생시키면 비동기적으로 데이터를 받아오게 할 수 있따.

Pyads는 데이터를 받아올수도, 수정할 수도 있기 때문에 사용 방법에 대해 간략하게 소개하자면?

https://pyads.readthedocs.io/en/latest/documentation/routing.html#creating-routes-on-windows

해당 내용을 참고하면 된다. 뭐.. 간단하니까..

import pyads
SENDER_AMS = '1.2.3.4.1.1'
PLC_IP = '192.168.0.100'
PLC_USERNAME = 'plc_username'
PLC_PASSWORD = 'plc_password'
ROUTE_NAME = 'RouteToMyPC'
HOSTNAME = 'MyPC'  # or IP
>>>
pyads.open_port()
pyads.set_local_address(SENDER_AMS)
pyads.add_route_to_plc(SENDER_AMS, HOSTNAME, PLC_IP, PLC_USERNAME, PLC_PASSWORD, route_name=ROUTE_NAME)
pyads.close_port()

위 내용을 보면 connect에 들어갈 내용이 모두 보인다. PLC의 이름과 PW를 요구할 수 있다. 이건 나중에 twincat 설명할 때 쓰겠다!

 >>> import pyads
 >>> plc = pyads.Connection('127.0.0.1.1.1', pyads.PORT_TC3PLC1):
 >>> plc.open()
 >>>
 >>> plc.read_by_name('GVL.bool_value')  # datatype will be queried and cached
 True
 >>> plc.read_by_name('GVL.bool_value')  # cached datatype will be used
 True
 >>> plc.read_by_name('GVL.bool_value', cache_symbol_info=False)  # datatype will not be cached and queried on each call
 True
 >>> plc.read_by_name('GVL.int_value', pyads.PLCTYPE_INT)  # datatype is provided and will not be queried
 0
 >>> plc.write_by_name('GVL.int_value', 10)  # write to target
 >>> plc.read_by_name('GVL.int_value')
 10

>>> plc.close()

위 예제를 보면 read_by_name와 write_by_name으로 데이터를 읽을 수도, 작성할 수도 있는 기능을 확인할 수 있다.

물론 arrays도 가능하다.

plc.write_by_name('GVL.sample_array', [1, 2, 3], pyads.PLCTYPE_INT * 3)
plc.read_by_name('GVL.sample_array', pyads.PLCTYPE_INT * 3)
[1, 2, 3]

plc.write_by_name('GVL.sample_array[0]', 5, pyads.PLCTYPE_INT)
plc.read_by_name('GVL.sample_array[0]', pyads.PLCTYPE_INT)
5

생각보다 간단하다.

근데 문제는 해당 내용에 대한 데이터 크기를 지정해야 원활한 통신이 가능하다.

근데 특이한건? byte 단위로 설정해야할 수도 있다는 것..

class ST_TwinCATData(Structure):
    _fields_ = [
        ("Timestamp", c_uint64),   # 시간 분초 밀리초로 저장된 64비트 Unsigned Integer
        ("TotalActive", c_float * 4)   # 유효 전력 (실수형)
    ]

위 내용을 보면 timeStamp와 TotalActive의 값을 ctypes를 import하여 c_float와 unit64를 사용하는 것을 볼 수 있다.

해당 데이터에 대한 설정을 모두 해주어야 한다.

저렇게 보니 간단해 보이죠?

class ST_EL3443_Data(Structure):
   _fields_ = [
       ("Unbalance_rate", ST_DataInfo),
       ("Total_Basic_Frequency", ST_DataInfo),
       ("Total_Basic_Powerfactor", ST_DataInfo),
       ("Basic_Voltage", DataInfoArray),
       ("Basic_Current", DataInfoArray),
       ("Power_Active_Power", DataInfoArray),
       ("Power_Apparent_Power", DataInfoArray),
       ("Power_Reactive_Power", DataInfoArray),
       ("Power_Power_Factor", DataInfoArray),
       ("HarmonicU_180hz", DataInfoArray),
       ("HarmonicU_300hz", DataInfoArray),
       ("HarmonicI_180hz", DataInfoArray),
       ("HarmonicI_300hz", DataInfoArray),
       ("THD_U", DataInfoArray),
       ("THD_I", DataInfoArray),
   ]


class ST_EL3443_Alarm(Structure):
   _fields_ = [
       ("Unbalance_rate", ST_DataInfo),
       ("Total_Basic_Frequency", ST_DataInfo),
       ("Total_Basic_Powerfactor", ST_DataInfo),
       ("Basic_Voltage", DataInfoArray),
       ("Basic_Current", DataInfoArray),
       ("Power_Active_Power", DataInfoArray),
       ("Power_Apparent_Power", DataInfoArray),
       ("Power_Reactive_Power", DataInfoArray),
       ("Power_Power_Factor", DataInfoArray),
       ("HarmonicU_180hz", DataInfoArray),
       ("HarmonicU_300hz", DataInfoArray),
       ("HarmonicI_180hz", DataInfoArray),
       ("HarmonicI_300hz", DataInfoArray),
       ("THD_U", DataInfoArray),
       ("THD_I", DataInfoArray),
   ]


class ST_Livo(Structure):
   _fields_ = [
       ("Timestamp", c_uint32),
       ("Data", ST_EL3443_Data * 3),
       ("Alarm", ST_EL3443_Alarm * 3),
   ]

처음엔 이렇게 하나하나 다 지정해놨어요..

그리고 데이터를 받아올 땐 추출 과정을 따로 만들어서 하나의 함수로 만들었습니다.

def extract_data(item):
       return {
           "Timestamp": item.Timestamp,
           "Data": [
               {
                   "ValueReal": data.ValueReal,
                   "Under": data.AbnormalDetection.Under,
                   "Over": data.AbnormalDetection.Over,
               }
               for data in item.Data
           ],
           "Alarm": [
               {
                   "ValueReal": alarm.ValueReal,
                   "Under": alarm.AbnormalDetection.Under,
                   "Over": alarm.AbnormalDetection.Over,
               }
               for alarm in item.Alarm
           ]
       }

위와 같은 식으로 지정을 해놨구..


  def _read_el3443_data(self, base_path):
      data = {}
      for field_name in ST_EL3443_Data._fields_:
          field_type = field_name[0]
          if field_name[1] == ST_DataInfo:
              value_real = self.plc.read_by_name(
                  f"{base_path}.{field_type}.ValueReal", pyads.PLCTYPE_REAL
              )
              under = self.plc.read_by_name(
                  f"{base_path}.{field_type}.AbnormalDetection.Under",
                  pyads.PLCTYPE_BOOL,
              )
              over = self.plc.read_by_name(
                  f"{base_path}.{field_type}.AbnormalDetection.Over",
                  pyads.PLCTYPE_BOOL,
              )
              data[field_type] = {
                  "ValueReal": value_real,
                  "AbnormalDetection": {"Under": under, "Over": over},
              }
          elif isinstance(field_name[1], Array):
              array_data = []
              for i in range(1, 4):  # 1-based indexing for arrays
                  value_real = self.plc.read_by_name(
                      f"{base_path}.{field_type}[{i}].ValueReal", pyads.PLCTYPE_REAL
                  )
                  under = self.plc.read_by_name(
                      f"{base_path}.{field_type}[{i}].AbnormalDetection.Under",
                      pyads.PLCTYPE_BOOL,
                  )
                  over = self.plc.read_by_name(
                      f"{base_path}.{field_type}[{i}].AbnormalDetection.Over",
                      pyads.PLCTYPE_BOOL,
                  )
                  array_data.append(
                      {
                          "ValueReal": value_real,
                          "AbnormalDetection": {"Under": under, "Over": over},
                      }
                  )
              data[field_type] = array_data

      return data

필요없어 지운 코드지만 위처럼 해당 내용에 대한 매핑을 할 수 있게끔 함수화 하였지만 편의를 위해 클래스를 사용하는 걸 볼 수 있다!

근데 이게 데이터가 맨날 똑같은데 받아오면 무슨 소용인가 생각을 했다.

그래서 데이터가 바뀌면 trig를 보내 해당 데이터를 받아오는 과정을 추가 했다.

위에 처럼 함수화를 다 진행한 다음엔 간단하다.

    def read_data(self):
        try:
            data = self.plc.read_by_name("MAIN.Data", ST_EL3443_Data)
            alarm = self.plc.read_by_name("MAIN.Alarm", ST_EL3443_Alarm)
            livo = self.plc.read_by_name("MAIN.stLivo", ST_Livo)

            return {"data": data, "alarm": alarm, "livo": livo}
        except pyads.ADSError as e:
            print(f"ADS 에러: {e}")
            return None
        except Exception as e:
            print(f"일반 에러: {e}")
            return None

위 처럼 read_data를 설정하고 데이터안에 잘 넣어주기만 하면 된다.

위 코드만 보면 굉장히 스무스하게 할 거 같았지만... 어려웠다.. twincat하고 연결하는 과정부터 받아올 때 data scale 이슈로 문제가 생겨 며칠을 머리아팠던 거 같아.

thread로 계속 돌리고 있긴 했지만 주기적으로 데이터를 받아오는 과정과 데이터 변경시 받아오는 방법으로 계속 고민했었고 성능 면에서도 생각을 했었따..

그나저나 pyads로 연결해서 사용하는게 제법 신기하더라..

마무리

  1. 하드웨어에서 데이터를 받아올 땐 항상 데이터 형태와 크기를 생각해서 지정해주자.
  2. 받아오는 과정에서 클래스화가 불필요할 수 있지만 수정용이성과 읽기 쉬운 코드를 위해 만들어두자
  3. 쓰레드로 지정할 경우 데이터를 받는 조건을 만들어두자

3번에 대해서는 trig를 만들어 데이터가 바뀔 때 마다 1, 2로 확인 할 수 있게 하였다.

어렵진 않다! 설정이 짜증날뿐...이어서 twincat 초보자의 망망대해 헤엄치기.. 쓸게요!

profile
강준규

0개의 댓글