package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "streams-application";
private static String BOOTSTRAP_SERVERS = "localhost:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_COPY = "stream_log_copy";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(STREAM_LOG)
.to(STREAM_LOG_COPY);
try (KafkaStreams streams = new KafkaStreams(builder.build(), props);){
streams.start();
}catch (Exception e) {
}
}
}
간단하게 설명하지만, 이것을 발견하려고 얼마나 생쇼를 했는지 누구도 모를 것이다.
집에 있는 서버로 띄워보고 로컬으로 띄워보고, 카프카, 주키퍼 로그를 다 읽어보고, 에러를 구글에 검색해도, 해결이 되지 않았다.
위 코드에서 이상한 점을 찾을 수 있겠는가?
그렇다.. 책의 예제에서는
KafkaStreams streams = new KafkaStreams(builder.build(), props);
이렇게 생으로 선언을 하였고 IDE는 나에게 try-with-resource문을 추천해주었다. 아마 AutoClosable을 구현한 객체 때문일 것이라고 생각한다..(아니 당장 그 인터페이스를 지워야한다고 생각한다. 많은 사람들에게 혼동을 줄것이다.)
그래서 나는 AutoClosable 객체이구나? 하고 냉큼 try-with-resource문을 사용하였고, 결과는 실행하자 마자 종료되는 현상이 발생했다.
약 4시간 정도의 삽질 끝에, 로그를 보다 다음 문장을 발견했다.
State transition from PENDING_SHUTDOWN to NOT_RUNNING
어라? Pending Shutdown이라는 것은,,, 정식적으로 프로세스 종료를 요청했다는 것 아닌가?
그러면서 위를 보니
[main] INFO org.apache.kafka.streams.KafkaStreams - stream-client [streams-application-c5443fe7-ad6b-472e-8e3d-27a3857ea952] State transition from REBALANCING to PENDING_SHUTDOWN
메인쓰레드에서 이런걸 호출했다.
그렇다. KafkaStreams는 별도의 쓰레드에서 돌아가는데, main쓰레드는 start를 호출하면 blocking되는 것이 아니라 계속 진행되므로, try-with-resource 문에 의해 close가 호출되는 것이다.
일단 AutoClosable은 이런데 쓰면 안된다고 생각한다.
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "streams-application";
private static String BOOTSTRAP_SERVERS = "localhost:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_COPY = "stream_log_copy";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(STREAM_LOG)
.to(STREAM_LOG_COPY);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}