Kafka는 실시간 데이터 피드를 처리하기 위해 설계된 분산 스트리밍 플랫폼입니다. 원래 LinkedIn에서 개발되어 Apache Software Foundation에 오픈 소스로 기여된 Kafka는 실시간 데이터 파이프라인 구축과 스트리밍 애플리케이션에 널리 사용됩니다. 기존의 메시지 브로커와 달리, Kafka는 대규모 데이터를 처리하는 데 최적화되어 있으며, 많은 빅 데이터 아키텍처에서 중요한 역할을 합니다.
신뢰성:
유연성:
확장성:
성능:
관리 및 모니터링:
설정 및 운영 복잡성:
성능 문제:
운영 비용:
러닝 커브:
Kafka 설치**
Docker Compose 파일 생성: 아래 내용을 docker-compose.yml로 저장합니다.
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
platform: linux/amd64
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
platform: linux/amd64
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-ui:
image: provectuslabs/kafka-ui:latest
platform: linux/amd64
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_READONLY: "false"
Docker Compose 실행: Kafka와 Zookeeper 컨테이너를 시작하려면 아래 명령어를 실행합니다.
docker compose up -d
Kafka UI 접속: 컨테이너가 실행된 후, 브라우저에서 localhost:8080에 접속하여 Kafka UI에 접속할 수 있습니다.
Spring Boot 프로젝트 생성: start.spring.io를 사용하여 새로운 프로젝트를 생성합니다. Web과 Spring Kafka 의존성을 포함합니다.
Gradle 디펜던시 설정: build.gradle 파일에 아래와 같은 디펜던시를 추가합니다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
애플리케이션 설정: application.properties 파일에서 Kafka 프로듀서 설정을 아래와 같이 구성합니다.
spring.application.name=producer
server.port=8090
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka 설정 클래스 작성: ProducerApplicationKafkaConfig.java 파일을 생성하고 아래 코드를 추가합니다.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class ProducerApplicationKafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
프로듀서 컨트롤러 작성: ProducerController.java 파일을 생성하고 아래 코드를 추가합니다.
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final ProducerService producerService;
@GetMapping("/send")
public String sendMessage(@RequestParam("topic") String topic,
@RequestParam("key") String key,
@RequestParam("message") String message) {
producerService.sendMessage(topic, key, message);
return "Message sent to Kafka topic";
}
}
프로듀서 서비스 작성: ProducerService.java 파일을 생성하고 아래 코드를 추가합니다.
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class ProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String message) {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(topic, key, message + " " + i);
}
}
}
Spring Boot 프로젝트 생성: start.spring.io에서 새로운 프로젝트를 생성하고, Web과 Spring Kafka 의존성을 추가합니다.
Gradle 디펜던시 설정: build.gradle 파일을 아래와 같이 구성합니다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
애플리케이션 설정: application.properties 파일을 아래와 같이 구성합니다.
spring.application.name=consumer
server.port=8091
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Kafka 설정 클래스 작성: ConsumerApplicationKafkaConfig.java 파일을 생성하고 아래 코드를 추가합니다.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class ConsumerApplicationKafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
컨슈머 서비스 작성: ConsumerService.java 파일을 생성하고 아래 코드를 추가합니다.
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ConsumerService {
@KafkaListener(groupId = "group_a", topics = "topic1")
public void consumeFromGroupA(String message
) {
log.info("Group A consumed message from topic1: " + message);
}
@KafkaListener(groupId = "group_b", topics = "topic1")
public void consumeFromGroupB(String message) {
log.info("Group B consumed message from topic1: " + message);
}
@KafkaListener(groupId = "group_c", topics = "topic2")
public void consumeFromTopicC(String message) {
log.info("Group C consumed message from topic2: " + message);
}
@KafkaListener(groupId = "group_c", topics = "topic3")
public void consumeFromTopicD(String message) {
log.info("Group C consumed message from topic3: " + message);
}
@KafkaListener(groupId = "group_d", topics = "topic4")
public void consumeFromPartition0(String message) {
log.info("Group D consumed message from topic4: " + message);
}
}
```
애플리케이션 실행: 위에서 작성한 프로듀서와 컨슈머 애플리케이션을 실행합니다.
Kafka UI 확인: Kafka UI에서 Topics 탭과 Consumers 탭을 확인합니다.
메시지 발행: 프로듀서 애플리케이션을 통해 토픽을 지정하고 메시지를 발행합니다. 예를 들어, 토픽을 topic1으로 지정하고 메시지를 발행하면, Kafka UI에서 해당 메시지가 수신된 것을 확인할 수 있습니다.
컨슈머 확인: 컨슈머 애플리케이션의 로그를 확인하여 메시지가 적절하게 수신되었는지 확인합니다. 그룹 ID에 따라 메시지가 분산 처리된 것을 확인할 수 있습니다.