CHAPTER 8. 조인

ack·2021년 7월 16일
0

Spark

목록 보기
6/6
post-thumbnail

8.1 조인 표현식

조인표현식이란?

  • 왼쪽과 오른쪽 데이터셋에 있는 하나 이상의 키값을 비교
  • 왼쪽 데이터셋과 오른쪽 데이터 셋의 결합 여부를 결정

동등조인(equi-join) : 왼쪽과 오른쪽 데이터 셋의 지정된 키가 동일한지 비교하여 데이터셋 결합

8.2 조인타입

  • 내부 조인 : 왼쪽과 오른쪽 데이터 셋에 키가 있는 로우를 유지
  • 외부 조인 : 왼쪽이나 오른쪽 데이터셋에 키가 있는 로우를 유자
  • 왼쪽 외부 조인 : 왼쪽 데이터 셋에 키가 있는 로우 유지
  • 오른쪽 외부 조인 : 오른쪽 데이터 셋에 키가 있는 로우를 유지
  • 왼쪽 세미 조인 : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에는 키가 일치하는 왼쪽 데이터셋만 유지
  • 왼쪽 안티 조인 : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않는 왼쪽 데이터셋만 유지
  • 자연 조인 : 두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적으로 결합하는 조인 수행
  • 교차조인/카테시안 조인: 왼쪽 데이터셋의 모든 로우와 오른쪽 데이터 셋의 모든 로우를 조합

8.3 내부조인

SQL조인과 유사하다고 생각하면 된다

joinExpression = df1["col"] == df2['col']
#내부조인
joinType = "inner"

# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()

8.4 외부조인

테이블의 키를 평가한 로우를 조인함

일치하는 로우가 없다면 해당 위치에 null을 삽입

joinExpression = df1["col"] == df2['col']
#외부조인
joinType = "outer"

# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()

8.5 왼쪽/오른쪽 외부조인

왼쪽이나 오른쪽 DF의 모든 로우와 일치하는 오른쪽이나 왼쪽 DF로우를 함께 포함, 없는 경우엔 null 삽입

joinExpression = df1["col"] == df2['col']
#왼쪽 외부조인
joinType = "left_outer"
#오른쪽 외부조인
joinType = "right_outer"

# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()

8.7 왼쪽 세미조인

오른쪽 DF의 어떤 값도 포함하지 않음, 값이 존재하는지 확인만 함! => DF의 필터용도

joinExpression = df1["col"] == df2['col']
#왼쪽세미
joinType = "left_semi"

# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()

8.8 왼쪽 안티 조인

외쪽 세미조인의 반대 개념

오른쪽 DF에 존재하지 않은 왼쪽DF의 값을 유지함

SQL의 NOT IN같음

joinExpression = df1["col"] == df2['col']
#왼쪽 안티
joinType = "left_anti"

# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()

8.9 자연 조인

조인하려는 컬럼을 추정함 => 일치하는 컬럼을 찾고 결과 반환

왼쪽, 오른쪽, 외부 자연조인

서로 다른 의미를 지녔지만 동일한 컬럼명을 가진 데이터셋을 조인하는 경우, 부정확한 결과를 만들어낼 수 있음

8.10 교차조인(카테시안 조인)

조건절을 기술하지 않은 내부 조인

왼쪽 DF와오른쪽DF의 결합 -> 로우의 수 = 왼쪽DF 로우의 수 X 오른쪽DF 로우의 수

엄청난 수의 DF가 생성될 수 있음 -> '키워드'를 이용해 조인을 수행해야함!

joinExpression = df1["col"] == df2['col']
#교차조인
joinType = "cross"

# 1
# 조인df.join(조인df2, 조인표현식, 조인타입)
df1.join(df2, joinExpression, joinType).show()

#2 명시적 메서드 사용
df1.crossJoin(df2).show()

8.11 조인 사용시 문제점

8.11.1 복합 데이터 타입의 조인

불리언을 반환하는 표현식은 모두 조인 표현식으로 사용 가능!

#sparkStatus라는 DF의 spark_status라는 리스트컬럼에 id값이 포함 있는 경우만 조인
person.withColumnReanmed("id", "personId")\
	.join(sparkStatus, expr("array_contains(spark_status, id)")).show()

8.11.2 중복 컬럼명 처리

DF의 각 컬럼은 카탈리스트(스파크 SQL엔진)내에 고유 ID존재

고유 ID는 카탈리스트 내부에서만 사용가능, 직접 참조 불가 => 특정 컬럼을 참조하기 어렵

문제를 일으키는 상황 2가지

  • 조인에 사용할 DF의 특정 키가 동일한 이름을 가지며, 키가 제거되지 않도록 조인 표현식에 명시하는 경우
    • 동일한 컬럼명으로 조인을 수행하면 같은 컬럼명이 두 개 존재, 컬럼중 하나를 참조할 때 오류 발생함
  • 조인 대상이 아닌 두 개의 컬럼이 동일한 이름을 가진 경우

해결방법 1 : 다른 조인 표현식 사용

문자열이나 시퀀스 조인 표현식사용(불리언형태X) => 두 컬럼중 하나가 자동으로 제거됨

df1.join(df2, "co1name").select("colname").show()

해결방법2 : 조인 후 컬럼 제거

원본 DF를 사용해 컬럼을 참조해야함. 조인시 동일한 키 이름을 사용하거나 원본에...

스파크의 SQL분석 프로세스의 특성(명시적으로 참조된 컬럼을 검증할 필요가 없음) 활용

col메서드를 사용하여 컬럼 고유의 ID로 해당 컬럼을 암시적으로 지정 가능

df1.join(df2, joinExpr).drop(df1.col("colname")).select("colname").show()

해결방법3 : 조인 전 컬럼명 변경

문제를 완전히 회피할 수 있는 방법

val gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")

8.12 스파크의 조인 수행 방식

실행에 필요한 두 가지 핵심 전략 이해하기

  • 노드간 네트워크 통신 전략
  • 노드별 연산 전략

8.12.1 네트워크 통신 전략

스파크가 조인에 사용하는 클러스터 통신 방식

  1. 셔플 조인 (전체 노드간 통신)
  2. 브로드캐스트 조안

큰 테이블과 큰 테이블 조인 => 셔플조인

전체 노드간 통신이 발생 + 특정 키나 키 집합을 가진 노드와 데이터를 공유 => 네트워크 복잡, 많은 자원 사용

데이터 파티셔닝 없이 전체 조인 프로세스가 진행 => 모든 워커 노드와 모든 파티션에서 통신이 발생을 의미

◾큰 테이블과 작은 테이블 조인

브로드캐스트 조인 사용 --> 한번만 복제 수행하고 그 후는 다른 워커 노드의 통신 없이 작업 수행 가능!

작은 DF를 클러스터 전체 워커 노드에 복제(대규모 노드간 통신 발생) => 개별 워커가 작업 수행(조인 내내 노드 사이 추가적인 통신X)

단일 노드에서 개별적으로 조인이 수행되므로 CPU가 병목구간.

너무 큰 데이터를 브로드캐스트하면 고비용 수집 연산 발생 => 드라이버 노드 비정상적 종료

브로드캐스트 조인 힌트!

강제성X, 옵티마이저가 무시가능

//braodcast 함수를 이용한 힌트
//작은 크기의 DataFrame을 인수로 사용
import org.apache.spark.sql.functions.braodcast

val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcasts(graduateProgram), joinExpr).explaint()
-- SQL
-- MAPJOIN, BROADCAST, BROADCASTJOIN 등의 힌트 설정
SELECT /*+ MAPJOIN(graduateProgram)*/ * FROM person JOIN graduateProgram ON person.graduate_program = graduateProgram.id

아주 작은 테이블 사이의 조인

스파크가 조인 방식을 결정하는 것이 가장 좋음

필요한 경우 브로드캐스트 조인 강제 지정 가능

8.13 정리

조인 전에 데이터를 적절하게 분할하면 셔플이 계획되어 있더라도 동일한 머신에 두 DataFrame의 데이터가 있을 수 있음

=> 셔플을 피할 수 있음, 효율적인 실행가능

profile
아자 (*•̀ᴗ•́*)و

0개의 댓글