20220901 [Spring Boot, MQTT]

Yeoonniiยท2022๋…„ 9์›” 2์ผ
0

TIL

๋ชฉ๋ก ๋ณด๊ธฐ
19/52
post-thumbnail

๋“ฑ๋ก/์ˆ˜์ •/์‚ญ์ œ ๋“ฑ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ์‹œ
MQTT๋ฅผ ์ด์šฉํ•˜์—ฌ ๋ฐœํ–‰(publish)๊ณผ ๊ตฌ๋…(subscribe)์„ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ •๋ณด ์ˆ˜์‹ 

์ƒˆ ํ”„๋กœ์ ํŠธ network_exam ์ƒ์„ฑ ํ›„
๋ฌผํ’ˆ๊ด€๋ฆฌ ์„œ๋ฒ„๊ตฌํ˜„ ํŒŒ์ผ ๋ณต์‚ฌํ•˜์—ฌ ๊ทธ๋Œ€๋กœ ๊ฐ€์ ธ์˜ค๊ณ  ์ถ”๊ฐ€๋กœ itemview ์™€ broker๋งŒ ์ƒ์„ฑ

๐Ÿ”Œ ์„œ๋ฒ„์—ฐ๊ฒฐ ๋ฐ ํ†ต์‹ 

MQTT๋Š” ๋ฐœํ–‰(publish)๊ณผ ๊ตฌ๋…(subscribe)์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›๋Š”๋‹ค
publish๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ด๋Š” ๊ฒƒ, subscribe๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›๋Š” ๊ฒƒ์ด๋‹ค

  • ๊ตฌ๋…(subscribe)์„ ํ†ตํ•œ ์‹ค์‹œ๊ฐ„ ์ •๋ณด๋ณ€ํ™” ๊ฐ์ง€ ์ˆ˜์‹ 
    MqttCallback์€ ์‚ฌ์šฉ์ž๊ฐ€ ์ „๋‹ฌํ•˜๋Š” ์ •๋ณด๋ฅผ ์‹ค์‹œ๊ฐ„ ๊ฐ์ง€ํ•œ๋‹ค
    ItemView์—์„œ ItemMqttClient๋ฅผ ๊ตฌ๋…subscribeํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ ๋ณ€ํ™”๊ฐ€ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๊ฐ์ง€ ๊ฐ€๋Šฅํ•˜๋‹ค
    ItemMqttClient์˜ messageArrived์€ ๋ฉ”์„ธ์ง€ ์ˆ˜์‹ ์‹œ ์ž๋™์œผ๋กœ ์‹คํ–‰๋˜๋ฉฐ ๋ถ€๋ชจ์ธ MqttCallback์„ ์ƒ์†implements๋ฐ›์€ ๋ฉ”์„œ๋“œ์ด๋‹ค
  • MQTT์—์„œ ๋ฐ์ดํ„ฐ ๋ฐœ์†ก(publish)
    ItemMqttClient์˜ sendMessage๋Š” ์‹ค์‹œ๊ฐ„ ๊ฐ์ง€์™€ ๋ณ„๊ฐœ๋กœ ์ง„ํ–‰๋œ๋‹ค
    ItemMqttClient ์—์„œ sendMessage๋Š” MQTT์—์„œ ๋ฐ์ดํ„ฐ ๋ฐœ์†กpublish์‹œ ์‹คํ–‰๋œ๋‹ค
    โžก๏ธ ex) ItemMqttClient์—์„œ TERMNAL์— ๋ฉ”์„ธ์ง€ ๋ฐœ์†ก

๐Ÿ“  ์‹ค์‹œ๊ฐ„ ์ •๋ณด๋ณ€ํ™” ๊ฐ์ง€(subscribe)

  1. app.java์—์„œ ItemView.java ์‹คํ–‰ํ•œ๋‹ค
  2. ItemView.java๊ฐ€ ItemMqttClient.java ์‹คํ–‰ํ•œ๋‹ค
    ์ด๋•Œ ItemView๋Š” ์ž์‹ ์˜ ๊ฐ์ฒดthis๋ฅผ ItemMqttClient์—๊ฒŒ ์ „๋‹ฌํ•ด ์ค€๋‹ค
    โš™๏ธ ItemMqttClient์—์„œ ๋‚˜์ค‘์—printitem()์„ ํ˜ธ์ถœ ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•˜๊ธฐ ์œ„ํ•ด ์ฃผ๋Š” ๊ฒƒ!
  3. ์‚ฌ์šฉ์ž๊ฐ€ ๋ณด๋‚ธ ๋ฉ”์‹œ์ง€๋ฅผ MqttCallback์ด ๊ฐ์ง€ํ•˜์—ฌ messageArrived๋ฅผ ์‹คํ–‰ํ•œ๋‹ค
  4. messageArrived๋Š” ItemView์—์„œ ์ถœ๋ ฅํ•˜๊ธฐ ์œ„ํ•ด printitem()์„ ํ˜ธ์ถœ ํ•ด์•ผํ•œ๋‹ค
    โžก๏ธ printitem()์„ ํ˜ธ์ถœ์„ ์œ„ํ•ด ์ƒ์„ฑ์ž์— ItemView์˜ ๊ฐ์ฒด์ธ itemObject = obj;๋ฅผ ์ƒ์„ฑํ•œ๋‹ค
  5. messageArrived๋Š” printItem()์„ ์ด์šฉํ•ด ItemView.java๋กœ topic๊ณผ msg๋ฅผ ๋ณด๋‚ธ๋‹ค
    โžก๏ธ itemObject.printItem( topic, msg );
  6. ItemView.java์˜ printItem๋ฉ”์„œ๋“œ๋Š” topic๊ณผ msg๋ฅผ ์ถœ๋ ฅํ•œ๋‹ค

๐Ÿ“ฌ ๋ฐœํ–‰(publish)๊ณผ ๊ตฌ๋…(subscribe)

๐Ÿ“ app.java

์ถœ๋ ฅ์€ ItemView์—์„œ ์‹คํ–‰
โžก๏ธ app.java ์‹คํ–‰์‹œ ItemView๊ฐ€ ์‹คํ–‰ ๋จ

import com.example.view.ItemView;

public class App 
{
    public static void main( String[] args )
    {
        new ItemView();
    }
}

๐Ÿ“ broker/ItemMqttClient.java

  • MqttCallback ๋ฅผ ์ƒ์† ๋ฐ›์•„ ์˜ค๋ฒ„๋ผ์ด๋”ฉํ•˜์—ฌ ์‚ฌ์šฉํ•œ๋‹ค = implements MqttCallback
    โžก๏ธ implements : ๋ถ€๋ชจ๋Š” ์„ ์–ธ๋งŒ ํ•˜๋ฉฐ, ๋ฐ˜๋“œ์‹œ ์ž์‹์ด ๋ถ€๋ชจ์˜ ์ •์˜๋ฅผ ์˜ค๋ฒ„๋ผ์ด๋”ฉํ•ด์„œ ์‚ฌ์šฉํ•œ๋‹ค
    ๐ŸŒŽ ์ฐธ๊ณ  extends, implements์˜ ์ฐจ์ด
  • ํ•œ๊ธ€ ์ถœ๋ ฅ ์„ค์ •์‹œ "UTF-8" ์‚ฌ์šฉ

    String msg = new String( message.getPayload(), "EUC-KR" );
    ๐ŸŸฅ ์ถœ๋ ฅ์‹œ ํ•œ๊ธ€์ด ๊นจ์ ธ์„œ ์ถœ๋ ฅ๋œ๋‹ค
    String msg = new String( message.getPayload(), "UTF-8" );
    ๐ŸŸฆ ํ•œ๊ธ€์ด ๊นจ์ง์—†์ด ์ถœ๋ ฅ๋จ

1. ์›น์„œ๋ฒ„ ๋ธŒ๋กœ์ปค ์„ค์ •

// ์›น์„œ๋ฒ„ ๋ธŒ๋กœ์ปค ์„ค์ •์‹œ => ws://127.0.0.1:1884, tcp://127.0.0.1.1883
    private String broker = "tcp://1.234.5.158:11883";
    private String clientId = "db207_" + System.currentTimeMillis();

    private String userid = "ds606";
    private String password = "ds606";

    // ์ „์†ก๋˜๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๊ด€ํ•˜๋Š” ๋ฐฉ๋ฒ•
    private MemoryPersistence persistence = new MemoryPersistence();
    private MqttAsyncClient client = null;

2. ๋ฐ์ดํ„ฐ ์ถœ๋ ฅ์ด ์ด๋ฃจ์–ด์งˆ ItemView์˜ printitem() ํ˜ธ์ถœ ์œ„ํ•ด itemObject ์ƒ์„ฑ

// ItemView ํ˜ธ์ถœ ๊ฐ€๋Šฅํ•œ itemobject
    private ItemView itemObject;

3. MQTT ์ ‘์†์„ ์œ„ํ•œ ์ƒ์„ฑ์ž ์ƒ์„ฑ

// ์ƒ์„ฑ์ž ์ƒ์„ฑ
    public ItemMqttClient(ItemView obj) {
        itemObject = obj;
        try {
            client = new MqttAsyncClient(broker, clientId, persistence);

            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(userid);
            options.setPassword(password.toCharArray());
            options.setCleanSession(true);

            client.connect(options);
            client.setCallback(this); // 1. ๊ฐ์ฒด๋ฅผ ์ „๋‹ฌํ•จ.

            System.out.println(this.client);// ์ ‘์†๊ฐ์ฒด
            Thread.sleep(1000);
            System.out.println("์ ‘์†์„ฑ๊ณต");
        } catch (Exception e) {
            System.out.println("์ ‘์†์‹คํŒจ");
            e.printStackTrace();
        }
    }

4. ๊ตฌ๋…ํ•˜๊ธฐ/๊ตฌ๋…ํ•ด์ œ/๋ฉ”์‹œ์ง€๋ณด๋‚ด๊ธฐ ์ƒ์„ฑ

// ๊ตฌ๋…ํ•˜๊ธฐ
    public void subscribe() {
        try {
            client.subscribe("/ds207/#", 0);
            System.out.println("๊ตฌ๋…์„ค์ • ์„ฑ๊ณต");
        } catch (MqttException e) {
            e.printStackTrace();
            System.out.println("๊ตฌ๋…์„ค์ • ์‹คํŒจ");
        }
    }

    // ๊ตฌ๋…ํ•ด์ œ
    public void unSubscribe() {
        try {
            client.unsubscribe("/ds207/#");
            System.out.println("๊ตฌ๋…ํ•ด์ œ ์„ฑ๊ณต");
        }
        catch(Exception e){
            e.printStackTrace();
            System.out.println("๊ตฌ๋…ํ•ด์ œ ์‹คํŒจ");
        }
    }

    // ๋ฉ”์‹œ์ง€๋ณด๋‚ด๊ธฐ
    public void sendMessage(String msg) {
        try {
            MqttMessage message = new MqttMessage();
            message.setQos(0);
            message.setPayload( msg.getBytes() );
            if(client != null) {
                client.publish("/ds207/test207", message);
            }
        }
        catch(Exception e){
            e.printStackTrace();
            System.out.println("๋ฉ”์‹œ์ง€ ๋ณด๋‚ด๊ธฐ ์‹คํŒจ");
        }
    }

5. implements MqttCallback ์— ๋Œ€ํ•œ ์˜ค๋ฒ„๋ผ์ด๋“œ ์ƒ์„ฑ

  • messageArrived ์˜ค๋ฒ„๋ผ์ด๋“œ ์ž‘์„ฑ์‹œ MqttMessage โ‡’ String์œผ๋กœ ๋ณ€๊ฒฝํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค
    ์•„๋ž˜์—์„œ ํ™•์ธ ํ•ด๋ณด๋ฉด getPayload() ๋Š” byteํ˜•ํƒœ์˜ []๋ฐฐ์—ด๋กœ ์˜ค๊ธฐ ๋•Œ๋ฌธ์— String์œผ๋กœ ๋ณ€๊ฒฝํ•ด์ค€๋‹ค
@Override
    public void connectionLost(Throwable arg0) {
        // ์„œ๋ฒ„ ์—ฐ๊ฒฐ์ด ๋Š๊ฒผ์„๋•Œ
        System.out.println("์„œ๋ฒ„ ์—ฐ๊ฒฐ ๋Š๊ฒผ์„ ๋•Œ");

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // ๋ฉ”์„ธ์ง€๊ฐ€ ๋„์ฐฉํ–ˆ์„๋•Œ
        System.out.println("๋ฉ”์„ธ์ง€๊ฐ€ ์™”์„ ๋•Œ");
        // MqttMessage => String์œผ๋กœ ๋ณ€๊ฒฝ
        String msg = new String( message.getPayload(), "UTF-8" ); //ํ•œ๊ธ€์ด ๊นจ์ง์—†์ด ์ถœ๋ ฅ๋จ

        // System.out.println(topic + ", " + msg); => itemview์—์„œ ์ถœ๋ ฅํ•ด์•ผํ•จ

        // itemobject.printItem ์‚ฌ์šฉํ•˜์—ฌ
        // printItem์•ˆ์— ๋ฐ์ดํ„ฐ ์ „์†กํ•ด์„œ view์ชฝ์—์„œ ์ถœ๋ ฅ!

        // String -> JSON parser
        itemObject.printItem( topic, msg );
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {
        // ๋‚ด๊ฐ€ ์ „์†กํ•˜๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ์ •ํ™•ํžˆ ์ „์†ก ์™„๋ฃŒ ํ–ˆ์„๋•Œ
        System.out.println("๋ฉ”์„ธ์ง€ ์ „์†ก์„ ์™„๋ฃŒ ํ–ˆ์„ ๋•Œ");
    }

๐Ÿ“ view/ItemView.java

๊ตฌ๋… ๋ฐ ์ถœ๋ ฅ
1. ItemView์˜ ๊ฐ์ฒด๋ฅผ ItemMqttClient์—๊ฒŒ ์ „๋‹ฌํ•ด์คŒ
โžก๏ธ ๋‚˜์ค‘์— printItem() ์‚ฌ์šฉ์„ ์œ„ํ•ด ItemMqttClient์—๊ฒŒ ์ž์‹ ์˜ ๊ฐ์ฒดthis๋ฅผ ItemMqttClient์—๊ฒŒ ์ „๋‹ฌ
2. ๊ตฌ๋…ํ•˜๊ธฐ
โžก๏ธ obj.subscribe();

package com.example.view;

import com.example.broker.ItemMqttClient;

public class ItemView {

    public ItemView() {
        // 1. ItemMqttClient ํ˜ธ์ถœ
        // ๋‚˜์ค‘์— printItem ์“ฐ๊ธฐ ์œ„ํ•ด ItemView์˜ ๊ฐ์ฒด๋ฅผ ItemMqttClient์—๊ฒŒ ์ „๋‹ฌ
        ItemMqttClient obj = new ItemMqttClient(this);

        // 2. ๊ตฌ๋…ํ•˜๊ธฐ
        obj.subscribe();
    }

    // ๋ณ€๊ฒฝ๋‚ด์šฉ์˜ ์ถœ๋ ฅ์€ ์—ฌ๊ธฐ์„œ
    public void printItem(String topic, String msg){
        System.out.println(topic + "," + msg);
    }
}

๐Ÿ“ AppTest.java

AppTest์—์„œ ์‚ฌ์šฉ์ž๊ฐ€ ์ „์†กํ•œ ๋ฉ”์„ธ์ง€๋Š” MqttCallback์ด ๊ฐ์ง€ํ•˜์—ฌ messageArrived๊ฐ€ ๋ฐ›๋Š”๋‹ค

...

public class AppTest 
{
    // ์•„์ดํ…œ ์„œ๋น„์Šค ๊ฐ์ฒด ์ƒ์„ฑ
    ItemService iService = new ItemServiceImpl();

    // ๋„คํŠธ์›Œํฌ mqtt ๊ฐ์ฒด ์ƒ์„ฑ
    ItemMqttClient mqttClient = new ItemMqttClient(null);

    @Test
    public void insert()
    {   //๋ฌผํ’ˆ ๋“ฑ๋ก
        Item item = new Item();
        item.setNo(17L);
        item.setName("err");
        item.setPrice(8860L);

        // ์„œ๋น„์Šค๋ฅผ ํ†ตํ•ด ์ถ”๊ฐ€ insertItem
        iService.insertItem(item);

        // JSON๋ฌธ์„œ๋กœ ๋ณ€๊ฒฝํ•˜์—ฌ sendMessage์— ๋ณด๋‚ด๊ธฐ
        JSONObject jobj = new JSONObject();
        jobj.put("type", 1);
        mqttClient.sendMessage(jobj.toString());
        mqttClient.sendMessage( item.getNo() + "๋ฒˆ ํ•ญ๋ชฉ์„ ์ถ”๊ฐ€ํ–ˆ์Šต๋‹ˆ๋‹ค");
        

    }

    @Test
    public void update()
    { //๋ฌผํ’ˆ ์ˆ˜์ •
        Item item = new Item(6L, "MMM", 1500L, null);

        // ์„œ๋น„์Šค ํ˜ธ์ถœํ•˜์—ฌ ์ˆ˜์ • updateItem
        iService.updateItem(item);

        // json๋ฌธ์„œ๋กœ ๋ณ€๊ฒฝํ•˜์—ฌ ๋ณด๋‚ด๊ธฐ
        JSONObject jobj = new JSONObject();
        jobj.put("type", "update");
        mqttClient.sendMessage(jobj.toString());

    }

    @Test
    public void delete()
    { // ๋ฌผํ’ˆ ์‚ญ์ œ
        Item item = new Item();
        item.setNo(15L);

        iService.deleteItem(item);

		// json๋ฌธ์„œ๋กœ ๋ณ€๊ฒฝํ•˜์—ฌ ๋ณด๋‚ด๊ธฐ
        JSONObject jobj = new JSONObject();
        jobj.put("type", "delete");
        // mqttClient.sendMessage(jobj.toString());
        mqttClient.sendMessage(item.getNo() + "๋ฒˆ ํ•ญ๋ชฉ์„ ์‚ญ์ œํ–ˆ์Šต๋‹ˆ๋‹ค");

    }
}

0๊ฐœ์˜ ๋Œ“๊ธ€