Kafka란 분산 스트리밍 플랫폼으로, 주로 실시간 데이터 피드의 빅 데이터 처리를 목적으로 사용된다. Kafka는 메시지 큐와 유사하지만, 대용량 데이터 스트림을 저장하고 실시간으로 분석하거나 처리하는 데 중점을 둔다.
장점 : 데이터 복제와 확인 메커니즘에 따른 신뢰성, 다양한 소비자 패턴과 프로토콜을 지원하는 유연성, 분산 시스템과 수평 확장을 통한 확장성, 높은 처리랑과 지연 최소화하는 성능, 다양한 관리도구와 플러그인 시스템을 통한 관리 및 모니터링이 가능한 장점이 있다.
단점 : 초시 설정의 복잡성과 운영 관리가 필요한 점, 브로커 오버헤드 발생 가능성과 대규모 메시지 처리에 따른 성능 저하 가능성, 리소스 소비가 늘어나고 모니터링 및 유지보수에 따른 운영 비용 등이 있다.
스트리밍 플랫폼: Kafka는 대량의 데이터를 실시간으로 처리하고 전송할 수 있는 시스템이다. 이는 데이터를 스트리밍 방식으로 처리하는 데 적합하다.
분산 구조: Kafka는 분산 시스템으로 설계되어 있으며, 여러 서버에 걸쳐 데이터를 분산 저장하고 처리한다. 이로 인해 높은 가용성과 확장성을 제공할 수 있다.
높은 처리량: Kafka는 높은 처리량을 지원한다. 대량의 데이터도 빠르게 전송하고 처리할 수 있어 실시간 데이터 처리가 필요한 애플리케이션에 적합하다.
내구성: Kafka는 데이터를 디스크에 저장하여 높은 내구성을 제공한다. 데이터는 복제되어 여러 브로커에 저장되므로, 하나의 브로커가 실패하더라도 데이터 손실 없이 시스템이 계속 운영된다.
구독과 발행 모델: Kafka는 발행(Producer)과 구독(Consumer) 모델을 사용한다. 프로듀서는 데이터를 Kafka에 게시하고, 컨슈머는 이 데이터를 구독하여 처리한다.
토픽과 파티션: Kafka는 데이터를 토픽(Topic)으로 나누어 저장한다. 각 토픽은 여러 파티션(Partition)으로 나뉘어 있어 병렬 처리가 가능하다. 이는 데이터의 분산 저장과 높은 처리 성능을 지원한다.
지속성: Kafka는 데이터를 로그 형태로 저장하며, 로그 파일은 지정된 기간 동안 보존된다. 이를 통해 데이터를 오랜 기간 동안 저장하고 후속 분석에 활용할 수 있다.
연결성과 통합: Kafka는 다양한 데이터 소스와 데이터 싱크를 연결할 수 있는 강력한 연결성과 통합 기능을 제공한다. Kafka Connect와 Kafka Streams와 같은 기능을 통해 데이터의 흐름을 쉽게 관리하고 처리할 수 있다.
메시지(Message)
Kafka를 통해 전달되는 데이터 단위로, 키, 값, 타임스탬프, 메타데이터들로 구성된다.
프로듀서(Producer)
메시지를 생성하고 Kafka에 보내는 역할을 하는 애플리케이션. 특정 토픽(topic)에 메시지를 보낸다.
컨슈머(Consumer): 토픽에서 Kafka를 통해 데이터를 읽어가는 역할을 하는 애플리케이션. 기본적으로 sticky partitioning (특정 컨슈머가 특정 파티션에 붙어서 계속해서 데이터를 처리하는 방식으로 지역성을 높여 캐시 히트율을 증가시키고 전반적 처리 성능을 향상시킴)을 사용한다.
브로커(Broker): Kafka 클러스터를 구성하는 서버로 메시지를 저장하고 전송하는 역항르 한다. 하나의 Kafka는 여러 브로커로 구성될 수 있으며, 각 브로커는 하나이상의 토픽의 파티션을 관리한다.
토픽(Topic)
메시지를 저정하는 장소. 메시지는 토픽에 저장되었다가 컨슈머에게 전달된다.
토픽은 여러 파티션으로 나뉠 수 있으며 파티션은 메시지를 순서대로 저장하며 파티션을 통해 병렬 처리가 가능하다.
파티션(Partition)
토픽을 물리적으로 나눈 단위로, 각 파티션은 독립적으로 메시지를 저장하고 관리하며 병렬 처리를 지원한다. 클러스터 내의 여러 브로커에 분산시켜서 저장할 수 있다.
키(Key)
메시지를 특정 파티션에 할당하는데 사용되는 값으로 동일한 키를 가진 메시지는항상 동일한 파티션에 저장된다.
주키퍼(Zookeeper): Kafka의 메타데이터와 클러스터 상태를 관리하는 서버. 주키퍼는 브로커와 토픽, 파티션 등의 정보(메타데이터)를 관리하며, 클러스터의 안정성을 보장하며 브로커 간의 상호작용을 조정한다
RabbitMQ는 전통적인 메시지 브로커로, 작업 큐, 요청/응답 패턴, 비동기 작업처리 등 메시지의 안정적인 전달과 큐잉에 중점을 두는 전통적인 메시지 큐 사용 사례에 적합하다. 메시지 모델은 큐를 증심으로 메시지를 전달하고 메시지는 큐에 저장되고 메시지를 메모리나 디스크에 저장할 수 있으며 일반적으로 단기 저장을 목표로 한다.
Kafka는 분산 스트리밍 플랫폼으로 실시간 데이터 스트리밍, 로그 수집 및 분석, 이벤트 소싱 등 대규모 실시간 데이터 스트림 처리에 적합하다. 메시지 모델은 토픽을 중심으로 메시지를 저장하며 메시지는 토픽의 파티션에 저장되고, 메시지는 디스크에 저장하며 대부분 장기 저장을 목표로 한다.
도커 컴포즈를 이용하여 Kafka 컨테이너를 생성하기 위한 docker-compose.yml 파일이 있는 경로에서 도커 컴포즈를 실행한다.
Kafka UI는 설정 파일에 따라 localhost:8080으로 접속할 수 있다.
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
platform: linux/amd64
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
platform: linux/amd64
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 #카프카 서버는 9092로 접속가능
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-ui:
image: provectuslabs/kafka-ui:latest
platform: linux/amd64
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_READONLY: "false"
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class ProducerApplicationKafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/send")
public String sendMessage(@RequestParam("topic") String topic,
@RequestParam("key") String key,
@RequestParam("message") String message) {
kafkaTemplate.send(topic, key, message);
return "Message sent success";
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션.
@Configuration
public class ConsumerApplicationKafkaConfig {
// ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용된다.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
// ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용된다.
// 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory를 생성합니다.
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ConsumerService {
// @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정한다.
// Kafka 토픽 "topic1"에서 메시지를 수신하면 이 메서드가 호출된다.
@KafkaListener(groupId = "group_a", topics = "topic1")
public void consumeFromGroupA(String message) {
log.info("Group A consumed message from topic1: " + message);
}
}
Producer 에서 consumer로 메시지가 토픽에 저장되어있다 전달되는 것을 확인할 수 있다. 메시지는 리스너의 해당 토픽으로 전달이 된다.