Ros 노드 통신 중 발생 할 수 있는 문제를 알아보고, 해결방안을 모색해보자.
📒발생 할 수 있는 문제 List
1.노드 간 동기화 문제
2.ROS 전송속도
3.ROS 처리 지연 문제
4.ROS 타임 슬롯 문제
5.ROS 노드의 순차 실행
노드 간 통신을 할 때 Publisher, Subscriber 가 topic 을 가지고 통신을 하는 데, 만약 Subscriber가 데이터를 받을 준비가 안돼어 있다면 어떤 일이 발생할까? 아래의 코드를 가지고 문제를 직면해보고 해결방안을 모색해보자.
🐍sender_serial.py
import rospy
from std_msgs.msg import Int32
rospy.init_node('sender_serial')
pub = rospy.Publisher('sync_issue',Int32)
rate = rospy.Rate(2)
count = 1
while not rospy.is_shutdown():
pub.publish(count)
count+=1
rate.sleep()
🐍reciever_serial.py
import rospy
from std_msgs.msg import Int32
def callback(msg):
print msg.data
rospy.init_node('receiver_serial')
sub = rospy.Subscriber('sync_issue',Int32,callback)
rospy.spin()
위와 같이 publisher와 Subscriber 의 코드를 작성하고 실행해보자!
실행화면은 아래와 같다.
Publisher는 첫번째로 1을 보냈는데,데이터가 누락되었고, 두번째 데이터부터 받기 시작했다.
첫번째 데이터가 누락되는 문제가 발생한 것이다.
원인은 Subscriber가 준비가 안 됐을 때, Publisher 가 메세지를 전송하였다는게 문제였다.
이를 해결 하기 위해 어떤 방법을 사용해야 할까?
rospy의 get_num_connections() 함수를 사용하면 해결 할 수 있다.
해당 함수는 특정 노드에 연결된 노드의 수를 리턴하는 함수이다. 아래와 같이 코드를 수정하였다.
🐍sender_serial.py
import rospy
from std_msgs.msg import Int32
rospy.init_node('sender_serial')
pub = rospy.Publisher('my_topic',Int32)
rate = rospy.Rate(2)
count = 1
while not pub.get_num_connections() : #추가내용
pass
while not rospy.is_shutdown():
pub.publish(count)
count+=1
rate.sleep()
publisher 에 연결된 노드의 개수가 0개라면 무한루프에 걸려 메세지를 보낼 수 없다. 만약 노드가 연결된다면 무한루프에서 탈출하여 다음 메세지를 보내는 while문으로 진입 할 수 있다. 아래의 화면은 새롭게 고친 코드 실행 화면이다.
첫번째 메세지부터 문제 없이 받을 수 있는 것을 확인 할 수 있다.
노드끼리 통신을 할때 메세지의 크기에 따른 통신 속도의 차이에 대해서 알아보자. 과연 데이터 크기에 따라 통신속도가 얼마나 차이가 날까? 코드를 작성하여 통신속도를 직접 확인해보자.
🐍sender_speed.py
import rospy
from std_msgs.msg import String
rospy.init_node('sender_serial')
pub = rospy.Publisher('speed',String)
rate = rospy.Rate(1)
msg_size = 10**6 #1Mb
#msg_size = 5 * 10**6 #5Mb
#msg_size = 10**7 #10Mb
#msg_size = 2 * 10**7 #20Mb
#msg_size = 5 * 10**7 #50Mb
while not rospy.is_shutdown():
msg = "#" * msg_size + "," + str(rospy.get_time()) + "," +str(msg_size)
pub.publish(msg)
rate.sleep()
"#"의 개수에 따라 메세지의 크기가 달라진다. 보낼 때의 시간, 데이터 크기를 함께 담아 전달한다.
rospy
의 get_time()
을 통해 현재 시간을 초 단위로 확인 할 수 있었다.
🐍reciever_speed.py
import rospy
from std_msgs.msg import String
def callback(msg):
split_msg = msg.data.split(",")
sent_time = float(split_msg[1])
msgSize = int(split_msg[2])
current_time = rospy.get_time()
time_gap = current_time - sent_time
bytePerSec = msgSize / time_gap
print "time gap : " , time_gap , "msgSize : ", msgSize
print "Speed : ", bytePerSec , "Byte/s"
rospy.init_node('receiver_serial')
sub = rospy.Subscriber('speed',String,callback)
rospy.spin()
메세지를 받아 보낸시간과 받은 시간의 차이를 계산하고 메세지 전송 속도를 계산한다.
전송시간: 약 0.01초 이내
속도 : 약 120 MBps
전송시간: 약 0.013초 이내
속도 : 약 360 MBps
전송시간: 약 0.013초 이내
속도 : 약 750 MBps
전송시간: 약 0.016초 ~ 0.026초
속도 : 약 768 ~ 1255 MBps
전송시간: 약 0.09초 이내
속도 : 약 580 MBps 이내
위와 같은 차트로 메세지 크기별 전송속도를 정리 할 수 있다. 20MB까지는 메세지의 크기에 비례해 속도가 증가하는 모습이 보였지만, 50MB 크기의 메세지에서는 속도가 오히려 줄어든 모습이 보였다.
노드끼리 통신을 할때 도착한 데이터를 미처 처리하지 못하면 어떻게 될까? 데이터를 쌓아 둘 것인가 버릴 것인가? 코드를 작성하여 알아보자.
🐍sender_overflow.py
import rospy
from std_msgs.msg import Int32
rospy.init_node('sender')
pub = rospy.Publisher('node_delay', Int32)
rate = rospy.Rate(1000)
cnt = 1
while not pub.get_num_connections():
pass
while not rospy.is_shutdown():
pub.publish(cnt)
cnt+=1
rate.sleep()
1초에 1000번 1씩 증가하는 값을 보내는 코드.
🐍reciever_overflow.py
import rospy
from std_msgs.msg import Int32
def callback(msg):
rospy.sleep(5)
print(msg.data)
rospy.init_node('receiver')
sub = rospy.Subscriber('node_delay', Int32, callback, queue_size =1)
rospy.spin()
rospy.sleep()
를 통해 강제적으로 노드가 동작안하게 작성.
reciever 노드에서 sleep 함수로 5초간 강제로 쉬게하여 메세지의 간격이 약 5000단위로 나타나는 것을 확인 할 수 있었다. 노드가 다른 동작을 하는동안 받은 메세지는 버린다는 결론을 내릴 수 있다. 데이터를 버리지 않는 방법은 있을까?
🐍reciever_overflow.py
import rospy
from std_msgs.msg import Int32
def callback(msg):
rospy.sleep(5)
print(msg.data)
rospy.init_node('receiver')
sub = rospy.Subscriber('node_delay', Int32, callback, queue_size =10000)
rospy.spin()
queue_size
의 사이즈를 늘려 누락되는 메세지를 없앨 수 있다. 다만 queue_size
의 크기가 충분하지 않다면 누락은 발생 할 수 있다.
누락되는 메세지 없이 기록되는 모습을 확인 할 수 있다.
주기적 발송에서 타임슬롯을 오버하면 어떻게 될까? 어떤일을 처리하는데 시간을 너무 많이 쓰거나 일을 할 타이밍을 놓치면 어떤식으로 동작할까?
rospy.Rate(N)
노드는 위와같이 Rate()
를 통해 주파수를 정할 수 있고, 위의 함수는 1초동안 N번 일을 반복한다는 의미이다.
한번 동작하는 시간을 타임슬롯이라고 하고,아래의 표와 같이 보통 노드의 동작은 한번의 타임 슬롯 내에서 처리할 수 있도록 만들어 남는 기간동안 쉬는 시간을 가지게 된다. 하지만 일을 많이 시켜서 해당 시간내에 일을 완수하지 못하면 어떻게 될까? 코드를 통해 알아보자.
🐍teacher_int32_job.py
import rospy
import time
from std_msgs.msg import Int32
def do_job(count):
for i in range(0,count):
i+=1
pub.publish(i)
def list_append_time():
start.append(start_time)
end.append(end_time)
sleep.append(sleep_time)
rospy.init_node('teacher')
pub = rospy.Publisher('msg_to_student', Int32, queue_size = 0)
rate = rospy.Rate(5)
while not rospy.is_shutdown():
start =[]
end = []
sleep = []
num = input("input count number > ")
rate.sleep()
total_start = time.time()
for j in range(0,5) :
start_time = time.time()
do_job(num)
end_time = time.time()
rate.sleep()
sleep_time = time.time()
list_append_time()
total_end = time.time()
for t in range(0,5):
sleep[t] = sleep[t] -end[t]
end[t] = end[t] -start[t]
for result in range(0,5):
print "spend time > ", round(end[result],4), 's'
print "sleep time > ", round(sleep[result],4), 's'
print "------------------------------------"
print "total time > ", round((total_end - total_start),4),'s'
print "------------------------------------\n\n"
num
값 을 받아서 num
만큼 반복되는 일을 진행한다.
총 일을 5번 수행하고 5번의 일하는 시간과 sleep
시간을 측정한다.
결과화면을 통해 일의양이 많아지면 어떤식으로 동작하는지 확인해보자.
num
이 100일때는 일을 금방 끝내고 sleep
시간이 대부분인 것을 확인 할 수 있다.
num
이 1000일때도 num
100일 때 보다는 오래 걸리지만, sleep
의 시간이 훨씬 많다. 수를 더 많이 늘려보자.
num
값이 100000 일때 첫번째 일에서 0.2s 을 넘겨 sleep
시간 없이 다음 일을 진행하였고, 나머지 일들은 시간내로 끝낸 것을 확인 할 수 있다. 그럼 0.2s을 훨씬 넘는다면?
num
이 150000일때는 타임슬롯 값보다 훨씬 오랜시간 일을 하고, sleep
시간 없이 일을 진행하였으며, 1초가 넘는 시간동안 일을 하였다.
타임슬롯보다 많은 일을 시킨다면 타임슬롯을 초과하더라도 주어진 일을 수행하는 것을 확인 할 수 있었다.
협업해야 하는 노드를 순서대로 기동시킬수 있는가? 만약 여러개의 노드가 주어지고 어떤 동작을 순차적으로 진행하려면 어떻게 해야할까? 순서를 지정해주지 않는다면 노드가 여러개면 랜덤 순서로 노드가 작동한다. 순서를 지정하는 법에 대해 알아보자.
🐍Reciever.py
import rospy
from std_msgs.msg import String
def callback(msg):
print msg.data
rospy.init_node('reciever')
sub = rospy.Subscriber('node_numbering', String, callback)
rospy.spin()
🐍First.py
import rospy
from std_msgs.msg import String
rospy.init_node('first')
pub = rospy.Publisher('node_numbering', String)
rate = rospy.Rate(5)
while not pub.get_num_connections() :
pass
while not rospy.is_shutdown():
pub.publish('My name is First')
rate.sleep()
🐍Second.py
🐍Third.py
🐍Fourth.py
Second
~Fourth
의 구조는 first
와 흡사하기 때문에 생략한다.
🚀node_numbering.launch
<launch>
<node pkg = "node_numbering" type="Receiver.py" name = "receiver" output ="screen"/>
<node pkg = "node_numbering" type="First.py" name = "first"/>
<node pkg = "node_numbering" type="Second.py" name = "second"/>
<node pkg = "node_numbering" type="Third.py" name = "third"/>
<node pkg = "node_numbering" type="Fourth.py" name = "fourth"/>
</launch>
단순한 Subscriber 와 4개의 Publisher를 선언한 뒤 launch
를 사용하여 동시에 노드를 동작시켰다. 순서를 지정해주지 않았는데 어떤식으로 동작할까?
두 가지 결과를 보면 임의의 순서대로 노드를 처리하는 것을 확인 할 수 있었다. 두번의 시도 외에도 전부 다른 순서의 결과가 나타났다.
순서를 지정해주기 위해 어떤 일을 해야할까?
🐍Reciever.py
import rospy
from std_msgs.msg import String
name = "receiver"
pub_topic = "start_ctl"
sub_topic = "msg_to_receiver"
def callback(data):
rospy.loginfo("I heard %s", data.data)
rospy.init_node(name)
rospy.Subscriber(sub_topic, String, callback)
pub = rospy.Publisher(pub_topic, String, queue_size =1)
rate = rospy.Rate(10)
hello_str = String()
rospy.sleep(1)
sq = ["first","second", "third","fourth"]
pub_msg = String()
for i in sq :
pub_msg.data = i + ":go"
pub.publish(pub_msg)
rospy.sleep(3)
rospy.spin()
Subscriber 역할만 하던 노드에 Publisher 기능을 추가한다. "start_ctl"을 통해 first
~fourth
까지 순서대로 토픽을 보내준다. sq에 담긴 순서대로 진행 할 수 있다. 순서대로 first
~fourth
: go 라는 메세지를 보내고 3초간 sleep
한다. 다음 코드를 통해 메세지를 받은 뒤에 어떤식으로 동작하는지 알아보자.
🐍First.py
import rospy
from std_msgs.msg import String
name = "first"
pub_topic = "msg_to_receiver"
sub_topic = "start_ctl"
OK = None
def ctl_callback(data):
global OK
OK = str(data.data)
rospy.init_node(name)
rospy.Subscriber(sub_topic,String,ctl_callback)
while True :
if OK == None :
continue
d = OK.split(":")
if (len(d)==2) and (d[0] ==name) and (d[1] == "go"):
break
pub = rospy.Publisher(pub_topic, String,queue_size = 1)
rate = rospy.Rate(2)
hello_str = String()
while not rospy.is_shutdown():
hello_str.data = "my name is" + name
pub.publish(hello_str)
rate.sleep()
start_ctl 토픽으로 메세지를 받고 OK값에 대입한다. 이때 노드의 이름과 "go"가 포함되어 있으면 while
문에서 벗어나서 메세지를 보내기 시작한다.
🐍Second.py
🐍Third.py
🐍Fourth.py
Second
~Fourth
의 구조는 first
와 흡사하기 때문에 생략한다.
각 노드들이 순서대로 메세지를 보내는 것을 확인 할 수 있다.