이전 실습까지는 직접 알림을 보내는 작업을 했다면, 이번 시간부터는 카프카를 이용해서 알림 서비스를 구현해볼 것이다.
이벤트(상태변경)가 발생하면 그 이벤트를 다른 서비스들이 감지해서 처리하는 비동기 통신이다. 이를 이용해 서비스 간 결합력을 낮추기도 한다.
서비스간 독립성과 확장성이 중요할 때
처리 부하를 분산시킬 때
이벤트 기반 자동화를 원할 때
이벤트 드리븐 아키텍쳐의 핵심 기술 중 하나인 카프카를 이용해보자.
분산 메시지 큐 시스템으로, 대용량 데이터를 비동기적으로 처리할 수 있게 해주는 도구
브로커 : 서버
토픽 : 메시지 이름
프로듀서 : 메시지를 생산하여 브로커의 토픽 이름으로 보내는 애플리케이션
컨슈머 : 브로커의 토픽을 구독하여 저장된 메시지를 가져가서 처리
카프카를 설치한 디렉터리에 docker-compose.yml를 다운로드 한다.
yml은 다음과 같다.
version: "2"
services:
kafdrop:
image: obsidiandynamics/kafdrop:3.31.0
restart: "always"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:29092"
JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
depends_on:
- "kafka"
kafka:
image: obsidiandynamics/kafka
restart: "always"
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
파워셀에서 docker compose up -d를 실행한 후, http://localhost:9000/를 실행한다.

이런 창이 뜨면 성공!
기존에 만들어 놓은 user, post, alim 서비스에 의존성을 추가하고 새로고침을 눌러준다.
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
application.yml
kafka:
listener:
ack-mode: manual_immediate
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
auto-offset-reset: latest
max-poll-records: 10
properties:
spring.json.trusted.packages: "*"
spring.json.use.type.headers: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaMessageProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic, Object message){
kafkaTemplate.send(topic, message);
}
}
@Getter
@Setter
public class SiteUserInfoEvent {
public static final String Topic = "userinfo"; -> 토픽 이름
private String action;
private String userId;
private String phoneNumber;
private LocalDateTime eventTime;
public static SiteUserInfoEvent fromEntity(String action, SiteUser siteUser) {
SiteUserInfoEvent message = new SiteUserInfoEvent();
message.action = action;
message.userId = siteUser.getUserId();
message.phoneNumber = siteUser.getPhoneNumber();
message.eventTime = LocalDateTime.now();
return message;
}
}
kafkaMessageProducer.send(SiteUserInfoEvent.Topic, message);
이후, 빌드를 해보면 설정한 토픽 이름과 아래와 같이 메시지를 확인할 수 있다.
