동기 서비스 클라이언트는 교착 상태에 취약하지만, 교착 상태 발생 시 문제를 알려주지는 않습니다.
ros2 service listros2 service type [service_name]
ros2 interface list
# request 영역
int64 a
int64 b
---
# response 영역
int64 sum
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 7, b: 8}"| 명령어 | 설명 |
|---|---|
ros2 service list | 현재 활성화된 모든 서비스 목록 보기 |
ros2 service type [서비스 이름] | 해당 서비스의 타입 확인 |
ros2 service find [서비스 타입] | 해당 타입의 서비스를 제공하는 서버 목록 출력 |
ros2 interface show [서비스 타입] | 서비스의 요청/응답 구조 확인 |
ros2 service call [서비스 이름] [서비스 타입] "{요청 데이터}" | 해당 서비스에 직접 요청 보내기 |
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
class GripperServiceServer(Node):
def __init__(self):
super().__init__('gripper_service_server')
# 서비스 서버 오픈
self.srv = self.create_service(SetBool, 'gripper_control', self.call_back)
# gripper control publisher 생성
self.publisher = self.create_publisher(JointTrajectory, '/gripper_controller/command', 10)
self.get_logger().info('Gripper service server ready.')
def call_back(self, request, response):
###--- service 요청 받음 ---###
# True=open, False=close
open_gripper = request.data
###--- service 요청에 따라 그리퍼를 여닫는 동작을 수행 ---###
traj = JointTrajectory()
# 어느 관절을 움직일지 명시
traj.joint_names = ['gripper_left_finger_joint', 'gripper_right_finger_joint']
point = JointTrajectoryPoint()
# 얼만큼 벌릴지 명시
point.positions = [0.035, 0.035] if open_gripper else [0.0, 0.0]
point.time_from_start.sec = 1
traj.points.append(point)
# 관절 제어 토픽을 발행
self.publisher.publish(traj)
###--- 동작 끝 ---###
###--- 동작을 수행한 후, 그 결과를 서비스 service 응답 반환---###
response.success = True
response.message = 'Gripper opened' if open_gripper else 'Gripper closed'
self.get_logger().info(response.message)
return response
def main():
rclpy.init()
node = GripperServiceServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
import time
class GripperServiceClient(Node):
def __init__(self):
super().__init__('gripper_service_client')
self.client = self.create_client(SetBool,'gripper_control')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = SetBool.Request()
def send_request(self):
self.response = self.client.call_async(self.req)
def main(args=None):
rclpy.init(args=args)
client = GripperServiceClient()
client.req.data = True
client.send_request()
# ROS가 동작중이면 계속 실행
while rclpy.ok():
rclpy.spin_once(client, timeout_sec=0.1)
# 응답이 오면 요청 결과값 출력 후 중단
if client.response.done():
try:
res = client.response.result()
except Exception as e :
client.get_logger().info('service call fail %r' % (e,))
else:
client.get_logger().info(f"response: {res.success}, {res.message}")
client.get_logger().info('call_async can doing another work ... ')
client.destroy_node()
rclpy.shutdown()
entry_points={
'console_scripts': [
'service_server = test_package.service_server:main',
'service_client = test_package.service_client:main',
],
},
colcon build --symlink-install --packages-select test_packageros2 run test_package service_server
ros2 run test_package service_client
mkdir srv# new_srv_data.srv
bool data
float64 linear
---
bool success
string message
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/new_srv_data.srv"
)
or
set(srv_files "srv/new_srv_data.srv")
rosidl_generate_interfaces(${PROJECT_NAME}
${srv_files}
)
# 여러개 추가시
set(srv_files
"srv/NewSrvData.srv"
"srv/AnotherSrv.srv"
"srv/ExampleRequest.srv"
)
ros2 interface show [패키지명]/srv/new_srv_data