version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
redis:
image: redis
container_name: redis
ports:
- "6379:6379"
postgres:
image: postgres
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_DB: "postgres"
POSTGRES_USER: "admin"
POSTGRES_PASSWORD: "admin"
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.0.0
container_name: elasticsearch
environment:
- ELASTIC_PASSWORD=elastic
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:8.0.0
container_name: kibana
ports:
- "5601:5601"
environment:
ELASTICSEARCH_HOSTS: "http://elasticsearch:9200"
TCP/IP 프로토콜 위에서 동작하는 발행 구독 기반의 경량 메시징 프로토콜
IoT와 같은 제한된, 혹은 대규모 트래픽 전송을 위해 만들어진 프로토콜이기에 TCP/IP 프로토콜 위에서 동작하지만 동시에 굉장히 가벼우며, 많은 통신 제약들을 해결해준다.
MQTT의 발행 구독 메시징 패턴은 오로지 브로커를 통해서만 통신할 수 있다.
프로세스 / 프로그램 간에 데이터 를 교환하는 통신 방법 중 하나로, 메시지를 임시로 저장하는 간단한 버퍼라고 생각하면 된다. 여기서 메시지
란 요청 응답 데이터 등 이다.
서비스 품질 레벨
0 : 최대 한 번
- 메시지는 최대 한 번 전달되거나 전혀 전달되지 않는다. 메시지는 저장되지 않으며 손실될 수 있다. 가장 빠른 전송모드 이며 실행 후 삭제
라고도 한다.
1 : 최소 한 번
- 기본 전송 모드로 송신자가 수신자 에게 수신확인을 받을 때 까지 다시 송신한다.
2 : 정확히 한 번
- 메시지가 처리 될 때 까지 송신자와 수신자는 메시지를 로컬에 저장, 가장 안전 하지만 가장 느린 전송 모드
MQTT 브로커 : https://mosquitto.org/download/
라이브러리
private MqttClient client;
public Publisher() {
try {
client = new MqttClient("tcp://localhost:1883","pub1");
client.connect();
}catch (MqttException e){
e.printStackTrace();
}
}
MqttClient
- MQTT에서 메시지를 발행하거나 구독 할 수 있는 객체 public boolean send(String topic, String msg){
try{
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());
client.publish(topic,message);
}catch (MqttException e){
e.printStackTrace();
}
return true;
}
setPayload(byte[] payload)
- 메시지의 본문을 설정publish(String topic, MqttMessage message)
- 브로커에 메시지를 발행 public boolean close(){
if(client!=null){
try {
client.disconnect();
client.close();
} catch (MqttException e) {
e.printStackTrace();
}
}
return true;
}
disconnect()
- 브로커와의 통신을 종료close()
- MQTT client 를 종료 public static void main(String[] args) throws InterruptedException {
Publisher sender = new Publisher();
for(int i =0; i<10; i++){
Thread.sleep(1000);
sender.send("test",String.valueOf(i));
}
sender.close();
}
private MqttClient mqttClient;
private MqttConnectOptions mqttOptions;
public Subscriber init(String server, String clientId){
try {
mqttOptions = new MqttConnectOptions();
mqttOptions.setCleanSession(true);
mqttOptions.setKeepAliveInterval(30);
mqttClient = new MqttClient(server, clientId);
mqttClient.setCallback(this);
mqttClient.connect(mqttOptions);
} catch (MqttException e) {
e.printStackTrace();
}
return this;
}
MqttConnectOptions
- MQTT 연결에 관련된 옵션을 구성하는데 사용되는 객체setCleanSession(boolean cleanSession)
- 클린세션 사용여부, true 로 설정할 시 클라이언트가 연결을 끊으면 브로커는 해당 클라이언트와 관련된 모든 세션정보를 삭제setKeepAliveInterval(int keepAliveInterval)
- 브로커와 연결유지 시간 설정 setCallback(MqttCallback callback)
- 클라이언트에 MQTT 콜백을 등록, 메시지 수신 및 기타 이벤트를 처리하기 위해 사용자 정의 콜백 메서드가 자동으로 호출된다.this는 현재 클래스에서 정의한 콜백 메서드를 처리할 수 있도록 한다.connect(MqttConnectOptions options)
- 인수로 넘어온 옵션을 적용하고 브로커와 연결을 시도
public class Subscriber implements MqttCallback{
@Override
public void connectionLost(Throwable throwable) {
}
//메세지가 도착하면 호출되는 메소드드
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("messageArrived : " + mqttMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
try {
System.out.println("deliveryComplete : " + iMqttDeliveryToken.getMessage());
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
connectionLost
- 연결이 끊어졌을 때 호출되는 메서드messageArrived
- 브로커로 부터 메시지가 도착했을 때 호출되는 메서드deliveryComplete
- 메시지가 성공적으로 전송되었을 때 호출되는 메서드 public boolean subscriber(String topic){
boolean result = true ;
try {
if(topic!=null){
mqttClient.subscribe(topic, 0);
}
} catch (MqttException e) {
e.printStackTrace();
result = false;
}
return result;
}
public static void main(String[] args) {
Subscriber subscriber = new Subscriber();
subscriber.init("tcp://localhost:1883", "sub1").subscriber("test");
}
subscribe(String topicFilter, int qos)
- 특정 topic을 구독 하고 QoS를 설정메시지 유형 | 방향 | 설명 |
---|---|---|
CONNECT | Client → Server | 서버 연결 요청 |
CONNACK | Client ← Server | 연결 요청에 대한 ACK |
PUBLISH | Client ↔ Server | Publish 메시지 전송 |
PUBACK | Client ↔ Server | Publish에 대한 ACK (QoS 1) |
PUBREC | Client ↔ Server | Publish 전달 확인 Flow 1 (QoS 2) |
PUBREL | Client ↔ Server | Publish 전달 확인 Flow 2 (QoS 2) |
PUBCOMP | Client ↔ Server | Publish 전달 확인 Flow 3 (QoS 2) |
SUBSCRIBE | Client → Server | 서버 구독 요청 |
SUBACK | Client ← Server | 구독 요청에 대한 ACK |
UNSUBSCRIBE | Client → Server | 서버 구독 해제 요청 |
UNSUBACK | Client ← Server | 서버 구독 해제 요청에 대한 ACK |
PINGREQ | Client → Server | PING 요청 |
PINGRESP | Client ← Server | PING 요청에 대한 응답 |
DISCONNECT | Client → Server | 서버 연결 해제 요청 |
Reserved | 예약된 메시지 유형 |
Remote Dictionary Server
Key, Value 구조의 비정형 데이터를 저장하고 관리하기 위한 오픈소스 기반의 비관계형 DMBS 로 인메모리 데이터 구조(RAM에 데이터 저장 및 관리 하는 방식) 를 가진 저장소 이다.
속도가 빠르지만 서버가 꺼지면 모든 데이터가 사라진다는 단점이 있다.
스프링에서 Redis를 사용하는 방법은
1. RedisTemplate
2. RedisRepository
두 가지 방법이 있다.
@SpringBootApplication
@EnableCaching //추가
public class MiddlewareApplication {
public static void main(String[] args) {
SpringApplication.run(MiddlewareApplication.class, args);
}
}
스프링의 캐시 기능을 활성화해 아래와 같은 애노테이션을 사용 할 수 있다.
@Cacheable
- 캐시가 있으면 그 정보를 가져오고 없으면 등록 @Cacheput
- 실행결과를 강제로 캐시에 저장@CacheEvict
- 캐시에서 특정 항목 또는 모든 항목을 제거하는 데 사용#application.properties
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
@RequiredArgsConstructor
@Configuration
@EnableRedisRepositories
public class MiddlewareConfig {
private final RedisProperties redisProperties;
@Bean
public RedisConnectionFactory redisConnectionFactory(){
return new LettuceConnectionFactory(redisProperties.getHost(), redisProperties.getPort());
}
@Bean
public RedisTemplate<?, ?> redisTemplate() {
RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
}
RedisConnectionFactory
- Redis 연결 설정 빈
Lettuce
- Reids 자바 클라이언트
LettuceConnectionFactory
- Lettuce를 사용하여 Redis에 대한 연결을 설정하는 클래스
RedisTemplate
- Redis에 데이터를 저장하거나 조회 하기 위한 빈
setKeySerializer, setValueSerializer
- RedisTemplate를 사용할 때 동작에는 문제가 없지만 redis-cli을 통해 직접 데이터를 보려고 할 때 알아볼 수 없는 형태로 출력되기 때문에 적용
@EnableRedisRepositories
- 스프링 데이터 Redis 리포지토리를 활성화, Redis에서 데이터를 읽고 쓸 수 있는 리포지토리 빈을 생성
@Service
@RequiredArgsConstructor
public class RedisService {
private final RedisTemplate redisTemplate;
public String getValues(String key){
ValueOperations<String, String> values = redisTemplate.opsForValue();
return values.get(key);
}
public void setValues(String key, String value){
ValueOperations<String, String> values = redisTemplate.opsForValue();
values.set(key,value);
}
public void setSets(String key,String... values){
redisTemplate.opsForSet().add(key,values);
}
public Set getSets(String key){
return redisTemplate.opsForSet().members(key);
}
}
메서드 | 설명 |
---|---|
opsForValue | Strings를 쉽게 Serialize / Deserialize 해주는 interface |
opsForList | List를 쉽게 Serialize / Deserialize 해주는 interface |
opsForSet | Set을 쉽게 Serialize / Deserialize 해주는 interface |
opsForZSet | ZSet을 쉽게 Serialize / Deserialize 해주는 interface |
opsForHash | Hash를 쉽게 Serialize / Deserialize 해주는 interface |
@Data
@RedisHash("Person")
public class Person {
private String name;
private int age;
@Id
private long id;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
@RedisHash
- 특정 해쉬값을 키, 해당 클래스의 인스턴스를 값으로 적재한다.public interface PersonRedisRepository extends CrudRepository<Person, Long> {
}
스프링 Data 사용
CrudRepository<T,ID>
- 기본적인 CRUD 작업을 제공객체관계형 DBMS로 개발자는 기존 데이터 type에서 확장된 type형태를 자유롭게 정의하여 사용할 수 있다.
상속기능 제공
테이블의 칼럼을 다차원의 가변적인 배열로 정의 할 수 있다.
'MVCC (Multi-Version Concurrency Control)'
- MGA(Multi-Version Concurrency Control)
를 사용하여 동시성 관리를 해 ,이전 공간이 재사용 될 수 없는 dead tuple 상태로 저장공간을 두게 되어서 이러한 현상이 지속될 경우, 공간 부족 및 데이터IO의 비효율을 유발하여 성능저하의 원인이 된다. 때문에 주기적으로 vacuum
기능을 수행하여 재사용 가능하도록 관리해 주어야 한다.
MGA(Multi-Version Concurrency Control)
- 튜플을 update할 때 새로운 값으로 replace 처리하는 것이 아니라, 새로운 튜플을 추가하고 이전 튜플은 유효 범위를 마킹하여 처리하는 방식https://techblog.woowahan.com/9478/
CREATE TABLE person (
id serial PRIMARY KEY,
age integer,
name varchar(255)
);
private final NamedParameterJdbcTemplate template;
public Person save(Person person){
String sql = "insert into person(name, age) values(:name, :age)";
SqlParameterSource param = new BeanPropertySqlParameterSource(person);
KeyHolder keyHolder = new GeneratedKeyHolder();
template.update(sql, param, keyHolder);
long key = keyHolder.getKey().longValue();
person.setId(key);
return person;
}
public Optional<Person> findById(Long id){
String sql = "select name, age from person where id=:id";
Map<String, Object> param = Map.of("id", id);
Person person = template.queryForObject(sql,param, BeanPropertyRowMapper.newInstance(Person.class));
return Optional.ofNullable(person);
}
public List<Person> findAll(){
String sql = "select name, age from persron";
return template.query(sql, BeanPropertyRowMapper.newInstance(Person.class));
}
public void update(Long id,String name, int age){
String sql = "update person set name=:name, age=:age where id=:id";
SqlParameterSource param = new MapSqlParameterSource()
.addValue("name",name)
.addValue("age",age)
.addValue("id",id);
template.update(sql, param);
}
public void delete(Long id) {
String sql = "delete from person where id = :id";
SqlParameterSource param = new MapSqlParameterSource()
.addValue("id", id);
template.update(sql, param);
}