데이터센터의 위치: 샌프란시스코
상황
문제점
해결방법
상태 기반 처리에서 상태의 유형, 갱신 방법, 제거 시점 등에 따라 세밀한 제어가 필요한 경우에 필요
사용자는 스트림 처리에 필요한 모든 정보를 스파크에 저장할 수 있음
예시
카운트 기반 윈도우
from pyspark.sql import functions as F
from pyspark.sql import types as T
path = 'FileStore/tables/bin/activity-data'
spark.conf.set('spark.sql.shuffle.partitions',5)
static = spark.read.json(path)
display(static.limit(5))
Arrival_Time | Creation_Time | Device | Index | Model | User | gt | x | y | z |
---|---|---|---|---|---|---|---|---|---|
1424686735090 | 1424686733090638193 | nexus4_1 | 18 | nexus4 | g | stand | 3.356934E-4 | -5.645752E-4 | -0.018814087 |
1424686735292 | 1424688581345918092 | nexus4_2 | 66 | nexus4 | g | stand | -0.005722046 | 0.029083252 | 0.005569458 |
1424686735500 | 1424686733498505625 | nexus4_1 | 99 | nexus4 | g | stand | 0.0078125 | -0.017654419 | 0.010025024 |
1424686735691 | 1424688581745026978 | nexus4_2 | 145 | nexus4 | g | stand | -3.814697E-4 | 0.0184021 | -0.013656616 |
1424686735890 | 1424688581945252808 | nexus4_2 | 185 | nexus4 | g | stand | -3.814697E-4 | -0.031799316 | -0.00831604 |
streaming = spark.readStream.schema(static.schema).option('maxFilesPerTrigger', 10).json('/'+path)
streaming.printSchema()
root
-- Arrival_Time: long (nullable = true)
-- Creation_Time: long (nullable = true)
-- Device: string (nullable = true)
-- Index: long (nullable = true)
-- Model: string (nullable = true)
-- User: string (nullable = true)
-- gt: string (nullable = true)
-- x: double (nullable = true)
-- y: double (nullable = true)
-- z: double (nullable = true)
display(static.withColumn('event_time', (F.col('Creation_Time')/1000000000).cast(T.TimestampType())).limit(5))
Arrival_Time | Creation_Time | Device | Index | Model | User | gt | x | y | z | event_time |
---|---|---|---|---|---|---|---|---|---|---|
1424686735090 | 1424686733090638193 | nexus4_1 | 18 | nexus4 | g | stand | 3.356934E-4 | -5.645752E-4 | -0.018814087 | 2015-02-23T10:18:53.090+0000 |
1424686735292 | 1424688581345918092 | nexus4_2 | 66 | nexus4 | g | stand | -0.005722046 | 0.029083252 | 0.005569458 | 2015-02-23T10:49:41.345+0000 |
1424686735500 | 1424686733498505625 | nexus4_1 | 99 | nexus4 | g | stand | 0.0078125 | -0.017654419 | 0.010025024 | 2015-02-23T10:18:53.498+0000 |
1424686735691 | 1424688581745026978 | nexus4_2 | 145 | nexus4 | g | stand | -3.814697E-4 | 0.0184021 | -0.013656616 | 2015-02-23T10:49:41.745+0000 |
1424686735890 | 1424688581945252808 | nexus4_2 | 185 | nexus4 | g | stand | -3.814697E-4 | -0.031799316 | -0.00831604 | 2015-02-23T10:49:41.945+0000 |
#event_time 컬럼 추가
withEventTime = streaming.selectExpr('*', "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")
withEventTime.groupBy(F.window(F.col('event_time'),"10 minutes")).count()\
.writeStream.queryName('pyevents_per_window').format('memory').outputMode('complete').start()
Out[11]: <pyspark.sql.streaming.StreamingQuery at 0x7fc1f07a4780>
spark.streams.active
Out[17]: [<pyspark.sql.streaming.StreamingQuery at 0x7fc1f07c8a20>]
spark.sql('select * from pyevents_per_window order by window').show()
+--------------------+------+
window| count|
+--------------------+------+
[2015-02-22 00:40...| 35|
[2015-02-23 10:10...| 11515|
[2015-02-23 10:20...| 99178|
[2015-02-23 10:30...|100443|
[2015-02-23 10:40...| 88681|
[2015-02-23 10:50...|160775|
[2015-02-23 11:00...|106232|
[2015-02-23 11:10...| 91382|
[2015-02-23 11:20...| 75181|
[2015-02-23 12:10...| 58984|
[2015-02-23 12:20...|106291|
[2015-02-23 12:30...|100853|
[2015-02-23 12:40...| 97897|
[2015-02-23 12:50...|105160|
[2015-02-23 13:00...|165556|
[2015-02-23 13:10...|162075|
[2015-02-23 13:20...|106075|
[2015-02-23 13:30...| 96480|
[2015-02-23 13:40...|167565|
[2015-02-23 13:50...|193453|
+--------------------+------+
only showing top 20 rows
spark.sql('select * from pyevents_per_window').printSchema()
root
-- window: struct (nullable = false)
|-- start: timestamp (nullable = true)
|-- end: timestamp (nullable = true)
-- count: long (nullable = false)
#User 키 추가
withEventTime.groupBy(F.window(F.col('event_time'),'10 minutes'), 'User').count()\
.writeStream.queryName('pyevents_per_window2').format('memory').outputMode('complete').start()
Out[33]: <pyspark.sql.streaming.StreamingQuery at 0x7fc1f07b35c0>
spark.sql('select * from pyevents_per_window2 order by window,User').show()
+--------------------+----+------+
window|User| count|
+--------------------+----+------+
[2015-02-22 00:40...| a| 35|
[2015-02-23 10:10...| g| 11515|
[2015-02-23 10:20...| g| 99178|
[2015-02-23 10:30...| g|100443|
[2015-02-23 10:40...| g| 88681|
[2015-02-23 10:50...| g|160775|
[2015-02-23 11:00...| g|106232|
[2015-02-23 11:10...| g| 91382|
[2015-02-23 11:20...| g| 75181|
[2015-02-23 12:10...| c| 58984|
[2015-02-23 12:20...| c|106291|
[2015-02-23 12:30...| c|100853|
[2015-02-23 12:40...| c| 97897|
[2015-02-23 12:50...| c|105160|
[2015-02-23 13:00...| a| 66049|
[2015-02-23 13:00...| c| 99507|
[2015-02-23 13:10...| a|113530|
[2015-02-23 13:10...| c| 48545|
[2015-02-23 13:20...| a|106075|
[2015-02-23 13:30...| a| 96480|
+--------------------+----+------+
only showing top 20 rows
withEventTime.groupBy(F.window(F.col('event_time'),'10 minutes', '5 minutes')).count()\
.writeStream.queryName('events_per_window3').format('memory').outputMode('complete').start()
Out[36]: <pyspark.sql.streaming.StreamingQuery at 0x7fc2145bf358>
spark.sql('select * from events_per_window3 order by window').show()
+--------------------+------+
window| count|
+--------------------+------+
[2015-02-22 00:35...| 35|
[2015-02-22 00:40...| 35|
[2015-02-23 10:10...| 11515|
[2015-02-23 10:15...| 55686|
[2015-02-23 10:20...| 99178|
[2015-02-23 10:25...|101286|
[2015-02-23 10:30...|100443|
[2015-02-23 10:35...| 98969|
[2015-02-23 10:40...| 88681|
[2015-02-23 10:45...|132708|
[2015-02-23 10:50...|160775|
[2015-02-23 10:55...|120218|
[2015-02-23 11:00...|106232|
[2015-02-23 11:05...|101780|
[2015-02-23 11:10...| 91382|
[2015-02-23 11:15...| 92946|
[2015-02-23 11:20...| 75181|
[2015-02-23 11:25...| 29794|
[2015-02-23 12:05...| 14805|
[2015-02-23 12:10...| 58984|
+--------------------+------+
only showing top 20 rows
spark.streams.active
Out[42]: [<pyspark.sql.streaming.StreamingQuery at 0x7fc1f07a4e10>,
<pyspark.sql.streaming.StreamingQuery at 0x7fc1f07a4be0>,
<pyspark.sql.streaming.StreamingQuery at 0x7fc1f07a4f60>]
withEventTime.withWatermark('event_time','30 minutes')\
.groupBy(F.window(F.col('event_time'), '10 minutes', '5 minutes'))\
.count().writeStream.queryName('pyevents_per_window4').format('memory').outputMode('complete').start()
Out[44]: <pyspark.sql.streaming.StreamingQuery at 0x7fc1f0730630>
#중간 결과
spark.sql('select * from pyevents_per_window4 order by window').show()
+--------------------+------+
window| count|
+--------------------+------+
[2015-02-22 00:35...| 30|
[2015-02-22 00:40...| 30|
[2015-02-23 10:10...| 10082|
[2015-02-23 10:15...| 48775|
[2015-02-23 10:20...| 86822|
[2015-02-23 10:25...| 88541|
[2015-02-23 10:30...| 87831|
[2015-02-23 10:35...| 86645|
[2015-02-23 10:40...| 77644|
[2015-02-23 10:45...|116113|
[2015-02-23 10:50...|140591|
[2015-02-23 10:55...|105186|
[2015-02-23 11:00...| 92997|
[2015-02-23 11:05...| 89074|
[2015-02-23 11:10...| 79909|
[2015-02-23 11:15...| 81313|
[2015-02-23 11:20...| 65837|
[2015-02-23 11:25...| 26066|
[2015-02-23 12:05...| 12899|
[2015-02-23 12:10...| 51598|
+--------------------+------+
only showing top 20 rows
spark.sql('select * from pyevents_per_window4 order by window').show()
+--------------------+------+
window| count|
+--------------------+------+
[2015-02-22 00:35...| 35|
[2015-02-22 00:40...| 35|
[2015-02-23 10:10...| 11515|
[2015-02-23 10:15...| 55686|
[2015-02-23 10:20...| 99178|
[2015-02-23 10:25...|101286|
[2015-02-23 10:30...|100443|
[2015-02-23 10:35...| 98969|
[2015-02-23 10:40...| 88681|
[2015-02-23 10:45...|132708|
[2015-02-23 10:50...|160775|
[2015-02-23 10:55...|120218|
[2015-02-23 11:00...|106232|
[2015-02-23 11:05...|101780|
[2015-02-23 11:10...| 91382|
[2015-02-23 11:15...| 92946|
[2015-02-23 11:20...| 75181|
[2015-02-23 11:25...| 29794|
[2015-02-23 12:05...| 14805|
[2015-02-23 12:10...| 58984|
+--------------------+------+
only showing top 20 rows
withEventTime.withWatermark('event_time', '5 seconds')\
.dropDuplicates(['User', 'event_time'])\
.groupBy('User').count().writeStream\
.queryName('pydeduplicated')\
.format("memory")\
.outputMode("complete")\
.start()
Out[7]: <pyspark.sql.streaming.StreamingQuery at 0x7f2233c86358>
spark.sql("select * from pydeduplicated").collect()
Out[10]: [Row(User='a', count=80854),
Row(User='b', count=91239),
Row(User='c', count=77155),
Row(User='g', count=91673),
Row(User='h', count=77326),
Row(User='e', count=96022),
Row(User='f', count=92056),
Row(User='d', count=81245),
Row(User='i', count=92553)]
spark.sql("select * from pydeduplicated").collect()
Out[17]: [Row(User='a', count=80854),
Row(User='b', count=91239),
Row(User='c', count=77155),
Row(User='g', count=91673),
Row(User='h', count=77326),
Row(User='e', count=96286),
Row(User='f', count=92056),
Row(User='d', count=81245),
Row(User='i', count=92553)]
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String) case class UserState(user:String, var activity:String, var start:java.sql.Timestamp, var end:java.sql.Timestamp)
//업뎃 함수 정의 def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = { // no timestamp, just ignore it if (Option(input.timestamp).isEmpty) { return state } //does the activity match for the input row if (state.activity == input.activity) { if (input.timestamp.after(state.end)) { state.end = input.timestamp } if (input.timestamp.before(state.start)) { state.start = input.timestamp } } else { //some other activity if (input.timestamp.after(state.end)) { state.start = input.timestamp state.end = input.timestamp state.activity = input.activity } } //return the updated state state }
import org.apache.spark.sql.streaming.GroupStateTimeout withEventTime .selectExpr("User as user", "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity") .as[InputRow] // group the state by user key .groupByKey(_.user) .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents) .writeStream .queryName("events_per_window") .format("memory") .outputMode("update") .start()