MQTT
와 ECS
를 활용하여 주차장의 주차 공간에 대해서 출력하는 프로그램을 만들어 보려고 한다.
만드는 이유는, MQTT
통신을 연습하고 이해하는 동시에 ECS
에 대해 이해도를 높이고자 한다.
ECS
의 경우 분산 시스템이나, 실시간으로 상태가 변경되어야 하는 모델에서 유용하다고 한다.
아직 정확히는 모르지만 공부해나가는 중이다.
간단한 시나리오는 다음과 같다.
1. 주차장내에 주차 공간을 감시하고 있다.
2. 차량이 들어와서 주차한 공간에 대해서 MQTT Publisher가 데이터를 보내준다.
3. MQTT Subscriber가 해당 토픽을 구독하고 있다가 데이터를 받아 파싱 후 주차 공간의 상태를 변경해준다.
4. 실시간으로 현재 주차장 상태를 표시해준다.
참고로 MQTT Broker
가 설치되어 있어야 한다.
use std::{process, time::Duration};
use serde::{Deserialize, Serialize};
const MQTT_BROKER: &str = "tcp://localhost:1883"; // 사용할 MQTT 브로커 주소
const MQTT_TOPIC: &str = "parking/status"; // 구독할 토픽
// -------MQTT 메시지 구조체 ---------
#[derive(Serialize,Deserialize, Debug)]
struct MqttMessage {
space_id: String,
occupied: bool,
}
fn main() {
/* ----------- MQTT Client 생성 -----------*/
let mut cli = paho_mqtt::Client::new(MQTT_BROKER).unwrap_or_else(|e| {
println!("Error Creating the client: {:?}", e);
process::exit(1);
});
cli.set_timeout(Duration::from_secs(5));
/* ----------- 주차장 상태 생성, 사용자 입력-----------*/
let mut space_id = String::new();
std::io::stdin().read_line(&mut space_id).unwrap();
space_id = space_id.trim().to_string();
println!("{}", space_id);
if let Err(e) = cli.connect(None) {
println!("Unable to connect: {:?}", e);
process::exit(1);
}
let mqtt_msg = MqttMessage {
space_id: space_id,
occupied: true,
};
/* ----------- MQTT 메시지 생성, 주차장 상태를 json으로 직렬화 함-----------*/
let msg = paho_mqtt::MessageBuilder::new()
.topic(MQTT_TOPIC)
.payload(serde_json::to_string(&mqtt_msg).unwrap())
.qos(1)
.finalize();
/* ----------- MQTT 메시지 publish -----------*/
if let Err(e) = cli.publish(msg) {
println!("Error sending message: {:?}", e);
}
cli.disconnect(None).unwrap();
}
간단하게 사용자의 입력(주차장 공간 번호)을 받아 json
으로 직렬화 후 MQTT 메시지로 만들어 publish
한다.
use std::{sync::{mpsc::{self, Receiver, Sender}, Arc, Mutex}, thread, time::Duration};
use bevy::{prelude::*, time::common_conditions::on_timer};
use serde::Deserialize;
const MQTT_BROKER: &str = "tcp://localhost:1883"; // 사용할 MQTT 브로커 주소
const MQTT_TOPIC: &str = "parking/status"; // 구독할 토픽
const CLIENT_ID: &str = "bevy_parking_system";
// ------ 컴포넌트 정의 --------
#[derive(Component)]
struct ParkingSpaceId(String);
#[derive(Component)]
struct Occupancy(bool);
// -------MQTT 메시지 구조체 ---------
#[derive(Deserialize, Debug)]
struct MqttMessage {
space_id: String,
occupied: bool,
}
// ------ 리소스 정의 --------
#[derive(Resource)]
struct MqttReceiver(Receiver<String>);
unsafe impl Sync for MqttReceiver{}
fn main() {
App::new()
.add_plugins(MinimalPlugins)
.add_systems(Startup, (
setup_parking_spaces,
setup_mqtt_listener,
))
.add_systems(Update,
(
update_parking_status_from_mqtt,
current_parking_status_showing.run_if(on_timer(Duration::from_secs(5))),
)
)
.run();
}
/* ------------------ ECS System ------------------*/
// Bevy 앱 시작 시, 주차 공간 엔티티들을 미리 생성하는 시스템
fn setup_parking_spaces(mut commands: Commands) {
println!("주차 공간 엔티티를 생성합니다.");
commands.spawn((
ParkingSpaceId("A-1".to_string()),
Occupancy(false),
Name::new("Parking Space A-1"),
));
commands.spawn((
ParkingSpaceId("A-2".to_string()),
Occupancy(false),
Name::new("Parking Space A-2"),
));
}
// Bevy 앱 시작 시, MQTT 클라이언트를 설정하고 별도의 스레드에서 실행하는 시스템
fn setup_mqtt_listener(mut commands: Commands) {
println!("MQTT 리스너를 설정합니다....");
// 스레드 간 통신을 위한 채널 생성
let (tx, rx) = mpsc::channel::<String>();
// 수신단을 Bevy 리소스로 등록하여 다른 시스템에서 접근 가능하게 함.
commands.insert_resource(MqttReceiver(rx));
std::thread::spawn(move || {
println!("MQTT thread start!!");
run_mqtt_sub(tx.clone());
println!("MQTT thread end!!");
});
}
// 매 프레임 실행되며, 채널에 수신된 MQTT 메시지가 있는지 확인하고 처리하는 시스템
fn update_parking_status_from_mqtt(
receiver: Res<MqttReceiver>,
mut query: Query<(&ParkingSpaceId, &mut Occupancy, &Name)>, // 엔티티 쿼리
) {
for payload in receiver.0.try_iter() {
match serde_json::from_str::<MqttMessage>(&payload) {
Ok(msg) => {
println!("파싱된 메시지: space_id={}, occupied={}", msg.space_id, msg.occupied);
// 모든 주차 공간 엔티티 순회
for (space_id, mut occupancy, name) in &mut query {
if space_id.0 == msg.space_id {
// 해당 엔티티의 Occupancy 컴포넌트 값을 업데이트
if occupancy.0 != msg.occupied {
occupancy.0 = msg.occupied;
println!("{}의 상태가 {}으로 변경되었습니다.", name, occupancy.0);
}
break;
}
}
},
Err(e) => {
println!("MQTT 메시지 파싱 실패: {:?}, 원본: {}", e, payload);
},
}
}
}
fn current_parking_status_showing(
query: Query<(&ParkingSpaceId, &Occupancy, &Name)>
) {
for (space_id, occupancy, name) in query.iter() {
println!("-------------------------- Parking Status --------------------------");
println!("name: {}, space_id: {}, occupancy: {}", name, space_id.0, occupancy.0);
println!("----------------------------------------------------");
}
}
/* ------------------ MQTT Logic ------------------*/
fn run_mqtt_sub(sender: Sender<String>) {
// MQTT 클라이언트 생성
let create_opts = paho_mqtt::CreateOptionsBuilder::new()
.server_uri(MQTT_BROKER)
.client_id(CLIENT_ID)
.finalize();
let client = paho_mqtt::Client::new(create_opts).expect("Error");
let rx = client.start_consuming();
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.finalize();
let qos = 1;
match client.connect(conn_opts) {
Ok(rsp) => {
if let Some(conn_rsp) = rsp.connect_response() {
println!("Connected to {} with Mqtt Version {}", conn_rsp.server_uri, conn_rsp.mqtt_version);
if let Err(e) = client.subscribe(MQTT_TOPIC, qos) {
println!("fail to subscribe err: {:?}", e);
} else {
println!("success [{}] subscribe!", MQTT_TOPIC);
}
if conn_rsp.session_present {
} else {
}
}
},
Err(e) => println!("connect error: {}", e),
}
println!("Waiting for messages on topics {:?}", MQTT_TOPIC);
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
let msg_str = msg.payload_str().to_string();
sender.send(msg_str).unwrap();
}
else if client.is_connected() {
break;
}
}
if client.is_connected() {
println!("Disconnecting...");
client.disconnect(None).unwrap();
}
}
main.rs
에 소스를 몰아넣어서 꽤 길다. 추후에 분리하여 정리할 예정이다. 개념과 동작 원리에 대해서 집중하기 위해서 정리는 하지 않았다.
먼저 run_mqtt_sub
함수에서는 다음과 같은 역할을 한다.
브로커서버
에 연결토픽
구독payload
를 mpsc
의 sender를 통해서 전송 최초 실행 시 setup_parking_spaces
함수가 호출되어 ParkingSpaceID
, Occupancy
, Name
컴포넌트를 가지는 주차 공간 엔티티
틀 생성하였다.
간단하게 2개만 생성해 주었다.
또한 setup_mqtt_listener
함수가 호출되어 mqtt 전용 스레드를 생성한다.
App::new()
.add_plugins(MinimalPlugins)
.add_systems(Startup, (
setup_parking_spaces,
setup_mqtt_listener,
))
위와 같이 Startup
태그 같은 유닛 구조체를 넘겨준 뒤 튜플로 한 번만 실행될 함수의 이름을 넘겨주면 된다.
매 프레임마다 update_parking_status_from_mqtt
함수를 호출해 Receiver
의 버퍼에 데이터가 있는지 확인한다.
만약에 존재하지 않는다면 처리하지 않고 반환되고, 존재한다면 payload
로 넘어온 json
데이터를 구조체로 역직렬화하여 파싱한다.
파싱 한 데이터를 비교하여 주차장의 space_id
와 일치한다면 받은 데이터의 값으로 변경시켜 준다.
여기서 Receive
를 어떻게 사용하는지 궁금했다.
commands.insert_resource(MqttReceiver(rx));
이 코드를 통해서 리소스를 설정해주는 것 같다. 정확히 내부적으로 어떻게 처리하는지는 확인하지 못했다. 그러나 전역 데이터로 가지고 있어 매개변수로 받아서 사용할 수 있다.
MQTT
로 구독한 토픽에 대해 데이터를 받아서 mpsc
로 데이터를 전송했을 떄만 업데이트를 시키고
current_parking_status_showing.run_if(on_timer(Duration::from_secs(5))),
이 함수가 5초에 한 번씩 호출이된다. 엔티티가 변경되었을 때만 실행시킬 수 있는지 찾아보았다.
Changed<T>
필터를 사용하여 해당 프레임에서 변경된 T
타입의 컴포넌트만 선택할 수 있다.
fn current_parking_status_showing(
query: Query<(&ParkingSpaceId, &Occupancy, &Name), Changed<Occupancy>>
) {
for (space_id, occupancy, name) in query.iter() {
println!("-------------------------- Parking Status --------------------------");
println!("name: {}, space_id: {}, occupancy: {}", name, space_id.0, occupancy.0);
println!("----------------------------------------------------");
}
}
Changed<Occupancy>>
을 추가해 주었다.
테스트해 본 결과
변경된 컴포넌트만 찾아서 호출이 잘 되는 것을 확인할 수 있다.
아직은 ecs의 정확한 개념과 논리에 대해서 완벽하게 이해하지는 못했다. 그러나 MQTT
를 활용하여 간단한 프로그램을 만들어 보았다.
끝!