본 포스팅은 Spring boot 환경에서 Kafka를 연동시키는 방법 및 절차를 기술한다.
대용량의 실시간 로그 처리에 특화되어 있는 솔루션이며 데이터를 유실없이 안전하게 전달하는 것이 주목적인 메세지 시스템에서 안정적인 아키텍처와 빠른 퍼포먼스로 데이터를 처리할 수 있는 분산 스트리밍 플랫폼이다.
(zookeeper_2)
분산 애플리케이션을 사용하게 되면, 분산 애플리케이션 관리를 위한 안정적인 코디네이션 애플리케이션이 추가로 필요하게 된다. 안정적인 코디네이션 서비스로 검증된 주키퍼(Zookeeper)를 많이 사용하게 된다.
카프카를 사용하기 위해서는 주키퍼(Zookeeper) 사용이 필수적이다.
주키퍼 는 분산 애플리케이션을 위한 코디네이션 시스템이다. 분산 애플리케이션이 안정적인 서비스를 할 수 있도록 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성 관리, 그룹 관리 네이밍, 동기화 등의 서비스를 제공한다.
(zookeeper_3)
서버 여러 대를 앙상블(클러스터)로 구성하고, 분산 애플리케이션들이 각각 클라이언트가 되어 주키퍼 서버들과 커넥션을 맺은 후 상태 정보 등을 주고 받는다. (위의 그림에서 Server는 주키퍼, Client는 카프카가 된다.)
상태 정보들은 주키퍼의 지노드(znode)라고 불리는 곳에 Key-Value 형태로 저장하며, 지노드에 저장된 것을 이용하여 분산 애플리ㅔ이션들은 서로 데이터를 주고받게 된다. (znode를 일반 컴퓨터의 파일이나 폴더 개념으로 생각하면 쉬움)
지노드(znode)는 우리가 알고 있는 일반적인 디렉토리와 비슷한 형태로서 자식노드를 가지고 있는 계층형 구조로 구성되어 있다.
(znode)
각 지노드는 데이터 변경 등에 대한 유효성 검사 등을 위해 버전 번호를 관리(데이터가 변동될 때마다 지노드의 버전 번호가 증가)
주키퍼에 저장되는 데이터는 모두 메모리에 저장되어 처치량이 매우 크고 속도 또한 빠름
주키퍼는 좀 더 신뢰성 있는 서비스를 위해 앙상블(클러스터)이라는 호스트 세트를 구성할 수 있다. 앙상블로 구성되어 있는 주키퍼는 과반수 방식에 따라 살아 있는 노드 수가 과반 수 이상 유지된다면, 지속적인 서비스가 가능하다.
Spring에서 제공하는 Kafka 모듈을 이용해 API 형식으로 메시지를 받고 전송하는 기능을 제공하고 있다. 사용방법은 아래와 같다.
build.gradle 에서 Kafka에 대한 의존성을 설정한다. 설정 방법은 다음과 같다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
/* kafka */
implementation 'org.springframework.kafka:spring-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
Spring boot에서 Kafka와 연동하기 위한 설정을 한다. Spring의 application.yml 파일에서 설정할 수 있다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer
spring.kafka.producer
KafkaController.java
post 방식으로 message 데이터를 받아서, Producer 서비스로 전달.
import com.victolee.kafkaexam.Service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}
@PostMapping
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);
return "success";
}
}
KafkaProducer.java
KafkaTemplate에 Topic명과 Message를 전달한다. KafkaTemplate.send() 메서드가 실행되면, Kafka 서버로 메시지가 전송된다.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "exam";
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.println(String.format("Produce message : %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
KafkaConsumer.java
Kafka로부터 메시지를 받으려면 @KafkaListener 어노테이션을 달아주면 된다.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "exam", groupId = "foo")
public void consume(String message) throws IOException {
System.out.println(String.format("Consumed message : %s", message));
}
}
log4j에서는 Kafka와 연동하여 바로 전송할 수 있도록 하는 서비스를 제공한다. 사용 방법은 아래와 같다.
# Root logger option
log4j.rootLogger=DEBUG, stdout, kafka
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
# Redirect log messages to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.topic=kafkalogger
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.kafka.level=INFO