[Spark Structured Streaming] Kafka read, write

Woong·2022년 6월 23일
0

Apache Spark

목록 보기
15/22
post-custom-banner

의존성 추가

프로젝트 의존성 설정

  • spark-sql-kafka 에 대한 의존성 추가
    • sbt, gradle, maven 등 문법에 따라 적용
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.3.0

배포시 의존성 설정

  • --packages 옵션을 통해 지정
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 
  • offline 환경인 경우 의존성 라이브러리를 -jars 옵션으로 추가하여 대체 가능

Kafka read, write

Kafka source 에서 read

  • kafka.bootstrap.servers : broker 서버 설정
  • subscribe : 읽을 topic 목록
    • subscribePattern : 정규식 패턴으로 topic 설정
    • assign : json 형식으로 topic 별 partition 지정
      • ex) {"topicA":[0,1],"topicB":[2,4]}
    • ※ subscribe, subscribePattern, assign 중 하나만 적용 가능
  • 그외 optional 한 필드는 공식 Docs 참조
  • 아래와 같은 스키마로 읽어들인다.
    • key, value, topic, partition, offset, timestamp, timestampType, headers (optional)
def test(): Unit = {
        import sparkSession.implicits._

		val topicName = "mytopic"
		// Kafka input 설정
        val ds1 = sparkSession.readStream.format("kafka")
        	// broker, topic 설정
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", topicName)
            // serializer 설정
            .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            // 읽기 시작할 offset 설정
            .option("startingOffsets", "latest")
            .load()
            // value 값에 대해 처리
            .select(from_json($"value".cast("string"), testSchema).alias("value"))
            .select("value.*")
            .withColumn("rdate", to_timestamp(substring(col("rdate"), 0, 14), "yyyyMMddHHmmss"))

Kafka 에 sink (write)

  • DataFrame 스키마가 아래와 같은 스키마여야만 한다.
    • key (optional) : String, binary
    • value : String, binary
    • headers (optional) : array
    • topic (*optional) : String
      • ※ 스키마에서 지정하지 않은 경우 'topic' option 으로 지정해주어야만 한다.
    • partition (optional) : int
// topic 을 option 으로 지정하고 key,value DataFrame 을 Write
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// topic 이 DataFrame에 명시된 key-value DataFrame 을 write
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

간단 예제

  • Kafka topic 에서 value 값을 읽어 일별로 JDBC에 sink 하는 예제
    • Window 5초마다 'device_id' 기준 'priority' max 값을 처리
def test(): Unit = {
        import sparkSession.implicits._

		val topicName = "mytopic"
		// Kafka input 설정
        val ds1 = sparkSession.readStream.format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", topicName)
            .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            .option("startingOffsets", "latest")
            .load()
            .select(from_json($"value".cast("string"), testSchema).alias("value"))
            .select("value.*")
            .withColumn("rdate", to_timestamp(substring(col("rdate"), 0, 14), "yyyyMMddHHmmss"))
            
		// DB 설정
        val driver = "org.mariadb.jdbc.Driver"
        val user = "testuser"
        val pwd = "testpassword"
        
        val connectionProperties = new Properties()
        connectionProperties.put("driver", driver)
        connectionProperties.put("user", user)
        connectionProperties.put("password", pwd)
        
        Class.forName(driver)

        val formatter = new SimpleDateFormat("yyyy_MM_dd")


        val stream = ds1
            .withWatermark("rdate", "5 seconds")
            .filter(col("device_id").isNotNull)
            .groupBy(window(col("rdate"), "5 seconds", "5 seconds"), col("device_id"))
            .agg(functions.max("priority").alias("priority"))
            .select("window.end", "device_id", "priority")
            .writeStream
            .foreachBatch( (batchDF: DataFrame, batchId: Long) =>
                batchDF.write.mode("append")
                    .jdbc(
                        "jdbc:mysql://localhost:3306/testdb",
                        "test_tb_%s".format(formatter.format(Calendar.getInstance().getTime)),
                        connectionProperties)
            )
            .outputMode("append")
            .start().awaitTermination()
    }

reference

post-custom-banner

0개의 댓글