먼저 IntelliJ 다운로드를 해준다.
새 프로젝트를 생성하자.

다음과 같이 설정하여 프로젝트를 생성하겠다.

Controller 하나와 Broker 3개를 켜주었다.
https://kafka.apache.org/41/apis/#producer-api
Producer 코드를 작성하기에 앞서 공식 문서의 설명에 따라 Maven pom.xml 파일에 종속성을 추가해준다.


Kafka/src/main/java 폴더에 새로운 Java 클래스 Producer를 생성해준다.

https://kafka.apache.org/41/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
위 공식문서를 참고해 Producer를 만들어보자.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092, localhost:9093, localhost:9094");
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
int numOfRecords = 100;
for (int i = 0; i < numOfRecords; i++)
producer.send(new ProducerRecord<>("reviews", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
우리는 Kafka의 Producer를 사용할 것이기 때문에 코드를 수정해주었다.
맨 위 import문은 우리가 pom.xml파일에 종속성을 추가해주었기 때문에 KafkaProducer 입력 시 자동으로 입력된다.
후에 'reviews'라는 이름의 topic을 만들 예정이기 때문에 topic값은 reviews로 설정해주었다.
Topic을 생성해보자.
bin/kafka-topics.sh \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--create \
--replication-factor 3 \
--partitions 5 \
--topic reviews
from kafka import KafkaConsumer
from topics import REVIEW_TOPIC
consumer = KafkaConsumer(
REVIEW_TOPIC,
bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
group_id='review-consumer-group',
)
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
python3 kafka_consumer.py
Python Consumer를 먼저 실행시켜주고,
Java Producer를 다음으로 실행시켜주면
다음과 같이 100개의 메시지가 produce&consume된 것을 확인할 수 있다!
