person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")
조인 표현식
- 왼쪽과 오른쪽 데이터셋에 있는 하나 이상의 키값을 비교하고 왼쪽 데이터셋과 오른쪽 데이터셋의 결합 여부 결정
조인 타입
- 결과 데이터셋에 어떤 데이터가 있어야 하는지 결정
- 종류
- 내부 조인
- 외부 조인
- 왼쪽 외부 조인
- 오른쪽 외부 조인
- 왼쪽 세미 조인
- 왼쪽 안티 조인
- 자연 조인
- 교차 조인
내부 조인
joinExpression = person["graduate_program"] == graduateProgram['id']
wrongJoinExpression = person["name"] == graduateProgram["school"]
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
id| name|graduate_program| spark_status| id| degree| department| school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley|
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley|
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
person.join(graduateProgram, wrongJoinExpression, joinType).show()
+---+----+----------------+------------+---+------+----------+------+
id|name|graduate_program|spark_status| id|degree|department|school|
+---+----+----------------+------------+---+------+----------+------+
+---+----+----------------+------------+---+------+----------+------+
- 데이터프레임 모두에 키가 존재하지 않으면 비어 있는 결과 데이터프레임을 얻게 됨
외부 조인
- 참이나 거짓으로 평가한 로우 포함
- 왼쪽이나 오른쪽 데이터프레임에 일치하는 로우가 없으면 해당 위치에 null삽입
joinType='outer'
person.join(graduateProgram, joinExpression, joinType).show()
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
id| name|graduate_program| spark_status| id| degree| department| school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley|
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley|
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley|
null| null| null| null| 2|Masters| EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
왼쪽 외부 조인
- 왼쪽 데이터프레임의 모든 로우와 왼쪽 데이터프레임과 일치하는 오른쪽 데이터프레임의 로우를 함께 포함
- 오른쪽 데이터프레임에 일치하는 로우가 없다면 해당 위치에 null삽입
joinType ='left_outer'
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
id| degree| department| school| id| name|graduate_program| spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
0|Masters|School of Informa...|UC Berkeley| 0| Bill Chambers| 0| [100]|
1| Ph.D.| EECS|UC Berkeley| 1| Matei Zaharia| 1|[500, 250, 100]|
1| Ph.D.| EECS|UC Berkeley| 2|Michael Armbrust| 1| [250, 100]|
2|Masters| EECS|UC Berkeley|null| null| null| null|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
오른쪽 외부 조인
- 오른쪽 데이터프레임의 모든 로우와 오른쪽 데이터프레임과 일치하는 왼쪽 데이터프레임의 로우를 함께 포함
- 왼쪽 데이터프레임에 일치하는 로우가 없다면 해당 위치에 null삽입
joinType ='right_outer'
person.join(graduateProgram, joinExpression, joinType).show()
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
id| name|graduate_program| spark_status| id| degree| department| school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley|
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley|
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley|
null| null| null| null| 2|Masters| EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
왼쪽 세미 조인
- 오른쪽 데이터프레임의 어떤 값도 포함하지 않으므로 다른 조인타입과 약간 다름
- 기존 조인 기능과 달리 데이터프레임 필터 정도로 볼 수 있음
- 오른쪽 데이터프레임은 단지 값의 존재 여부만 확인하는 용도
- 값이 존재하면 왼쪽 데이터프레임에 중복 키가 존재하더라도 해당 로우는 결과에 포함
joinType ='left_semi'
gradProgram2 = graduateProgram.union(spark.createDataFrame([
(0, "Masters", "Duplicated Row", "Duplicated School")]))
gradProgram2.join(person, joinExpression, joinType).show()
+---+-------+--------------------+-----------------+
id| degree| department| school|
+---+-------+--------------------+-----------------+
0|Masters|School of Informa...| UC Berkeley|
1| Ph.D.| EECS| UC Berkeley|
0|Masters| Duplicated Row|Duplicated School|
+---+-------+--------------------+-----------------+
왼쪽 안티 조인
- 왼쪽 세미 조인의 반대
- 오른쪽 데이터프레임에서 관련된 키를 찾을 수 없는 로우만 결과에 포함
joinType ='left_anti'
gradProgram2.join(person, joinExpression, joinType).show()
+---+-------+----------+-----------+
id| degree|department| school|
+---+-------+----------+-----------+
2|Masters| EECS|UC Berkeley|
+---+-------+----------+-----------+
자연 조인
- 조인하려는 컬럼을 암시적으로 추정
- 일치하는 컬럼을 찾고 그 결과를 반환함
- 암시적인 처리는 언제나 위험하다는 것 명심
ex) graduatePrograme의 id와 person의 id는 컬럼명은 같지만 서로 다른 의미인데 이걸 기준으로 잡아버림
- 데이터프레임의 join메서드는 지원X 링크
교차 조인(카테시안 조인)
- 조건절을 기술하지 않은 내부 조인
- 왼쪽 데이터프레임의 모든 로우를 오른쪽 데이터프레임의 모든 로우와 결합
joinType ='cross'
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
id| degree| department| school| id| name|graduate_program| spark_status|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
0|Masters|School of Informa...|UC Berkeley| 0| Bill Chambers| 0| [100]|
1| Ph.D.| EECS|UC Berkeley| 1| Matei Zaharia| 1|[500, 250, 100]|
1| Ph.D.| EECS|UC Berkeley| 2|Michael Armbrust| 1| [250, 100]| +---+-------+--------------------+-----------+---+----------------+----------------+---------------+
조인 사용 시 문제점
복합 데이터 타입의 조인
- 어려울 것 같지만 불리언을 반환하는 모든 표현식은 조인 표현식으로 간주할 수 있음
from pyspark.sql import functions as F
person.withColumnRenamed('id', 'personId').join(sparkStatus, F.expr('array_contains(spark_status,id)')).show()
+--------+----------------+----------------+---------------+---+--------------+
personId| name|graduate_program| spark_status| id| status|
+--------+----------------+----------------+---------------+---+--------------+
0| Bill Chambers| 0| [100]|100| Contributor|
1| Matei Zaharia| 1|[500, 250, 100]|500|Vice President|
1| Matei Zaharia| 1|[500, 250, 100]|250| PMC Member|
1| Matei Zaharia| 1|[500, 250, 100]|100| Contributor|
2|Michael Armbrust| 1| [250, 100]|250| PMC Member|
2|Michael Armbrust| 1| [250, 100]|100| Contributor|
+--------+----------------+----------------+---------------+---+--------------+
중복 컬럼명 처리
- 조인을 수행할 때 가장 까다로운 것은 결과 데이터프레임에서 중복된 컬럼명을 다루는 것
- 각 컬럼은 스파크 SQL 엔진인 카탈리스트 내에 고유 id가 존재하지만 직접 참조할 수 있는 값은 아님
gradProgramDupe= graduateProgram.withColumnRenamed('id', 'graduate_program')
joinExpr=gradProgramDupe['graduate_program'] == person['graduate_program']
person.join(gradProgramDupe, joinExpr).select('graduate_program').show()
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last) <command-3827894466668178> in <module> 1
joinExpr=gradProgramDupe['graduate_program'] == person['graduate_program'] ---->
2 person.join(gradProgramDupe, joinExpr).select('graduate_program').show()
/databricks/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1690 [Row(name='Alice', age=12), Row(name='Bob', age=15)] 1691
""" -> 1692 jdf = self._jdf.select(self._jcols(*cols)) 1693 return DataFrame(jdf, self.sql_ctx)
1694 /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value =
get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121
raise converted from None 124 else: 125 raise AnalysisException: Reference 'graduate_program' is
ambiguous, could be: graduate_program, graduate_program.
해결 방법1: 다른 조인 표현식 사용
person.join(gradProgramDupe, 'graduate_program').show()
+----------------+---+----------------+---------------+-------+--------------------+-----------+
graduate_program| id| name| spark_status| degree| department| school|
+----------------+---+----------------+---------------+-------+--------------------+-----------+
0| 0| Bill Chambers| [100]|Masters|School of Informa...|UC Berkeley|
1| 1| Matei Zaharia|[500, 250, 100]| Ph.D.| EECS|UC Berkeley|
1| 2|Michael Armbrust| [250, 100]| Ph.D.| EECS|UC Berkeley|
+----------------+---+----------------+---------------+-------+--------------------+-----------+
- 그냥 컬럼명을 문자열로 전달하면 중복된 두 컬럼 중 하나가 자동으로 제거됨
해결 방법2: 조인 후 컬럼 제거
person.join(gradProgramDupe, joinExpr).drop(person['graduate_program']).show()
- 이 방법은 스파크의 SQL 분석 프로세스의 특성을 활용함
- 스파크는 명시적으로 참조된 컬럼(person['graduate_program'])을 검증하지 않으므로 스파크 코드 분석 단계를 통과함.
해결 방법3: 조인 전 컬럼명 변경
gradProgram3 = gradProgramDupe.withColumnRenamed('graduate_program', 'grad_id')
joinExpr2= person['graduate_program'] == gradProgram3['grad_id']
person.join(gradProgram3, joinExpr2).show()
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
id| name|graduate_program| spark_status|grad_id| degree| department| school|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley|
1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|UC Berkeley|
2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|UC Berkeley|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
스파크의 조인 수행 방식
- 스파크가 조인을 수행하는 방식을 이해하기 위해서는 실행에 필요한 두 가지 핵심 전략을 이해해야함
- 수행 방식을 이해하면 빠르게 완료되는 작업과 절대 완료되지 않는 작업 간의 차이를 알 수 있음
네트워크 통신 전략
- 스파크는 조인 시 두 가지 클러스터 통신 방식을 활용함
Case1: 큰 테이블과 큰 테이블 조인
- 조인에 사용한 특정 키나 키 집합을 어떤 노드가 가졌는지에 따라
해당 노드와 데이터를 공유함
- 데이터가 잘 나뉘어 있지 않다면
네트워크가 복잡하고 많은 자원을 사용함
Case2: 큰 테이블과 작은 테이블 조인
- 작은 테이블 기준: 테이블이 단일 워커 노드의 메모리 크기(메모리 여유 공간 포함)에 적합할 정도
- 조인 연산 최적화 가능 -> 브로드캐스트 조인
-
작은 테이블을 클러스터의 전체 워커 노드에 각각 복제
- 그래서 너무 큰 테이블이면 안됨
- 시작 시 단 한 번만 복제가 수행되며, 이후에는 노드들 끼리
통신할 필요 없이 각자 작업 수행
- 큰 테이블의 각 파티션과 작은테이블의 조인 수행
- 개별적으로 조인이 수행되므로 CPU가 가장 큰 병목 구간이 됨
person.join(F.broadcast(gradProgram3), joinExpr2).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [graduate_program
[grad_id
graduate_program
ExistingRDD[_1
bigint, true]),false), [id=
AS department
Case3: 아주 작은 테이블 사이의 조인
- 이 경우는 그냥 스파크가 조인 방식을 결정하도록 내버려두기