scala> val flights = sc.parallelize(List(
| ("SEA", "JFK", "DL", "418", "7:00"),
| ("SFO", "LAX", "AA", "1250", "7:05"),
| ("SFO", "JFK", "VX", "12", "7:05"),
| ("JFK", "LAX", "DL", "424", "7:10"),
| ("LAX", "SEA", "DL", "5737", "7:10")))
flights: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> val airports = sc.makeRDD(List(
| ("JFK", "John F. Kennedy International Airport", "New York", "NY"),
| ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"),
| ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"),
| ("SFO", "San Francisco International Airport", "San Francisco", "CA")))
airports: org.apache.spark.rdd.RDD[(String, String, String, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:23
scala> val airlines = sc.parallelize(List(
| ("AA", "American Airlines"),
| ("DL", "Delta Airlines"),
| ("VX", "Virgin America")))
airlines: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:23
기존의 RDD를 DF으로 transform이 가능
scala> val flightsDF = flights.toDF("origin", "destination", "airline", "flight_num", "flight_time")
flightsDF: org.apache.spark.sql.DataFrame = [origin: string, destination: string ... 3 more fields]
scala> val airportsDF = airports.toDF("code", "name", "city", "state")
airportsDF: org.apache.spark.sql.DataFrame = [code: string, name: string ... 2 more fields]
scala> val airlinesDF = airlines.toDF("code", "name")
airlinesDF: org.apache.spark.sql.DataFrame = [code: string, name: string]
scala> val flightsDF = flights.toDF("origin", "destination", "airline", "flight_num", "flight_time")
flightsDF: org.apache.spark.sql.DataFrame = [origin: string, destination: string ... 3 more fields]
scala> val airportsDF = airports.toDF("code", "name", "city", "state")
airportsDF: org.apache.spark.sql.DataFrame = [code: string, name: string ... 2 more fields]
scala> val airlinesDF = airlines.toDF("code", "name")
airlinesDF: org.apache.spark.sql.DataFrame = [code: string, name: string]
scala> flightsDF.as("a").
| join(broadcast(airportsDF.as("b")), $"a.origin" === $"b.code").
| join(broadcast(airportsDF.as("c")), $"a.destination" === $"c.code").
| join(broadcast(airlinesDF.as("d")), $"a.airline" === $"d.code").
| select($"b.city".as("origin"), $"c.city".as("destination"), $"d.name".as("airline"), $"a.flight_num", $"a.flight_time").
| show()
+-------------+-----------+-----------------+----------+-----------+
| origin|destination| airline|flight_num|flight_time|
+-------------+-----------+-----------------+----------+-----------+
| Seattle| New York| Delta Airlines| 418| 7:00|
|San Francisco|Los Angeles|American Airlines| 1250| 7:05|
|San Francisco| New York| Virgin America| 12| 7:05|
| New York|Los Angeles| Delta Airlines| 424| 7:10|
| Los Angeles| Seattle| Delta Airlines| 5737| 7:10|
+-------------+-----------+-----------------+----------+-----------+
broadcast: shuffle을 하지 않게 만드는 용도
Spark shell web UI: http://spark-master-01:4040/ 의 SQL에서 실행 과정 확인 가능
Spark SQL은 adaptive enabled가 true이므로
spark.conf.set("spark.sql.adaptive.enabled", false) 로 해제 가능
최적의 방향으로 작동 방식을 변경함. broadcast를 요청하지 않더라도 broadcast를 수행