큰 테이블과 상대적으로 작은 테이블간 join할 때 사용
Large table <=> Fact table
Small table <=> Dimension table
Standalone Cluster 실행
$ ./sbin/start-master.sh
$ ./sbin/start-worker.sh spark://spark-master-01:7177
Spark Shell 실행
$ ./bin/spark-shell --master spark://spark-master-01:7177
- Fact table (Large table)....
scala> val flights = sc.parallelize(List(
| ("SEA", "JFK", "DL", "418", "7:00"),
| ("SFO", "LAX", "AA", "1250", "7:05"),
| ("SFO", "JFK", "VX", "12", "7:05"),
| ("JFK", "LAX", "DL", "424", "7:10"),
| ("LAX", "SEA", "DL", "5737", "7:10")))
flights: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23
- Dimension table #1 (Small table)....
scala> val airports = sc.makeRDD(List(
| ("JFK", "John F. Kennedy International Airport", "New York", "NY"),
| ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"),
| ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"),
| ("SFO", "San Francisco International Airport", "San Francisco", "CA")))
airports: org.apache.spark.rdd.RDD[(String, String, String, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:23
- Dimension table #2 (Small table)....
scala> val airlines = sc.parallelize(List(
| ("AA", "American Airlines"),
| ("DL", "Delta Airlines"),
| ("VX", "Virgin America")))
airlines: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala 결과 창에서 table들이 rdd에 즉 executor에 있음을 알 수 있다. broadcast를 하기 위해서는 driver -> executor로 전송하므로 table을 driver로 갖고 와야 한다.
// ("JFK", "John F. Kennedy International Airport", "New York", "NY")
scala> val airportsMap = sc.broadcast(airports.map{case (a, b, c, d) => (a, c)}.collectAsMap)
airportsMap: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(1)
scala> val airlinesMap = sc.broadcast(airlines.collectAsMap)
airlinesMap: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(3)
collect는 array 형태로 갖고 오고 collectAsMap은 map 형태 즉 (key, value) 형태(pair)로 가지고 온다. 따라서 collect와 달리 collectAsMap은 항상 사용할 수 없다.
// ("SEA", "JFK", "DL", "418", "7:00")
scala> flights.map{case (a, b, c, d, e) => (
| airportsMap.value.get(a).get,
| airportsMap.value.get(b).get,
| airlinesMap.value.get(c).get,
| d,
| e)}.collect.foreach(println)
(Seattle,New York,Delta Airlines,418,7:00)
(San Francisco,Los Angeles,American Airlines,1250,7:05)
(San Francisco,New York,Virgin America,12,7:05)
(New York,Los Angeles,Delta Airlines,424,7:10)
(Los Angeles,Seattle,Delta Airlines,5737,7:10)
flights
("SEA", "JFK", "DL", "418", "7:00")
airports
("JFK", "John F. Kennedy International Airport", "New York", "NY")
scala> val flightsJoin = flights.keyBy(_._1).join(airports.keyBy(_._1))
flightsJoin: org.apache.spark.rdd.RDD[(String, ((String, String, String, String, String), (String, String, String, String)))] = MapPartitionsRDD[9] at join at <console>:24
flights.keyBy(._1) -> ("SEA", ("SEA", "JFK", "DL", "418", "7:00"))
airports.keyBy(._1) -> ("JFK", ("JFK", "John F. Kennedy International Airport", "New York", "NY"))
scala> flightsJoin.collect.foreach(println)
(LAX,((LAX,SEA,DL,5737,7:10),(LAX,Los Angeles International Airport,Los Angeles,CA)))
(SFO,((SFO,LAX,AA,1250,7:05),(SFO,San Francisco International Airport,San Francisco,CA)))
(SFO,((SFO,JFK,VX,12,7:05),(SFO,San Francisco International Airport,San Francisco,CA)))
(SEA,((SEA,JFK,DL,418,7:00),(SEA,Seattle-Tacoma International Airport,Seattle,WA)))
(JFK,((JFK,LAX,DL,424,7:10),(JFK,John F. Kennedy International Airport,New York,NY)))
scala> val flights2 = flightsJoin.map(x => (x._2._2._3, x._2._1._2, x._2._1._3, x._2._1._4,\
x._2._1._5))
flights2: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[10] at map at <console>:23
scala> flights2.collect.foreach(println)
(Los Angeles,SEA,DL,5737,7:10)
(San Francisco,LAX,AA,1250,7:05)
(San Francisco,JFK,VX,12,7:05)
(Seattle,JFK,DL,418,7:00)
(New York,LAX,DL,424,7:10)
scala> val flightsJoin2 = flights2.keyBy(_._2).join(airports.keyBy(_._1))
flightsJoin2: org.apache.spark.rdd.RDD[(String, ((String, String, String, String, String), (String, String, String, String)))] = MapPartitionsRDD[15] at join at <console>:24
scala> flightsJoin2.collect.foreach(println)
(LAX,((San Francisco,LAX,AA,1250,7:05),(LAX,Los Angeles International Airport,Los Angeles,CA)))
(LAX,((New York,LAX,DL,424,7:10),(LAX,Los Angeles International Airport,Los Angeles,CA)))
(SEA,((Los Angeles,SEA,DL,5737,7:10),(SEA,Seattle-Tacoma International Airport,Seattle,WA)))
(JFK,((San Francisco,JFK,VX,12,7:05),(JFK,John F. Kennedy International Airport,New York,NY)))
(JFK,((Seattle,JFK,DL,418,7:00),(JFK,John F. Kennedy International Airport,New York,NY)))
scala> val flights3 = flightsJoin2.map(x => (x._2._1._1, x._2._2._3, x._2._1._3, x._2._1._4, x._2._1._5))
flights3: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[16] at map at <console>:23
scala> flights3.collect.foreach(println)
(San Francisco,Los Angeles,AA,1250,7:05)
(New York,Los Angeles,DL,424,7:10)
(Los Angeles,Seattle,DL,5737,7:10)
(San Francisco,New York,VX,12,7:05)
(Seattle,New York,DL,418,7:00)
scala> val flightsJoin3 = flights3.keyBy(_._3).join(airlines.keyBy(_._1))
flightsJoin3: org.apache.spark.rdd.RDD[(String, ((String, String, String, String, String), (String, String)))] = MapPartitionsRDD[21] at join at <console>:24
scala> flightsJoin3.collect.foreach(println)
(DL,((New York,Los Angeles,DL,424,7:10),(DL,Delta Airlines)))
(DL,((Los Angeles,Seattle,DL,5737,7:10),(DL,Delta Airlines)))
(DL,((Seattle,New York,DL,418,7:00),(DL,Delta Airlines)))
(AA,((San Francisco,Los Angeles,AA,1250,7:05),(AA,American Airlines)))
(VX,((San Francisco,New York,VX,12,7:05),(VX,Virgin America)))
scala> val flights4 = flightsJoin3.map(x => (x._2._1._1, x._2._1._2, x._2._2._2, x._2._1._4, x._2._1._5))
flights4: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[22] at map at <console>:23
scala> flights4.collect.foreach(println)
(New York,Los Angeles,Delta Airlines,424,7:10)
(Los Angeles,Seattle,Delta Airlines,5737,7:10)
(Seattle,New York,Delta Airlines,418,7:00)
(San Francisco,Los Angeles,American Airlines,1250,7:05)
(San Francisco,New York,Virgin America,12,7:05)
scala> flights.
| keyBy(_._1).join(airports.keyBy(_._1)).
| map(x => (x._2._2._3, x._2._1._2, x._2._1._3, x._2._1._4, x._2._1._5)).
| keyBy(_._2).join(airports.keyBy(_._1)).
| map(x => (x._2._1._1, x._2._2._3, x._2._1._3, x._2._1._4, x._2._1._5)).
| keyBy(_._3).join(airlines.keyBy(_._1)).
| map(x => (x._2._1._1, x._2._1._2, x._2._2._2, x._2._1._4, x._2._1._5)).
| collect.foreach(println)
(New York,Los Angeles,Delta Airlines,424,7:10)
(Los Angeles,Seattle,Delta Airlines,5737,7:10)
(Seattle,New York,Delta Airlines,418,7:00)
(San Francisco,Los Angeles,American Airlines,1250,7:05)
(San Francisco,New York,Virgin America,12,7:05)
shuffle이 일어나지 않는 map side join이 shuffle이 일어나는 join에 비해 실행 속도가 빠르다
Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,1343,1325,1451,1435,WN,588,N240WN,68,70,55,16,18,HOU,LIT,393,4,9,0,,0,16,0,0,0,0
2008,1,3,4,1125,1120,1247,1245,WN,1343,N523SW,82,85,71,2,5,HOU,MAF,441,3,8,0,,0,NA,NA,NA,NA,NA
...
scala> val flights_air = sc.textFile("/skybluelee/data/airline_on_time/2008.csv").
| filter(!_.startsWith("Year")).
| map{x =>
| val arr = x.split(",")
| val origin = arr(16)
| val dest = arr(17)
| val uniquecarrier = arr(8)
| val flightnum = arr(9)
| val deptime = arr(4)
| (origin, dest, uniquecarrier, flightnum, deptime)
| }
flights_air: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[37] at map at <console>:25
filter(!_.startsWith("Year")) "Year"로 시작하지 않는값만 얻음 -> 첫번째 라인 필터링
"iata","airport","city","state","country","lat","long"
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778
...
scala> val airports_air = sc.textFile("/skybluelee/data/airline_on_time_ext/airports.csv").
| map(_.replaceAll("\"", "")).
| filter(!_.startsWith("iata")).
| map{x =>
| val arr = x.split(",")
| val iata = arr(0)
| val airport = arr(1)
| val city = arr(2)
| val state = arr(3)
| (iata, airport, city, state)
| }
airports_air: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[42] at map at <console>:26
"Thigpen " 의 경우 str임을 나타내기 위해 "를 사용하였지만 사용하는 입장에서는 불필요하다. 따라서 "를 제거하기 위해 replace 함수를 사용한다.
Code,Description
"02Q","Titan Airways"
"04Q","Tradewind Aviation"
"05Q","Comlux Aviation, AG"
...
scala> val airlines_air = sc.textFile("/skybluelee/data/airline_on_time_ext/carriers.csv").
| map(_.replaceAll("\"", "")).
| filter(!_.startsWith("Code")).
| map{x =>
| val arr = x.split(",")
| val code = arr(0)
| val description = arr(1)
| (code, description)
| }
airlines_air: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[47] at map at <console>:26
scala> val airportsMap_air = sc.broadcast(airports_air.map{case (a, b, c, d) => (a, c)}.collectAsMap)
airportsMap_air: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(19)
scala> val airlinesMap_air = sc.broadcast(airlines_air.collectAsMap)
airlinesMap_air: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(21)
scala> flights_air.map{case (a, b, c, d, e) => (
| airportsMap_air.value.get(a).get,
| airportsMap_air.value.get(b).get,
| airlinesMap_air.value.get(c).get,
| d,
| e)}.count()
res2: Long = 2389217
scala> flights_air.map{case (a, b, c, d, e) => (
| airportsMap_air.value.get(a).get,
| airportsMap_air.value.get(b).get,
| airlinesMap_air.value.get(c).get,
| d,
| e)}.take(5).foreach(println)
(Houston,Little Rock,Southwest Airlines Co.,588,1343)
(Houston,Midland,Southwest Airlines Co.,1343,1125)
(Houston,Midland,Southwest Airlines Co.,3841,2009)
(Houston,Orlando,Southwest Airlines Co.,3,903)
(Houston,Orlando,Southwest Airlines Co.,25,1423)
Map Side-Join이 RDD Join보다 더 빠르다. RDD Join을 사용해야 하는 경우를 제외하고는 sql이나 map side-join을 사용할 것.
scala> val flightsDF_air = flights_air.toDF("origin", "destination", "airline", "flight_num", "flight_time")
flightsDF_air: org.apache.spark.sql.DataFrame = [origin: string, destination: string ... 3 more fields]
scala> val airportsDF_air = airports_air.toDF("code", "name", "city", "state")
airportsDF_air: org.apache.spark.sql.DataFrame = [code: string, name: string ... 2 more fields]
scala> val airlinesDF_air = airlines_air.toDF("code", "name")
airlinesDF_air: org.apache.spark.sql.DataFrame = [code: string, name: string]
scala> flightsDF_air.as("a").
| join(broadcast(airportsDF_air.as("b")), $"a.origin" === $"b.code").
| join(broadcast(airportsDF_air.as("c")), $"a.destination" === $"c.code").
| join(broadcast(airlinesDF_air.as("d")), $"a.airline" === $"d.code").
| select($"b.city".as("origin"), $"c.city".as("destination"), $"d.name".as("airline"), $"a.flight_num", $"a.flight_time").
| show(false)
+-------+-----------+----------------------+----------+-----------+
|origin |destination|airline |flight_num|flight_time|
+-------+-----------+----------------------+----------+-----------+
|Houston|Little Rock|Southwest Airlines Co.|588 |1343 |
|Houston|Midland |Southwest Airlines Co.|1343 |1125 |
|Houston|Midland |Southwest Airlines Co.|3841 |2009 |
|Houston|Orlando |Southwest Airlines Co.|3 |903 |
|Houston|Orlando |Southwest Airlines Co.|25 |1423 |
|Houston|Orlando |Southwest Airlines Co.|51 |2024 |
|Houston|Orlando |Southwest Airlines Co.|940 |1753 |
|Houston|Orlando |Southwest Airlines Co.|2621 |622 |
|Houston|Chicago |Southwest Airlines Co.|389 |1944 |
|Houston|Chicago |Southwest Airlines Co.|519 |1453 |
|Houston|Chicago |Southwest Airlines Co.|894 |2030 |
|Houston|Chicago |Southwest Airlines Co.|969 |708 |
|Houston|Chicago |Southwest Airlines Co.|2174 |1749 |
|Houston|Chicago |Southwest Airlines Co.|2445 |1217 |
|Houston|Chicago |Southwest Airlines Co.|2974 |954 |
|Houston|New Orleans|Southwest Airlines Co.|41 |1758 |
.show()는 항상 20개를 보여줌