[ROS STUDY] ROS 9강

‍이수빈·2025년 2월 10일

[ROS STUDY]

목록 보기
9/12

출처: YOUTUBE: ROS 1 (09강 of 25강) - Developers and Creators

service server 생성

Turning tinybot

$ roslaunch gcamp_gazebo gazebo_world.launch 
# gazebo 시뮬레이션이 완전히 켜질때까지 기다려 주세요!!

# 새 터미널을 엽니다.
$ rosrun service_tutorial robot_turning_srv.py

# 또 다른 터미널을 엽니다.
$ rosrun service_tutorial robot_turning_client.py
[INFO] [1610111418.391926, 0.000000]: ==== Robot Turning Server Started ====
> Type turning time duration: 
  • 속도: 회전하는 각속도, 단위: radian
  • 시간: 각속도로 회전하는 시간, 단위: 초

robot_turning_client.py

#! /usr/bin/env python

"""
basic rospy service example

referenced from wiki.ros.org

url : http://wiki.ros.org/ROS/Tutorials/WritingServiceClient%28python%29 
"""

import sys
import rospy
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageRequest

rospy.init_node("robot_turning_client")
rospy.loginfo("==== Robot Turning Server Started ====")

rospy.wait_for_service("/control_robot_angle")
service_client = rospy.ServiceProxy("/control_robot_angle", ControlTurningMessage)

request_srv = ControlTurningMessageRequest()

while not rospy.is_shutdown():
    try:
        t = input("> Type turning time duration: ")
        vel = input("> Type turning angular velocity: ")

        if vel > 1.5707:
            raise ArithmeticError("Velocity too high !!")

        request_srv.time_duration = t
        request_srv.angular_vel = vel
        break
    except ArithmeticError as e:
        rospy.logerr(e)
    except Exception as e:
        rospy.logerr("Not a number type number plz !!")

result = service_client(request_srv)

print(result)
  • (topic에서 message import 했던 것처럼)service에서 사용되는 srv import
# 이전 topic message
from sensor_msgs.msg import LaserScan 

# service는 이렇게 .srv로 import
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageRequest
  • wait_for_service를 통해 server가 동작중인지 확인하고, ServiceProxy를 생성(client가 request를 해야 하는데, 그 대상이 되는 server가 있어야 하니)
rospy.wait_for_service('/control_robot_angle')
  • 예외 처리 부분.(pi (3.141)의 절반이 되는 숫자 1.5707을 사용. 자주 사용되어서 외워두면 편하다고 함)
while not rospy.is_shutdown():
    try:
        t = input('> Type turning time duration: ')
        vel = input('> Type turning angular velocity: ')

        if vel > 1.5707:    
            raise ArithmeticError("Velocity too high !!")

        request_msg.time_duration = t
        request_msg.angular_vel = vel
        break
    except ArithmeticError as e:
        rospy.logerr(e)
    except Exception as e:
        rospy.logerr("Not a number type number plz !!")

robot_turning_srv.py

#! /usr/bin/env python

import time
import rospy
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageResponse
from geometry_msgs.msg import Twist

def callback(request):

    print(request)
    
    # publish to cmd_vel
    cmd_vel = Twist()
    cmd_vel.angular.z = request.angular_vel
    start_time = time.time()

    print('\nRobot Turning...')
    while time.time() - start_time < request.time_duration:
        velocity_publisher.publish(cmd_vel)
    
    print("Done ...")
    cmd_vel.angular.z = 0.0
    velocity_publisher.publish(cmd_vel)

    response = ControlTurningMessageResponse()
    response.success = True

    return response

rospy.init_node('robot_turning_server')
rospy.loginfo("==== Robot Turning Server Started ====")

service_server = rospy.Service('/control_robot_angle', ControlTurningMessage, callback)
velocity_publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=1 )

rospy.spin()
  • import 부분 차이 존재(이전과 비교)
# client
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageRequest

# server
from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageResponse
  • ControlTurningMessage srv 형식
$ rossrv info ControlTurningMessage
[service_tutorial/ControlTurningMessage]:
uint32 time_duration
float64 angular_vel
---
bool success

[cpp_service_tutorial/ControlTurningMessage]:
uint32 time_duration
float64 angular_vel
---
bool success

파이썬 코드에서 사용하는 경우, [MessageName]Request, [MessageName]Response의 형태를 갖게 됨.

    response = ControlTurningMessageResponse()
    response.success = True
		
	return response
  • ControlTurningMessage의 원천을 찾기 위해 rossrv 커맨드 사용(srv 조회 목적)
$ rossrv list
control_msgs/QueryCalibrationState
control_msgs/QueryTrajectoryState
control_toolbox/SetPidGains
controller_manager_msgs/ListControllerTypes
controller_manager_msgs/ListControllers
controller_manager_msgs/LoadController
controller_manager_msgs/ReloadControllerLibraries
controller_manager_msgs/SwitchController
controller_manager_msgs/UnloadController
diagnostic_msgs/AddDiagnostics
diagnostic_msgs/SelfTest
dynamic_reconfigure/Reconfigure
(...)
  • service_tutorial에서 다음 세 가지 srv 사용 가능.
    - ControlTurningMessage
    - ControlTurningMessageRequest
    - ControlTurningMessageResponse

  • request import
    import 부분을 다시 정리하면, client에게 response는 불가. 따라서 request만 import 하면 됨. 하단과 같은 이름으로 가져옴.

from service_tutorial.srv import ControlTurningMessage, ControlTurningMessageResponse
  • init_node, loginfo가 보이고, Service Server class 생성 부분 보임.
rospy.init_node('robot_turning_server')
rospy.loginfo("==== Robot Turning Server Started ====")

service_server = rospy.Service('/control_robot_angle', ControlTurningMessage, callback)
velocity_publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=1 )

rospy.spin()
  • rospy.Sevice('service-server-name', 'srv', 'callback')
    기본 정보들과 함께 초기화 → 요청 들어올 때마다 콜백 실행.

  • ControlTurningMessage를 인자로 넣어주어야 함.

rospy.Service('/control_robot_angle', ControlTurningMessageResponse, callback)
주의: server는 response만 사용하니까 ControlTurningMessageResponse를 넣어야 하지 않나? 하는 생각으로 실수할 수 있다고 함!
  • publisher
    : 본 예제가 각속도, 시간을 request 받아 로봇을 움직이는 것. cmd_velpublish하는 publisher
	velocity_publisher = rospy.Publisher("/cmd_vel", Twist, queue_size=1)
  • callback
    : . client로부터 request가 올 때마다 실행될 부분. request srv에서 각속도, 시간을 가져와 cmd_velpublish하는 부분.
def callback(request):

    print(request)
    
    # publish to cmd_vel
    cmd_vel = Twist()
    cmd_vel.angular.z = request.angular_vel
    start_time = time.time()

    print('\nRobot Turning...')
    while time.time() - start_time < request.time_duration:
        velocity_publisher.publish(cmd_vel)
    
    print("Done ...")
    cmd_vel.angular.z = 0.0
    velocity_publisher.publish(cmd_vel)

    response = ControlTurningMessageResponse()
    response.success = True

    return response
  • response.success = True
    client에게 성공했다는 response 전송.
    현재 type이 bool이기에, True 입력.

gazebo services

roslaunch를 통해 gazebo를 실행시키는 것/일반적으로 gazebo를 실행시킬 때에는 차이 존재.

roslaunch로 실행시키는 경우에는 gazebo_ros를 통해 rosnode들도 같이 실행됨. 그래서 모델을 불러오고, 제거하고, 위치를 이동시키고, 중력 상수를 바꾸는 등 수많은 service들이 같이 실행됨.

그 중에서, 이번에 배운 모델 spawn을 포함하여, 모델 제거, 시뮬레이션 초기화를 request하는 service client들을 하단 예시로 첨부함. (응용/연습ㄱㄱ)

#! /usr/bin/env python

"""
Fuctions for Gazebo model Spawn and Deletion
created by kimsooyoung : https://github.com/kimsooyoung
"""

import rospy
import rospkg

from std_srvs.srv import Empty
from geometry_msgs.msg import Pose
from gazebo_msgs.srv import SpawnModel, DeleteModel

def GazeboSpawnModel( name, pkg_name, pose, namespace='', frame='world'):
    # model_name
    model_name = name

    # model_xml
    rospack = rospkg.RosPack()
    model_path = rospack.get_path(pkg_name)+'/models/'

    with open (model_path + model_name + '.urdf', 'r') as xml_file:
        model_xml = xml_file.read().replace('\n', '')

    # robot_namespace
    robot_namespace = namespace

    # initial_pose
    initial_pose = Pose()
    initial_pose.position = pose.position

    # z rotation -pi/2 to Quaternion
    initial_pose.orientation = pose.orientation

    # reference_frame
    reference_frame = frame

    # service call
    spawn_model_prox = rospy.ServiceProxy('gazebo/spawn_urdf_model', SpawnModel)
    result = spawn_model_prox(
        model_name, model_xml,
        robot_namespace, initial_pose,
        reference_frame) 

    ''' result fromat
    bool success
    string status_message
    '''

    print(result.status_message)
    

def GazeboDeleteModel(name):

    '''
    string model_name
    ---
    bool success
    string status_message
    '''

    delete_srv = DeleteModel()

    model_name = name

    spawn_model_prox = rospy.ServiceProxy('gazebo/delete_model', DeleteModel)
    result = spawn_model_prox(model_name)

    print(result.status_message)


def GazeboResetSimulation():
    spawn_model_prox = rospy.ServiceProxy('gazebo/reset_simulation', Empty)
    result = spawn_model_prox()

    rospy.loginfo('Gazebo Simulation Reset')

rospy.init_node("gazebo_spawn_handler")
profile
🏫 Kookmin University, Major in Electrical Engineering (First Major), AI Big Data & Management (Double Major), Smart Car ICT (Interdisciplinary Major)

0개의 댓글