build.gradle
plugins {
id 'org.springframework.boot' version '2.5.6'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
group = 'me.jinmin'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
useJUnitPlatform()
}
application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer
bootstrap-servers
호스트
:포트
쌍consumer
전용으로 오바리이딩group-id
auto-offset-reset
latest
: 가장 최근에 생산된 메시지 offset
resetearliest
: 가장 오래된 메시지로 offset
resetnone
: offset
정보없을때, Exception 발생key-deserializer
/ value-deserializer
JsonDeserializer
)spring.kafka.producer
bootstrap-servers
key-serializer
/ value-serializer
docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Producer
package me.jinmin.springkafkaautoconfig;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "jinmin";
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessgae(String msg) {
System.out.println(String.format("Produce message : %s", msg));
kafkaTemplate.send(TOPIC, msg);
}
}
KafkaTemplate
을 통해 해당되는 TOPIC에 메시지를 생산한다.Consumer
package me.jinmin.springkafkaautoconfig;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaConsumer {
@KafkaListener(
topics = "jinmin",
groupId = "foo"
)
public void listen(String msg) throws IOException {
System.out.println(String.format("Consumed message : %s", msg));
}
}
KafkaListener
를 통해 topic
과 groupId
에 해당되는 메시지를 소비한다.Controller
package me.jinmin.springkafkaautoconfig;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class Controller {
private final KafkaProducer kafkaProducer;
@PostMapping("/kafka")
public String sendMsg(@RequestParam("message") String message) {
kafkaProducer.sendMessgae(message);
return "Successful Sending!!";
}
}
docker-compose.yml
설정을 통해 Docker에 Zookeeper 및 Kafka 컨테이너를 먼저 실행하자.docker container exec -it [container_name] bash
jinmin
이라는 토픽을 제대로 생성Run