의존성 추가
프로젝트 의존성 설정
- spark-sql-kafka 에 대한 의존성 추가
- sbt, gradle, maven 등 문법에 따라 적용
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.3.0
배포시 의존성 설정
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0
- offline 환경인 경우 의존성 라이브러리를
-jars
옵션으로 추가하여 대체 가능
Kafka read, write
Kafka source 에서 read
- kafka.bootstrap.servers : broker 서버 설정
- subscribe : 읽을 topic 목록
- subscribePattern : 정규식 패턴으로 topic 설정
- assign : json 형식으로 topic 별 partition 지정
- ex) {"topicA":[0,1],"topicB":[2,4]}
- ※ subscribe, subscribePattern, assign 중 하나만 적용 가능
- 그외 optional 한 필드는 공식 Docs 참조
- 아래와 같은 스키마로 읽어들인다.
- key, value, topic, partition, offset, timestamp, timestampType, headers (optional)
def test(): Unit = {
import sparkSession.implicits._
val topicName = "mytopic"
// Kafka input 설정
val ds1 = sparkSession.readStream.format("kafka")
// broker, topic 설정
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topicName)
// serializer 설정
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// 읽기 시작할 offset 설정
.option("startingOffsets", "latest")
.load()
// value 값에 대해 처리
.select(from_json($"value".cast("string"), testSchema).alias("value"))
.select("value.*")
.withColumn("rdate", to_timestamp(substring(col("rdate"), 0, 14), "yyyyMMddHHmmss"))
Kafka 에 sink (write)
- DataFrame 스키마가 아래와 같은 스키마여야만 한다.
- key (optional) : String, binary
- value : String, binary
- headers (optional) : array
- topic (*optional) : String
- ※ 스키마에서 지정하지 않은 경우 'topic' option 으로 지정해주어야만 한다.
- partition (optional) : int
// topic 을 option 으로 지정하고 key,value DataFrame 을 Write
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// topic 이 DataFrame에 명시된 key-value DataFrame 을 write
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
간단 예제
- Kafka topic 에서 value 값을 읽어 일별로 JDBC에 sink 하는 예제
- Window 5초마다 'device_id' 기준 'priority' max 값을 처리
def test(): Unit = {
import sparkSession.implicits._
val topicName = "mytopic"
// Kafka input 설정
val ds1 = sparkSession.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topicName)
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("startingOffsets", "latest")
.load()
.select(from_json($"value".cast("string"), testSchema).alias("value"))
.select("value.*")
.withColumn("rdate", to_timestamp(substring(col("rdate"), 0, 14), "yyyyMMddHHmmss"))
// DB 설정
val driver = "org.mariadb.jdbc.Driver"
val user = "testuser"
val pwd = "testpassword"
val connectionProperties = new Properties()
connectionProperties.put("driver", driver)
connectionProperties.put("user", user)
connectionProperties.put("password", pwd)
Class.forName(driver)
val formatter = new SimpleDateFormat("yyyy_MM_dd")
val stream = ds1
.withWatermark("rdate", "5 seconds")
.filter(col("device_id").isNotNull)
.groupBy(window(col("rdate"), "5 seconds", "5 seconds"), col("device_id"))
.agg(functions.max("priority").alias("priority"))
.select("window.end", "device_id", "priority")
.writeStream
.foreachBatch( (batchDF: DataFrame, batchId: Long) =>
batchDF.write.mode("append")
.jdbc(
"jdbc:mysql://localhost:3306/testdb",
"test_tb_%s".format(formatter.format(Calendar.getInstance().getTime)),
connectionProperties)
)
.outputMode("append")
.start().awaitTermination()
}
reference