build.fradle
에 작성한다.
// kafka
implementation 'org.springframework.kafka:spring-kafka'
application.properties
에 작성한다.
producer
의 역할을 할 수 있도록 했다.
직렬화를 통해 보내주어야 해서 serialization
으로 보내준다. 만약 consumer
역할을 추가해준다면 deserialization
으로 작성해주면 된다.
# kafka
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message){
this.kafkaTemplate.send("topic 값",message);
}
}
나는 토픽값으로 seol_roh_log_testing 로 작성했다.
끝! 생각보다 너무 간단했다.
(base) root@master:/home# /home/kafka-sparkhu/confluent-6.2.2/bin/kafka-console-consumer (--from-beginning) --topic spring boot에서 보낸 topic값 --bootstrap-server localhost:9092
--from-beginning
은 해당 명령어를 입력하기 전부터 받은 모든 목록을 읽는다.
도착 확인!