
qos_profile = QoSProfile([option]=QoS[option]Policy.[setting])
History: 정해진 크기만큼 데이터를 보관하는 기능qos_profile = QoSProfile(history=QoSHistoryPolicy.KEEP_LAST, depth=10)| option | explanation |
|---|---|
| KEEP_LAST | 정해진 메시지 큐 크기(depth)만큼의 데이터를 보관 |
| KEEP_ALL | 모든 데이터를 보관 |
Reliability: TCP 통신 방식으로, 데이터 손실을 방지하여 신뢰도를 우선(Reliable)로 설정하거나, UDP 통신 방식과 같이 통신 속도 최우선(best effort)로 설정qos_profile = QoSProfile(reliability=QoReliabilityPolicy.BEST_EFFORT)| option | explanation |
|---|---|
| BEST_EFFORT | 데이터 송신에 집중, 전송 속도를 중시하며 네트워크 상태에 따라 유실이 발생 |
| RELIABLE | 데이터 수신에 집중, 신뢰성을 중시하며 유실 발생시 재전송을 통해 수신을 보장 |
Durability: 데이터 수신하는 서브스크라이버가 생성되거 전, 데이터의 사용 유무를 설정qos_profile = QoSProfile(duarability=QoSDurabilityPolicy.TRANSIENT_LOCAL)| option | explanation |
|---|---|
| TRANSIENT_LOCAL | 서브스크립션이 생성되기 전의 데이터도 보관(publisher에만 적용 가능) |
| VOCATILE | 서브스크립션이 생성되기 전의 데이터는 무효 |
Deadline: 정해진 주기 내 데이터의 발신 및 수신이 없는 경우 이벤트 함수 실행qos_profile = QoSProfile(depth=10, deadline=Duration(0.1))| option | explanation |
|---|---|
| deadline_duration | deadline을 확인하는 주기 |
Lifespan: 정해진 주기 내 수신되는 데이터에만 유효 판정, 이외 데이터는 삭제qos_profile = QoSProfile(lifespan=Duration(0.01))| option | explanation |
|---|---|
| lifespan_duration | lifespan을 확인하는 주기 |
Liveliness: 정해진 주기 내 노드 또는 토픽의 생사를 확인qos_profile = QoSProfile(liveliness=AUTOMATIC, liveliness_lease_duration=Duration(1.0))| option | explanation |
|---|---|
| liveliness | 자동 또는 매뉴얼로 확인할지를 지정하는 옵션(AUTOMATIC, MANUAL_BY_ALONE, MANUAL_BY_TOPIC) 중 선택 |
| lease_duration | Liveliness을 확인하는 주기 |

from rclpy.qos import
(qos_profile_default, # 기본 qos 설정
qos_profile_sensor_data, # 센서 데이터 스트림에 적합한 qos
qos_profile_services_default,
qos_profile_system_default,
QoSProfile)
class Node_class(Node):
def __init__(self):
super().__init__('node_name')
self.publisher_ = self.create_publisher(<msg type>, 'msg_name', qos_profile)
from rclpy.qos import
QoSProfile,
QoSReliabilityPolicy,
QoSHistoryPolicy,
QoSDurabilityPolicy
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
durability=QoSDurabilityPolicy.VOLATILE
)
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
durability=QoSDurabilityPolicy.VOLATILE
)
qos_profile: QosProfile = qos_profile_services_default
RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFUALT = 0
RMW_QOS_DEADLINE_DEFAULT {0, 0}
RMW_QOS_LIFESPAN_DEFAULT {0, 0}
RMW_QOS_LIVELINESS_LEASE_DURATION_DEFAULT {0, 0}
액션 토픽 : qos_profile_services_default를 기본 설정
피드백 퍼블리셔: QoSProfile, rmw_qos_profile_default를 초기값으로 사용
액션 상태 퍼블리셔: qos_profile_action_status_default를 기본값으로 사용
파이썬의 경우 goal_service_qos_profile, result_service_qos_profile, cancel_service_qos_profile,feedback_pub_qos_profile,status_pub_qos_profile에 대한 기본 설정을 사용

py_pubsub/src/publisher_member_function.py을 수정하여 확인
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
durability=QoSDurabilityPolicy.VOLATILE
)
class MinimalPublisher(Node): # rclpy의 node로부터 상속되는 클래스의 선언
def __init__(self): # 생성자를 정의
super().__init__('minimal_publisher') # 부모 클래스 호출
self.publisher_ = self.create_publisher(String, 'topic', qos_profile=qos_profile) # string 메시지 형식을 사용하여 topic 토픽에 발행하는 발행자 생성
timer_period = 0.5 # seconds, 발행주기 설정
self.timer = self.create_timer(timer_period, self.timer_callback) # 주기적으로 timer_callback() 함수 호출
self.i = 0 # 카운트 변수 초기화
def timer_callback(self):
msg = String() # string 메세지 객체 생성
msg.data = 'Hello World: %d' % self.i # 메시지 데이터 설정
self.publisher_.publish(msg) # publisher를 사용하여 메시지를 'topic' 토픽에 발생
self.get_logger().info('Publishing: "%s"' % msg.data) # 로그에 발행한 메시지 출력
self.i += 1 # 카운트 증가
def main(args=None):
rclpy.init(args=args) # ros2 초기화
minimal_publisher = MinimalPublisher() # minimalpublisher 클래스 인스턴스화
rclpy.spin(minimal_publisher) # 노드실행 및 이벤트 루프 시작
minimal_publisher.destroy_node()
rclpy.shutdown() # ros2 종료
if __name__ == '__main__':
main()

$ sudo tc qdisc add dev lo root netem loss 45%
$ ros2 run demo_nodes_cpp listener_best_effort
$ ros2 run demo_nodes_cpp talker


ros2 run quality_of_service_demo_py deadline 700 --publish-for 3000 --pause-for 1000

ros2 run quality_of_service_demo_py lifespan 1000 --publish-count 10 --subscribe-after 3000

liveliness_lease_duartion = Duration(seconds=parsed_args.liveliness_lease_duration/1000.0)
liveliness_policy = POLICY_MAP[parsed_args.policy]
ros2 run quality_of_service_demo_py liveliness 1000 --kill-publisher-after 2000 --policy AUTOMATIC
qos_profile = QosProfile(
depth=10,
liveliness=liveliess_policy,
liveliness_lease_duartion = liveliness_lease_duration)
