
저번 프로젝트에서 pyads로 Twincat과 연결하여 데이터를 받는 업무를 수행한 적 있다.
신기하게도 pyads를 사용하면 TwincatConnector를 만들어 하드웨어 장비와 연결할 수 있었고 데이터를 쉽게 받아올 수 있었따.
기본 적인 연결 로직을 보자.
class TwincatConnector(ConnectorBase):
def __init__(self, resource_id: str, channel: str, sample_size: int=96, site_name: str='sitename'):
super().__init__(resource_id, channel, sample_size, site_name)
net_ip, ipaddr, variable = channel.split(":")
self.AMS_NET_IP = net_ip
self.PLC_IP = ipaddr
self.variable = variable
self.plc = None
self.data_callback = None
self._stop_event = asyncio.Event() # 비동기 이벤트
async def connect(self):
try:
self.plc = pyads.Connection(self.AMS_NET_IP, pyads.PORT_TC3PLC1, self.PLC_IP)
await asyncio.to_thread(self.plc.open) # 비동기로 PLC 열기
print("PLC 연결 성공")
return True
except Exception as e:
print(f"연결 실패: {e}")
return False
async def disconnect(self):
self._stop_event.set()
if self.plc and self.plc.is_open:
await asyncio.to_thread(self.plc.close)
print("PLC 연결 종료")
위 내용을 보면 나는 비동기로 1초마다 데이터를 받아와야했기에 thread를 설정해두었지만 필수는 아니다! init 내용을 보면 기본적인 연결에 필요한 내용을 확인할 수 있다.
AMS_NET_IP, PLC_IP가 가장 기본적인 인자이다.
해당 값을 기준으로 연결을 시도하기 때문에 확인 작업이 꼭 필요하며 나는 이 과정을 main에서 handler를 만들어 넘겨주었따.
async def main():
global handler
handler = TwincatConnector("NAME", "AMS_NET_IP:PLC_IP:VALUE")
try:
if await handler.connect():
await handler.subscribe()
await asyncio.Event().wait()
except KeyboardInterrupt:
print("프로그램 종료")
finally:
await handler.disconnect()
if __name__ == "__main__":
asyncio.run(main())
뭐 대단한 걸 만든건 아니기 때문에 연결 로직 자체도 간단하다.
필요 인자를 handler로 넘겨주고 실행만 시키면 된다! 그리고 handler를 연결하고 구독하여 이벤트를 쓰레드로 발생시키면 비동기적으로 데이터를 받아오게 할 수 있따.
Pyads는 데이터를 받아올수도, 수정할 수도 있기 때문에 사용 방법에 대해 간략하게 소개하자면?
https://pyads.readthedocs.io/en/latest/documentation/routing.html#creating-routes-on-windows
해당 내용을 참고하면 된다. 뭐.. 간단하니까..
import pyads
SENDER_AMS = '1.2.3.4.1.1'
PLC_IP = '192.168.0.100'
PLC_USERNAME = 'plc_username'
PLC_PASSWORD = 'plc_password'
ROUTE_NAME = 'RouteToMyPC'
HOSTNAME = 'MyPC' # or IP
>>>
pyads.open_port()
pyads.set_local_address(SENDER_AMS)
pyads.add_route_to_plc(SENDER_AMS, HOSTNAME, PLC_IP, PLC_USERNAME, PLC_PASSWORD, route_name=ROUTE_NAME)
pyads.close_port()
위 내용을 보면 connect에 들어갈 내용이 모두 보인다. PLC의 이름과 PW를 요구할 수 있다. 이건 나중에 twincat 설명할 때 쓰겠다!
>>> import pyads
>>> plc = pyads.Connection('127.0.0.1.1.1', pyads.PORT_TC3PLC1):
>>> plc.open()
>>>
>>> plc.read_by_name('GVL.bool_value') # datatype will be queried and cached
True
>>> plc.read_by_name('GVL.bool_value') # cached datatype will be used
True
>>> plc.read_by_name('GVL.bool_value', cache_symbol_info=False) # datatype will not be cached and queried on each call
True
>>> plc.read_by_name('GVL.int_value', pyads.PLCTYPE_INT) # datatype is provided and will not be queried
0
>>> plc.write_by_name('GVL.int_value', 10) # write to target
>>> plc.read_by_name('GVL.int_value')
10
>>> plc.close()
위 예제를 보면 read_by_name와 write_by_name으로 데이터를 읽을 수도, 작성할 수도 있는 기능을 확인할 수 있다.
물론 arrays도 가능하다.
plc.write_by_name('GVL.sample_array', [1, 2, 3], pyads.PLCTYPE_INT * 3)
plc.read_by_name('GVL.sample_array', pyads.PLCTYPE_INT * 3)
[1, 2, 3]
plc.write_by_name('GVL.sample_array[0]', 5, pyads.PLCTYPE_INT)
plc.read_by_name('GVL.sample_array[0]', pyads.PLCTYPE_INT)
5
생각보다 간단하다.
근데 문제는 해당 내용에 대한 데이터 크기를 지정해야 원활한 통신이 가능하다.
근데 특이한건? byte 단위로 설정해야할 수도 있다는 것..
class ST_TwinCATData(Structure):
_fields_ = [
("Timestamp", c_uint64), # 시간 분초 밀리초로 저장된 64비트 Unsigned Integer
("TotalActive", c_float * 4) # 유효 전력 (실수형)
]
위 내용을 보면 timeStamp와 TotalActive의 값을 ctypes를 import하여 c_float와 unit64를 사용하는 것을 볼 수 있다.
해당 데이터에 대한 설정을 모두 해주어야 한다.
저렇게 보니 간단해 보이죠?
class ST_EL3443_Data(Structure):
_fields_ = [
("Unbalance_rate", ST_DataInfo),
("Total_Basic_Frequency", ST_DataInfo),
("Total_Basic_Powerfactor", ST_DataInfo),
("Basic_Voltage", DataInfoArray),
("Basic_Current", DataInfoArray),
("Power_Active_Power", DataInfoArray),
("Power_Apparent_Power", DataInfoArray),
("Power_Reactive_Power", DataInfoArray),
("Power_Power_Factor", DataInfoArray),
("HarmonicU_180hz", DataInfoArray),
("HarmonicU_300hz", DataInfoArray),
("HarmonicI_180hz", DataInfoArray),
("HarmonicI_300hz", DataInfoArray),
("THD_U", DataInfoArray),
("THD_I", DataInfoArray),
]
class ST_EL3443_Alarm(Structure):
_fields_ = [
("Unbalance_rate", ST_DataInfo),
("Total_Basic_Frequency", ST_DataInfo),
("Total_Basic_Powerfactor", ST_DataInfo),
("Basic_Voltage", DataInfoArray),
("Basic_Current", DataInfoArray),
("Power_Active_Power", DataInfoArray),
("Power_Apparent_Power", DataInfoArray),
("Power_Reactive_Power", DataInfoArray),
("Power_Power_Factor", DataInfoArray),
("HarmonicU_180hz", DataInfoArray),
("HarmonicU_300hz", DataInfoArray),
("HarmonicI_180hz", DataInfoArray),
("HarmonicI_300hz", DataInfoArray),
("THD_U", DataInfoArray),
("THD_I", DataInfoArray),
]
class ST_Livo(Structure):
_fields_ = [
("Timestamp", c_uint32),
("Data", ST_EL3443_Data * 3),
("Alarm", ST_EL3443_Alarm * 3),
]
처음엔 이렇게 하나하나 다 지정해놨어요..
그리고 데이터를 받아올 땐 추출 과정을 따로 만들어서 하나의 함수로 만들었습니다.
def extract_data(item):
return {
"Timestamp": item.Timestamp,
"Data": [
{
"ValueReal": data.ValueReal,
"Under": data.AbnormalDetection.Under,
"Over": data.AbnormalDetection.Over,
}
for data in item.Data
],
"Alarm": [
{
"ValueReal": alarm.ValueReal,
"Under": alarm.AbnormalDetection.Under,
"Over": alarm.AbnormalDetection.Over,
}
for alarm in item.Alarm
]
}
위와 같은 식으로 지정을 해놨구..
def _read_el3443_data(self, base_path):
data = {}
for field_name in ST_EL3443_Data._fields_:
field_type = field_name[0]
if field_name[1] == ST_DataInfo:
value_real = self.plc.read_by_name(
f"{base_path}.{field_type}.ValueReal", pyads.PLCTYPE_REAL
)
under = self.plc.read_by_name(
f"{base_path}.{field_type}.AbnormalDetection.Under",
pyads.PLCTYPE_BOOL,
)
over = self.plc.read_by_name(
f"{base_path}.{field_type}.AbnormalDetection.Over",
pyads.PLCTYPE_BOOL,
)
data[field_type] = {
"ValueReal": value_real,
"AbnormalDetection": {"Under": under, "Over": over},
}
elif isinstance(field_name[1], Array):
array_data = []
for i in range(1, 4): # 1-based indexing for arrays
value_real = self.plc.read_by_name(
f"{base_path}.{field_type}[{i}].ValueReal", pyads.PLCTYPE_REAL
)
under = self.plc.read_by_name(
f"{base_path}.{field_type}[{i}].AbnormalDetection.Under",
pyads.PLCTYPE_BOOL,
)
over = self.plc.read_by_name(
f"{base_path}.{field_type}[{i}].AbnormalDetection.Over",
pyads.PLCTYPE_BOOL,
)
array_data.append(
{
"ValueReal": value_real,
"AbnormalDetection": {"Under": under, "Over": over},
}
)
data[field_type] = array_data
return data
필요없어 지운 코드지만 위처럼 해당 내용에 대한 매핑을 할 수 있게끔 함수화 하였지만 편의를 위해 클래스를 사용하는 걸 볼 수 있다!
근데 이게 데이터가 맨날 똑같은데 받아오면 무슨 소용인가 생각을 했다.
그래서 데이터가 바뀌면 trig를 보내 해당 데이터를 받아오는 과정을 추가 했다.
위에 처럼 함수화를 다 진행한 다음엔 간단하다.
def read_data(self):
try:
data = self.plc.read_by_name("MAIN.Data", ST_EL3443_Data)
alarm = self.plc.read_by_name("MAIN.Alarm", ST_EL3443_Alarm)
livo = self.plc.read_by_name("MAIN.stLivo", ST_Livo)
return {"data": data, "alarm": alarm, "livo": livo}
except pyads.ADSError as e:
print(f"ADS 에러: {e}")
return None
except Exception as e:
print(f"일반 에러: {e}")
return None
위 처럼 read_data를 설정하고 데이터안에 잘 넣어주기만 하면 된다.
위 코드만 보면 굉장히 스무스하게 할 거 같았지만... 어려웠다.. twincat하고 연결하는 과정부터 받아올 때 data scale 이슈로 문제가 생겨 며칠을 머리아팠던 거 같아.
thread로 계속 돌리고 있긴 했지만 주기적으로 데이터를 받아오는 과정과 데이터 변경시 받아오는 방법으로 계속 고민했었고 성능 면에서도 생각을 했었따..
그나저나 pyads로 연결해서 사용하는게 제법 신기하더라..
3번에 대해서는 trig를 만들어 데이터가 바뀔 때 마다 1, 2로 확인 할 수 있게 하였다.
어렵진 않다! 설정이 짜증날뿐...이어서 twincat 초보자의 망망대해 헤엄치기.. 쓸게요!