[Kafka] 삽질 기록 - KafkaStreams try-with-resource 사용금지!!

유알·2023년 9월 2일
0

[Kafka]

목록 보기
4/5
package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public class SimpleStreamApplication {
    private static String APPLICATION_NAME = "streams-application";
    private static String BOOTSTRAP_SERVERS = "localhost:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_COPY = "stream_log_copy";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        builder
                .stream(STREAM_LOG)
                .to(STREAM_LOG_COPY);
        try (KafkaStreams streams = new KafkaStreams(builder.build(), props);){
            streams.start();
        }catch (Exception e) {

        }

    }
}

간단하게 설명하지만, 이것을 발견하려고 얼마나 생쇼를 했는지 누구도 모를 것이다.
집에 있는 서버로 띄워보고 로컬으로 띄워보고, 카프카, 주키퍼 로그를 다 읽어보고, 에러를 구글에 검색해도, 해결이 되지 않았다.

위 코드에서 이상한 점을 찾을 수 있겠는가?
그렇다.. 책의 예제에서는

KafkaStreams streams = new KafkaStreams(builder.build(), props);

이렇게 생으로 선언을 하였고 IDE는 나에게 try-with-resource문을 추천해주었다. 아마 AutoClosable을 구현한 객체 때문일 것이라고 생각한다..(아니 당장 그 인터페이스를 지워야한다고 생각한다. 많은 사람들에게 혼동을 줄것이다.)

그래서 나는 AutoClosable 객체이구나? 하고 냉큼 try-with-resource문을 사용하였고, 결과는 실행하자 마자 종료되는 현상이 발생했다.

약 4시간 정도의 삽질 끝에, 로그를 보다 다음 문장을 발견했다.
State transition from PENDING_SHUTDOWN to NOT_RUNNING
어라? Pending Shutdown이라는 것은,,, 정식적으로 프로세스 종료를 요청했다는 것 아닌가?
그러면서 위를 보니

[main] INFO org.apache.kafka.streams.KafkaStreams - stream-client [streams-application-c5443fe7-ad6b-472e-8e3d-27a3857ea952] State transition from REBALANCING to PENDING_SHUTDOWN

메인쓰레드에서 이런걸 호출했다.

그렇다. KafkaStreams는 별도의 쓰레드에서 돌아가는데, main쓰레드는 start를 호출하면 blocking되는 것이 아니라 계속 진행되므로, try-with-resource 문에 의해 close가 호출되는 것이다.
일단 AutoClosable은 이런데 쓰면 안된다고 생각한다.

고친 코드

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public class SimpleStreamApplication {
    private static String APPLICATION_NAME = "streams-application";
    private static String BOOTSTRAP_SERVERS = "localhost:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_COPY = "stream_log_copy";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        builder
                .stream(STREAM_LOG)
                .to(STREAM_LOG_COPY);

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.cleanUp();
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    }
}
profile
더 좋은 구조를 고민하는 개발자 입니다

0개의 댓글

Powered by GraphCDN, the GraphQL CDN