개요
- Spark Structured Streaming 에서는 여러 streaming aggregations 를 지원하지 않는다.
- 에러 메시지
Multiple streaming aggregations are not supported with streaming DataFrames/Datasets
무엇이 안되는 것인가?
- 아래처럼 여러번 transform을 할 수 없다.
* Stream input 을 읽어서
* DataFrame1 로 transform 하고
* 이를 DataFrame2 로 transform <-- 지원 X
- 여러 종류의 aggregation 은 당연히 지원된다.
dataFrame.agg(
functions.max("priority").alias("priority"),
sum("cnt").alias("cnt")
)
foreachBatch 를 활용한 대응 방안
- foreachBatch 를 통해 추가적인 aggregation 을 적용하는 방안
ds1
.withWatermark("rdate", "5 seconds")
.filter(col("device_id").isNotNull)
.groupBy(window(col("rdate"), "5 seconds", "5 seconds"), col("group_id"), col("device_id"))
.agg(
functions.last("priority").alias("priority"),
sum("category").alias("category")
)
.select("window.end", "group_id", "device_id", "priority", "category")
.writeStream
.foreachBatch( (batchDF: DataFrame, batchId: Long) =>
sumGroupPriority(batchDF, batchId)
)
.outputMode("append")
.start()
.awaitTermination()
- 2번째 transform 을 foreachBatch 에 등록할 함수에서 지정
def sumGroupPriority(df:DataFrame, batchId:Long): Unit = {
val formatter = new SimpleDateFormat("yyyy_MM_dd")
val connectionProperties = new Properties()
connectionProperties.put("driver", "org.mariadb.jdbc.Driver")
connectionProperties.put("user", "myuser")
connectionProperties.put("password", "mypassword")
val subDataFrame = df.groupBy("end", "group_id")
.agg(sum("priority").alias("priority"),
sum("category").alias("category")
)
.select("end", "group_id", "priority", "category")
.withColumnRenamed("end", "rdate")
subDataFrame.write.mode("append")
.jdbc(
"jdbc:mysql://localhost:3306/test",
"test_tb_%s".format(
formatter.format(Calendar.getInstance().getTime)),
connectionProperties)
}
reference