Spring Boot Kafka Json 전송하기

김정수·2023년 10월 19일
post-thumbnail

Kafka?

일반적인 api 방식은 요청과 응답 사이의 동기적인 통신을 의미하며 클라이언트가 요청을 보내면 서버가 해당 요청을 처리하고 즉시 응답을 반환합니다. 클라이언트는 응답을 받을 때까지 기다리는 것입니다.

Kafka는 비동기적인 이벤트 스트리밍 서비스이고, 클라이언트는 요청에 대한 응답을 기다리지 않고 다른 수행을 작업 할 수 있습니다.

🔽여기서 동기와 비동기란?

  • 동기 : 동기적 작업은 순차적으로 진행됩니다. 그리고 한 작업이 시작되면 다음 작업은 이전 작업이 완료될 때까지 대기하면서 작업의 진행을 차단합니다.

  • 비동기 : 비동기적 작업은 동시에 다수의 작업을 처리할 수 있고, 작업이 완료되기를 기다리지 않고 다음 작업을 진행합니다. 그리고 요청을 보내면 즉시 다음 작업을 수행하며, 응답을 기다리지 않고 다른 작업을 처리합니다.

application.yml 설정

spirng:
	kafka:
   	bootstrap-servers: localhost:9092

kafka의 기본포트는 9092번이며, 한 프로젝트 안에서 테스트를 하기 위해 bootstrap-servers만 설정해 두었습니다.

객체 설정

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Chatmessage {

    private String sender;
    private String context;
}

메세지를 보낸 사람과 그 내용을 json으로 받는다고 하였을때, 먼저 객체부터 만들어줍니다.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean
    public ConsumerFactory<String, Chatmessage> consumerFactory() {

        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");

        return new DefaultKafkaConsumerFactory<>(
                config,
                new StringDeserializer(),
                new JsonDeserializer<>(Chatmessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Chatmessage> kafkaListener() {
        ConcurrentKafkaListenerContainerFactory<String, Chatmessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

그리고 ConsumerConfig를 설정합니다. 여기서 GROUP_ID_CONFIG는 서버간에 통신을 할 경우 식별하기 위한 입장권같은 개념으로 알고있습니다. 저는 "groupId"라고 설정을 해두었습니다.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "testTopic", groupId = "groupId", containerFactory = "kafkaListener")
    public void consume(Chatmessage message){
        System.out.println("name = " + message.getSender());
        System.out.println("consume message = " + message.getContext());
    }

}

그리고 Consumer Service에 @KafkaListener를 통해 설정해줍니다. 그리고 topic은 Producer와 같아야하니 주의 해주세요.

===========

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean
    public ProducerFactory<String, Chatmessage> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Chatmessage> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

그리고 ProducerConfig를 설정해줍니다

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducerService {
    private static final String TOPIC = "testTopic";
    private final KafkaTemplate<String, Chatmessage> kafkaTemplate;

    public void sendMessage(Chatmessage chatmessage) {
        System.out.println("chatmessage = " + chatmessage.getContext());

        kafkaTemplate.send(TOPIC, chatmessage);
    }
}

ProducerSerivce를 설정합니다 여기서 Topic은 ConsumerSerivce와 같아야합니다.

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaProducerService producerService;

    @PostMapping("/kafka")
    public String sendMessage(@RequestBody Chatmessage chatmessage) {
        System.out.println("chatmessage = " + chatmessage);
        producerService.sendMessage(chatmessage);

        return "success";
    }
}

마지막으로 Controller의 POST를 통해 테스트를 하면 됩니다. 그리고 서버를 켜기전에 먼저 Kafka와 Zookeeper를 켜놓은 상태에서 테스트를 해야합니다.

profile
현재진행형

0개의 댓글