Spark FlatMap

Q·2024년 5월 29일
0

FlatMap

Apache Spark에서 flatMap 함수는 복잡한 데이터 구조를 단순화하여 분석과 쿼리 작업을 용이하게 만드는 핵심 기능 중 하나이다. flatMap은 각 입력 요소를 여러 출력 요소로 변환할 수 있으며, 주로 중첩된 컬렉션을 단일 수준의 컬렉션으로 평탄화하는 데 사용된다.

  • 예시: [[1, 2], [3, 4]]의 배열에 flatMap을 적용하면 [1, 2, 3, 4]로 평탄화.

FlatMap의 기능

  • 변환 기능: 각 입력 요소를 0개 이상의 출력으로 매핑한다. 예를 들어, 중첩된 리스트 또는 배열의 각 요소를 독립적인 요소로 분리하여 평탄한 리스트를 생성한다.

  • 데이터 평탄화: flatMap은 중첩된 데이터 구조를 단일 배열로 평탄화하여 복잡한 쿼리를 단순화하고, 데이터 액세스를 빠르고 효율적으로 만든다.

예시 Flattener 코드

Flattener 코드는 중첩된 데이터 구조(예: JSON 또는 Parquet 파일)에서 평탄화 프로세스를 자동화하여 Spark DataFrame에서 쉽게 사용할 수 있게 구성

예시 JSON 데이터

{
  "eventTime": "2024-05-25T00:01:05.210581234",
  "eventType": "PRODUCT1",
  "id": 123456789,
  "data": {
    "product": {
      "pid": 99999999,
      "uid": 66666666,
      "name": "테스트 데이터",
      "price": 3280,
      "location": "서울",
    }
  }
}

spark executor

import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType

// Spark 세션 초기화
val spark = SparkSession.builder()
  .appName("Data Flattening Example")
  .master("local[*]")
  .getOrCreate()

// JSON 데이터 읽기
val jsonData = spark.read.json("path_to_json_file.json")

// 스키마 평탄화
val flattenedColumns = ExampleFlattener.flatten(jsonData.schema, maxDepth = 4)
val flattenedDataFrame = jsonData.select(flattenedColumns: _*)

// 결과 출력
flattenedDataFrame.show()

Flattener

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, to_json}
import org.apache.spark.sql.types._

object ExampleFlattener {

  case class ColumnInfo(name: String, dataType: DataType)

  def flatten(
      schema: StructType,
      maxDepth: Int,
      leafColumns: Set[String] = Set.empty
  ): Array[Column] = {
    val flattened = flattenColumns(
      schema,
      depth = 1,
      maxDepth = maxDepth,
      leafColumns = leafColumns
    )

    flattened.map { column =>
      val alias = column.name.replace(".", "__")
      column.dataType match {
        case _: StructType | _: ArrayType | _: MapType => to_json(col(column.name)).as(alias)
        case _ => col(column.name).cast(StringType).as(alias)
      }
    }
  }

  private def flattenColumns(
      schema: StructType,
      depth: Int,
      maxDepth: Int,
      prefix: String = null,
      leafColumns: Set[String] = Set.empty
  ): Array[ColumnInfo] = {
    if (depth >= maxDepth) {
      schema.fields.map(field => ColumnInfo(getColumnName(prefix, field.name), field.dataType))
    } else {
      schema.fields.flatMap(field => {
        val columnInfo = ColumnInfo(getColumnName(prefix, field.name), field.dataType)
        field.dataType match {
          case st: StructType =>
            flattenColumns(st, depth + 1, maxDepth, columnInfo.name, leafColumns)
          case _ => Array(columnInfo)
        }
      })
    }
  }

  private def getColumnName(prefix: String, name: String): String =
    if (prefix == null) name else s"$prefix.$name"
}
  1. ExampleFlattener
  • 재귀적으로 스키마의 각 필드를 순회하면서 데이터 타입에 따라 적절한 처리를 수행
  • StructType, ArrayType, MapType 등의 복합 데이터 타입은 재귀적으로 더 깊이 탐색하며, maxDepth에 도달하거나 기타 조건에 부합할 경우 재귀를 중단한다.
  • 최대 깊이(maxDepth)를 지정함으로써 얼마나 깊게 탐색할지 제한할 수 있으며, 특정 리프 노드에서 중단할 수도 있다.
  1. 병합 과정
  • flattenColumns 함수는 스키마를 입력으로 받아 각 필드를 순회하면서 ColumnInfo 객체를 생성한다.
  • 중첩된 필드는 재귀적으로 탐색되며, flatMap을 사용하여 각 필드의 결과를 하나의 리스트로 병합한다.
  • 이 병합은 flatMap이 각 요소의 출력(여기서는 중첩 배열 또는 단일 요소)을 연속된 리스트로 결합하기 때문에 가능하다.
  1. 결과 생성
  • 최종 결과는 모든 필드의 ColumnInfo 객체를 포함하는 배열
  • 이 배열은 데이터프레임의 새로운 컬럼 구성
  • 각 중첩된 구조는 해당 경로 이름을 변경하고 JSON 문자열로 변환하여 평탄화

예상 결과

데이터가 아래의 형태인 경우,

{"person": 
	{"name": "John", "address": 
    	{"street": "123 Main St", "city": "New York"}
    }
}

평탄화는 다음과 같이 진행될 수 있다

person_name: "John"
person_address_street: "123 Main St"
person_address_city: "New York"
  • 이 평탄화는 복잡한 데이터 구조를 간단하게 만들어 분석 및 보고에 이상적인 형태로 만든다.
  • 데이터의 중첩 깊이나 구조에 따라 적절한 maxDepth 설정이 중요하며, 이를 통해 처리 성능과 메모리 사용을 최적화할 수 있다.
profile
Data Engineer

0개의 댓글