❗카프카의 잘못된 사용 예제입니다. 카프카의 학습을 위해 적용한 예제로 사용을 철회하진 않을 것이나 다음과 똑같은 상황이라면 카프카를 사용하지 않을 것입니다.
카프카 잘못된 설계 고찰❗
나는 Kafka
를 선택했다.
위와 같은 이유로 선택했다.
Docker를 통해 카프카를 실행할 것이다. 설정값을 먼저 살펴 본 후 카프카에서 네트워크 설정을 알아보자.
로컬에서 테스트를 위한 docker compose 파일을 작성했다. 멀티모듈 최상단 docker 패키지에 위치한다.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka0
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER1:PLAINTEXT,LISTENER1_INTERNAL:PLAINTEXT
KAFKA_LISTENERS: LISTENER1://kafka0:9092,LISTENER1_INTERNAL://kafka0:29092
KAFKA_ADVERTISED_LISTENERS: LISTENER1://localhost:9092,LISTENER1_INTERNAL://kafka0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER1_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
주키퍼
-> 카프카
순서로 실행시켜야한다. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
값은 통신 프로토콜로 어떤 리스너가 어떤 프로토콜을 사용하는지 key, value로 매핑해줄 수 있다.
도커내 환경에서는 2개의 네트워크 설정이 필요하다.
현재 나의 설정에서는 LISTENER1
, LISTENER1_INTERNAL
두개의 설정이 있으며 LISTENER1
이 외부 통신용, LISTENER1_INTERNAL
이 내부 통신용이다. KAFKA_INTER_BROKER_LISTENER_NAME
설정값에 꼭 내부 통신 리스너를 명시해주어야한다.
KAFKA_LISTENERS
: 브로커가 클라이언트 요청을 수신할 수 있는 호스트 및 포트를 지정하는 데 사용된다.
KAFKA_ADVERTISED_LISTENERS
: 클라이언트가 Kafka 브로커에 연결할 때 사용하는 호스트 및 포트를 정의한다. 즉, 클라이언트가 연결할 수 있는 공개적인 주소를 지정하는 데 사용된다.
나는 도커를 사용하기 때문에 브로커로 바로 붙을 수 없다. 따라서 외부에서 접속 가능한 값을 api서버에 세팅해 주어야 하는데 이때 KAFKA_ADVERTISED_LISTENERS
에 설정된 리스너 중 외부 통신용 리스너를 사용하면된다.
다음의 명령어를 참고하여 도커 컴포즈를 로컬에서 실행하면 된다.
docker-compose -f kafka-docker-compose.yml up
docker-compose -f kafka-docker-compose.yml up --build
docker-compose -f kafka-docker-compose.yml down
다음 --describe 옵션을 통해 생성된 토픽을 확인할 수 있다.
docker exec -it kafka0 kafka-topics.sh --bootstrap-server kafka0:29092 --topic create-user-point --describe
필요한 토픽은 총 6개
다음의 명령어를 통해 토픽을 생성할 수 있다.
docker exec -it kafka0 kafka-topics.sh --create --bootstrap-server kafka0:29092 --topic create-user-point
study-internal:async
모듈에 작성했다.study-internal:async
모듈은 교체를 용이하게 하기 위해 아무 모듈도 의존하지 않는다.interface StudyPointService {
fun createStudyRoom(studyRoomId: Long, point: Int)
fun joinStudyRoom(studyRoomId: Long, userId: Long, point: Int)
}
@Component
class StudyPointProducer(
private val kafkaTemplate: KafkaTemplate<String, String>
): StudyPointService {
override fun createStudyRoom(studyRoomId: Long, point: Int) {
kafkaTemplate.send("create-study-point", studyRoomId.toString(), point.toString())
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
override fun joinStudyRoom(studyRoomId:Long, userId: Long, point: Int) {
kafkaTemplate.send("add-study-point", studyRoomId.toString(), point.toString())
kafkaTemplate.send("sub-user-point", userId.toString(), point.toString())
}
}
다음의 명령을 통해 프로듀서를 테스트해 볼 수 있다.
docker exec -it kafka0 kafka-console-producer.sh --broker-list kafka0:29092 --topic create-user-point
study-consumer
모듈을 작성했다.@KafkaListener
를 사용하여 파티션의 레코드를 처리한다.study-domain
모듈을 의존한다.@Component
class StudyPointConsumer (
private val studyRoomPointRepository: StudyRoomPointRepository,
private val studyRoomRepository: StudyRoomRepository,
private val studyRoomPointLogRepository: StudyRoomPointLogRepository,
) {
@KafkaListener(topics = ["create-study-point"], groupId = "point")
fun createStudyPoint(data: ConsumerRecord<String, String>) {
val studyRoomId = data.key().toLong()
val point = data.value().toInt()
val studyRoom = studyRoomRepository.findByIdOrNull(studyRoomId)!!
studyRoomPointRepository.save(StudyRoomPoint(studyRoom, point))
saveStudyRoomPointLog(studyRoom, point, ADDED, SYSTEM)
}
...
}
다음의 명령을 통해 컨슈머를 테스트해 볼 수 있다.
docker exec -it kafka0 kafka-console-consumer.sh --bootstrap-server kafka0:29092 --topic create-user-point --property print.key=true --from-beginning
카프카를 통해 처리되는 로직은 다음과 같다.
이 중 스터디방 가입 시, 스터디방 포인트 증가, 유저 포인트 감소는 카프카 이벤트의 트랜잭션 처리가 필요하다.
카프카 트랜잭션 정리한 것을 바탕으로 스프링 이벤트를 활용한 트랜잭션 처리를 해보겠다.
우선 Producer Transaction Prefix Id 설정, Consumer Trasaction 격리 레벨 설정이 필요하다.
Producer Transaction Prefix Id 설정
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val config = HashMap<String, Any>()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.ACKS_CONFIG] = "1"
config[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "point-transaction-id"
return DefaultKafkaProducerFactory(config)
}
TRANSACTIONAL_ID_CONFIG
를 point-transaction-id
로 설정한다.@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val config = HashMap<String, Any>()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:29092"
config[ConsumerConfig.GROUP_ID_CONFIG] = "point"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] =
org.apache.kafka.common.serialization.StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =
org.apache.kafka.common.serialization.StringDeserializer::class.java
config[ConsumerConfig.ISOLATION_LEVEL_CONFIG] = "read_committed"
return DefaultKafkaConsumerFactory(config)
}
ISOLATION_LEVEL_CONFIG
를 read_committed
로 설정한다.@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
fun joinStudyPoint(event: StudyRoomJoinedEvent) {
studyPointService.joinStudyRoom(event.studyRoomId, event.userId, event.studyPoint)
}
@TransactionalEventListener
를 통해 commit 후 카프카 트랜잭션을 새로 적용한다.@Transactional(propagation = Propagation.REQUIRES_NEW)
override fun joinStudyRoom(studyRoomId:Long, userId: Long, point: Int) {
kafkaTemplate.send("add-study-point", studyRoomId.toString(), point.toString())
kafkaTemplate.send("sub-user-point", userId.toString(), point.toString())
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
을 추가하여 의도적으로 카프카 트랜잭션이 시작한다는 것을 알리고 싶었다.