[ROS] ROS Bridge - Java로 ROS 통신 구현하기

김희정·2023년 7월 20일
3

ROS

목록 보기
4/4
post-thumbnail

💎 들어가며

ROS는 자체가 로봇이라는 너무나 생소하고 희귀한 도메인이기 때문에 레퍼런스가 많이 없습니다.

아래와 같은 메일을 받게 되어, 제 블로그가 미약하게나마 사람들에게 도움을 주었다는 사실에 기뻤습니다.

받은 메일

C#이나 Unity 계열은 아닌지라 소스와 같이 직접적인 도움은 드리지 못했지만, 동작원리에 대해 설명해주면 좋을 거 같다는 생각이 들었습니다.

그래서!!!

이번 포스팅에서는 통신을 위해 고생하실 분들을 위해 RosBridge 프로토콜을 구현한 메커니즘(mechanism, 동작원리) 에 대해 작성해보고자 합니다.



1. WebSocket

본격적인 메커니즘에 대해 설명하기에 앞서 저는 RosBridge 프로토콜 중 WebSocket 기반으로 통신을 구현했습니다.

이에 기본적인 웹소켓 지식에 대해 설명하고자 합니다.


1.1 What is WebSocket?

웹소켓이란 무엇인가?

이제 막 입사한 신입시절에 처음 웹(HTTP)과 소켓 프로그래밍(Socket), 웹소켓 통신(WebSocket) 등 여러가지 기술들을 혼합해서 경험하게 되어, 혼란스러웠던 경험이 있습니다.

그 차이점을 정리해보고자 궁금했던 점 위주로 문단을 구성해보았습니다.


🖥️ WebSocket = Web + Socket

웹소켓(WebSocket) 의 사전적 의미

인터넷을 통해 서로 연결된 컴퓨터들에 의해 정보를 공유하는 전 세계적인 네트워크인 웹 (Web, World Wide Web)과 네트워크의 연결지점을 의미하고 양방향 통신을 의미하는 소켓(Socket)이 결합된 단어입니다.

통신의 방향

웹소켓은 웹 브라우저와 웹 서버 사이에 양방향 통신 채널을 열어주는 기술로서, 웹에서 실시간 데이터를 주고받기 위한 통신 프로토콜을 지칭하는 용어입니다.


👻 Socket vs WebSocket

소켓웹소켓은 둘다 양방향 통신이지만, 목적과 동작 방식에 차이가 있습니다.


목적

  • 소켓: 네트워크 애플리케이션을 개발할 때 사용되며, 프로세스 간 통신을 구현하는데 사용됩니다.
  • 웹소켓: 웹 애플리케이션에서 실시간으로 데이터를 주고 받거나, 서버로부터 데이터를 푸시 받는 데 사용됩니다.

동작방식

  • 소켓: 프로세스들 간에 양방향 통신. TCP와 UDP 같은 전송 계층 프로토콜 사용.
  • 웹소켓: 웹 브라우저와 웹 서버 간에 양방향 실시간 통신. HTTP 프로토콜을 기반, 클라이언트와 서버 사이에 단일 TCP 연결 유지.

👻 HTTP vs WebSocket

웹에서 사용되는 프로토콜인 HTTP와 WebSocket의 통신 방향과 연결 유지에 대해 비교해보면 아래와 같습니다.

  • HTTP: 클라이언트의 요청이 있을 때만 서버가 응답하는 단방향 통신으로 클라이언트의 요청을 서버가 응답하면 커넥션을 닫는 방식으로, 각 요청마다 새로운 연결을 설정합니다.

  • WebSocket: 서버와 클라이언트가 서로 정보를 주고 받을 수 있는 전이중 통신으로, 클라이언트와 서버 사이에 단일 TCP 연결을 유지하여 지속적으로 통신할 수 있습니다.


😲 HTTP 기반의 WebSocket

앞서 통신 방향으로 서로 다른 프로토콜로 비교했지만, WebSocket은 사실 HTTP 프로토콜 기반입니다.

아니 아까는 다른 거라면서요?

웹소켓 연결을 설정할 때, 클라이언트는 HTTP 요청을 보내고 서버도 이에 대한 요청을 검증해 응답을 보내는 방식으로 웹소켓 연결을 시작합니다.

HTTP 기반으로 연결을 수립할 때 사용하고, 이후에는 독립적으로 동작

WebSocket은 HTTP 기반이지만 핸드쉐이크 과정을 거친 후에는 독립적으로 동작하며, HTTP와는 다른 방식으로 통신하게 됩니다.

Client 요청

클라이언트는 웹소켓 연결을 위해 HTTP 기반의 Header에 Upgrade 헤더를 포함하는 특수한 요청을 보냅니다.

GET /chat HTTP/1.1
Host: example.com
Connection: Upgrade
Upgrade: websocket

Server 응답

서버는 클라이언트의 웹소켓 요청을 받으면, 지원 가능한 웹소켓 프로토콜 버전을 확인하고 요청이 유효한지 검사합니다. 그리고 요청이 유효하다면 클라이언트와 웹소켓 연결을 위한 응답을 보냅니다. (핸드쉐이크 과정)

HTTP 핸드쉐이크는 클라이언트가 서버에 어떤 요청을 보내기 전에 서로간의 통신을 설정하고 동의하기 위해 이루어지는 과정입니다.

핸드쉐이크의 주요 목적은 연결 설정, 프로토콜 협상, 보안 설정 등이 있습니다.

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

1.2 Server vs Client

WebSocket은 서버와 클라이언트가 서로 양방향 통신이기 때문에 내가 구현해야 될 로직이 서버Server인지, Client인지 헷갈릴 때가 있습니다.

Server와 Client를 같이 개발하는 경우가 대부분이기도 하고, 주로 서버를 개발하기 때문이기도 하죠. 서버 프로그램이 클라이언트가 된다고 생각하면 조금 헷갈리지 않나요?

WebSocket 사용 예시로는 웹 애플리케이션에서는 주로 백(Java)단에서 WebSocket 서버를 구성하고, 클라이언트(JavaScript)단에서 SocketJS 라이브러리로 WebSocket 클라이언트를 개발합니다.

또 단순히 Java 대 Java 프로그램(프로세스)끼리 연결하는 경우도 있습니다.

당연한 소리지만, 내가 연결해야 될 서버가 없다면 Server 개발, 있다면 Client 개발입니다.

RosBridge

RosBridgeROSWS 서버를 제공하기 때문에 프로그램에서 통신하기 위해서는 WebSocket Client로 구현해야합니다.



2. RosBridge 통신 구현

2.1 WebSocket Client를 구현하여 RosBridge Server와 연결

WebSocket Client

본격적인 개발에 들어가기 위해서는 일단 내가 서버 프로그램을 개발 중이더라도 RosBridge에서 만큼은 클라이언트라는 것을 인지해야 합니다.

RosBridge Server와 연결하기 위해서는 WebSocket Client 기능이 필요한데, 각 언어 별로 해당 라이브러리나 프레임워크를 이용하면 됩니다.


Java 라이브러리

저는 주 언어가 Java이기 때문에, Java WebSocket Client 라이브러리를 소개해볼까 합니다.

예제 소스는 내용이 길어져서 아래 [Appendix]에 추가하였으니 필요하신 분은 참고하세요. ^^


Spring WebSocket

Spring Boot의 starter 프로젝트의 websocket 모듈입니다.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

Vert.x Core

Vert.x는 JVM 기반의 네트워크 라이브러리로 다양한 네트워크 방식으로 다양한 네트워킹 기능을 지원해주는 라이브러리입니다.

<!-- Vertx -->
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-core</artifactId>
    <version>4.4.4</version>
    <scope>provided</scope>
</dependency>

저는 Vert.x 기반으로 RosBridge Library 개발했습니다.

그 이유는 Vert.x의 Promise 패턴이 비동기 통신 시에 콜백 지옥에 빠지지 않도록 도와주는 점이 가장 매력적으로 다가왔습니다.


2.2 WebSocket Client의 기본 기능 구현

WebSocket Client 라이브러리를 서칭했으면 사용하고자 하는 프로젝트에 적용하고, 기본적으로 제공하는 기능들을 구현해야합니다.

  • 연결 (connect)
  • 연결 해제 (close)
  • 메세지 전송자 (sender)
  • 메세지 응답자 (receiver)

RosBridge는 이미 정의된 프로토콜이기 때문에, 우리가 보낼 수 있는 주고 받는 메세지는 한정적입니다. 이를 OP(Operations)라고 합니다.

  • OP를 JSON 형식으로 생성하여 전송
  • 응답 시 OP 별로 처리

아래는 OP인 advertise 예시입니다. (참조: 이전 포스팅)

🔔 advertise

advertise: Topic을 게시할 예정일 때 사용. ROS Master에게 Topic 등록을 요청한다.

{
    "op": "advertise",
	(optional) "id": <string>,
	"topic": <string>,
	"type": <string>
}

Topic - 게시를 알릴 Topic 명
Type - Topic 유형


2.3 메세지 전송자 구현

저는 OP들을 각각 객체로 구현하고, 메인클래스의 메소드 추가를 통해 전송자를 구현했습니다.

예시로 가장 간단한 Advertise 소스를 보여드리겠습니다.


OP 객체는 Lombok으로 생성자와 Builder, Getter 메소드를 구현했습니다.

package io.github.twinklekhj.ros.op;


import io.vertx.core.json.JsonObject;
import lombok.*;


/**
 * [RosOperation] 토픽 게시
 *
 * @author khj
 */

@Builder
@RequiredArgsConstructor
@ToString
@Getter
public class RosAdvertise implements RosOperation {
    private final Type op = Type.ADVERTISE_TOPIC;
    @Builder.Default
    private final String id = String.format("advertise_%s", RosOperation.current());
    @NonNull
    private final String topic;
    @NonNull
    private final String type;

    private static RosAdvertiseBuilder builder() {
        return new RosAdvertiseBuilder();
    }

    public static RosAdvertiseBuilder builder(String topic, String type) {
        return builder().topic(topic).type(type);
    }

    public static RosAdvertiseBuilder builder(RosCommand command) {
        return builder().topic(command.getName()).type(command.getType());
    }

    @Override
    public JsonObject getJsonObject() {
        return new JsonObject()
                .put("op", this.op.code)
                .put("topic", this.topic)
                .put("type", this.type)
                .put("id", this.id);
    }

    @Override
    public Type getOperation() {
        return this.op;
    }
}

아래와 같이 생성할 수 있겠네요

RosAdvertise op = new RosAdvertise(topic, type);
RosAdvertise op = RosAdvertise.builder(topic, type).build();

/**
 * [Topic] 토픽 발행 공고
 *
 * @param op 발행정보
 * @return 콜백함수
 */
public Promise<RosAdvertise> advertise(RosAdvertise op) {
    Promise<RosAdvertise> promise = Promise.promise();

    send(op).onSuccess(unused -> {
        this.publishedTopics.add(op.getTopic());
        promise.complete(op);
    }).onFailure(promise::fail);

    return promise;
}

/**
 * [RosBridge] 메세지 전송
 *
 * @param support - 보낼 메세지
 * @return 메세지 전송 성공 여부
 */
private Future<Void> send(RosOperation support) {
    JsonObject json = support.getJsonObject();
    return this.webSocket.write(json.toBuffer());
}

2.4 메세지 응답자 구현

메세지 응답자는 등록된 메소드로 모든 메세지를 받을 수 있기 때문에 해당 메소드에서 OP 별로 처리하였습니다.

public void onMessage(Buffer buffer) {
    JsonObject json = buffer.toJsonObject();
    if (json.containsKey("op")) {
        // publish
        String op = json.getString("op");
        switch (op) {
            case "publish":
                // 받은 토픽 처리
                Set<String> listeners = this.topicListeners.get(topic);
                if (this.topicListeners.containsKey(topic)) {
                    listeners.forEach(listener -> {
                        this.bus.publish(listener, json.getJsonObject("msg"));
                    });
                 }
                break;
            case "service_response":
                // 받은 서비스 메세지 처리
                RosResponse res = RosResponse.fromJsonObject(json);
                break;
            case "fragment":
                // 조각난 모음 처리
                break;
        }
    }
}

OP를 객체로 구현해놓으면 또 하나의 장점이 파싱시에 수월하다는 점입니다.

Topic 처리 부분 로직에 대해 간단하게 설명하면

  • Topic을 구독(subscribe)할 때, Listener를 받아서 topicListners에 등록
  • 이벤트 발생시 topicListeners에 등록된 Topic이 발행되면 해당 Topic에 등록된 listener들이 동작

2.5 메세지 객체 구현

마지막 팁으로, 자주 사용되는 메세지 객체를 클래스로 구현하는 것입니다. 이는 Publish나 Subscribe 할 때 사용됩니다.

몇 십개의 메세지 객체를 구현하면서 정말 힘든 작업이었습니다. 🤮🤮

하지만 만들어 놓으니 편하네요! 메세지 객체에 fromXXX()과 같은 파싱 메소드를 구현해 놓으면, 손쉽게 객체를 만들 수 있습니다.

저는 메세지 객체 구현시에 ros 공식 홈페이지(ros wiki) 를 정말 많이 참고했습니다. ros wiki에 명시된 내용이 프로토콜이나 다름없기 때문에, wiki에 있는 추상화된 메세지 타입을 구현한다고 보시면 됩니다.

예시) Int8 객체 - ros wiki

package io.github.twinklekhj.ros.type.std;

import io.github.twinklekhj.ros.type.RosMessage;
import io.vertx.core.json.JsonObject;
import lombok.ToString;

@ToString
public class Int8 extends RosMessage {
    public static final String TYPE = "std_msgs/Int8";
    public static final String FIELD_DATA = "data";

    private final byte data;

    public Int8() {
        this((byte) 0);
    }

    public Int8(byte data) {
        super(jsonBuilder().put(FIELD_DATA, data), Int8.TYPE);
        this.data = data;
    }

    public static Int8 fromJsonString(String jsonString) {
        return Int8.fromMessage(new RosMessage(jsonString, TYPE));
    }

    public static Int8 fromMessage(RosMessage m) {
        return Int8.fromJsonObject(m.getJsonObject());
    }

    public static Int8 fromJsonObject(JsonObject jsonObject) {
        byte data = jsonObject.containsKey(FIELD_DATA) ? jsonObject.getInteger(FIELD_DATA).byteValue() : 0;
        return new Int8(data);
    }

    public byte getData() {
        return this.data;
    }

    @Override
    public Int8 clone() {
        return new Int8(this.data);
    }
}


Appendix

Appendix A. Spring Example

아래와 같이 WebSocketClient 프로그램을 작성한 뒤, 스프링 ApplicationRunner를 통해 실행해주었습니다.

  • Client 프로그램
package io.github.twinklekhj.wsclient.ws;

import java.lang.reflect.Type;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.RestTemplateXhrTransport;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;

@Service
public class Client {
	private static Logger logger = LoggerFactory.getLogger(Client.class);
	final CountDownLatch latch = new CountDownLatch(1);

	@Value("${url.ws}")
	private String ws_url;
	
	public void start() {
		logger.info("Connect with WebSocket: {}", URI.create(ws_url));

		// sock js transport
		List<Transport> transports = new ArrayList<>(2);
		transports.add(new WebSocketTransport(new StandardWebSocketClient()));
		transports.add(new RestTemplateXhrTransport());

		SockJsClient sockjsClient = new SockJsClient(transports);
		WebSocketStompClient stompClient = new WebSocketStompClient(sockjsClient);
		stompClient.connect(ws_url, new StompSessionHandler() {
			@Override
			public void handleFrame(StompHeaders headers, Object payload) {
				String data = new String((byte[]) payload);
				
				// Data 가공
				logger.info("data - {}", data);
				latch.countDown();
			}

			@Override
			public Type getPayloadType(StompHeaders headers) {
				return byte[].class;
			}

			@Override
			public void handleTransportError(StompSession session, Throwable exception) {
				latch.countDown();
			}

			@Override
			public void handleException(StompSession session, StompCommand command, StompHeaders headers,
					byte[] payload, Throwable exception) {
				logger.info("headers: {}", headers);
				logger.error("error: {}", exception.getMessage());
				latch.countDown();
			}

			@Override
			public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
				logger.info("WebSocket 연결 상태: {}", session.isConnected());
			}
		});
	}
}
  • ApplicationRunner
package io.github.twinklekhj.wsclient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import io.github.twinklekhj.wsclient.Client;

@Slf4j
@Component
public class WebSocketRunner implements ApplicationRunner {
	@Autowired
	private Client client;
	
	@Override
	public void run(ApplicationArguments args) throws Exception {
		log.info("WebSocket Client 프로그램을 실행합니다.");
		client.start();
	}
}

Appendix B. Vert.x Example

package io.github.twinklekhj.ros.core;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Client extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        connect();
    }

    /**
     * [Client] WebSocket 연결
     *
     * @return future
     */
    public Future<WebSocket> connect() {
        // HttpClient 생성 Option
        HttpClientOptions httpOptions = new HttpClientOptions();

        // HttpClient 생성
        HttpClient client = this.vertx.createHttpClient(httpOptions);

        // WebSocketClient 생성 Option
        WebSocketConnectOptions wsOptions = new WebSocketConnectOptions();
        wsOptions.setHost("127.0.0.1");
        wsOptions.setPort(9090);
        wsOptions.setTimeout(1000);

        // WebSocketClient 생성
        Future<WebSocket> future = client.webSocket(wsOptions);
        future.onSuccess(webSocket -> {
            // 메세지 핸들러 추가
            webSocket.handler(this::onMessage);
        }).onFailure(throwable -> {
            log.error("WebSocket Connect Error! {}", throwable.getMessage());
        });

        return future;
    }

    /**
     * [Client] WebSocket 에서 받은 메세지 처리
     *
     * @param buffer 받은 메세지
     */
    private void onMessage(Buffer buffer) {
        // 메세지 처리
        String plain = buffer.toString();
        JsonObject json = buffer.toJsonObject();
        JsonArray array = buffer.toJsonArray();
    }
    
    /**
     * [Client] 메세지 전송
     *
     * @param json - 보낼 메세지
     * @return 메세지 전송 성공 여부
     */
    private Future<Void> send(JsonObject json) {
        if (props.isPrintProcessMsg()) {
            logger.info("ros:send message");
        }
        return this.webSocket.write(json.toBuffer());
    }
}


💎 References


💎 마치며

이상으로 RosBridge Protocol 구현을 마칩니다.

메커니즘을 작성하면서 제가 미처 생각하지 않았던 부분 (websocket 동작방식, handsake 과정 등)이나 연동이라는 추상적인 개념을 프로세스(전송자 구현, 응답자 구현 등)로 정리할 수 있게 되어, 저에게도 많은 도움이 되었습니다.

도움이 되셨으면 좋겠어요.😊

profile
Java, Spring 기반 풀스택 개발자의 개발 블로그입니다.

2개의 댓글

comment-user-thumbnail
2023년 7월 20일

항상 좋은 글 감사합니다.

답글 달기
comment-user-thumbnail
2023년 11월 23일

yr

답글 달기