카프카(Kafka)란 데이터를 스트림 파이프라인을 통해 실시간으로 관리하기 위한 메시징 분산 스트리밍 플랫폼이다.
Pub-Sub 모델의 메시지 큐 형태로 동작하며 분산환경에 특화되어 있다.

클러스터 (Cluster)
여러 대의 컴퓨터들이 연결되어 하나의 시스템처럼 동작하는 컴퓨터들의 집합
프로듀서 (Producer)
데이터를 만들어 전달하는 역할
컨슈머 (Consumer)
전달받은 데이터를 브로커(Broker)에 요청하여 데이터를 소비하는 역할
브로커 (Broker)
실행된 카프라 서버를 말한다.
- Producer와 Consumer는 별도의 애플리케이션으로 구성되는 반면, 브로커는 카프카 자체이다.
- Broker(각 서버)는 Kafka Cluster 내부에 존재한다.
서버 내부에 메시지를 저장하고 관리하는 역할을 수행한다.
Zookeeper
분산 애플리케이션 관리를 위한 코디네이션 시스템
분산 메시지큐의 메타 정보를 중앙에서 관리하는 역할
토픽 (Topic)
각각의 메시지를 목적에 맞게 구분할 때 사용.
메시지를 전송하거나 소비할 때 Topic을 반드시 입력해야 함.
Consumer는 자신이 담당하는 Topic의 메시지를 처리한다.
한 개의 토픽은 한 개 이상의 파티션으로 구성된다.
파티션 (Partition)
분산 처리를 위해 사용된다.
Topic 생성 시, Partition 개수를 지정할 수 있다. (파티션 개수 변경 가능 * 추가만 가능)
파티션이 1개라면 모든 메시지에 대해 순서가 보장
파티션 내부에서 각 메시지는 offset(고유 번호)로 구분
파티션은 여러 개의 브로커에 걸쳐서 저장된다.
파티션이 여러갤라면 kafka 클러스터가 라운드 로빈 방식으로 분배해서 분산처리되기 때문에 순서 보장되지 않음.
파티션이 많을 수록 처리량이 좋지만 장애 복구 시간이 늘어남.
Kafka zookeeper 설치docker-compose upservices:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- '2181:2181'
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- '9092:9092'
volumes:
- '/var/run/docker.sock:/var/run/docker.sock'
environment:
KAFKA_ADVERTISED_HOST_NAME: '127.0.0.1'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
docker exec -it kafka /bin/bash
kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
kafka-topics.sh --list --bootstrap-server localhost:9092

cd opt/kafka_2.13-2.8.1/bin/
./kafka-console-consumer.sh --topic topic-test --bootstrap-server kafka:9092

./kafka-console-producer.sh --topic topic-test --broker-list kafka:9092

Dependency Add// kafka
implementation 'org.springframework.kafka:spring-kafka'
Producer@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-server}")
private String bootstrapServer;
@Bean
public ProducerFactory<String ,Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Slf4j
@RequiredArgsConstructor
@Component
public class KafkaEventSender {
private final KafkaTemplate<String ,Object> kafkaTemplate;
public void send(String topic, Object data) {
log.info("Kafka Producer Send (Topic : {} Data : {})", topic, data.toString());
kafkaTemplate.send(topic, data.toString());
}
}
@Test
void producer_test() {
kafkaEventSender.send(EventTopic.TOPIC_TEST.getTopic(), "test");
}
Consumer@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-server}")
private String bootstrapServer;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String ,Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Slf4j
@Component
public class KafkaTestConsumer {
@KafkaListener(topics = "topic-test", groupId = "group_1")
public void listener(ConsumerRecord<String,String> record) {
log.info(record.toString());
}
}