'Hellow World: [count]` 형식의 메시지 0.5초마다 'topic'이라는 토픽으로 발행하는 간단한 ROS2 Publisher 노드를 정의하고 실행
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
durability=QoSDurabilityPolicy.VOLATILE
)
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_=self.create_publisher(String, 'topic', qos_profile=qos_profile)
timer_period = 0.5
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i=0
def timer_callback(self):
msg = String()
msg.data='Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger.info('Publishing: "%s"' % msg.data)
self.i += 1
class <class_name>(Node) : Node 클래스를 상속받아 ROS2 노드를 정의def __init__(self): : 생성자, 클래스의 초기화 및 속성 작업super().__init__('<node_name>)self.publisher_=self.create_publisher(<msg_type>,<topic_name>,<QoSProfile>)self_timer = self.create_timer(<period>, <callback_function>)def timer_callback(self): : 타이머에 의해 주기적으로 호출되는 함수 정의msg = <msg_type> : 메시지 형식, 타입을 정의, 외부 파일을 호출하여 정의 가능msg.data = 'string...' % self.i : 메시지의 형식을 생성self.publisher_.publish(msg): 위에서 정의한 형식의 메시지를 발행self.get_logger.info(): 해당 메시지를 로깅하여 화면에 출력def main(args=None):
rclpy.init(args=args) # ros2 초기화
minimal_publisher = MinimalPublisher() # minimalpublisher 클래스 인스턴스화
rclpy.spin(minimal_publisher) # 노드실행 및 이벤트 루프 시작
minimal_publisher.destroy_node()
rclpy.shutdown() # ros2 종료
def main(args=None): : args라는 인자를 선택하지 않으면 None 값이 사용rclpy.init(args=args) : ROS2 초기화<node_name> = <class_name>(): 클래스 인스턴스 생성rclpy.spin(<node_name>): 노드 실행 및 이벤트 루프 시작<node_name>.destroy_node() : 노드 종료rclpy.shutdown() : ROS2 종료if __name__ == '__main__':
main()
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
qos_profile=qos_profile) # 토픽에서 string 메시지를 구독하는 구독자 생성
self.subscription # prevent unused variable warning
class MinimalSubscriber(Node): Node 클래스를 상속받아 ROS2 노드를 정의
super().__init__(<node_name): 노드 초기화 및 노드 생성
self.subscription = self.create_subscription(<msg_type, <topic_name>,callback_function, <QoSProfile>)
: 주어진 토픽에 대한 구독자를 생성, msg가 토픽에 게시될 때마다 콜백함수가 호출
self.subscription : Python에서는 정의한 변수가 사용되지 않을 경우 "unused variable" 경고가 발생할 수 있다. 이 코드를 추가하여 변수 사용을 인식시켜 경고 발생을 방지한다.
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
- 토픽에 게시된 메시지를 수신할 때 호출되는 콜백함수
- 수신된 메시지의 내용을 로그에 기록
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
# Destroy the node explicitly
minimal_subscriber.destroy_node()
rclpy.shutdown()
동일
| publisher | self.publisher_ = self.create_publisher(msg_type,topic_name,qos_profile=qos_profile) |
| subscriber | self.subscription = self.create_subscription(msg_type,topic_name,callback_function,qos_profile=qos_profile) |