조인표현식이란?
동등조인(equi-join) : 왼쪽과 오른쪽 데이터 셋의 지정된 키가 동일한지 비교하여 데이터셋 결합
SQL조인과 유사하다고 생각하면 된다
joinExpression = df1["col"] == df2['col']
#내부조인
joinType = "inner"
# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()
테이블의 키를 평가한 로우를 조인함
일치하는 로우가 없다면 해당 위치에 null을 삽입
joinExpression = df1["col"] == df2['col']
#외부조인
joinType = "outer"
# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()
왼쪽이나 오른쪽 DF의 모든 로우와 일치하는 오른쪽이나 왼쪽 DF로우를 함께 포함, 없는 경우엔 null 삽입
joinExpression = df1["col"] == df2['col']
#왼쪽 외부조인
joinType = "left_outer"
#오른쪽 외부조인
joinType = "right_outer"
# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()
오른쪽 DF의 어떤 값도 포함하지 않음, 값이 존재하는지 확인만 함! => DF의 필터용도
joinExpression = df1["col"] == df2['col']
#왼쪽세미
joinType = "left_semi"
# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()
외쪽 세미조인의 반대 개념
오른쪽 DF에 존재하지 않은 왼쪽DF의 값을 유지함
SQL의 NOT IN같음
joinExpression = df1["col"] == df2['col']
#왼쪽 안티
joinType = "left_anti"
# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()
조인하려는 컬럼을 추정함 => 일치하는 컬럼을 찾고 결과 반환
왼쪽, 오른쪽, 외부 자연조인
서로 다른 의미를 지녔지만 동일한 컬럼명을 가진 데이터셋을 조인하는 경우, 부정확한 결과를 만들어낼 수 있음
조건절을 기술하지 않은 내부 조인
왼쪽 DF와오른쪽DF의 결합 -> 로우의 수 = 왼쪽DF 로우의 수 X 오른쪽DF 로우의 수
엄청난 수의 DF가 생성될 수 있음 -> '키워드'를 이용해 조인을 수행해야함!
joinExpression = df1["col"] == df2['col']
#교차조인
joinType = "cross"
# 1
# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()
#2 명시적 메서드 사용
df1.crossJoin(df2).show()
불리언을 반환하는 표현식은 모두 조인 표현식으로 사용 가능!
#sparkStatus라는 DF의 spark_status라는 리스트컬럼에 id값이 포함 있는 경우만 조인
person.withColumnReanmed("id", "personId")\
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()
DF의 각 컬럼은 카탈리스트(스파크 SQL엔진)내에 고유 ID존재
고유 ID는 카탈리스트 내부에서만 사용가능, 직접 참조 불가 => 특정 컬럼을 참조하기 어렵
문제를 일으키는 상황 2가지
해결방법 1 : 다른 조인 표현식 사용
문자열이나 시퀀스 조인 표현식사용(불리언형태X) => 두 컬럼중 하나가 자동으로 제거됨
df1.join(df2, "co1name").select("colname").show()
해결방법2 : 조인 후 컬럼 제거
원본 DF를 사용해 컬럼을 참조해야함. 조인시 동일한 키 이름을 사용하거나 원본에...
스파크의 SQL분석 프로세스의 특성(명시적으로 참조된 컬럼을 검증할 필요가 없음) 활용
col메서드를 사용하여 컬럼 고유의 ID로 해당 컬럼을 암시적으로 지정 가능
df1.join(df2, joinExpr).drop(df1.col("colname")).select("colname").show()
해결방법3 : 조인 전 컬럼명 변경
문제를 완전히 회피할 수 있는 방법
val gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
실행에 필요한 두 가지 핵심 전략 이해하기
스파크가 조인에 사용하는 클러스터 통신 방식
◾큰 테이블과 큰 테이블 조인 => 셔플조인
전체 노드간 통신이 발생 + 특정 키나 키 집합을 가진 노드와 데이터를 공유 => 네트워크 복잡, 많은 자원 사용
데이터 파티셔닝 없이 전체 조인 프로세스가 진행 => 모든 워커 노드와 모든 파티션에서 통신이 발생을 의미
◾큰 테이블과 작은 테이블 조인
브로드캐스트 조인 사용 --> 한번만 복제 수행하고 그 후는 다른 워커 노드의 통신 없이 작업 수행 가능!
작은 DF를 클러스터 전체 워커 노드에 복제(대규모 노드간 통신 발생) => 개별 워커가 작업 수행(조인 내내 노드 사이 추가적인 통신X)
단일 노드에서 개별적으로 조인이 수행되므로 CPU가 병목구간.
너무 큰 데이터를 브로드캐스트하면 고비용 수집 연산 발생 => 드라이버 노드 비정상적 종료
브로드캐스트 조인 힌트!
강제성X, 옵티마이저가 무시가능
//braodcast 함수를 이용한 힌트
//작은 크기의 DataFrame을 인수로 사용
import org.apache.spark.sql.functions.braodcast
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcasts(graduateProgram), joinExpr).explaint()
-- SQL
-- MAPJOIN, BROADCAST, BROADCASTJOIN 등의 힌트 설정
SELECT /*+ MAPJOIN(graduateProgram)*/ * FROM person JOIN graduateProgram ON person.graduate_program = graduateProgram.id
◾아주 작은 테이블 사이의 조인
스파크가 조인 방식을 결정하는 것이 가장 좋음
필요한 경우 브로드캐스트 조인 강제 지정 가능
조인 전에 데이터를 적절하게 분할하면 셔플이 계획되어 있더라도 동일한 머신에 두 DataFrame의 데이터가 있을 수 있음
=> 셔플을 피할 수 있음, 효율적인 실행가능