출처: YOUTUBE: ROS 1 (09강 of 25강) - Developers and Creators
$ roslaunch gcamp_gazebo gazebo_world.launch
# gazebo 시뮬레이션이 완전히 켜질때까지 기다려 주세요!!
# 새 터미널을 엽니다.
$ rosrun service_tutorial robot_turning_srv.py
# 또 다른 터미널을 엽니다.
$ rosrun service_tutorial robot_turning_client.py
[INFO] [1610111418.391926, 0.000000]: ==== Robot Turning Server Started ====
> Type turning time duration:
#! /usr/bin/env python
"""
basic rospy service example
referenced from wiki.ros.org
url : http://wiki.ros.org/ROS/Tutorials/WritingServiceClient%28python%29
"""
import sys
import rospy
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageRequest
rospy.init_node("robot_turning_client")
rospy.loginfo("==== Robot Turning Server Started ====")
rospy.wait_for_service("/control_robot_angle")
service_client = rospy.ServiceProxy("/control_robot_angle", ControlTurningMessage)
request_srv = ControlTurningMessageRequest()
while not rospy.is_shutdown():
try:
t = input("> Type turning time duration: ")
vel = input("> Type turning angular velocity: ")
if vel > 1.5707:
raise ArithmeticError("Velocity too high !!")
request_srv.time_duration = t
request_srv.angular_vel = vel
break
except ArithmeticError as e:
rospy.logerr(e)
except Exception as e:
rospy.logerr("Not a number type number plz !!")
result = service_client(request_srv)
print(result)
# 이전 topic message
from sensor_msgs.msg import LaserScan
# service는 이렇게 .srv로 import
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageRequest
wait_for_service를 통해 server가 동작중인지 확인하고, ServiceProxy를 생성(client가 request를 해야 하는데, 그 대상이 되는 server가 있어야 하니)rospy.wait_for_service('/control_robot_angle')
while not rospy.is_shutdown():
try:
t = input('> Type turning time duration: ')
vel = input('> Type turning angular velocity: ')
if vel > 1.5707:
raise ArithmeticError("Velocity too high !!")
request_msg.time_duration = t
request_msg.angular_vel = vel
break
except ArithmeticError as e:
rospy.logerr(e)
except Exception as e:
rospy.logerr("Not a number type number plz !!")
#! /usr/bin/env python
import time
import rospy
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageResponse
from geometry_msgs.msg import Twist
def callback(request):
print(request)
# publish to cmd_vel
cmd_vel = Twist()
cmd_vel.angular.z = request.angular_vel
start_time = time.time()
print('\nRobot Turning...')
while time.time() - start_time < request.time_duration:
velocity_publisher.publish(cmd_vel)
print("Done ...")
cmd_vel.angular.z = 0.0
velocity_publisher.publish(cmd_vel)
response = ControlTurningMessageResponse()
response.success = True
return response
rospy.init_node('robot_turning_server')
rospy.loginfo("==== Robot Turning Server Started ====")
service_server = rospy.Service('/control_robot_angle', ControlTurningMessage, callback)
velocity_publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=1 )
rospy.spin()
# client
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageRequest
# server
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageResponse
ControlTurningMessage srv 형식$ rossrv info ControlTurningMessage
[service_tutorial/ControlTurningMessage]:
uint32 time_duration
float64 angular_vel
---
bool success
[cpp_service_tutorial/ControlTurningMessage]:
uint32 time_duration
float64 angular_vel
---
bool success
파이썬 코드에서 사용하는 경우, [MessageName]Request, [MessageName]Response의 형태를 갖게 됨.
response = ControlTurningMessageResponse()
response.success = True
return response
ControlTurningMessage의 원천을 찾기 위해 rossrv 커맨드 사용(srv 조회 목적)$ rossrv list
control_msgs/QueryCalibrationState
control_msgs/QueryTrajectoryState
control_toolbox/SetPidGains
controller_manager_msgs/ListControllerTypes
controller_manager_msgs/ListControllers
controller_manager_msgs/LoadController
controller_manager_msgs/ReloadControllerLibraries
controller_manager_msgs/SwitchController
controller_manager_msgs/UnloadController
diagnostic_msgs/AddDiagnostics
diagnostic_msgs/SelfTest
dynamic_reconfigure/Reconfigure
(...)
service_tutorial에서 다음 세 가지 srv 사용 가능.
- ControlTurningMessage
- ControlTurningMessageRequest
- ControlTurningMessageResponse
request import
import 부분을 다시 정리하면, client에게 response는 불가. 따라서 request만 import 하면 됨. 하단과 같은 이름으로 가져옴.
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageResponse
init_node, loginfo가 보이고, Service Server class 생성 부분 보임.rospy.init_node('robot_turning_server')
rospy.loginfo("==== Robot Turning Server Started ====")
service_server = rospy.Service('/control_robot_angle', ControlTurningMessage, callback)
velocity_publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=1 )
rospy.spin()
rospy.Sevice('service-server-name', 'srv', 'callback')
기본 정보들과 함께 초기화 → 요청 들어올 때마다 콜백 실행.
ControlTurningMessage를 인자로 넣어주어야 함.
rospy.Service('/control_robot_angle', ControlTurningMessageResponse, callback)
주의: server는 response만 사용하니까 ControlTurningMessageResponse를 넣어야 하지 않나? 하는 생각으로 실수할 수 있다고 함!
publishercmd_vel로 publish하는 publisher velocity_publisher = rospy.Publisher("/cmd_vel", Twist, queue_size=1)
callbackrequest srv에서 각속도, 시간을 가져와 cmd_vel로 publish하는 부분.def callback(request):
print(request)
# publish to cmd_vel
cmd_vel = Twist()
cmd_vel.angular.z = request.angular_vel
start_time = time.time()
print('\nRobot Turning...')
while time.time() - start_time < request.time_duration:
velocity_publisher.publish(cmd_vel)
print("Done ...")
cmd_vel.angular.z = 0.0
velocity_publisher.publish(cmd_vel)
response = ControlTurningMessageResponse()
response.success = True
return response
response.success = Trueroslaunch를 통해 gazebo를 실행시키는 것/일반적으로 gazebo를 실행시킬 때에는 차이 존재.
roslaunch로 실행시키는 경우에는 gazebo_ros를 통해 rosnode들도 같이 실행됨. 그래서 모델을 불러오고, 제거하고, 위치를 이동시키고, 중력 상수를 바꾸는 등 수많은 service들이 같이 실행됨.
그 중에서, 이번에 배운 모델 spawn을 포함하여, 모델 제거, 시뮬레이션 초기화를 request하는 service client들을 하단 예시로 첨부함. (응용/연습ㄱㄱ)
#! /usr/bin/env python
"""
Fuctions for Gazebo model Spawn and Deletion
created by kimsooyoung : https://github.com/kimsooyoung
"""
import rospy
import rospkg
from std_srvs.srv import Empty
from geometry_msgs.msg import Pose
from gazebo_msgs.srv import SpawnModel, DeleteModel
def GazeboSpawnModel( name, pkg_name, pose, namespace='', frame='world'):
# model_name
model_name = name
# model_xml
rospack = rospkg.RosPack()
model_path = rospack.get_path(pkg_name)+'/models/'
with open (model_path + model_name + '.urdf', 'r') as xml_file:
model_xml = xml_file.read().replace('\n', '')
# robot_namespace
robot_namespace = namespace
# initial_pose
initial_pose = Pose()
initial_pose.position = pose.position
# z rotation -pi/2 to Quaternion
initial_pose.orientation = pose.orientation
# reference_frame
reference_frame = frame
# service call
spawn_model_prox = rospy.ServiceProxy('gazebo/spawn_urdf_model', SpawnModel)
result = spawn_model_prox(
model_name, model_xml,
robot_namespace, initial_pose,
reference_frame)
''' result fromat
bool success
string status_message
'''
print(result.status_message)
def GazeboDeleteModel(name):
'''
string model_name
---
bool success
string status_message
'''
delete_srv = DeleteModel()
model_name = name
spawn_model_prox = rospy.ServiceProxy('gazebo/delete_model', DeleteModel)
result = spawn_model_prox(model_name)
print(result.status_message)
def GazeboResetSimulation():
spawn_model_prox = rospy.ServiceProxy('gazebo/reset_simulation', Empty)
result = spawn_model_prox()
rospy.loginfo('Gazebo Simulation Reset')
rospy.init_node("gazebo_spawn_handler")