Spark SQL

이상민·2023년 3월 27일
0

spark

목록 보기
13/17
scala> val flights = sc.parallelize(List(
     |   ("SEA", "JFK", "DL", "418",  "7:00"),
     |   ("SFO", "LAX", "AA", "1250", "7:05"),
     |   ("SFO", "JFK", "VX", "12",   "7:05"),
     |   ("JFK", "LAX", "DL", "424",  "7:10"),
     |   ("LAX", "SEA", "DL", "5737", "7:10")))
flights: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> val airports = sc.makeRDD(List(
     |   ("JFK", "John F. Kennedy International Airport", "New York", "NY"),
     |   ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"),
     |   ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"),
     |   ("SFO", "San Francisco International Airport", "San Francisco", "CA")))
airports: org.apache.spark.rdd.RDD[(String, String, String, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:23

scala> val airlines = sc.parallelize(List(
     |   ("AA", "American Airlines"),
     |   ("DL", "Delta Airlines"),
     |   ("VX", "Virgin America")))
airlines: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:23

기존의 RDD를 DF으로 transform이 가능

scala> val flightsDF = flights.toDF("origin", "destination", "airline", "flight_num", "flight_time")
flightsDF: org.apache.spark.sql.DataFrame = [origin: string, destination: string ... 3 more fields]

scala> val airportsDF = airports.toDF("code", "name", "city", "state")
airportsDF: org.apache.spark.sql.DataFrame = [code: string, name: string ... 2 more fields]

scala> val airlinesDF = airlines.toDF("code", "name")
airlinesDF: org.apache.spark.sql.DataFrame = [code: string, name: string]
scala> val flightsDF = flights.toDF("origin", "destination", "airline", "flight_num", "flight_time")
flightsDF: org.apache.spark.sql.DataFrame = [origin: string, destination: string ... 3 more fields]

scala> val airportsDF = airports.toDF("code", "name", "city", "state")
airportsDF: org.apache.spark.sql.DataFrame = [code: string, name: string ... 2 more fields]

scala> val airlinesDF = airlines.toDF("code", "name")
airlinesDF: org.apache.spark.sql.DataFrame = [code: string, name: string]

scala> flightsDF.as("a").
     |   join(broadcast(airportsDF.as("b")), $"a.origin" === $"b.code").
     |   join(broadcast(airportsDF.as("c")), $"a.destination" === $"c.code").
     |   join(broadcast(airlinesDF.as("d")), $"a.airline" === $"d.code").
     |   select($"b.city".as("origin"), $"c.city".as("destination"), $"d.name".as("airline"), $"a.flight_num", $"a.flight_time").
     |   show()
+-------------+-----------+-----------------+----------+-----------+
|       origin|destination|          airline|flight_num|flight_time|
+-------------+-----------+-----------------+----------+-----------+
|      Seattle|   New York|   Delta Airlines|       418|       7:00|
|San Francisco|Los Angeles|American Airlines|      1250|       7:05|
|San Francisco|   New York|   Virgin America|        12|       7:05|
|     New York|Los Angeles|   Delta Airlines|       424|       7:10|
|  Los Angeles|    Seattle|   Delta Airlines|      5737|       7:10|
+-------------+-----------+-----------------+----------+-----------+

broadcast: shuffle을 하지 않게 만드는 용도
Spark shell web UI: http://spark-master-01:4040/ 의 SQL에서 실행 과정 확인 가능
Spark SQL은 adaptive enabled가 true이므로

spark.conf.set("spark.sql.adaptive.enabled", false) 로 해제 가능

최적의 방향으로 작동 방식을 변경함. broadcast를 요청하지 않더라도 broadcast를 수행

0개의 댓글