Spark 완벽 가이드 ch8. 조인

Q·2023년 1월 17일
0

Spark 완벽 가이드

목록 보기
9/24
#일단 데이터셋 생성
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")

조인 표현식

  • 왼쪽과 오른쪽 데이터셋에 있는 하나 이상의 키값을 비교하고 왼쪽 데이터셋과 오른쪽 데이터셋의 결합 여부 결정

조인 타입

  • 결과 데이터셋에 어떤 데이터가 있어야 하는지 결정
  • 종류
    • 내부 조인
    • 외부 조인
    • 왼쪽 외부 조인
    • 오른쪽 외부 조인
    • 왼쪽 세미 조인
    • 왼쪽 안티 조인
    • 자연 조인
    • 교차 조인

내부 조인

  • 참으로 평가되는 로우만 결합
joinExpression = person["graduate_program"] == graduateProgram['id']
wrongJoinExpression = person["name"] == graduateProgram["school"]
joinType = "inner"
#디폴트 조인타입은 내부조인
person.join(graduateProgram, joinExpression, joinType).show()
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+ 
id| name|graduate_program| spark_status| id| degree| department| school| 
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+ 
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley| 
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
person.join(graduateProgram, wrongJoinExpression, joinType).show()
+---+----+----------------+------------+---+------+----------+------+
id|name|graduate_program|spark_status| id|degree|department|school| 
+---+----+----------------+------------+---+------+----------+------+ 
+---+----+----------------+------------+---+------+----------+------+
  • 데이터프레임 모두에 키가 존재하지 않으면 비어 있는 결과 데이터프레임을 얻게 됨

외부 조인

  • 참이나 거짓으로 평가한 로우 포함
  • 왼쪽이나 오른쪽 데이터프레임에 일치하는 로우가 없으면 해당 위치에 null삽입
joinType='outer'
person.join(graduateProgram, joinExpression, joinType).show()
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+ 
id| name|graduate_program| spark_status| id| degree| department| school| 
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+ 
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley| 
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
null| null| null| null| 2|Masters| EECS|UC Berkeley| 
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+

왼쪽 외부 조인

  • 왼쪽 데이터프레임의 모든 로우와 왼쪽 데이터프레임과 일치하는 오른쪽 데이터프레임의 로우를 함께 포함
  • 오른쪽 데이터프레임에 일치하는 로우가 없다면 해당 위치에 null삽입
joinType ='left_outer'
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+ 
id| degree| department| school| id| name|graduate_program| spark_status| 
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+ 
0|Masters|School of Informa...|UC Berkeley| 0| Bill Chambers| 0| [100]| 
1| Ph.D.| EECS|UC Berkeley| 1| Matei Zaharia| 1|[500, 250, 100]| 
1| Ph.D.| EECS|UC Berkeley| 2|Michael Armbrust| 1| [250, 100]| 
2|Masters| EECS|UC Berkeley|null| null| null| null| 
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+

오른쪽 외부 조인

  • 오른쪽 데이터프레임의 모든 로우와 오른쪽 데이터프레임과 일치하는 왼쪽 데이터프레임의 로우를 함께 포함
  • 왼쪽 데이터프레임에 일치하는 로우가 없다면 해당 위치에 null삽입
joinType ='right_outer'
person.join(graduateProgram, joinExpression, joinType).show()
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+ 
id| name|graduate_program| spark_status| id| degree| department| school| 
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+ 
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley| 
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
null| null| null| null| 2|Masters| EECS|UC Berkeley| 
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+

왼쪽 세미 조인

  • 오른쪽 데이터프레임의 어떤 값도 포함하지 않으므로 다른 조인타입과 약간 다름
    • 기존 조인 기능과 달리 데이터프레임 필터 정도로 볼 수 있음
  • 오른쪽 데이터프레임은 단지 값의 존재 여부만 확인하는 용도
    • 값이 존재하면 왼쪽 데이터프레임에 중복 키가 존재하더라도 해당 로우는 결과에 포함
joinType ='left_semi'
gradProgram2 = graduateProgram.union(spark.createDataFrame([
    (0, "Masters", "Duplicated Row", "Duplicated School")]))
gradProgram2.join(person, joinExpression, joinType).show()
+---+-------+--------------------+-----------------+ 
id| degree| department| school| 
+---+-------+--------------------+-----------------+ 
0|Masters|School of Informa...| UC Berkeley| 
1| Ph.D.| EECS| UC Berkeley| 
0|Masters| Duplicated Row|Duplicated School| 
+---+-------+--------------------+-----------------+

왼쪽 안티 조인

  • 왼쪽 세미 조인의 반대
  • 오른쪽 데이터프레임에서 관련된 키를 찾을 수 없는 로우만 결과에 포함
joinType ='left_anti'
gradProgram2.join(person, joinExpression, joinType).show()
+---+-------+----------+-----------+ 
id| degree|department| school| 
+---+-------+----------+-----------+ 
2|Masters| EECS|UC Berkeley| 
+---+-------+----------+-----------+

자연 조인

  • 조인하려는 컬럼을 암시적으로 추정
  • 일치하는 컬럼을 찾고 그 결과를 반환함
  • 암시적인 처리는 언제나 위험하다는 것 명심
    ex) graduatePrograme의 id와 person의 id는 컬럼명은 같지만 서로 다른 의미인데 이걸 기준으로 잡아버림
  • 데이터프레임의 join메서드는 지원X 링크

교차 조인(카테시안 조인)

  • 조건절을 기술하지 않은 내부 조인
  • 왼쪽 데이터프레임의 모든 로우를 오른쪽 데이터프레임의 모든 로우와 결합
joinType ='cross'
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+ 
id| degree| department| school| id| name|graduate_program| spark_status| 
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+ 
0|Masters|School of Informa...|UC Berkeley| 0| Bill Chambers| 0| [100]| 
1| Ph.D.| EECS|UC Berkeley| 1| Matei Zaharia| 1|[500, 250, 100]| 
1| Ph.D.| EECS|UC Berkeley| 2|Michael Armbrust| 1| [250, 100]| +---+-------+--------------------+-----------+---+----------------+----------------+---------------+

조인 사용 시 문제점

복합 데이터 타입의 조인

  • 어려울 것 같지만 불리언을 반환하는 모든 표현식은 조인 표현식으로 간주할 수 있음
from pyspark.sql import functions as F
#person 데이터프레임의 id컬럼을 personId로 바꿔줌(중복 컬럼명 처리)
person.withColumnRenamed('id', 'personId').join(sparkStatus, F.expr('array_contains(spark_status,id)')).show()
+--------+----------------+----------------+---------------+---+--------------+ 
personId| name|graduate_program| spark_status| id| status| 
+--------+----------------+----------------+---------------+---+--------------+ 
0| Bill Chambers| 0| [100]|100| Contributor| 
1| Matei Zaharia| 1|[500, 250, 100]|500|Vice President| 
1| Matei Zaharia| 1|[500, 250, 100]|250| PMC Member| 
1| Matei Zaharia| 1|[500, 250, 100]|100| Contributor| 
2|Michael Armbrust| 1| [250, 100]|250| PMC Member| 
2|Michael Armbrust| 1| [250, 100]|100| Contributor| 
+--------+----------------+----------------+---------------+---+--------------+

중복 컬럼명 처리

  • 조인을 수행할 때 가장 까다로운 것은 결과 데이터프레임에서 중복된 컬럼명을 다루는 것
    • 각 컬럼은 스파크 SQL 엔진인 카탈리스트 내에 고유 id가 존재하지만 직접 참조할 수 있는 값은 아님
gradProgramDupe= graduateProgram.withColumnRenamed('id', 'graduate_program') #일부러 person의 컬럼과 같게 바꿔보기
joinExpr=gradProgramDupe['graduate_program'] == person['graduate_program']
person.join(gradProgramDupe, joinExpr).select('graduate_program').show()
--------------------------------------------------------------------------- 
AnalysisException Traceback (most recent call last) <command-3827894466668178> in <module> 1 
joinExpr=gradProgramDupe['graduate_program'] == person['graduate_program'] ----> 
2 person.join(gradProgramDupe, joinExpr).select('graduate_program').show() 
/databricks/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1690 [Row(name='Alice', age=12), Row(name='Bob', age=15)] 1691 
""" -> 1692 jdf = self._jdf.select(self._jcols(*cols)) 1693 return DataFrame(jdf, self.sql_ctx) 
1694 /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = 
get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 
121 # Hide where the exception came from that shows a non-Pythonic 122 # JVM exception message. --> 123 
raise converted from None 124 else: 125 raise AnalysisException: Reference 'graduate_program' is 
ambiguous, could be: graduate_program, graduate_program.
  • AnalysisException 뜸

해결 방법1: 다른 조인 표현식 사용

person.join(gradProgramDupe, 'graduate_program').show() 
+----------------+---+----------------+---------------+-------+--------------------+-----------+ 
graduate_program| id| name| spark_status| degree| department| school| 
+----------------+---+----------------+---------------+-------+--------------------+-----------+ 
0| 0| Bill Chambers| [100]|Masters|School of Informa...|UC Berkeley| 
1| 1| Matei Zaharia|[500, 250, 100]| Ph.D.| EECS|UC Berkeley| 
1| 2|Michael Armbrust| [250, 100]| Ph.D.| EECS|UC Berkeley| 
+----------------+---+----------------+---------------+-------+--------------------+-----------+
  • 그냥 컬럼명을 문자열로 전달하면 중복된 두 컬럼 중 하나가 자동으로 제거됨

해결 방법2: 조인 후 컬럼 제거

person.join(gradProgramDupe, joinExpr).drop(person['graduate_program']).show() #조인 후 person의 컬럼 제거
  • 이 방법은 스파크의 SQL 분석 프로세스의 특성을 활용함
  • 스파크는 명시적으로 참조된 컬럼(person['graduate_program'])을 검증하지 않으므로 스파크 코드 분석 단계를 통과함.

해결 방법3: 조인 전 컬럼명 변경

gradProgram3 = gradProgramDupe.withColumnRenamed('graduate_program', 'grad_id')
joinExpr2= person['graduate_program'] == gradProgram3['grad_id']
person.join(gradProgram3, joinExpr2).show()
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+ 
id| name|graduate_program| spark_status|grad_id| degree| department| school| 
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+ 
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley| 
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley| 
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+

스파크의 조인 수행 방식

  • 스파크가 조인을 수행하는 방식을 이해하기 위해서는 실행에 필요한 두 가지 핵심 전략을 이해해야함
    • 노드간 네트워크 통신 전략
    • 노드별 연산 전략
  • 수행 방식을 이해하면 빠르게 완료되는 작업과 절대 완료되지 않는 작업 간의 차이를 알 수 있음

네트워크 통신 전략

  • 스파크는 조인 시 두 가지 클러스터 통신 방식을 활용함
    • 셔플 조인
    • 브로드캐스트 조인

Case1: 큰 테이블과 큰 테이블 조인

  • 셔플 조인 발생


- 조인에 사용한 특정 키나 키 집합을 어떤 노드가 가졌는지에 따라 해당 노드와 데이터를 공유함 - 데이터가 잘 나뉘어 있지 않다면 네트워크가 복잡하고 많은 자원을 사용함

Case2: 큰 테이블과 작은 테이블 조인

  • 작은 테이블 기준: 테이블이 단일 워커 노드의 메모리 크기(메모리 여유 공간 포함)에 적합할 정도
  • 조인 연산 최적화 가능 -> 브로드캐스트 조인


- 작은 테이블을 클러스터의 전체 워커 노드에 각각 복제 - 그래서 너무 큰 테이블이면 안됨 - 시작 시 단 한 번만 복제가 수행되며, 이후에는 노드들 끼리 통신할 필요 없이 각자 작업 수행 - 큰 테이블의 각 파티션과 작은테이블의 조인 수행 - 개별적으로 조인이 수행되므로 CPU가 가장 큰 병목 구간이 됨
#DataFrame API는 옵티마이저에서 브로드캐스트 조인을 사용할 수 있도록 broadcast함수를 통해 힌트를 줄 수 있음
person.join(F.broadcast(gradProgram3), joinExpr2).explain()
== Physical Plan == 
AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [graduate_program#16L], 
[grad_id#1202L], Inner, BuildRight, false :- Project [_1#6L AS id#14L, _2#7 AS name#15, _3#8L AS 
graduate_program#16L, _4#9 AS spark_status#17] : +- Filter isnotnull(_3#8L) : +- Scan 
ExistingRDD[_1#6L,_2#7,_3#8L,_4#9] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
bigint, true]),false), [id=#3738] +- Project [_1#22L AS grad_id#1202L, _2#23 AS degree#31, _3#24 
AS department#32, _4#25 AS school#33] +- Filter isnotnull(_1#22L) +- Scan ExistingRDD[_1#22L,_2#23,_3#24,_4#25]

Case3: 아주 작은 테이블 사이의 조인

  • 이 경우는 그냥 스파크가 조인 방식을 결정하도록 내버려두기
profile
Data Engineer

0개의 댓글