기본 설정
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
depends_on:
- zookeeper
Spring Web, Spring for Apache Kafka 추가한 후 프로젝트 생성application.yml 파일에 Kafka 설정을 추가spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: group_id
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
메시지를 생성하고 처리하는 패턴으로, 모든 메시지 처리는 send 시 함께 보내는 topic을 기준으로 전달된다.
각 topic에 대해서 consumer는 group id를 기준으로 한 번씩 처리된다.
예를 들어, example_topic이라는 topic 기준으로 생성된 메시지는 group_id1, group_id2로 consumer가 붙어있다면 각각 처리되어 2번의 함수 실행이 유발된다는 뜻
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "example_topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
System.out.println("Producing message: " + message);
kafkaTemplate.send(TOPIC, message);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "example_topic", groupId = "group_id")
public void consume(String message) {
System.out.println("Consumed message: " + message);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerGroup1 {
@KafkaListener(topics = "example_topic", groupId = "group_id_1")
public void consumeGroup1(String message) {
System.out.println("Consumer Group 1 - Consumed message: " + message);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerGroup2 {
@KafkaListener(topics = "example_topic", groupId = "group_id_2")
public void consumeGroup2(String message) {
System.out.println("Consumer Group 2 - Consumed message: " + message);
}
}
Kafka에서는 3가지 압축 방식을 지원한다.
다만 압축하려는 데이터 타입에 따라 압축률과 속도가 다를 수 있다.
예를 들어, 텍스트 데이터는 압축률이 높게 나오지만, 바이너리 데이터나 이미 압축된 데이터는 압축률이 낮게 나올 수 있다.
Producer 측에만 설정하면 된다.
Consumer는 압축된 데이터를 자동으로 해제한다.
spring:
kafka:
producer:
compression-type: snappy