러닝 스파크 - 5장

Jajuna_99·2023년 5월 3일
1

러닝 스파크

목록 보기
8/8

5장 스파크 SQL과 데이터 프레임: 외부 데이터 소스와 소통하기

  • 이번 장에서는 스파크 SQL이 외부 구성요소와 상호 작용하는 방법에 중점을 둔다.

5장에서 배우는 것들

  • 아파치 하이브 및 스파크 모두에 사용자 정의 함수 사용
  • SQL과 여러 외부 요소의 데이터 원본과 연결
  • 여러 관계 연산가 사용 및 작업

스파크 SQL과 아파치 하이브

스파크 SQL은? 관계형 처리 + 함수형 프로그래밍 API

샤크 이전에 했던 작업을 기원으로 한다.

샤크, Shark : 아파치 위에 하이브 코드베이스를 기반으로 구축되었던, 하둡 시스템에서 최초의 대화영 SQL 쿼리 엔진
장점 : 엔터츠라이즈 DW만큼 빠르고, 하이브/맵리듀스 만큼 확장 가능

이러한 장점들을 상속 받아, 스파크는 더 빠른 성능, 관계형 프로그래밍이 가능하고,

(SparkSession을 통해) 복잡한 분석 라이브러리(MLlib) 호출도 가능하다.

사용자 정의 함수

내장 함수와는 별도로, 엔지니어와 과학자들이 기능을 정의할 수 있는 사용자 정의 함수(UDF)도 제공한다.

구현 유연성 up

스파크 SQL UDF

프로그래머는 모델의 내부를 이해하지 않고도 스파크 SQL에서 예측 결과를 쿼리할 수 있도록 해당 모델을 UDF로 생성할 수 있는 점이 스파크 SQL UDF의 강점이다.

from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName("ch05").getOrCreate()

# 스파크 SQL UDF 파이썬 예제
from pyspark.sql.types import LongType

# 큐브 함수 생성
def cubed(s):
  return s * s * s

data = [(1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]
df = spark.createDataFrame(data, schema=['id'])

# UDF로 등록
spark.udf.register("cubed", cubed, LongType())

# 임시 뷰 생성
df.createOrReplaceTempView('udf_test')
# 큐브 UDF를 사용하여 쿼리
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show() 

스파크 SQL에서 평가 순서 및 null 검사

스파크 SQL은 하위 표현식의 평가 순서를 보장하지 않는다.

spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")

즉, 위 예시에서 AND를 사이에 둔 두 조건문들의 순차적 실행을 보장하지 않는다.

따라서 적절한 null 검사를 위해 다음을 수행해야 한다.

  1. UDF 자체가 null을 인식하도록 만들고 UDF 내부에서 null 검사를 수행한다.
  2. IF 또는 CASE WHEN 식을 사용하여 null 검사를 수행하고 조건 분기에서 UDF를 호출한다.

판다스 UDF로 PySpark UDF 속도 향상 및 배포

파이스파크 UDF의 문제점은 스칼라 UDF보다 성능이 느리다는 점이다.

파이스파크 UDF가 JVM과 파이썬 사이의 데이터 이동을 필요로 하기 때문!

이를 위해 판다스 UDF(벡터화된 UDF라고도 함)가 아파치 스파크 2.3부터 도입됐다.

판다스 UDF는 아파치 애로우(Arrow)를 사용하여 데이터를 전송, 판다스는 해당 데이터로 작업을 한다.

아파치 애로우 : 인메모리(In-Memory) 컬럼기반 데이터 포맷으로 스파크에서 JVM과 Python 프로세스 간에 데이터를 효율적으로 전송하기 위해 사용

아파치 애로우 형식에 포함된 데이터는 이미 파이썬 프로세스에서 사용할 수 있는 형식임으로 더 이상의 작업이 필요없다. (데이터 직렬화나 피클(pickle))

또, 행마다 개별 입력에 대해 작업하는 대신 판다스 시리즈 또는 데이터 프레임에서 작업한다.

아파치 스파크 3.0 부터 판다스 UDF와 판다스 함수 API로 분할되었다.

판다스 UDF
원래의 판다스 UDF는 UDF 유형을 수동적으로 정의하고 지정해야 했다.

스파크 3.0 부터 판다스 UDF는 Pandas.Series, Pandas.DataFrame, Tuple 및 Iterater와 같은 파이썬 유형 힌트로 판다스 UDF 유형을 유추한다.

판다스 함수 API
판단스 함수 API를 사용하면 입력과 출력이 모두 판다스 인스턴스인 파이스파크 데이터 프레임에 로컬 파이썬 함수를 직접 적용할 수 있다.

더 자세한 내용은 12장 '파이썬 타입 힌트를 사용한 판다스 UDF 재설계'에 나온다.

판다스 UDF 예제

# 판다스 가져오기
import pandas as pd

# 파이스파크 SQL 함수와 pandas_udf 가져오기
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# 큐브 함수 선언 (파이스파크 UDF랑 다른 점 확인)
def cubed(a: pd.Series) -> pd.Series:
  return a * a * a

# 큐브 함수에 대한 판다스 UDF 생성
cubed_udf = pandas_udf(cubed, returnType=LongType())
# 판다스 시리즈 생성
x = pd.Series([1,2,3])

# 로컬 판다스 데이터를 실행하는 pandas_udf에 대한 함수
print(cubed(x))

판다스 UDF를 스파크 데이터 프레임으로 전환

# 스파크 데이터 프레임 생성
df = spark.range(1, 4)

# 벡터화된 스파크 UDF(판다스 UDF)를 함수로 생성
df.select("id", cubed_udf(col('id'))).show()

스파크 UI 단계 넣을까 말까...

스파크 SQL 셸, 비라인 및 태블로로 쿼리하기

스파크 SQL 셀, 비라인(Beeline) CLI 유틸리티, 태블로 및 파워 BI와 같은 리포팅 툴을 푸홤하여 아파치 스파크를 뭐리하는 다양한 매커니즘이 있다.

스파크 SQL 셸 사용하기

스파크 SQL CLI를 시작하려면 "$SPARK_HOME" 폴더에서 다음 명령을 실행한다.

./bin/spark-sql
# 스파크 SQL 테이블 생성
CREATE TABLE people (name STRING, age INT);

# 테이블에 데이터 삽입
INSERT INTO people SELECET name, age FROM
# values 문 사용
INSERT INTO people SELECET name, age VALUES ("Michael", NULL);
INSERT INTO people SELECET name, age VALUES ("Andy", 30);
INSERT INTO people SELECET name, age VALUES ("Samantha", 19);

# 스파크 SQL 쿼리 실행하기
SHOW TABLES;
SELECT * FROM people WHERE age < 20;
SELECT name FROM people WHERE age IS NULL;

바리인 작업

비라인은 SQLLine CLI를 기반으로 하는 JDBC 클라이언트다.

이 유틸리티를 사용하여 스파크 스리프트 서버에 대해 스파크 SQL 쿼리를 실행할 수 있다.

스파크 쓰리프트 JDBC/ODBC 서버를 시작하려면 "$SPARK_HOME" 폴더에서 다음 명령을 실행한다.

./sbin/start-thriftserver.sh
# 쓰리프트 서버 테스트
./bin/beeline
# 비라인을 구성하여 로컬 쓰리프트 서버에 연결
!connect jdbc:hive2//localhost:10000
#비라인으로 스파크 sql 쿼리 실행
SHOW tables;
SELECT * FROM people;
# 서버 중지하기
./sbin/stop-thriftserver.sh

쿼리 결과문이 보기 좋게 표 형식으로 출력된다!

태블로로 작업하기

비라인과 유사하게 쓰리프트 서버를 통해 선호하는 BI 도구를 스파크 SQL에 연결할 수 있다.

는 생략하겠습니다...

외부 데이터 소스

JDBC 및 SQL 데이터베이스부터 시작하여 스파크 SQL을 사용하여 외부 데이터 소스에 연결하는 방법에 중점을 둘 것이다.

JDBC 및 SQL 데이터베이스

스파크 SQL에는 JDBC를 사용하여 다른 데이터베이스에서 데이터를 읽을 수 있는 데이터 소스 API가 포함되어 있다.

JDBC(Java Database Connectivity) : 자바에서 데이터베이스에 접속할 수 있도록 하는 자바 API

결과를 데이터 프레임으로 변환할 때 이러한 데이터 소스 쿼리를 단순화하므로 스파크 SQL의 모든 이점(성능 및 데이터 소스 끼리 조인 등)을 제공한다.

  • JDBC 데이터 소스에 대한 JDBC 드라이버를 지정해야 하며 스파크 클래스 경로에 있어야 한다.
# $SPARK_HOME 폴더에서 다음 실행
./bin.spark-shell --driver-class-path $ database.jar --jars $database.jar

데이터 소스 API를 사용하여 원격 데이터베이스의 테이블을 데이터 프레임 또는 스파크 SQL 임시 뷰로 로드할 수 있다.

  • 사용자는 데이터 소스 옵션에서 JDBC 연결 속성을 지정할 수 있다.

파티셔닝의 중요성

스파크 SQL과 JDBC 외부 소스 간에 많은 양의 데이터를 전송할 때 데이터 소스를 분할하는 것이 중요하다.

모든 데이터가 하나의 드라이버 연결을 통해 처리되면 추출 성능을 포화 상태로 만들어 성능을 저하시킨다.

대규모 작업의 경우 파티셔닝 연결 속성을 사용하는 것이 좋다.

속성들이 작동하는 방식 예제

다음 설정들을 사용한다고 가정하자

  • numPartitions:10
  • lowerBound:1000
  • upperBound:10000

=> 파티션 크기 = 1000, 파티션 갯수 = 10

모든 파티션에 쿼리 실행해보기 (10개의 쿼리와 동일)

SELECT FROM table WHERE partitionColumn BETWEEN 1000 and 2000
SELECT
FROM table WHERE partitionColumn BETWEEN 2000 and 3000
...
SELECT * FROM table WHERE partitionColumn BETWEEN 9000 and 10000

책에서 말하는 설정 가이드라인이 있다. (정답은 아니라고 한다.)

  • numPartitions의 좋은 시작점은 스파크 워커 수의 배수가 좋다.

    • ex) 노드 4개 => 파티션 4개 or 8개
  • 처음에는 최소 및 최대 partitionColumn의 실제 값을 기준으로 lowerBound, upperBound를 정해야 한다.

  • 데이터 스큐를 방지하기 위해 균일하게 분산될 수 있는 partitionColumn을 선택해야 한다.

    • 만약 대부분의 작업이 2000에서 3000 사이의 값을 요청하는 작업을 수행한다면, 기본 partitionColumn 대신 새로운 partitionColumn를 사용하거나
    • 균등한 분산을 위한 새 항목(ex. 여러 칼럽값의 해시)을 생성해야 한다.

PostgreSQL

PostgreSQL DB에 연결하려면 메이븐에서 JDBC jar를 빌드하거나 다운로드한 후에 클래스 경로에 추가한다.

그 다름 해당 jar를 지정하여 스파크 셸을 시작한다.

bin/spark-shell --jars postgresql-42.2.6.jar
# 읽기 방법 1 : 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF1 = (spark
           .read
           .format("jdbc")
           .option("url", "jdbc:postgresql://[DBSERVER]")
           .option("dbtable", "[SCHEMA].[TABLENAME]")
           .option('user', "[USERNAME]")
           .option('password', "[PASSWORD]")
           .load())

# 읽기 방법 2 : jdbc 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF2 = (spark
           .read
           .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
                 properties={'user':"[USERNAME]", 'password':"[PASSWORD]"}))

# 쓰기 방법 1 : 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF1
 .write
 .format("jdbc")
 .option("url", "jdbc:postgresql://[DBSERVER]")
 .option("dbtable", "[SCHEMA].[TABLENAME]")
 .option('user', "[USERNAME]")
 .option('password', "[PASSWORD]")
 .save())

# 쓰기 방법 2 : jdbc 함수를 사용하여 JDBC 소스에 데이터르 저장
(jdbcDF2
 .write
 .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
       properties={'user':"[USERNAME]", 'password':"[PASSWORD]"}))

MySQL

MySQL DB에 연결하려면 메이븐 또는 MySQL에서 JDBC jar를 빌드하거나 다운로드 후에 클래스 경로에 추가...

스파크 셀로 실행...

# 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF = (spark
          .read
          .format("jdbc")
          .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
          .option("driver", "com.myslql.jdbcDriver")
          .option("dbtable", "[TABLENAME]")
          .option('user', "[USERNAME]")
          .option('password', "[PASSWORD]")
          .load())

# 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF
 .write
 .format("jdbc")
 .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
 .option("driver", "com.myslql.jdbcDriver")
 .option("dbtable", "[TABLENAME]")
 .option('user', "[USERNAME]")
 .option('password', "[PASSWORD]")
 .save())

애저 코스모스

에저 코스모스를 연결하려면 메이븐 or 깃허브에서 JDBC jar를 빌드 or 다운로드하고 클래스 경로에 추가... 파이스파크 셸을 시작...

# 애저 코스모스 DB로부터 데이터 로드
## 설정 읽기
query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
readConfig = {
    "Endpoint" : "https://[ACCOUNT].documents.azure.com:443",
    "MasterKey" : "[MASTER KEY]",
    "Database" : "[DATABASE]",
    "preferredRegions" : "Central US; East US2",
    "Collection" : "[COLLECTION]",
    "SamplingRatio" : "1.0"
    "schema_samplesize" : "1000",
    "query_pagesize" : "2147483647",
    "query_custom" : query
}

# azure-cosmosdb-spark를 통해 연결하여 스파크 데이터 프레임 생성
df = (spark
      .read
      .format("com.microsoft.azure.cosmosdb.spark")
      .option(**readConfig)
      .load())

# 비행 수 카운트
df.count()

# 애저 코스모스 DB에 데이터 저장
# 설정 쓰기
writeConfig = {
    "Endpoint" : "https://[ACCOUNT].documents.azure.com:443",
    "MasterKey" : "[MASTER KEY]",
    "Database" : "[DATABASE]",
    "Collection" : "[COLLECTION]",
    "Upsert" : "true"
}

# 애저 코스모스 DB에 데이터 프레임 업서트 하기
(df.write
 .format("com.microsoft.azure.cosmosdb.spark")
 .options(**writeConfig)
 .save())

MS SQL 서버

MS SQL 서버도 크게 다른 점이 없다~~

기타 외부 데이터 소스

위에서 소개한 데이터 소스말고 아파치 스프크에 연결 가능한 인기 있는 외부 데이터 소스로는

  • 아파치 카산드라
  • 스노우플레이크
  • 몽고DB

등이 있다.

데이터 프레임 및 스파크 SQL의 고차 함수

복잡한 데이터 유형은 단순한 데이터 유형의 결합이다.

여기에 복잡한 데이터 유형을 조작하는 두 가지의 일반적인 방법이 있다.

  1. 중첩된 구조를 개별 행으로 분해하고 일부 함수를 적용한 다음 중첩된 구조를 다시 만드는 방법
  2. 사용자 정의 함수 구축

이러한 접근 방식은 문제를 표 형식으로 생각할 수 있다는 이점이 있다.

방법 1: 분해 및 수집

SELECT id, collect_list(value + 1) AS values
FROM (SELECT id, EXPLODE(values) AS value
		FROM table) x
GROUP BY id
  • 이 중첩된 SQL문에서 explode(values)는 values 내의 각 요소에 대한 새로운 행(id 포함)을 만든다.

collect_list()가 중복된 개체 목록을 반환하지만 GROUP BY문에는 셔플 작업이 필요하다.
즉, 재수집된 배열의 순서가 원래 배열의 순서와 반드시 동일하지는 않다.
값은 여러 차원이 될 수도 있고, GROUP BY는 비싼 접근법이 될 수 있다.

방법 2: 사용자 정의 함수

동일한 작업을 수행하는 다른 방법으로 map()을 사용하여 각 요소를 반복하고 더하기 작업을 수행하는 UDF를 보자.

def addOne(values: Seq[Int]): Seq[Int] = {
	values.map(value => value + 1
}
val plusOneInt = spark.udf.register("plusOneInt", addOne(_:Seq[Int]): Seq[Int])   

이 다음 Spark SQL에서 이 UDF를 실행할 수 있다.

spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()
  • 첫 번째 방법에 비해 정렬 문제(explode(), collect_list())가 없어 사용하기는 간단할 수 있따.
  • 직렬화 역직렬화 프로세스는 비용이 많이 들 수 있다.
  • 또, UDF는 대용량 데이터에 대한 메모리 부족 문제 또한 완화할 수 있다.

복잡한 데이터 유형을 위한 내장 함수

아파치 스파크 2.4 부터 복잡한 데이터 유형에 대한 내장 함수를 포함한다.

를 참고하자... 짱 많다. 일반적으로 복잡한 데이터 유형에 사용되는 내장 함수로는 배열 유형, 맵 유형이 있다.

고차 함수

익명 람다 함수를 인수로 사용하는 고차 함수가 있다.

# 고차 함수 예제
transform(values, value -> lambda expression)

transform() 함수는 배열과 익명 함수(람다 표현식)를 입력으로 사용한다.

이 함수는 각 요소에 익명 함수를 적용한 다음, 결과를 출력 배열에 할당함으로써 새로운 배열을 투명하게 생성한다. (UDF와 유사하지만 더 효율적이다.)

데이터 세트를 생성하고 몇 가지 예를 실행해보자.

# DF 생성
from pyspark.sql.types import *

schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFraame(t_list, schema)
t_c.createOrReplaceTempView("tC")

# DF 출력
t_c.show()

위에 데이터 프레임을 사용하면 다음과 같은 고차 함수 쿼리 실행 가능

transform(array<T>, function<T, U>): array<U>
# 고차 함수 쿼리 (transform, map()과 유사)
## 온도의 배열에 대해 섭씨를 화씨로 계산
spark.sql("""
SELECT celsius, 
  transform(celsius, t -> ((t * 9) div 5) + 32)) AS fahrenheit
FROM tC
""").show()

filter() 함수로 배열의 요소 중 부울 함수가 인 요소만 배열 생성

filter(array<T>, function<T, Boolean>): array<T>
# 고차 함수 쿼리 (filter)
## 온도의 배열에 대해 섭씨 38도 이상을 필터
spark.sql("""
SELECT celsius, 
  filter(celsius, t -> t > 38) AS high
FROM tC
""").show()

exist()는 입력한 배열의 요소 중 불린 함수를 만족시키는 것이 존재하면 을 반환한다.

exists(array<T>, function<T, V, Boolean>): Boolean
# 고차 함수 쿼리 (exists)
## 온도의 배열에 섭씨 38도의 온도가 있는가?
spark.sql("""
SELECT celsius, 
  exists(celsius, t -> t = 38) as threshold
FROM tC
""").show()

reduce()는 function<B, T, B>를 사용하여 요소를 버퍼 B에 병합하고 최종 버퍼에 마무리 function<B, R>을 적용하여 배열의 요소를 단일값으로 줄인다.

reduce(array<T>, B, function<B, T, B>, function<B, R>)
# 고차 함수 쿼리 (reduce)
## 온도의 평균을 계산하고 화씨로 변환
spark.sql("""
SELECT celsius, 
  reduce(
    celsius,
    0,
    (t, acc) -> t + acc,
    acc -> (acc div size(celsius) * 9 div 5) + 32
  ) as avgFahrenheit
FROM tC
""").show()

일반적인 데이터 프레임 및 스파크 SQL 작업

스파크 SQL의 기능 중 일부는 데이터 프레임의 다양한 기능(비형식의 데이터세트 작업으로 알려진)에서 유래된다.

작업 목록

  • 집계 함수
  • 수집 함수
  • 날짜/시간 함수
  • 수학 함수
  • 기타 함수
  • 비집계 함수
  • 정렬 함수
  • 문자열 함수
  • UDF 함수
  • 윈도우 함수

책에서는 결합과 조인, 윈도우, 수정 함수에 집중한다.

데이터세트를 불러오는 작업부터 해보자...

# 파일 경로 설정
from pyspark.sql.functions import expr
tripdelaysFilePath = " "
airportsnaFilePath = " "

# 공항 데이터세트 읽어 오기
airportsna = (spark.read
              .format("csv")
              .options(header="true", inferSchema="true", sep="\t")
              .load(airportsnaFilePath))

airportsna.createOrReplaceTempView("airports_na")

# 출발 지연 데이터세트를 읽어 오기
departureDelays = (spark.read
                   .format("csv")
                   .optoins(header="true")
                   .load(tripdelaysFilePath))

departureDelays = (departureDelays
                   .withColumn("delay", expr("CAST(delay as INT) as delay"))
                   .withColumn("distance", expr("CAST(distance as INT) as distance")))

departureDelays.createOrReplaceTempView("departureDelays")

# 임시 작은 테이블(뷰) 생성
foo = (departureDaleays
       .filter(expr("""origin == 'SEA' AND destination == 'SFO' and date like '01010%' and delay > 0""")))
foo.createOrReplaceTempView("foo")
  
# SQL query 넣기
spark.sql("SELECT * FROM airports_na LIMIT 10").show()
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
spark.sql("SELECT * FROm foo").show()

Union

동일한 스키마를 가진 두 개의 서로 다른 DF을 함께 결합하는 함수

# 두 테이블 결합
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

# 결합된 결과 보기(특정 시간 범위에 대한 SEA와 SFO를 필터)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0""")).show()
# bar DF는 foo와 delays의 결합입으로 중복됨을 확인할 수 있다.
spark.sql("""
  SELECT *
  FROM bar
  WHERE origin = 'SEA'
    AND destination = 'SFO'
    AND date LIKE '01010%'
    AND delay > 0
""").show()

Join

스파크 SQL 조인은 inner join이며, 옵션은 inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi 및 left_anti이다.

# 출발 지연 데이터(foo)와 공항 정보의 inner 조인
foo.join(
    airports,
    airports.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()
# SQL 예제
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
  FROM foo f
  JOIN airports_na a
    ON a.IATA = f.origin
""").show()

윈도우

윈도우 함수는 일반적으로 윈도우(입력 행의 범위) 행의 값을 사용하여 다른 행의 형태로 값 집합을 반환한다.

모든 입력 행에 대해 단일값을 반환하면서 행 그룹에 대해 작업할 수 있다.

이 절에서는 dense_rank() 윈도우 함수를 사용한다.

문서 : window 함수 소개 블로그

# SEA, SFO, JFK에서 출발하여 특정 목적지 위치로 이동하는 항공편 검토
DROP TABLE IF EXISTS departureDelaysWindow;

CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, SUM(delay) AS TotalDelays
FROM departureDelays
WHERE origin IN ('SEA', 'SFO', 'JFK')
AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, distination;

SELECT * FROM departureDelaysWindow
# 각 출방 공항에 대해 가장 많은 지연이 발생한 3개의 목적지 찾기
SELECT origin, destination, SUM(TotalDelays) AS TotalDelays
FROM departureDelaysWindow
WHERE origin = '[ORIGIN]'
GROUP BY origin, destination
ORDER BY SUM(TotalDelays) DESC
LIMIT 3
# density_rank() 사용하여 위 쿼리 보완
spark.sql("""
SELECT origin, destination, TotalDelays, rank
FROM (
  SELECT origin, destination, TotalDelays,
  dense_rank() OVER (PARTITION BY origin ORDER BY TotalDelays DESC) AS rank
  FROM departureDelaysWindow
) t
WHERE rank <= 3
""").show()

각 윈도우 그룹은 단일 이그제큐터에서 실행될 수 있어야 하며 실행 중에는 단일 파티션으로 구성된다.

수정

데이터 프레임을 수정(modification) 하는 작업 또한 보자.

# 열 추가 
from pyspark.sql.functions import expr

foo.show()

foo2 = (foo.withColumn(
    "status",
    expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END" )
))

foo2.show()
# 열 삭제
foo3 = foo2.drop("delay")
foo3.show()
# 컬럼명 바꾸기
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()
# SEA에서 출발하는 항공편 도착지, 월, 지연 컬럼으로 query
SELECT destination, CAST(SUBSTRING(date, 0, 2)AS int) AS month, delay
FROM departureDelays
WHERE origin = "SEA"
# 위 쿼리를 목적지 및 월별 지연(평균과 최대)에 대한 집계 계산하기
SELECT destination, CAST(SUBSTRING(date, 0, 2)AS int) AS month, delay
FROM departureDelays
WHERE origin = "SEA"
)
PIVOT (
    CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
profile
Learning bunch, mostly computer and language

0개의 댓글