[Apache Kafka] 4 - JAVA와 연동하기(Producer / Consumer)

leeng·2024년 7월 10일
0

kafka

목록 보기
4/4

Apache Kafka와 JAVA를 연동하는 간단한 예제 코드를 알아보자.

1. JAVA 코드로 Producer 만들기

public class ProducerJAVA {
	// 로그 설정
    private static final Logger log = LoggerFactory.getLogger(ProducerDemoKeys.class.getSimpleName());

    public static void main(String[] args) {
        // create Producer Properties
        Properties properties = new Properties();

// 부트스트랩 서버 설정 
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // properties.setProperty( "bootstrap.servers", "127.0.0.1:9092"); 로도 설정 가능

// Key serializer 설정  
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // properties.setProperty("key.serializer", StringSerializer.class.getName()); 로도 설정 가능

// Value serializer 설정        
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // properties.setProperty("value.serializer", StringSerializer.class.getName()); 로도 설정 가능

        // create the producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        String topic = "demo_java"; // 토픽 설정

        for (int j = 0; j < 3; j++) {
            for (int i = 0; i < 10; i++) {
                String key = "id_" + i; // 키 설정 (생략 가능)
                String value = "hello world " + i; // value 설정

                ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
                producer.send(record,
                        (metadata, exception) -> { // 메시지 전송 callback
                            if(exception == null) {
                                log.info("key: {}", key);
                                log.info("partition: {}", metadata.partition());
                            } else {
                                log.error("Error!! ", exception);
                            }
                        });
            }
        }

        producer.flush(); // 카프카 메세지 전송은 비동기 방식이기 때문에 플러시 안해주면 전송하기 전에 프로그램 끝나버림
        producer.close();
    }
}


2. JAVA 코드로 Consumer 만들기

public class ConsumerJAVA {
	// 로그 설정
    private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());

    public static void main(String[] args) {
        String groupId = "my-java-application"; // 컨슈머 그룹 ID 설정
        String topic = "wikimedia.recentchange.connect"; // 토픽 설정
        
        // create Consumer Properties
        Properties properties = new Properties();
        
        // 부트스트랩 서버 설정
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
       
        // Key deserializer 설정
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        
        // value deserializer 설정
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        
        // 컨슈머 그룹 ID 설정
        properties.setProperty("group.id", groupId);
        
        // offset reset 설정
        // latest -> 지금부터 보낸 메세지만 읽겠다 / earliest -> 현재 컨슈머 그룹의 커밋된 오프셋을 찾지 못할 때 가장 처음의 오프셋부터 메시지를 읽겠다
        properties.setProperty("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic)); // 구독할 토픽 설정. 복수 개의 토픽 설정 가능함.

		// 메시지 폴링 루프
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 최대 1000 밀리초만큼 기다리면서 브로커로부터 메시지를 가져옴
            for (ConsumerRecord<String, String> record : records) {
                log.info("Key: {} , Value: {} ", record.key(), record.value());
                log.info("Partition: {} , Offset: {} ", record.partition(), record.offset());
            }
        }
    }
}
profile
기술블로그보다는 기록블로그

0개의 댓글