" Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology. "
카프카 스트림즈는 토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리이다.
(출처: 아파치 카프카 애플리케이션 프로그래밍 with 자바, 책)
그런데!!!
""" 카프카 스트림즈 애플리케이션은 Java와 Scala로만 작성하고 배포할 수 있다. """
대부분의 개발 조직은 각자 상황에 맞게 인프라와 언어, 프레임워크 등의 기술 스택을 정립해서 쓰게된다. 만약 파이썬이나 Go를 주로 쓰는 조직에서 Kafka를 도입하고자 한다고 가정하자. 그런데 위에서 언급했듯이 카프카 스트림즈 애플리케이션은 Java와 Scala로만 작성이 가능하다. 그렇다면 파이썬이나 Go만 사용하는 조직은 이를 어떻게 해결해야 할까? 간단하게는 2가지 방법이 있을 수 있다. 첫번째, Java를 공부해서 도입한다. 물론 컴퓨터 전공을 한다하면 Java 정도는 충분히 할 수 있다. 하지만 조직관점에서 봤을 때는 운영을 하는데 사용하는 언어가 늘어가는 것은 분명 관리 포인트가 늘어나 부담이 된다. 그렇기 때문에 쉽지 않은 결정이다. 두번째, Kafka를 안쓴다. (하하...^^) 여튼 둘 다 좋은 방법은 아닌 듯 하다. 그래서 이 글에서는 언어에 상관없이 카프카 스트림즈를 사용할 수 있는 방법은 찾아보려고 한다.
필자는 Python이 편하기 때문에 이 글에서는 파이썬 기준으로 설명한다.
바로 ksqlDB!!
" ksqlDB is the streaming database for Apache Kafka®. With ksqlDB, you can write event streaming applications by using a lightweight SQL syntax. "
위 그림은 ksqlDB를 이해하기 딱 좋은 그림이다. ksqlDB는 카프카 스트림즈를 추상화하여 SQL 문으로 구현할 수 있게 한 것이다. 즉, SQL문을 통해 카프카 스트림즈를 사용할 수 있기 때문에 언어의 문제가 사라진다. 실제도 공식 문서를 보면, 다음과 같은 상황에서 ksqlDB를 사용하라고 한다.
" Want the power of Kafka Streams but you aren't on the JVM: use the ksqlDB REST API from Python, Go, C#, JavaScript, shell"
ksqlDB에 대해서는 별도의 글로 다루도록 하고 이 글에서는 예제를 통해 카프카 스트림즈와 ksqlDB를 비교하여 어떻게 사용하는지에 초점을 맞추려고 한다.
모든 예제는 "아파치 카프카 애플리케이션 프로그래밍 with 자바" 책에서 가져온 것이다.
코드: https://github.com/bjpublic/apache-kafka-with-java/tree/master/Chapter3/3.5%20kafka-streams
stream_log 토픽의 데이터를 stream_log_copy 토픽으로 옮겨오는 예제
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "streams-application";
private static String BOOTSTRAP_SERVERS = "localhost:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_COPY = "stream_log_copy";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(STREAM_LOG);
stream.to(STREAM_LOG_COPY);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
ksqlDB에서 데이터 정의
출처: https://docs.ksqldb.io/en/latest/reference/sql/data-definition/
" This section covers how you create the structures that store your events. ksqlDB abstracts events as rows with columns and stores them in streams and tables. "
-> 그렇기 때문에 위의 예제와 100% 동일할 수는 없다. 대신 "data" column에 log를 작성하는 예제로 변경하여 구현했다.
1) stream_log 토픽 streamLog 스트림 생성
ksql> CREATE STREAM streamLog (data VARCHAR)
WITH (kafka_topic='stream_log', value_format='json', partitions=1);
Message
----------------
Stream created
----------------
2) streamLogCopy 스트림 생성
ksql> CREATE STREAM streamLogCopy AS SELECT * FROM streamLog EMIT CHANGES;
Message
--------------------------------------------
Created query with ID CSAS_STREAMLOGCOPY_9
--------------------------------------------
3) Push Query: streamLogCopy
ksql> SELECT * from streamLogCopy EMIT CHANGES;
+----------------------------------------------------------------------------------------------------+
|DATA |
+----------------------------------------------------------------------------------------------------+
ksql> INSERT INTO streamLog (data) VALUES ('Hello');
ksql> INSERT INTO streamLog (data) VALUES ('Jimin');
ksql> SELECT * from streamLogCopy EMIT CHANGES;
+----------------------------------------------------------------------------------------------------+
|DATA |
+----------------------------------------------------------------------------------------------------+
|Hello |
|Jimin
$ kafka-console-consumer --bootstrap-server localhost:29092 --topic stream_log --from-beginning
{"DATA":"Hello"}
{"DATA":"Jimin"}
^CProcessed a total of 2 messages
$ kafka-console-consumer --bootstrap-server localhost:29092 --topic STREAMLOGCOPY --from-beginning
{"DATA":"Hello"}
{"DATA":"Jimin"}
^CProcessed a total of 2 messages
user-product 주문 정보가 들어오면 제품을 어디로 보내야할지 user-address 매핑 테이블을 보고 결정하는 예제
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Properties;
public class KStreamJoinKTable {
private static String APPLICATION_NAME = "order-join-application";
private static String BOOTSTRAP_SERVERS = "localhost:9092";
private static String ADDRESS_TABLE = "address";
private static String ORDER_STREAM = "order";
private static String ORDER_JOIN_STREAM = "order_join";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressTable, (order, address) -> order + " send to " + address).to(ORDER_JOIN_STREAM);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
1) order 스트림 생성
CREATE STREAM order (user VARCHAR, product VARCHAR)
WITH (kafka_topic='order', value_format='json', partitions=1);
Message
----------------
Stream created
----------------
2) address 테이블 생성 및 user-address 매핑 데이터 준비
ksql> CREATE TABLE address (user VARCHAR PRIMARY KEY, address VARCHAR)
WITH (kafka_topic='address', value_format='json', partitions=1);
Message
---------------
Table created
---------------
ksql> INSERT INTO address (user, address) VALUES ('Jimin', 'Seoul');
ksql> INSERT INTO address (user, address) VALUES ('John', 'San Francisco');
ksql> SELECT * FROM address EMIT CHANGES;
+------------------------------------------------------------------+------------------------------------------------------------------+
|USER |ADDRESS |
+------------------------------------------------------------------+------------------------------------------------------------------+
|Jimin |Seoul |
|John |San Francisco |
Press CTRL-C to interrupt
3) order 스트림과 address 테이블 조인
ksql> CREATE STREAM orderJoin AS
SELECT
order.user AS user,
product,
address
FROM order
LEFT JOIN address ON order.user = address.user
EMIT CHANGES;
Message
-----------------------------------------
Created query with ID CSAS_ORDERJOIN_15
-----------------------------------------
4) orderJoin 스트림을 Push Query로 데이터 확인 & order 데이터 생성
ksql> SELECT * from orderJoin EMIT CHANGES;
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|USER |PRODUCT |ADDRESS |
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
Press CTRL-C to interrupt
ksql> INSERT INTO order (user, product) VALUES ('Jimin', 'Mac book');
ksql> INSERT INTO order (user, product) VALUES ('John', 'iPad');
ksql> SELECT * from orderJoin EMIT CHANGES;
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|USER |PRODUCT |ADDRESS |
+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|Jimin |Mac book |Seoul |
|John |iPad |San Francisco |
Press CTRL-C to interrupt
공식 문서: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/
Python 라이브러리: https://github.com/bryanyang0528/ksql-python
예제: https://velog.io/@jm94318/ksqlDB-실습
공식 문서에서 소개한 ksql 라이브러리를 사용하면 다음과 같이 ksqlDB를 사용할 수 있다.
import logging
from ksql import KSQLAPI
logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://localhost:8088', timeout=None)
result = client.ksql('show tables')
print(result)
results = client.query("""SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;""")
for result in results:
print(result)
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /info HTTP/1.1" 200 133
DEBUG:root:KSQL generated: SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
[{"header":{"queryId":"transient_RIDERLOCATIONS_71790220042559025","schema":"`PROFILEID` STRING, `LATITUDE` DOUBLE, `LONGITUDE` DOUBLE"}},
{"row":{"columns":["4ab5cbad",37.3952,-122.0813]}},
{"row":{"columns":["8b6eae59",37.3944,-122.0813]}},
{"row":{"columns":["4a7c7b41",37.4049,-122.0822]}},
import logging
from ksql import KSQLAPI
logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://localhost:8088')
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);")
client.ksql("INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);")
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /info HTTP/1.1" 200 133
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
DEBUG:root:KSQL generated: INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
이것이 가능한 이유는...
ksqlDB는 클라이언트가 ksqlDB 엔진(SQL문과 쿼리문을 처리)에 접근할 수 있게 REST API 제공하기 때문이다. 파이썬 ksql 라이브러리는 내부적으로 REST API를 호출하여 SQL문을 처리하고 있다. 따라서 이 REST API를 사용하면 어떤 언어를 사용하더라도 스트림즈를 구현할 수 있다.
다음글에서는 ksqlDB의 개념과 구조, Kafka Streams와의 비교 등 좀 더 ksqlDB의 이론과 심화 내용을 다뤄보도록 할 예정이다.