
사이드 프로젝트에서 SSE를 활용해 실시간 푸시 알림 기능을 구현했었다.
알림 전송에서의 안정성과 신뢰성을 높히기 위해 여러 방면을 고민하다가, Kafka를 도입하게 되었다. Kafka & SSE를 통해 구현한 실시간 이벤트 스트리밍 도입 과정을 기록하고자 한다.
Kafka가 무엇인지는 이전의 포스팅을 참고하면 된다.
Kafka 도입하게 된 배경
SSE(Server-Sent-Event)의 가장 큰 특징은 클라이언트와 서버가 커넥션을 유지한다는 점이다. 다수의 클라이언트가 연결될 경우, 서버가 모든 연결을 관리하기 때문에 서버 부하가 증가할 수 있다.
또한, 클라이언트가 네트워크 문제로 연결이 끊겼을 때, 손실된 데이터, 누락된 이벤트를 처리하는 로직을 별도로 구현해야한다.
위 2가지 이슈를 보완하기 위해 실시간 이벤트 스트리밍의 대명사로 알려진 Kafka를 도입하게 되었다.
Kafka 도입으로 얻는 이점
푸시 알림 생성 이벤트를 publish만 하면 된다. 같은 WAS에서 이벤트를 consume하지만, 비동기로 빠르게 효율적으로 처리하기 때문에, 서버 부하 부담을 줄여줄 수 있다.
필자는 Docker를 통해 kafka, zookeeper 컨테이너를 운용하고 있다.
docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose 파일을 작성할 때는 항상 띄어쓰기를 주의해야한다.
의존성 추가

ProducerConfig

브로커 컨테이너를 ec2 인스턴스에 운용하고 있어서 ec2 dns 주소로 작성했다.
로컬 환경의 경우, localhost로 작성하면된다.
KafkaTemplate<String, ?> ?는 제네릭을 뜻하는게 아니고, 메시지를 produce할 때 사용할 DTO, Record 타입의 객체를 많이 쓰는 것 같다.
ConsumerConfig

addTrustPackages를 추가하지 않았을 때는, untrusted packages~ 라는 에러를 로그에서 봐서 구글링해보니 저렇게 명시해줘야 하는 걸 알게되었다.
메시지 전송을 위한 NotifyDto

푸시 알림 이벤트를 발행하는 케이스는 특정 유저가 미션에 참여한 경우, 인증글을 작성한 경우지만, 추후 확장성을 위해 NotificationType을 Enum으로 선언했다.
메시지 produce service

메세지 cosumer service

@kafkaListners 어노테이션을 통해 메세지 브로커에 들어있는 notify 토픽의 메세지를 consumer하는 서비스 로직이다.
연결이 수립되어 있는 클라이언트로는 푸시 알람과 함께 DB에 저장하고, 연결되어 있지 않은 유저는 알림 없이, DB에 알림을 저장하도록 구현했다.
이벤트 발행 메서드를 호출하는 service

포스트가 작성되었을 때, 작성한 당사자를 제외하고 해당 미션에 참여자 전체에게 푸시 알림, 알림 내역을 저장하는 이벤트를 발행하도록 작성했다.