Spark 완벽 가이드 ch5. 구조적 API 연산

Q·2023년 1월 11일
1

Spark 완벽 가이드

목록 보기
6/24
  • DataFrame의 구성
    • Row 타입의 레코드
    • 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼
  • 스키마: 각 컬럼명과 데이터 타입을 정의
  • 파티셔닝: DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의
  • 파티셔닝 스키마: 파티션을 배치하는 방법을 정의
path="/FileStore/tables/2015_summary.json"
#DataFrame 생성
df= spark.read.format('json').load(path)
#스키마 출력
df.printSchema()
root 
-- DEST_COUNTRY_NAME: string (nullable = true) 
-- ORIGIN_COUNTRY_NAME: string (nullable = true) 
-- count: long (nullable = true)

스키마

  • 데이터소스에서 스키마를 얻거나 직접 정의 가능
  • 데이터를 읽기 전에 스키마를 정의해야 하는지 여부는 상황에 따라 달라짐
    • ETL 작업에 스파크를 사용한다면 직접 스키마를 정의해야함
    • ETL 작업 중 데이터 타입을 알기 힘든 CSV나 JSON 등의 데이터소스를 사용하는 경우, 스키마 추론 과정에서 읽어 들인 샘플로 스키마를 결정해버릴 수 있음
spark.read.format('json').load(path).schema
Out[4]: StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))
  • 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체임
  • StructField는 이름, 데이터 타입, 값이 null일 수 있는지 지정하는 boolean 값을 가짐
from pyspark.sql import types as T
#직접 스키마 정의
myManualSchema = T.StructType([
  T.StructField('DEST_COUNTRY_NAME', T.StringType(), True),
  T.StructField('ORIGIN_COUNTRY_NAME', T.StringType(), True),
  T.StructField('count', T.LongType(), False, metadata={'hello':'world'})
])
#스키마 적용
df2 = spark.read.format('json').schema(myManualSchema).load(path)

컬럼과 표현식

  • 스파크의 컬럼은 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조
    • 표현식으로 컬럼을 선택, 조작, 제거 가능
  • 따라서 컬럼의 실젯값을 얻으려면 로우가 필요하고,
  • 로우를 얻으려면 DataFrame이 필요하다.
  • DataFrame을 통하지 않으면 외부에서 컬럼에 접근할 수 없다.
  • 컬럼 내용을 수정하려면 반드시 DataFrame의 스파크 트랜스포메이션을 사용해야함

컬럼

  • col함수나 column함수를 사용하여 컬럼을 생성하고 참조
from pyspark.sql import functions as F
#col함수로 컬럼 생성
F.col('someColumnName')
Out[7]: Column<'someColumnName'>
  • 컬럼이 DataFrame에 있을지 없을지는 알 수 없다.
  • 컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태
  • 4장에서 본 것 처럼, 분석기가 동작하는 단계에서 컬럼과 테이블을 분석함

명시적 컬럼 참조

  • DataFrame의 컬럼은 col 메서드로 참조함
  • col메서드를 사용해 명시적으로 컬럼을 정의하면 스파크는 분석기 실행 단계에서 컬럼 확인 절차를 생략함

표현식

  • 앞서 컬럼은 표현식이라고 했음 -> 표현식이란?
  • DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합
  • 여러 컬럼명을 입력으로 받아 식별하고, '단일 값'을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수
    • 여기서 단일 값은 Map이나 Array 같은 복합 데이터 타입일 수 있음
  • expr함수로 간단히 표현식 사용 가능
    • expr함수의 인수로 표현식을 사용하면 표현식을 분석해서 트랜스포메이션과 컬럼 참조를 알아낼 수 있음
F.expr("(((someCol + 5)*200) - 6) < otherCol")
Out[8]: Column<'((((someCol + 5) * 200) - 6) < otherCol)'>
  • 위 표현식은 실행 시점에 아래와 같은 논리적 트리로 컴파일됨
  • 어차피 실행 시점에 논리 트리로 컴파일되므로 DataFrame코드나 SQL코드나 위와 같은 표현식을 작성할 수 있고, 같은 성능을 발휘함

DataFrame 컬럼에 접근하기

  • printSchema메서드로 전체 컬럼 정보를 확인할 수 있지만 프로그래밍 방식으로 컬럼에 접근할 땐 df의 columns 속성을 사용
df.columns
Out[9]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

레코드와 로우

  • 스파크에서 DataFrame의 각 로우는 하나의 레코드
  • 스파크는 레코드를 Row 객체로 표현
  • Row 객체는 내부에 바이트 배열을 가짐
  • 바이트 배열 인터페이스는 오직 컬럼 표현식으로만 다룰 수 있으므로 사용자에게 절대 노출되지 않음
#first메서드로 로우 확인
df.first()
Out[10]: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

로우 생성하기

  • Row 객체는 스키마 정보를 갖고 있지 않음
  • DataFrame만 유일하게 스키마를 가짐
  • 따라서 Row객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야함
from pyspark.sql import Row

myRow = Row("hello", None, 1, False)
myRow[0]
Out[12]: 'hello'
myRow[2]
Out[13]: 1

DataFrame의 트랜스포메이션

DataFrame 생성하기

myManualSchema=T.StructType([
  T.StructField("some", T.StringType(), True),
  T.StructField("col", T.StringType(), True),
  T.StructField("names", T.LongType(), True),
])
myRow = Row("hello", None,1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
+-----+----+-----+ 
some| col|names| 
+-----+----+-----+ 
hello|null| 1| 
+-----+----+-----+

select와 selectExpr

  • select와 selectExpr메서드를 사용하면 데이터 테이블에 SQL을 실행하는 것 처럼 DataFrame에서도 SQL 사용 가능
#select dest_country_name from table limit 2
df.select("DEST_COUNTRY_NAME").show(2)
+-----------------+ 
DEST_COUNTRY_NAME| 
+-----------------+ 
United States| 
United States| 
+-----------------+ 
only showing top 2 rows
#select dest_country_name, origin_country_name from table limit 2
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
+-----------------+-------------------+ 
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| 
+-----------------+-------------------+ 
United States| Romania| 
United States| Croatia| 
+-----------------+-------------------+ 
only showing top 2 rows
#expr함수로 컬럼 참조하기
#expr함수는 단순 컬럼 참조나 문자열을 이용해 컬럼을 참조할 수 있음
df.select(F.expr("dest_country_name as destination")).show(2)
+-------------+ 
destination| 
+-------------+ 
United States| 
United States| 
+-------------+ 
only showing top 2 rows
  • 위처럼 select 메서드에 expr함수를 사용하는 패턴을 자주 사용함

-> 이를 간단하고 효율적으로 할 수 있는 메서드가 selectExpr

selectExpr

  • 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구
df.selectExpr("dest_country_name as destination").show(2)
+-------------+ 
destination| 
+-------------+ 
United States| 
United States| 
+-------------+ 
only showing top 2 rows
#예제1: 출발지와 도착지가 같은지 나타내는 withCountry컬럼 추가
df.selectExpr('*', '(dest_country_name = origin_country_name) as withinCountry').show(2)
+-----------------+-------------------+-----+-------------+ 
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry| 
+-----------------+-------------------+-----+-------------+ 
United States| Romania| 15| false| 
United States| Croatia| 1| false| 
+-----------------+-------------------+-----+-------------+ 
only showing top 2 rows
#예제2: 컬럼에 대한 집계 함수 사용
df.selectExpr("avg(count)", "count(distinct(dest_country_name))").show(2)
+-----------+---------------------------------+ 
avg(count)|count(DISTINCT dest_country_name)| 
+-----------+---------------------------------+ 
1770.765625| 132| 
+-----------+---------------------------------+

리터럴(literal)

  • 때로는 새로운 컬럼이 아닌 명시적인 값(상숫값)을 스파크에 전달해야함
    • 생성된 변숫값이 특정 컬럼의 값보다 큰지 확인할 때
  • literal은 프로그래밍 언어의 리터럴값을 스파크가 이해할 수 있는 값으로 변환함
df.selectExpr('*', '1 as one').show(5)
+-----------------+-------------------+-----+---+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one| 
+-----------------+-------------------+-----+---+ 
United States| Romania| 15| 1| 
United States| Croatia| 1| 1| 
United States| Ireland| 344| 1| 
Egypt| United States| 15| 1| 
United States| India| 62| 1| 
+-----------------+-------------------+-----+---+ 
only showing top 5 rows
df.select('*', F.lit(1).alias('one')).show(5)
+-----------------+-------------------+-----+---+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one| 
+-----------------+-------------------+-----+---+ 
United States| Romania| 15| 1| 
United States| Croatia| 1| 1| 
United States| Ireland| 344| 1| 
Egypt| United States| 15| 1| 
United States| India| 62| 1| 
+-----------------+-------------------+-----+---+ 
only showing top 5 rows

withColumn

  • DataFrame에 신규 컬럼을 추가
 df.withColumn('withinCountry', F.expr('origin_country_name == dest_country_name')).show(5)
+-----------------+-------------------+-----+-------------+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry| 
+-----------------+-------------------+-----+-------------+ 
United States| Romania| 15| false| 
United States| Croatia| 1| false| 
United States| Ireland| 344| false| 
Egypt| United States| 15| false| 
United States| India| 62| false| 
+-----------------+-------------------+-----+-------------+ 
only showing top 5 rows

withColumnRenamed

  • 컬럼명 변경
df.withColumnRenamed('dest_country_name', 'dest').columns
Out[31]: ['dest', 'ORIGIN_COUNTRY_NAME', 'count']

예약 문자와 키워드

  • 공백이나 하이픈(-) 같은 예약 문자는 컬럼명에 사용할 수 없음
  • 백틱(')문자를 이용해 이스케이핑 가능

withColumn메서드의 인자로 전달할 땐 이스케이프 문자 필요 없음

dfWithLongColName = df.withColumn(
  'This Long Column-Name',
  F.expr('origin_country_name')
)
dfWithLongColName.columns
Out[33]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'This Long Column-Name']

표현식으로 컬럼을 참조할 땐 이스케이프 문자 필요

dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")\
  .show(2)
+---------------------+-------+ 
This Long Column-Name|new col| 
+---------------------+-------+ 
Romania|Romania| 
Croatia|Croatia| 
+---------------------+-------+ 
only showing top 2 rows
  • 표현식 대신 문자열을 사용해서 명시적으로 컬럼을 참조하면 리터럴로 해석되기에 예약 문자가 포함된 컬럼을 참조할 수 있는 것임

대소문자 구분

  • 기본적으로 스파크는 대소문자를 가리지 않음
  • 다음과 같은 설정으로 스파크에서 대소문자를 구분하게 만들 수 있음

set spark.sql.caseSensitive=true

spark.sql('set spark.sql.caseSensitive=true')
Out[59]: DataFrame[key: string, value: string]

확인

df.columns
Out[60]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
df.select('COUNT').show(1)
--------------------------------------------------------------------------- 
AnalysisException Traceback (most recent call last) 
<command-2616171458017162> in <module> 
----> 1 df.select('COUNT').show(1)

/databricks/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 
1690 [Row(name='Alice', age=12), Row(name='Bob', age=15)] 
1691 """ -> 
1692 jdf = self._jdf.select(self._jcols(*cols)) 
1693 return DataFrame(jdf, self.sql_ctx) 
1694 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
1302 
1303 answer = self.gateway_client.send_command(command) 
-> 1304 return_value = get_return_value( 
1305 answer, self.gateway_client, self.target_id, self.name) 
1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 
121 # Hide where the exception came from that shows a non-Pythonic 
122 # JVM exception message. 
--> 123 raise converted from None 
124 else: 
125 raise 

AnalysisException: cannot resolve '`COUNT`' given input columns: [DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count]; 
'Project ['COUNT] 
+- Relation[DEST_COUNTRY_NAME#13,ORIGIN_COUNTRY_NAME#14,count#15L] json
df.select('count').show(1)
+-----+ 
count| 
+-----+ 
15| 
+-----+ 
only showing top 1 row
  • 대소문자 구분함

컬럼 제거하기

  • drop 메서드
dfWithLongColName.drop('ORIGIN_COUNTRY_NAME', 'DEST_COUNTRY_NAME')
Out[69]: DataFrame[count: bigint, This Long Column-Name: string]

컬럼의 데이터 타입 변경하기

  • cast 메서드
df.withColumn('count2', F.col('count').cast(T.StringType()))
Out[70]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: string]

로우 필터링하기

  • 로우를 필터링하려면 참과 거짓을 판별하는 표현식을 만들어야함
  • 그러면 표현식의 결과가 false인 로우를 걸러낼 수 있음
  • where나 filter메서드로 필터링 가능
df.filter(F.col('count')<2).show(2)
+-----------------+-------------------+-----+ 
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| 
+-----------------+-------------------+-----+ 
United States| Croatia| 1| 
United States| Singapore| 1| 
+-----------------+-------------------+-----+ 
only showing top 2 rows
df.where('count<2').show(2)
+-----------------+-------------------+-----+ 
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| 
+-----------------+-------------------+-----+ 
United States| Croatia| 1| 
United States| Singapore| 1| 
+-----------------+-------------------+-----+ 
only showing top 2 rows
  • 스파크는 자동으로 필터의 순서와 상관없이 동시에 모든 필터링 작업을 수행하기 때문에 항상 유용한 것은 아님
  • 따라서 여러 개의 AND 필터를 지정하려면 차례대로 필터를 연결하고 판단은 스파크에 맡겨야함

고유한 로우 얻기

  • distinct 메서드
df.select('ORIGIN_COUNTRY_NAME').distinct().count()
Out[75]: 125

무작위 샘플 만들기

  • sample 메서드
df.sample(withReplacement=False, fraction=0.5, seed=1).count()
Out[79]: 135
  • 복원 추출 여부: False
  • 표본 데이터 추출 비율:0.5

임의 분할하기

  • randomSplit 메서드
  • 학습 / 검증 / 테스트 셋을 만들 때 주로 사용
tmp_df = df.randomSplit([0.25,0.75],seed=1)
tmp_df[0].count()
Out[77]: 77
tmp_df[1].count()
Out[78]: 179

로우 합치기와 추가하기

  • union 메서드
  • 통합하려는 두 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야함
  • 또한, union 메서드는 컬럼 위치를 기반으로 동작함
schema= df.schema
newRows= [
  Row('new_country', 'other_country', 5),
  Row('new_country_2', 'otheR_country_3', 1)
]

parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)\
.where('count=1')\
.where(F.col('ORIGIN_COUNTRY_NAME') != 'United States').show()
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| 
+-----------------+-------------------+-----+ 
United States| Croatia| 1| 
United States| Singapore| 1| 
United States| Gibraltar| 1| 
United States| Cyprus| 1| 
United States| Estonia| 1| 
United States| Lithuania| 1| 
United States| Bulgaria| 1| 
United States| Georgia| 1| 
United States| Bahrain| 1| 
United States| Papua New Guinea| 1| 
United States| Montenegro| 1| 
United States| Namibia| 1| 
new_country_2| otheR_country_3| 1| 
+-----------------+-------------------+-----+

로우 정렬하기

  • sort나 orderBy메서드
df.sort('count').show(3)
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| 
+-----------------+-------------------+-----+ 
Moldova| United States| 1| 
United States| Singapore| 1| 
United States| Croatia| 1| 
+-----------------+-------------------+-----+ 
only showing top 3 rows
df.orderBy(F.desc('count'), 'DEST_COUNTRY_NAME').show(3)
+-----------------+-------------------+------+ 
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count| 
+-----------------+-------------------+------+ 
United States| United States|370002| 
United States| Canada| 8483| 
Canada| United States| 8399| 
+-----------------+-------------------+------+ 
only showing top 3 rows

로우 수 제한하기

  • limit 메서드
df.limit(5).show()
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| 
+-----------------+-------------------+-----+ 
United States| Romania| 15| 
United States| Croatia| 1| 
United States| Ireland| 344| 
Egypt| United States| 15| 
United States| India| 62| 
+-----------------+-------------------+-----+

repartition과 coalesce

  • 최적화 기법 중, 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방법이 있다.
    • 이를 통해 파티셔닝 스키마와 파티션 수 등 클러스터 전반의 물리적 데이터 구성을 제어할 수 있다.
  • repartition 메서드로 파티션 재분배 가능

    • 전체 데이터를 셔플함
    • 향후에 파티션 수가 현재 파티션 수보다 많을 때나
    • 컬럼을 기준으로 파티션을 만드는 경우에만 사용
  • coalesce 메서드로 전체 데이터를 셔플하지 않고 파티션 병합 가능

df.rdd.getNumPartitions()
Out[100]: 1
#파티션 수 지정
df.repartition(5)
Out[99]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
#특정 컬럼 기준 파티션 재분배
df.repartition(F.col('DEST_COUNTRY_NAME'))
Out[102]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
#특정 컬럼을 기준으로 셔플을 수행해서 5개의 파티션으로 나누고, 
#전체 데이터를 셔플 없이 병합
df.repartition(5, col('DEST_COUNTRY_NAME')).coalesce(2)

드라이버로 로우 데이터 수집하기

  • 스파크는 드라이버에서 클러스터 상태 정보를 유지한다.
  • collect 메서드: 데이터프레임의 모든 데이터 수집
    • 대규모 데이터셋에 수행하면 드라이버가 비정상적으로 종료될 수 있음
  • take 메서드: 상위 N개의 row 반환
  • show 메서드: 여러 row를 보기 좋게 출력
collectDF = df.limit(10)
collectDF.collect()
Out[108]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), 
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344), 
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15), 
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62), 
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1), 
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62), 
Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588), 
Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40), 
Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]
collectDF.take(5)
Out[105]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), 
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), 
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344), 
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15), 
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]
collectDF.show(5)
+-----------------+-------------------+-----+ 
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| 
+-----------------+-------------------+-----+ 
United States| Romania| 15| 
United States| Croatia| 1| 
United States| Ireland| 344| 
Egypt| United States| 15| 
United States| India| 62| 
+-----------------+-------------------+-----+ 
only showing top 5 rows
collectDF.toLocalIterator()
  • toLocalIterator 메서드로 데이터셋의 파티션을 차례로 반복 처리 가능
    • iterator로 모든 파티션의 데이터를 드라이버에 전달
    • 병렬 연산 X -> 매우 큰 처리 비용 발생
profile
Data Engineer

0개의 댓글