Quick Start
- https://ksqldb.io/quickstart.html
- 실습 환경은 위에 링크에서 각자 환경에 맞게 셋팅하면 됨 (여기서는 docker로 환경 셋팅함)
시나리오
- Mountain View 주변의 라이더 찾기
- Mountain View 위치 (lat, long): (37.4133, -122.1162)
- 주변 기준 : 5마일 / 10마일
라이더의 위치에 대한 스트림
ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
WITH (kafka_topic='locations', value_format='json', partitions=1);
Message
----------------
Stream created
----------------
key | value |
---|---|
profileId | 라이더 ID |
latitude | 위도 |
longitude | 경도 |
parameter | description |
---|---|
kafka_topic | 스트림을 흘려줄 topic의 이름 토픽이 없으면 생성하고 이미 있으면 해당 토픽에 스트림 생성 |
value_format | json {"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205} |
partitions | 토픽 생성시 지정한 파티션 수 이미 토픽이 생성돼 있다면 필요 없는 파라미터 |
라이더의 현재 위치에 대한 구체화 뷰 (materialized view)
ksql> CREATE TABLE currentLocation AS
SELECT profileId,
LATEST_BY_OFFSET(latitude) AS la,
LATEST_BY_OFFSET(longitude) AS lo
FROM riderlocations
GROUP BY profileId
EMIT CHANGES;
Message
----------------------------------------------
Created query with ID CTAS_CURRENTLOCATION_3
-------------------------T---------------------
...
라이더 스트림 정보 입력
...
ksql> SELECT * from currentLocation;
>
+--------------------------------+--------------------------------+--------------------------------+
|PROFILEID |LA |LO |
+--------------------------------+--------------------------------+--------------------------------+
|18f4ea86 |37.3903 |-122.0643 |
|4a7c7b41 |37.4049 |-122.0822 |
|4ab5cbad |37.3952 |-122.0813 |
|4ddad000 |37.7857 |-122.4011 |
|8b6eae59 |37.3944 |-122.0813 |
|c2309eec |37.7877 |-122.4205 |
Query terminated
얼마나 많은 라이더가 MountainView에서 얼마나 떨어져 있는지 보여주는 구체화 뷰 (materialized view)
ksql> CREATE TABLE ridersNearMountainView AS
> SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
> COLLECT_LIST(profileId) AS riders,
> COUNT(*) AS count
> FROM currentLocation
> GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Message
-----------------------------------------------------
Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------
Mountain View에서 5마일 이내에 들어온 라이더의 위치의 스트림 정보를 가져옴
Push query: 실시간으로 output이 나옴
아래와 같이 select 문 실행 후 insert하면 Mountain View와 5마일 이내에 라이더가 들어오면 output row가 추가된다.
ksql> SELECT * FROM riderLocations
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+--------------------------------+--------------------------------+--------------------------------+
|4ab5cbad |37.3952 |-122.0813 |
|8b6eae59 |37.3944 |-122.0813 |
|4a7c7b41 |37.4049 |-122.0822 |
Press CTRL-C to interrupt
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
>INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
현재 Mountain View에서 10마일 이내 라이더의 위치의 스트림 정보를 가져옴
Pull query: 기존의 SQL과 비슷 (traditional request-response model)
ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
+--------------------------------+--------------------------------+--------------------------------+
|DISTANCEINMILES |RIDERS |COUNT |
+--------------------------------+--------------------------------+--------------------------------+
|0.0 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3 |
|10.0 |[18f4ea86] |1 |
Query terminated