기존 트랜스포메이션 개념과 거의 같음
근데 몇 가지 제약사항은 있음
액션은 단 한 가지
싱크로 스트림의 결과를 저장할 목적지를 명시함
싱크와 실행 엔진은 데이터 처리의 진행 상황을 신뢰도 있고 정확하게 추적하는 역할을 함
지원하는 출력용 싱크(버전 3.1)
싱크를 정의하기 위해서는 데이터를 출력하는 방법도 정의해야함
지원하는 출력 모드(버전 3.1)
특정 쿼리와 싱크는 일부 출력모드만 지원함
트리거는 데이터 출력 시점을 정의함
기본적으로는 마지막 입력 데이터를 처리한 직후에 신규 데이터를 확인하고 결과를 갱신함
처리 시간(고정된 주기로만 신규 데이터 탐색)기반의 트리거도 지원
이벤트 시간 기준의 처리도 지원함
이 처리 방식은 무작위로 도착한 레코드 내부에 기록된 타임스탬프를 기준으로함
두 가지 핵심 아이디어
path = 'FileStore/tables/bin/activity-data'
static = spark.read.json(path)
dataSchema = static.schema
dataSchema
Out[3]: StructType(List(StructField(Arrival_Time,LongType,true),StructField(Creation_Time,LongType,true),StructField(Device,StringType,true),StructField(Index,LongType,true),StructField(Model,StringType,true),StructField(User,StringType,true),StructField(gt,StringType,true),StructField(x,DoubleType,true),StructField(y,DoubleType,true),StructField(z,DoubleType,true)))
display(static.take(5))
Arrival_Time | Creation_Time | Device | Index | Model | User | gt | x | y | z |
---|---|---|---|---|---|---|---|---|---|
1424686735090 | 1424686733090638193 | nexus4_1 | 18 | nexus4 | g | stand | 3.356934E-4 | -5.645752E-4 | -0.018814087 |
1424686735292 | 1424688581345918092 | nexus4_2 | 66 | nexus4 | g | stand | -0.005722046 | 0.029083252 | 0.005569458 |
1424686735500 | 1424686733498505625 | nexus4_1 | 99 | nexus4 | g | stand | 0.0078125 | -0.017654419 | 0.010025024 |
1424686735691 | 1424688581745026978 | nexus4_2 | 145 | nexus4 | g | stand | -3.814697E-4 | 0.0184021 | -0.013656616 |
1424686735890 | 1424688581945252808 | nexus4_2 | 185 | nexus4 | g | stand | -3.814697E-4 | -0.031799316 | -0.00831604 |
#read 대신 readStream 메서드
streaming = spark.readStream.schema(dataSchema).option('maxFilesPerTrigger',1).json('/'+path)
#트랜스포메이션 정의
activityCounts = streaming.groupBy('gt').count()
#액션 정의
#쿼리 결과를 내보낼 목적지나 싱크를 지정해야함
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format('memory').outputMode('complete').start()#.awaitTermination()
#실행중인 스트림 목록 확인
spark.streams.active
Out[8]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3e3c88>]
from time import sleep
#스트리밍 쿼리의 결과를 1초마다 출력
for _ in range(3):
spark.sql('select * from activity_counts').show()
sleep(1)
+----------+-----+
gt|count|
+----------+-----+
stairsup|41829|
sit|49238|
stand|45539|
walk|53019|
bike|43185|
stairsdown|37455|
null|41785|
+----------+-----+
+----------+-----+
gt|count|
+----------+-----+
stairsup|41829|
sit|49238|
stand|45539|
walk|53019|
bike|43185|
stairsdown|37455|
null|41785|
+----------+-----+
+----------+-----+
gt|count|
+----------+-----+
stairsup|41829|
sit|49238|
stand|45539|
walk|53019|
bike|43185|
stairsdown|37455|
null|41785|
+----------+-----+
from pyspark.sql import functions as F
simpleTransform = streaming.withColumn('stairs', F.expr("gt like '%stairs%'"))\
.where('stairs')\
.where('gt is not null')\
.select('gt', 'model', 'arrival_time', 'creation_time')\
.writeStream.queryName('simple_transform')\
.format('memory')\
.outputMode('append').start()
#이제 두 개의 스트림이 실행 중
spark.streams.active
Out[11]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3a8240>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3a8be0>]
spark.sql('select distinct(gt) from simple_transform').show()
+----------+
gt|
+----------+
stairsup|
stairsdown|
+----------+
for _ in range(3):
spark.sql('select * from simple_transform').limit(3).show()
sleep(1)
+--------+------+-------------+-------------------+
gt| model| arrival_time| creation_time|
+--------+------+-------------+-------------------+
stairsup|nexus4|1424785980959|1424787826278414112|
stairsup|nexus4|1424785981157|1424785984637538373|
stairsup|nexus4|1424785981366|1424785984843806684|
+--------+------+-------------+-------------------+
+--------+------+-------------+-------------------+
gt| model| arrival_time| creation_time|
+--------+------+-------------+-------------------+
stairsup|nexus4|1424697948398|1424699794447729066|
stairsup|nexus4|1424697948595|1424699794648961976|
stairsup|nexus4|1424697948810|1424697946816230086|
+--------+------+-------------+-------------------+
+----------+------+-------------+-------------------+
gt| model| arrival_time| creation_time|
+----------+------+-------------+-------------------+
stairsdown|nexus4|1424700667634|1424702513688330618|
stairsdown|nexus4|1424700667837|1424702513889631513|
stairsdown|nexus4|1424700668035|1424702514090864423|
+----------+------+-------------+-------------------+
deviceModelStats = streaming.cube('gt', 'model').avg()\
.drop("avg(Arrival_Time)")\
.drop("avg(Creation_Time)")\
.drop('avg(Index)')\
.writeStream.queryName('device_counts').format('memory')\
.outputMode('complete').start()
spark.streams.active
Out[15]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6898>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b67f0>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6748>]
for _ in range(3):
spark.sql('select * from device_counts').limit(5).show()
sleep(1)
+----------+------+--------------------+--------------------+--------------------+
gt| model| avg(x)| avg(y)| avg(z)|
+----------+------+--------------------+--------------------+--------------------+
null| null|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...|
stairsdown| null| 0.02425847993757609|-0.03760274457414847| 0.12621213764276576|
stairsdown|nexus4| 0.02425847993757609|-0.03760274457414847| 0.12621213764276576|
null|nexus4|-0.01085041855216...|0.003269388401703...|0.007623247528790201|
stand| null|-4.85009687984190...|7.843164314448882E-5| 6.53799922292491E-4|
+----------+------+--------------------+--------------------+--------------------+
+-----+------+--------------------+--------------------+--------------------+
gt| model| avg(x)| avg(y)| avg(z)|
+-----+------+--------------------+--------------------+--------------------+
null|nexus4|-0.01085041855216...|0.003269388401703...|0.007623247528790201|
null|nexus4|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...|
null| null|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...|
bike|nexus4|0.023557159293850122| -0.0103710797474206|-0.08150749488259697|
stand| null|-4.85009687984190...|7.843164314448882E-5| 6.53799922292491E-4|
+-----+------+--------------------+--------------------+--------------------+
+-----+------+--------------------+--------------------+--------------------+
gt| model| avg(x)| avg(y)| avg(z)|
+-----+------+--------------------+--------------------+--------------------+
stand| null|-4.85009687984190...|7.843164314448882E-5| 6.53799922292491E-4|
sit| null|-6.03518642992934...|3.315789799252582...|1.607174773661537E-4|
walk|nexus4|-0.00477321663226...|0.007647625398144233|1.359592299788926E-4|
null|nexus4|-0.01085041855216...|0.003269388401703...|0.007623247528790201|
null| null|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...|
+-----+------+--------------------+--------------------+--------------------+
historicalAgg = static.groupBy('gt', 'model').avg()
deviceModelStats = streaming.drop('Arrival_Time', 'Creation_Time','Index')\
.cube('gt', 'model').avg()\
.join(historicalAgg, ['gt','model'])\
.writeStream.queryName('device_counts2').format('memory')\
.outputMode('complete').start()
spark.streams.active
Out[18]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6fd0>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6eb8>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6128>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6e48>]
for _ in range(3):
spark.sql('select * from device_counts2').show()
sleep(1)
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
gt| model| avg(x)| avg(y)| avg(z)| avg(Arrival_Time)| avg(Creation_Time)| avg(Index)| avg(x)| avg(y)| avg(z)|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
bike|nexus4|0.021949482812608244|-0.00749751739872...|-0.08270261150639441|1.424751134339993...|1.424752127369581...| 326459.6867328154| 0.02268875955086689|-0.00877912156368...|-0.08251001663412375|
walk|nexus4|-0.00285732139643...|0.003373111639646...|-0.00155088920916...|1.424746420641788...|1.424747351060679...|149760.09974990616|-0.00390116006094...|0.001052508689953713|-6.95435553042992...|
stairsdown|nexus4| 0.02262820081168685|-0.03079508201793...| 0.12088580480524012|1.424744591412858...|1.424745503635642...|230452.44623187225|0.021613908669165325|-0.03249018824752615| 0.12035922691504057|
sit|nexus4|-6.44713381746548...|2.614244124546443E-4|-1.55316137749796...|1.424741207868231...|1.424742112220357...| 74577.84690275553|-5.49433244039586...|2.791446281700070...|-2.33994461689889...|
stand|nexus4|-3.89372639367013...|3.238874567024346...|2.323141750435506...|1.424743637921212...|1.424744579547465...|31317.877585550017|-3.11082189691723...| 3.21846166597532E-4|2.141300040636485E-4|
null|nexus4|-0.00981882283597...|-0.00189753526302...|0.003903477859879...|1.424749002876348E12|1.424749919482132...| 219276.9663669269|-0.00847688860109...|-7.30455258739180...|0.003090601491419...|
stairsup|nexus4|-0.02424235941716...|-0.00542728576322...|-0.10011072208887009|1.424745996101166E12|1.424746915892742...|227912.96550673083|-0.02479965287771...|-0.00800392344379...| -0.100340884150604|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
gt| model| avg(x)| avg(y)| avg(z)| avg(Arrival_Time)| avg(Creation_Time)| avg(Index)| avg(x)| avg(y)| avg(z)|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
bike|nexus4|0.021949482812608244|-0.00749751739872...|-0.08270261150639441|1.424751134339993...|1.424752127369581...| 326459.6867328154| 0.02268875955086689|-0.00877912156368...|-0.08251001663412375|
walk|nexus4|-0.00285732139643...|0.003373111639646...|-0.00155088920916...|1.424746420641788...|1.424747351060679...|149760.09974990616|-0.00390116006094...|0.001052508689953713|-6.95435553042992...|
stairsdown|nexus4| 0.02262820081168685|-0.03079508201793...| 0.12088580480524012|1.424744591412858...|1.424745503635642...|230452.44623187225|0.021613908669165325|-0.03249018824752615| 0.12035922691504057|
sit|nexus4|-6.44713381746548...|2.614244124546443E-4|-1.55316137749796...|1.424741207868231...|1.424742112220357...| 74577.84690275553|-5.49433244039586...|2.791446281700070...|-2.33994461689889...|
stand|nexus4|-3.89372639367013...|3.238874567024346...|2.323141750435506...|1.424743637921212...|1.424744579547465...|31317.877585550017|-3.11082189691723...| 3.21846166597532E-4|2.141300040636485E-4|
null|nexus4|-0.00981882283597...|-0.00189753526302...|0.003903477859879...|1.424749002876348E12|1.424749919482132...| 219276.9663669269|-0.00847688860109...|-7.30455258739180...|0.003090601491419...|
stairsup|nexus4|-0.02424235941716...|-0.00542728576322...|-0.10011072208887009|1.424745996101166E12|1.424746915892742...|227912.96550673083|-0.02479965287771...|-0.00800392344379...| -0.100340884150604|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
gt| model| avg(x)| avg(y)| avg(z)| avg(Arrival_Time)| avg(Creation_Time)| avg(Index)| avg(x)| avg(y)| avg(z)|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
bike|nexus4|0.021949482812608244|-0.00749751739872...|-0.08270261150639441|1.424751134339993...|1.424752127369581...| 326459.6867328154| 0.02268875955086689|-0.00877912156368...|-0.08251001663412375|
walk|nexus4|-0.00285732139643...|0.003373111639646...|-0.00155088920916...|1.424746420641788...|1.424747351060679...|149760.09974990616|-0.00390116006094...|0.001052508689953713|-6.95435553042992...|
stairsdown|nexus4| 0.02262820081168685|-0.03079508201793...| 0.12088580480524012|1.424744591412858...|1.424745503635642...|230452.44623187225|0.021613908669165325|-0.03249018824752615| 0.12035922691504057|
sit|nexus4|-6.44713381746548...|2.614244124546443E-4|-1.55316137749796...|1.424741207868231...|1.424742112220357...| 74577.84690275553|-5.49433244039586...|2.791446281700070...|-2.33994461689889...|
stand|nexus4|-3.89372639367013...|3.238874567024346...|2.323141750435506...|1.424743637921212...|1.424744579547465...|31317.877585550017|-3.11082189691723...| 3.21846166597532E-4|2.141300040636485E-4|
null|nexus4|-0.00981882283597...|-0.00189753526302...|0.003903477859879...|1.424749002876348E12|1.424749919482132...| 219276.9663669269|-0.00847688860109...|-7.30455258739180...|0.003090601491419...|
stairsup|nexus4|-0.02424235941716...|-0.00542728576322...|-0.10011072208887009|1.424745996101166E12|1.424746915892742...|227912.96550673083|-0.02479965287771...|-0.00800392344379...| -0.100340884150604|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
가장 간단한 소스는 파일 소스
실전에서는 파케이, 텍스트, json, csv를 자주 사용
스트리밍에서 파일 소스/싱크와 정적 파일 소스를 사용할 때 유일한 차이점
아파치 카프카는 데이터 스트림을 위한 발행-구독(Publish-Subscribe) 방식의 분산형 시스템
발행된 메시지는 내고장성을 보장하는 저장소에 저장됨
레코드의 스트림은 토픽이라 불리는 카테고리에 저장함
메시지를 읽기 위해 첫 번째로 해야 할 일은, 다음 옵션 중 하나를 선택하는 것
두 번째로 해야 할 일은, 카프카 서비스에 접속할 수 있도록 kafka.bootstrap.servers 값을 지정하는 것
카프카에서 데이터를 읽거나 쓸 땐 json이나 avro를 자주 사용함
#topic1 수신
df1 = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('subscribe', 'topic1').load()
#여러 토픽 수신
df2 = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('subscribe', 'topic1,topic2').load()
#패턴에 맞는 토픽 수신
df3 = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('subscribePattern', 'topic.*').load()
path
Out[32]: 'FileStore/tables/bin/activity-data'
#selecExpr에서 토픽 지정하는 방법
#'topic'은 df1의 컬럼명을 의미함 즉, topic 컬럼의 값은 위에서 설정한대로 'topic1'이므로 topic1을 읽게 되는 것임
df1.selectExpr('topic', 'CAST(key as STRING)', 'CAST(value as STRING)')\
.writeStream.format('kafka').option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('checkpointLocation', '/FileStore/tables/bin/to/HDFS-compatible/dir').start()
Out[34]: <pyspark.sql.streaming.StreamingQuery at 0x7f503b3c3d30>
#option()에서 토픽 지정하는 방법
df1.selectExpr('CAST(key as STRING)', 'CAST(value as STRING)')\
.writeStream.format('kafka').option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('checkpointLocation', '/FileStore/tables/bin/to/HDFS-compatible/dir')\
.option('topic','topic1').start()
Out[35]: <pyspark.sql.streaming.StreamingQuery at 0x7f5058b1e518>
#데이터를 읽기 위한 호스트와 포트 지정
socketDF = spark.readStream.format('socket').option('host', 'localhost').option('port',9999).load()
#5초 주기
activityCounts.writeStream.trigger(processingTime='5 seconds')\
.format('console').outputMode('complete').start()
Out[8]: <pyspark.sql.streaming.StreamingQuery at 0x7f634b17fcc0>
activityCounts.writeStream.trigger(once=True)\
.format('console').outputMode('complete').start()
Out[9]: <pyspark.sql.streaming.StreamingQuery at 0x7f634b2ede48>