Apache Spark에서 flatMap 함수는 복잡한 데이터 구조를 단순화하여 분석과 쿼리 작업을 용이하게 만드는 핵심 기능 중 하나이다. flatMap은 각 입력 요소를 여러 출력 요소로 변환할 수 있으며, 주로 중첩된 컬렉션을 단일 수준의 컬렉션으로 평탄화하는 데 사용된다.
[[1, 2], [3, 4]]
의 배열에 flatMap을 적용하면 [1, 2, 3, 4]
로 평탄화.변환 기능: 각 입력 요소를 0개 이상의 출력으로 매핑한다. 예를 들어, 중첩된 리스트 또는 배열의 각 요소를 독립적인 요소로 분리하여 평탄한 리스트를 생성한다.
데이터 평탄화: flatMap은 중첩된 데이터 구조를 단일 배열로 평탄화하여 복잡한 쿼리를 단순화하고, 데이터 액세스를 빠르고 효율적으로 만든다.
Flattener 코드는 중첩된 데이터 구조(예: JSON 또는 Parquet 파일)에서 평탄화 프로세스를 자동화하여 Spark DataFrame에서 쉽게 사용할 수 있게 구성
{
"eventTime": "2024-05-25T00:01:05.210581234",
"eventType": "PRODUCT1",
"id": 123456789,
"data": {
"product": {
"pid": 99999999,
"uid": 66666666,
"name": "테스트 데이터",
"price": 3280,
"location": "서울",
}
}
}
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
// Spark 세션 초기화
val spark = SparkSession.builder()
.appName("Data Flattening Example")
.master("local[*]")
.getOrCreate()
// JSON 데이터 읽기
val jsonData = spark.read.json("path_to_json_file.json")
// 스키마 평탄화
val flattenedColumns = ExampleFlattener.flatten(jsonData.schema, maxDepth = 4)
val flattenedDataFrame = jsonData.select(flattenedColumns: _*)
// 결과 출력
flattenedDataFrame.show()
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, to_json}
import org.apache.spark.sql.types._
object ExampleFlattener {
case class ColumnInfo(name: String, dataType: DataType)
def flatten(
schema: StructType,
maxDepth: Int,
leafColumns: Set[String] = Set.empty
): Array[Column] = {
val flattened = flattenColumns(
schema,
depth = 1,
maxDepth = maxDepth,
leafColumns = leafColumns
)
flattened.map { column =>
val alias = column.name.replace(".", "__")
column.dataType match {
case _: StructType | _: ArrayType | _: MapType => to_json(col(column.name)).as(alias)
case _ => col(column.name).cast(StringType).as(alias)
}
}
}
private def flattenColumns(
schema: StructType,
depth: Int,
maxDepth: Int,
prefix: String = null,
leafColumns: Set[String] = Set.empty
): Array[ColumnInfo] = {
if (depth >= maxDepth) {
schema.fields.map(field => ColumnInfo(getColumnName(prefix, field.name), field.dataType))
} else {
schema.fields.flatMap(field => {
val columnInfo = ColumnInfo(getColumnName(prefix, field.name), field.dataType)
field.dataType match {
case st: StructType =>
flattenColumns(st, depth + 1, maxDepth, columnInfo.name, leafColumns)
case _ => Array(columnInfo)
}
})
}
}
private def getColumnName(prefix: String, name: String): String =
if (prefix == null) name else s"$prefix.$name"
}
데이터가 아래의 형태인 경우,
{"person":
{"name": "John", "address":
{"street": "123 Main St", "city": "New York"}
}
}
평탄화는 다음과 같이 진행될 수 있다
person_name: "John"
person_address_street: "123 Main St"
person_address_city: "New York"