이전 포스트에서 정리한 카프카를 로컬 환경에서 사용해보았다.
환경은 아래와 같다.
내가 생성한 예제는 notify-crawler 프로젝트가 Producer 역할, notify-server 프로젝트가 Consumer 역할을 맏도록 하였다.
이후 application.yml을 아래와 같이 설정하였다.
server:
port: 8080
두 어플리케이션의 포트번호가 달라야 하므로 하나는 8080
포트로 하나는 8081
포트로 설정하였다.
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "my_topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
코드를 보면 "my_topic"이라는 토픽에 메시지를 전송하는 것을 볼 수 있다.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@PostMapping("/publish")
public void sendMessageToKafka(@RequestParam("message") String message) {
kafkaProducer.sendMessage(message);
}
}
코드를 보면 /publish
로 POST
요청이 왔을 대 message
를 url에서 찾아서 프로듀서 서비스를 이용해 전송하는 것을 볼 수 있다.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
코드를 보면 my_topic
에서 메시지를 가져와서 시스템 출력으로 출력하는 것을 볼 수 있다.
카프카 다운로드 페이지에서 사진에 보이는 바이너리 코드를 다운로드 한 후 C 드라이브 안의 kafka 폴더
를 만들어 그 안에 압축을 푼다.
명령 프롬프트를 열고 Kafka 디렉토리로 이동한다.
cd C:\kafka\kafka_2.13-2.8.0
Zookeeper를 실행한다.
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
명령 프롬프트를 열고 Kafka 디렉토리로 이동한다.
cd C:\kafka\kafka_2.13-2.8.0
카프카 브로커를 실행한다.
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
메시지가 잘 전달된 것을 볼 수 있다.
참고: