미들웨어 정리 (1) - MQTT, Redis, PostgreSQL

NowHyeok·2023년 9월 19일
0

Middleware

목록 보기
1/3

도커 환경설정

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  redis:
    image: redis
    container_name: redis
    ports:
      - "6379:6379"
  postgres:
    image: postgres
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: "postgres"
      POSTGRES_USER: "admin"
      POSTGRES_PASSWORD: "admin"

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.0.0
    container_name: elasticsearch
    environment:
      - ELASTIC_PASSWORD=elastic
    ports:
      - "9200:9200"

  kibana:
    image: docker.elastic.co/kibana/kibana:8.0.0
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_HOSTS: "http://elasticsearch:9200"


MQTT

TCP/IP 프로토콜 위에서 동작하는 발행 구독 기반의 경량 메시징 프로토콜

IoT와 같은 제한된, 혹은 대규모 트래픽 전송을 위해 만들어진 프로토콜이기에 TCP/IP 프로토콜 위에서 동작하지만 동시에 굉장히 가벼우며, 많은 통신 제약들을 해결해준다.

MQTT의 발행 구독 메시징 패턴은 오로지 브로커를 통해서만 통신할 수 있다.

Message Queue

프로세스 / 프로그램 간에 데이터 를 교환하는 통신 방법 중 하나로, 메시지를 임시로 저장하는 간단한 버퍼라고 생각하면 된다. 여기서 메시지란 요청 응답 데이터 등 이다.

QoS (Quality of Service)

서비스 품질 레벨

  • 0 : 최대 한 번 - 메시지는 최대 한 번 전달되거나 전혀 전달되지 않는다. 메시지는 저장되지 않으며 손실될 수 있다. 가장 빠른 전송모드 이며 실행 후 삭제 라고도 한다.

  • 1 : 최소 한 번 - 기본 전송 모드로 송신자가 수신자 에게 수신확인을 받을 때 까지 다시 송신한다.

  • 2 : 정확히 한 번 - 메시지가 처리 될 때 까지 송신자와 수신자는 메시지를 로컬에 저장, 가장 안전 하지만 가장 느린 전송 모드

사용해보기

다운로드

broker 실행

Publisher

MqttClient 객체 가져오기

    private MqttClient client;
    public Publisher() {
        try {
            client = new MqttClient("tcp://localhost:1883","pub1");
            client.connect();
        }catch (MqttException e){
            e.printStackTrace();
        }
    }
  • MqttClient - MQTT에서 메시지를 발행하거나 구독 할 수 있는 객체

메세지 전송 메서드

    public boolean send(String topic, String msg){
        try{
            MqttMessage message = new MqttMessage();
            message.setPayload(msg.getBytes());
            client.publish(topic,message);
        }catch (MqttException e){
            e.printStackTrace();
        }

        return true;
    }
  • setPayload(byte[] payload) - 메시지의 본문을 설정
  • publish(String topic, MqttMessage message) - 브로커에 메시지를 발행

MqttClient 종료

    public boolean close(){
        if(client!=null){
            try {
                client.disconnect();
                client.close();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        return true;
    }
  • disconnect() - 브로커와의 통신을 종료
  • close() - MQTT client 를 종료

발행하기

    public static void main(String[] args) throws InterruptedException {
        Publisher sender = new Publisher();
        for(int i =0; i<10; i++){
            Thread.sleep(1000);
            sender.send("test",String.valueOf(i));
        }
        sender.close();
    }

Subscriber

설정

    private MqttClient mqttClient;
    private MqttConnectOptions mqttOptions;

    public Subscriber init(String server, String clientId){
        try {
            mqttOptions = new MqttConnectOptions();
            mqttOptions.setCleanSession(true);
            mqttOptions.setKeepAliveInterval(30);
            mqttClient = new MqttClient(server, clientId);
            mqttClient.setCallback(this);
            mqttClient.connect(mqttOptions);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        return this;
    }
  • MqttConnectOptions - MQTT 연결에 관련된 옵션을 구성하는데 사용되는 객체
  • setCleanSession(boolean cleanSession) - 클린세션 사용여부, true 로 설정할 시 클라이언트가 연결을 끊으면 브로커는 해당 클라이언트와 관련된 모든 세션정보를 삭제
  • setKeepAliveInterval(int keepAliveInterval) - 브로커와 연결유지 시간 설정
  • setCallback(MqttCallback callback) - 클라이언트에 MQTT 콜백을 등록, 메시지 수신 및 기타 이벤트를 처리하기 위해 사용자 정의 콜백 메서드가 자동으로 호출된다.this는 현재 클래스에서 정의한 콜백 메서드를 처리할 수 있도록 한다.
  • connect(MqttConnectOptions options) - 인수로 넘어온 옵션을 적용하고 브로커와 연결을 시도

메서드 정의


public class Subscriber implements MqttCallback{

	@Override
    public void connectionLost(Throwable throwable) {

    }

    //메세지가 도착하면 호출되는 메소드드
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("messageArrived : " + mqttMessage);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        try {
            System.out.println("deliveryComplete : " + iMqttDeliveryToken.getMessage());
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}
  • connectionLost - 연결이 끊어졌을 때 호출되는 메서드
  • messageArrived - 브로커로 부터 메시지가 도착했을 때 호출되는 메서드
  • deliveryComplete - 메시지가 성공적으로 전송되었을 때 호출되는 메서드

구독하기

    public boolean subscriber(String topic){
        boolean result = true ;
        try {
            if(topic!=null){
                mqttClient.subscribe(topic, 0);
            }
        } catch (MqttException e) {
            e.printStackTrace();
            result = false;
        }
        return result;
    }

    public static void main(String[] args) {
        Subscriber subscriber = new Subscriber();
        subscriber.init("tcp://localhost:1883", "sub1").subscriber("test");
    }
  • subscribe(String topicFilter, int qos) - 특정 topic을 구독 하고 QoS를 설정

결과

mosquitto

메시지 유형방향설명
CONNECTClient → Server서버 연결 요청
CONNACKClient ← Server연결 요청에 대한 ACK
PUBLISHClient ↔ ServerPublish 메시지 전송
PUBACKClient ↔ ServerPublish에 대한 ACK (QoS 1)
PUBRECClient ↔ ServerPublish 전달 확인 Flow 1 (QoS 2)
PUBRELClient ↔ ServerPublish 전달 확인 Flow 2 (QoS 2)
PUBCOMPClient ↔ ServerPublish 전달 확인 Flow 3 (QoS 2)
SUBSCRIBEClient → Server서버 구독 요청
SUBACKClient ← Server구독 요청에 대한 ACK
UNSUBSCRIBEClient → Server서버 구독 해제 요청
UNSUBACKClient ← Server서버 구독 해제 요청에 대한 ACK
PINGREQClient → ServerPING 요청
PINGRESPClient ← ServerPING 요청에 대한 응답
DISCONNECTClient → Server서버 연결 해제 요청
Reserved예약된 메시지 유형

subscriber

Redis

Remote Dictionary Server
Key, Value 구조의 비정형 데이터를 저장하고 관리하기 위한 오픈소스 기반의 비관계형 DMBS 로 인메모리 데이터 구조(RAM에 데이터 저장 및 관리 하는 방식) 를 가진 저장소 이다.

속도가 빠르지만 서버가 꺼지면 모든 데이터가 사라진다는 단점이 있다.

스프링에서 Redis를 사용하는 방법은
1. RedisTemplate
2. RedisRepository
두 가지 방법이 있다.

사용해보기

@SpringBootApplication
@EnableCaching //추가
public class MiddlewareApplication {
	public static void main(String[] args) {
		SpringApplication.run(MiddlewareApplication.class, args);
	}
}

@EnableCaching / @EnableCache

스프링의 캐시 기능을 활성화해 아래와 같은 애노테이션을 사용 할 수 있다.

  • @Cacheable - 캐시가 있으면 그 정보를 가져오고 없으면 등록
  • @Cacheput - 실행결과를 강제로 캐시에 저장
  • @CacheEvict - 캐시에서 특정 항목 또는 모든 항목을 제거하는 데 사용
#application.properties
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379

RedisTemplate

Config

@RequiredArgsConstructor
@Configuration
@EnableRedisRepositories
public class MiddlewareConfig {
    
    private final RedisProperties redisProperties;

    @Bean
    public RedisConnectionFactory redisConnectionFactory(){
        return new LettuceConnectionFactory(redisProperties.getHost(), redisProperties.getPort());
    }
    @Bean
    public RedisTemplate<?, ?> redisTemplate() {
        RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());

        return redisTemplate;
    }
}
  • RedisConnectionFactory - Redis 연결 설정 빈

  • Lettuce - Reids 자바 클라이언트

  • LettuceConnectionFactory - Lettuce를 사용하여 Redis에 대한 연결을 설정하는 클래스

  • RedisTemplate - Redis에 데이터를 저장하거나 조회 하기 위한 빈

  • setKeySerializer, setValueSerializer - RedisTemplate를 사용할 때 동작에는 문제가 없지만 redis-cli을 통해 직접 데이터를 보려고 할 때 알아볼 수 없는 형태로 출력되기 때문에 적용

  • @EnableRedisRepositories - 스프링 데이터 Redis 리포지토리를 활성화, Redis에서 데이터를 읽고 쓸 수 있는 리포지토리 빈을 생성

Service

@Service
@RequiredArgsConstructor
public class RedisService {

    private final RedisTemplate redisTemplate;

    public String getValues(String key){
        ValueOperations<String, String> values = redisTemplate.opsForValue();
        return values.get(key);
    }

    public void setValues(String key, String value){
        ValueOperations<String, String> values = redisTemplate.opsForValue();
        values.set(key,value);
    }

    public void setSets(String key,String... values){
        redisTemplate.opsForSet().add(key,values);
    }

    public Set getSets(String key){
        return redisTemplate.opsForSet().members(key);
    }
}
메서드설명
opsForValueStrings를 쉽게 Serialize / Deserialize 해주는 interface
opsForListList를 쉽게 Serialize / Deserialize 해주는 interface
opsForSetSet을 쉽게 Serialize / Deserialize 해주는 interface
opsForZSetZSet을 쉽게 Serialize / Deserialize 해주는 interface
opsForHashHash를 쉽게 Serialize / Deserialize 해주는 interface

RedisRepository

@Data
@RedisHash("Person")
public class Person {
    private String name;
    private int age;
    
    @Id
    private long id;

    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
  • @RedisHash - 특정 해쉬값을 키, 해당 클래스의 인스턴스를 값으로 적재한다.
public interface PersonRedisRepository extends CrudRepository<Person, Long> {
}

스프링 Data 사용

  • CrudRepository<T,ID> - 기본적인 CRUD 작업을 제공

PostgreSQL

특징

  1. 객체관계형 DBMS로 개발자는 기존 데이터 type에서 확장된 type형태를 자유롭게 정의하여 사용할 수 있다.

  2. 상속기능 제공

  3. 테이블의 칼럼을 다차원의 가변적인 배열로 정의 할 수 있다.

  4. 'MVCC (Multi-Version Concurrency Control)' - MGA(Multi-Version Concurrency Control) 를 사용하여 동시성 관리를 해 ,이전 공간이 재사용 될 수 없는 dead tuple 상태로 저장공간을 두게 되어서 이러한 현상이 지속될 경우, 공간 부족 및 데이터IO의 비효율을 유발하여 성능저하의 원인이 된다. 때문에 주기적으로 vacuum 기능을 수행하여 재사용 가능하도록 관리해 주어야 한다.

  • MGA(Multi-Version Concurrency Control) - 튜플을 update할 때 새로운 값으로 replace 처리하는 것이 아니라, 새로운 튜플을 추가하고 이전 튜플은 유효 범위를 마킹하여 처리하는 방식

https://techblog.woowahan.com/9478/

사용해보기

테이블 생성

CREATE TABLE person (
    id serial PRIMARY KEY,
    age integer,
    name varchar(255)
);

Repository

  • NamedParameterJdbcTemplate 사용
    private final NamedParameterJdbcTemplate template;
  • Create
    public Person save(Person person){
        String sql = "insert into person(name, age) values(:name, :age)";

        SqlParameterSource param = new BeanPropertySqlParameterSource(person);

        KeyHolder keyHolder = new GeneratedKeyHolder();
        template.update(sql, param, keyHolder);

        long key = keyHolder.getKey().longValue();
        person.setId(key);
        return person;
    }
  • Read
    public Optional<Person> findById(Long id){
        String sql = "select name, age from person where id=:id";
        Map<String, Object> param = Map.of("id", id);
        Person person = template.queryForObject(sql,param, BeanPropertyRowMapper.newInstance(Person.class));
        return Optional.ofNullable(person);

    }

    public List<Person> findAll(){
        String sql = "select name, age from persron";
        return template.query(sql, BeanPropertyRowMapper.newInstance(Person.class));
    }
  • Update
    public void update(Long id,String name, int age){
        String sql = "update person set name=:name, age=:age where id=:id";
        SqlParameterSource param = new MapSqlParameterSource()
                .addValue("name",name)
                .addValue("age",age)
                .addValue("id",id);

        template.update(sql, param);
    }
  • Delete
    public void delete(Long id) {
        String sql = "delete from person where id = :id";
        SqlParameterSource param = new MapSqlParameterSource()
                .addValue("id", id);
        template.update(sql, param);
    }

0개의 댓글