Spark 개념

Spark 란

  • Hadoop : HDFS + MapReduce

    • 병렬처리(parallezation)
    • 데이터 분산(Distribution)
    • 장애 내성(Fault Tolerance) -> 복제
    • Job --> MapReduce
      [Job 1] -------------> [Job2]
      -> HDFS 저장
  • Spark :
    • 병렬처리

    • 데이터 분산

    • 인 메모리 데이터 처리

    • 장애 내성(Fault Tolerance) -> RDD

    • JVM Object 중 Distribute Collection(Map,Set,, )

    • 함수지향적 언어 (scalar, python,,, java)

    • Funtional Operator (map, filter, ,,,,, )

    • Job --> spark (In-Memroy 실행 모델) : Hadoop100 배의 성능 처리
      [Job 1] -------------> [Job2]
      -> In-Memory 유지

    • 스파크쉘(spark-shell) <-- REPL (Read-Eval-Print Loop)
      - scalar,python,r,java,sql

    • 스파크 라이브러리
      - Spark Sql , Spark Stream, MLib, GraphX

    • 클러스터 매니저(Cluster Manager)
      - Standalone , Mesos, Yarn

  • Data 처리 그릇 : RDD , DataFrame, DataSet(
    • RDD(Resilient Distributed Data)
      - Resilient (Data 복구)
      - Lineage (Data 혈통/족보) --> Data 의 처리/연산 순서
      - 일정한 방향성을 Job 실행
      - RDD Lineage는 DAG(Directed Acyclic Graph)
      - RDD 동작 원리의 핵심은 Lazy Evaluation

      RDD 처리 연산
      - transform[변환] : RDD[n] 생성 => filter, map
      - action(행동): 계산결과 반환, RDD 요소의 특정 작업 => count, foreach

    • DataFrame
      - (Pandas,R, Excel Sheet)
      - 행/열 Data
      - column을 가질수 있다.

    • DataSet
      - Data 의 Schema(구조)를 포함
      - 행/열 Data
      - column을 가질수 있다.

Scala 실습

  • 함수형 언어
	- HelloWorldObject 테스트 
	wget https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz
	tar -xvzf scala-2.12.10.tgz
	cp -r scala-2.12.10 ../scala-2.12.10

	vi .bashrc
	export SCALA_HOME=/home/hadoop/scala-2.12.10
	export PATH=$PATH:~~~~~~:$SCALA_HOME/bin

	cat > HelloWorldObject.scala
	// main 함수를 생성App 을 상속하여 실행하는 방법 
	object HelloWorldObject {
    def main(args: Array[String]): Unit = {
    	println("Hello World main")
	scalac HelloWorldObject.scala
	scala HelloWorldObject
	- 객체
	- 자료형
	- 문자열
	- 변수	-> var, val

	- 함수	-> def 함수( ) = { }
		def add(x: Int, y: Int): Int = { return x + y }

	- 람다함수
		(x: Int, y: Int): Int => x + y
		def exec(f(x: Int, y: Int): Int => x + y, 2, 3

		def exec(f: (Int, Int) => Int, x: Int, y: Int) = f(x, y)
		exec((x: Int, y: Int) => x + y, 2, 3)

 	- 커링 -
 	(Quiz) 6의 배수 확인법

	 1) 6의 배수인지 -> 6으로 나누기 

 	def modN(n:Int, x:Int) = ((x%n) ==0) 
 	def modN(n:Int)(x:Int) = ((x%n) ==0) 
 	def modSix:Int => Boolean = modN(6)

 	2) 6의 배수인지 ->2로, 3으로 나누기
 	def modN(n1:Int, n2:Int, x:Int) = ( ((x%n1)==0) &&((x%n2) ==0) ) 
 	def modN(n1:Int, n2:Int) (x:Int) = ( ((x%n1)==0) &&((x%n2) ==0) ) 
 	def modSix:Int => Boolean = modN(3,2)
 	def modN(n1:Int) (n2:Int) (x:Int) = ( ((x%n1)==0) &&((x%n2) ==0) ) 
 	def modSix:Int => Boolean = modN(3)(2)

 	- 420 = 3 4 5 7
 	(Quiz) 420의 x,y 배수 확인법
 	def modN(n:Int, x:Int, y:Int) = ( ((n%x)==0) &&((n%y) ==0) ) 
	- 클래스 -
	   - 멤버변수, 메소드, 메소드 오버라이드, 생성자
	   - 상속, 추상(abstract) 클래스
	   - 봉인(sealed) 클래스
	- 콜렉션

	   - 배열(array), 
	   - 리스트(list), 셋(set), 튜플(tuple), 
	   - 맵(map) : key,value 형태

	- 반복문 : for , 조건식, do..while, while

	- 정렬, 그룹핑, 필터링 함수

	   - map
	   - reduce, fold
	   - groupBy
	   - filter
	   - zip
	   - mapValues
	   - sort

버전 확인 
 >  scala.util.Properties.versionString
	- String = version 2.12.10

Spark RDD 실습

[scala : 실습예제 01 : LICENSE파일에 BSD 몇 개있어요? ]

/ spark-shell /

	-> org.apache.spark.SparkContext = org.apache.spark.SparkContext@64027866

val licLines = sc.textFile("/home/hadoop/spark-3.1.2/LICENSE")
val lineCnt = licLines.count

val bsdLines = licLines.filter(line => line.contains("BSD"))

def isBSD(line: String) = { line.contains("BSD") }
val isBSD = (line: String) => line.contains("BSD")
val bsdLines1 = licLines.filter(isBSD)
bsdLines.foreach(bLine => println(bLine))

[scala : 실습예제 02 : map 변환 연산자 ]

//# def map[B](f: (A) ⇒ B): Traversable[B]
//# def map[U](f: (T) => U): RDD[U]

val numbers = sc.parallelize(10 to 50 by 10)
numbers.foreach(x => println(x))
val numbersSquared = numbers.map(num => num * num)
numbersSquared.foreach(x => println(x))

val reversed = numbersSquared.map(x => x.toString.reverse)
reversed.foreach(x => println(x))

# _ -> 밑줄문자 : placeholder (자기객체의 요소)
val alsoReversed = numbersSquared.map(_.toString.reverse)
alsoReversed.foreach(x => println(x))

[scala : 실습예제 03 : client-ids.log 파일의 변환 ]

$ echo "15,16,20,20,77,80,94,94,98,16,31,31,15,20" > client-ids.log

val lines = sc.textFile("/home/hadoop/data/client-ids.log")

val idsStr = lines.map(line => line.split(","))



//# def flatMap[U](f: (T) => TraversableOnce[U]): RDD[U]
//  flatMap <-- 단일 배열(array) 화 함수

val ids = lines.flatMap(_.split(","))



ids.collect.mkString("; ")

val intIds = ids.map(_.toInt)

--> 일정기간 접속한 id는 몇명 ?

//# def distinct(): RDD[T]

val uniqueIds = intIds.distinct
val finalCount  = uniqueIds.count

val transactionCount = ids.count

//# def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

val s = uniqueIds.sample(false, 0.3)

val swr = uniqueIds.sample(true, 0.5)

//# def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

val taken = uniqueIds.takeSample(false, 5)

//Implicit conversion: 클래스, 묵시적 형변환 함수 선언

class ClassOne[T](val input: T) { }
class ClassOneStr(val one: ClassOne[String]) {
    def duplicatedString() = one.input + one.input
class ClassOneInt(val one: ClassOne[Int]) {
    def duplicatedInt() = one.input.toString + one.input.toString
implicit def toStrMethods(one: ClassOne[String]) = new ClassOneStr(one)
implicit def toIntMethods(one: ClassOne[Int]) = new ClassOneInt(one)

val oneStrTest = new ClassOne("test")
val oneIntTest = new ClassOne(123)

double RDD 함수를 통한 기초통계량



히스토그램 histogram --> hist

intIds.histogram(Array(1.0, 50.0, 100.0))

PairRDD: [ key, value ] <- map(java,scala), dictionary(python)

[파일] data_products.txt
[파일] data_transactions.txt
--> 2015-03-30#12:46 AM#51#50#6#7501.89
--> 구매날짜#구매시간#고객ID#상품ID#수량#구매금액

==> 1) 최대횟수로 구매고객에게 곰인형 사은품 증정하기

--> 고객ID별 총 구매금액

val tranFile = sc.textFile("/home/hadoop/data/data_transactions.txt")
val tranData = tranFile.map(_.split("#"))
var transByCust = tranData.map(tran => (tran(2).toInt, tran))


#. 구매횟수가 가장 많았던 고객의 ID 와 구매숫자

val (cid, purch) = transByCust.countByKey().toSeq.sortBy(_._2).last
#. 구매횟수가 가장 많았던 고객에게 곰인형 사은품을 보낸 내용 기록
var complTrans = Array(Array("2015-03-30", "11:59 PM", "53", "4", "1", "0.00"))

#. 53번 고객의 모든 구매기록 출력


transByCust.lookup(53).foreach(tran => println(tran.mkString(", ")))

==> 2) Barbie Shopping Mall Playset 를 2개 이상 구매한 고객에게 5% 할인
--> 판매품 25#Barbie Shopping Mall Playset#437.5#9
--> 구매목록 2015-03-30#12:46 AM#51#50#6#7501.89

transByCust = transByCust.mapValues(tran => {
     if(tran(3).toInt == 25 && tran(4).toDouble > 1)
         tran(5) = (tran(5).toDouble * 0.95).toString
     tran })

==> 3) 81번 Dictionary 상품을 5권 이상 구매한 고객에게 70번 상품 칫솔을 보내기
--> 판매품 81#Dictionary#29.65#4

transByCust = transByCust.flatMapValues(tran => {
    if(tran(3).toInt == 81 && tran(4).toInt >= 5) {
       val cloned = tran.clone()
       cloned(5) = "0.00"; cloned(3) = "70"; cloned(4) = "1";
       List(tran, cloned)

#. reduceByKey, foldByKey
==> 3) 최대 구매금액을 가진 고객을 찾아라

--> 구매목록 2015-03-30#12:46 AM#51#50#6#7501.89

val amounts = transByCust.mapValues(t => t(5).toDouble)
val totals = amounts.foldByKey(0)((p1, p2) => p1 + p2).collect()
amounts.foldByKey(100000)((p1, p2) => p1 + p2).collect()

complTrans = complTrans :+ Array("2015-03-30", "11:59 PM", "76", "63", "1", "0.00")

transByCust = transByCust.union(sc.parallelize(complTrans).map(t => (t(2).toInt, t)))
transByCust.map(t => t._2.mkString("#")).saveAsTextFile("output-transByCust")

#. aggregateByKey

==> 4) 고객별 구매목록

val prods = transByCust.aggregateByKey(List[String]())(
   (prods, tran) => prods ::: List(tran(3)),
   (prods1, prods2) => prods1 ::: prods2)

wiki 데이터 사용 실습

  1. wiki 관련 데이터 download
    curl https://dumps.wikimedia.org/kowiki/latest/kowiki-latest-all-titles-in-ns0.gz | zcat > kowiki_titles
  2. 히틀러라는 단어 grep
    cat kowiki_titles | grep "히틀러"


kowiki_titles 파일에서 히틀러라는 단어 grep

  1. var lines = sc.textFile("/home/hadoop/data/kowiki_titles")

  2. var words= lines.flatMap(x=>x.split("_"))

  3. var filterTest = words.filter(line=>line.contains("히틀러"))

  4. var wordMap = filterTest.map(w=>(w,1))
    var result = wordMap.reduceByKey((a,b)=>a+b) # 누적시키며 count


kowiki_titles 파일에서 히틀러라는 단어 grep

text_file = sc.textFile("/home/hadoop/data/kowiki_titles", use_unicode=True)

counts=text_file.flatMap(lambda line: str(line).split("_")) \
             .filter(lambda x: "히틀러" in x) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)



-- (11) -------------------------------------------------------------------------------

import spark.implicits._

val itPostsRows = sc.textFile("/home/hadoop/data/italianPosts.csv")
val itPostsSplit = itPostsRows.map(x => x.split("~"))

val itPostsRDD = itPostsSplit.map(x => (x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12)))
val itPostsDFrame = itPostsRDD.toDF()

val itPostsDF = itPostsRDD.toDF("commentCount", "lastActivityDate", "ownerUserId", "body", "score", "creationDate", "viewCount", "title", "tags", "answerCount", "acceptedAnswerId", "postTypeId", "id")


import java.sql.Timestamp
case class Post (commentCount:Option[Int], lastActivityDate:Option[java.sql.Timestamp],
  ownerUserId:Option[Long], body:String, score:Option[Int], creationDate:Option[java.sql.Timestamp],
  viewCount:Option[Int], title:String, tags:String, answerCount:Option[Int],
  acceptedAnswerId:Option[Long], postTypeId:Option[Long], id:Long)

object StringImplicits {
   implicit class StringImprovements(val s: String) {
      import scala.util.control.Exception.catching
      def toIntSafe = catching(classOf[NumberFormatException]) opt s.toInt
      def toLongSafe = catching(classOf[NumberFormatException]) opt s.toLong
      def toTimestampSafe = catching(classOf[IllegalArgumentException]) opt Timestamp.valueOf(s)

import StringImplicits._
def stringToPost(row:String):Post = {
  val r = row.split("~")
val itPostsDFCase = itPostsRows.map(x => stringToPost(x)).toDF()

import org.apache.spark.sql.types._
val postSchema = StructType(Seq(
  StructField("commentCount", IntegerType, true),
  StructField("lastActivityDate", TimestampType, true),
  StructField("ownerUserId", LongType, true),
  StructField("body", StringType, true),
  StructField("score", IntegerType, true),
  StructField("creationDate", TimestampType, true),
  StructField("viewCount", IntegerType, true),
  StructField("title", StringType, true),
  StructField("tags", StringType, true),
  StructField("answerCount", IntegerType, true),
  StructField("acceptedAnswerId", LongType, true),
  StructField("postTypeId", LongType, true),
  StructField("id", LongType, false))
import org.apache.spark.sql.Row
def stringToRow(row:String):Row = {
  val r = row.split("~")
val rowRDD = itPostsRows.map(row => stringToRow(row))
val itPostsDFStruct = spark.createDataFrame(rowRDD, postSchema)

-- (12) ------------------------------------------------------------------------------------

val postsDf = itPostsDFStruct
val postsIdBody = postsDf.select("id", "body")

val postsIdBody = postsDf.select(postsDf.col("id"), postsDf.col("body"))
val postsIdBody = postsDf.select(Symbol("id"), Symbol("body"))
val postsIdBody = postsDf.select('id, 'body)
val postsIdBody = postsDf.select($"id", $"body")

val postIds = postsIdBody.drop("body")

postsIdBody.filter('body contains "Italiano").count()

val noAnswer = postsDf.filter(('postTypeId === 1) and ('acceptedAnswerId isNull))

val firstTenQs = postsDf.filter('postTypeId === 1).limit(10)
val firstTenQsRn = firstTenQs.withColumnRenamed("ownerUserId", "owner")

postsDf.filter('postTypeId === 1).withColumn("ratio", 'viewCount / 'score).where('ratio < 35).show()

//The 10 most recently modified questions:
postsDf.filter('postTypeId === 1).orderBy('lastActivityDate desc).limit(10).show

-- (13) ------------------------------------------------------------------------------------

import org.apache.spark.sql.functions._
postsDf.filter('postTypeId === 1).withColumn("activePeriod", datediff('lastActivityDate, 'creationDate)).orderBy('activePeriod desc).head.getString(3).replace("&lt;","<").replace("&gt;",">")
//res0: String = <p>The plural of <em>braccio</em> is <em>braccia</em>, and the plural of <em>avambraccio</em> is <em>avambracci</em>.</p><p>Why are the plural of those words so different, if they both are referring to parts of the human body, and <em>avambraccio</em> derives from <em>braccio</em>?</p>

postsDf.select(avg('score), max('score), count('score)).show

import org.apache.spark.sql.expressions.Window
postsDf.filter('postTypeId === 1).select('ownerUserId, 'acceptedAnswerId, 'score, max('score).over(Window.partitionBy('ownerUserId)) as "maxPerUser").withColumn("toMax", 'maxPerUser - 'score).show(10)

postsDf.filter('postTypeId === 1).select('ownerUserId, 'id, 'creationDate, lag('id, 1).over(Window.partitionBy('ownerUserId).orderBy('creationDate)) as "prev", lead('id, 1).over(Window.partitionBy('ownerUserId).orderBy('creationDate)) as "next").orderBy('ownerUserId, 'id).show()

val countTags = udf((tags: String) => "&lt;".r.findAllMatchIn(tags).length)
val countTags = spark.udf.register("countTags", (tags: String) => "&lt;".r.findAllMatchIn(tags).length)
postsDf.filter('postTypeId === 1).select('tags, countTags('tags) as "tagCnt").show(10, false)

-- (14) ------------------------------------------------------------------------------------
val cleanPosts = postsDf.na.drop()

postsDf.na.fill(Map("viewCount" -> 0))

val postsDfCorrected = postsDf.na.replace(Array("id", "acceptedAnswerId"), Map(1177 -> 3000))

-- (15) ------------------------------------------------------------------------------------
val postsRdd = postsDf.rdd

val postsMapped = postsDf.rdd.map(row => Row.fromSeq(
  row.toSeq.updated(3, row.getString(3).replace("&lt;","<").replace("&gt;",">")).
    updated(8, row.getString(8).replace("&lt;","<").replace("&gt;",">"))))
val postsDfNew = spark.createDataFrame(postsMapped, postsDf.schema)

-- (16) ------------------------------------------------------------------------------------
postsDfNew.groupBy('ownerUserId, 'tags, 'postTypeId).count.orderBy('ownerUserId desc).show(10)

postsDfNew.groupBy('ownerUserId).agg(max('lastActivityDate), max('score)).show(10)
postsDfNew.groupBy('ownerUserId).agg(Map("lastActivityDate" -> "max", "score" -> "max")).show(10)

postsDfNew.groupBy('ownerUserId).agg(max('lastActivityDate), max('score).gt(5)).show(10)

val smplDf = postsDfNew.where('ownerUserId >= 13 and 'ownerUserId <= 15)
smplDf.groupBy('ownerUserId, 'tags, 'postTypeId).count.show()

smplDf.rollup('ownerUserId, 'tags, 'postTypeId).count.show()

smplDf.cube('ownerUserId, 'tags, 'postTypeId).count.show()

spark.sql("SET spark.sql.caseSensitive=true")
spark.conf.set("spark.sql.caseSensitive", "true")

-- (17) ------------------------------------------------------------------------------------
val itVotesRaw = sc.textFile("/home/hadoop/data/talianVotes.csv").map(x => x.split("~"))
val itVotesRows = itVotesRaw.map(row => Row(row(0).toLong, row(1).toLong, row(2).toInt, Timestamp.valueOf(row(3))))
val votesSchema = StructType(Seq(
  StructField("id", LongType, false),
  StructField("postId", LongType, false),
  StructField("voteTypeId", IntegerType, false),
  StructField("creationDate", TimestampType, false))
val votesDf = spark.createDataFrame(itVotesRows, votesSchema)

val postsVotes = postsDf.join(votesDf, postsDf("id") === votesDf("postId"))
val postsVotesOuter = postsDf.join(votesDf, postsDf("id") === votesDf("postId"), "outer")

-- (31) ------------------------------------------------------------------------------------


-- (32) ------------------------------------------------------------------------------------
val resultDf = sql("select * from posts")

spark-sql> select substring(title, 0, 70) from posts where postTypeId = 1 order by creationDate desc limit 3;
$ spark-sql -e "select substring(title, 0, 70) from posts where postTypeId = 1 order by creationDate desc limit 3"

-- (42) ------------------------------------------------------------------------------------

sql("select * from postsjson")

val props = new java.util.Properties()
props.setProperty("user", "user")
props.setProperty("password", "password")
postsDf.write.jdbc("jdbc:postgresql://postgresrv/mydb", "posts", props)

-- (43) ------------------------------------------------------------------------------------
val postsDf = spark.read.table("posts")
val postsDf = spark.table("posts")

val result = spark.read.jdbc("jdbc:postgresql://postgresrv/mydb", "posts", Array("viewCount > 3"), props)

sql("CREATE TEMPORARY TABLE postsjdbc "+
  "USING org.apache.spark.sql.jdbc "+
  "OPTIONS ("+
    "url 'jdbc:postgresql://postgresrv/mydb',"+
    "dbtable 'posts',"+
    "user 'user',"+
    "password 'password')")

sql("CREATE TEMPORARY TABLE postsParquet "+
  "USING org.apache.spark.sql.parquet "+
  "OPTIONS (path '/path/to/parquet_file')")
val resParq = sql("select * from postsParquet")

-- (55) ------------------------------------------------------------------------------------

val postsFiltered = postsDf.filter('postTypeId === 1).withColumn("ratio", 'viewCount / 'score).where('ratio < 35)



from __future__ import print_function

-- (11) ------------------------------------------------------------------------------------
itPostsRows = sc.textFile("home/hadoop/data/italianPosts.csv")
itPostsSplit = itPostsRows.map(lambda x: x.split("~"))

itPostsRDD = itPostsSplit.map(lambda x: (x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12]))
itPostsDFrame = itPostsRDD.toDF()
# +---+--------------------+---+--------------------+---+--------------------+----+--------------------+--------------------+----+----+---+----+
# | _1|                  _2| _3|                  _4| _5|                  _6|  _7|                  _8|                  _9| _10| _11|_12| _13|
# +---+--------------------+---+--------------------+---+--------------------+----+--------------------+--------------------+----+----+---+----+
# |  4|2013-11-11 18:21:...| 17|&lt;p&gt;The infi...| 23|2013-11-10 19:37:...|null|                    |                    |null|null|  2|1165|
# |  5|2013-11-10 20:31:...| 12|&lt;p&gt;Come cre...|  1|2013-11-10 19:44:...|  61|Cosa sapreste dir...| &lt;word-choice&gt;|   1|null|  1|1166|
# |  2|2013-11-10 20:31:...| 17|&lt;p&gt;Il verbo...|  5|2013-11-10 19:58:...|null|                    |                    |null|null|  2|1167|
# |  1|2014-07-25 13:15:...|154|&lt;p&gt;As part ...| 11|2013-11-10 22:03:...| 187|Ironic constructi...|&lt;english-compa...|   4|1170|  1|1168|
# |  0|2013-11-10 22:15:...| 70|&lt;p&gt;&lt;em&g...|  3|2013-11-10 22:15:...|null|                    |                    |null|null|  2|1169|
# |  2|2013-11-10 22:17:...| 17|&lt;p&gt;There's ...|  8|2013-11-10 22:17:...|null|                    |                    |null|null|  2|1170|
# |  1|2013-11-11 09:51:...| 63|&lt;p&gt;As other...|  3|2013-11-11 09:51:...|null|                    |                    |null|null|  2|1171|
# |  1|2013-11-12 23:57:...| 63|&lt;p&gt;The expr...|  1|2013-11-11 10:09:...|null|                    |                    |null|null|  2|1172|
# |  9|2014-01-05 11:13:...| 63|&lt;p&gt;When I w...|  5|2013-11-11 10:28:...| 122|Is &quot;scancell...|&lt;usage&gt;&lt;...|   3|1181|  1|1173|
# |  0|2013-11-11 10:58:...| 18|&lt;p&gt;Wow, wha...|  5|2013-11-11 10:58:...|null|                    |                    |null|null|  2|1174|
# +---+--------------------+---+--------------------+---+--------------------+----+--------------------+--------------------+----+----+---+----+
# only showing top 10 rows

itPostsDF = itPostsRDD.toDF(["commentCount", "lastActivityDate", "ownerUserId", "body", "score", "creationDate", "viewCount", "title", "tags", "answerCount", "acceptedAnswerId", "postTypeId", "id"])

# root
#  |-- commentCount: string (nullable = true)
#  |-- lastActivityDate: string (nullable = true)
#  |-- ownerUserId: string (nullable = true)
#  |-- body: string (nullable = true)
#  |-- score: string (nullable = true)
#  |-- creationDate: string (nullable = true)
#  |-- viewCount: string (nullable = true)
#  |-- title: string (nullable = true)
#  |-- tags: string (nullable = true)
#  |-- answerCount: string (nullable = true)
#  |-- acceptedAnswerId: string (nullable = true)
#  |-- postTypeId: string (nullable = true)
#  |-- id: string (nullable = true)

from pyspark.sql import Row
from datetime import datetime
def toIntSafe(inval):
    return int(inval)
  except ValueError:
    return None

def toTimeSafe(inval):
    return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S.%f")
  except ValueError:
    return None

def toLongSafe(inval):
    return long(inval)
  except ValueError:
    return None

def stringToPost(row):
  r = row.encode('utf8').split("~")
  return Row(

from pyspark.sql.types import *
postSchema = StructType([
  StructField("commentCount", IntegerType(), True),
  StructField("lastActivityDate", TimestampType(), True),
  StructField("ownerUserId", LongType(), True),
  StructField("body", StringType(), True),
  StructField("score", IntegerType(), True),
  StructField("creationDate", TimestampType(), True),
  StructField("viewCount", IntegerType(), True),
  StructField("title", StringType(), True),
  StructField("tags", StringType(), True),
  StructField("answerCount", IntegerType(), True),
  StructField("acceptedAnswerId", LongType(), True),
  StructField("postTypeId", LongType(), True),
  StructField("id", LongType(), False)

rowRDD = itPostsRows.map(lambda x: stringToPost(x))
itPostsDFStruct = sqlContext.createDataFrame(rowRDD, postSchema)

-- (12) ------------------------------------------------------------------------------------

postsDf = itPostsDFStruct
postsIdBody = postsDf.select("id", "body")

postsIdBody = postsDf.select(postsDf["id"], postsDf["body"])

postIds = postsIdBody.drop("body")

from pyspark.sql.functions import *
postsIdBody.filter(instr(postsIdBody["body"], "Italiano") > 0).count()

noAnswer = postsDf.filter((postsDf["postTypeId"] == 1) & isnull(postsDf["acceptedAnswerId"]))

firstTenQs = postsDf.filter(postsDf["postTypeId"] == 1).limit(10)
firstTenQsRn = firstTenQs.withColumnRenamed("ownerUserId", "owner")

postsDf.filter(postsDf.postTypeId == 1).withColumn("ratio", postsDf.viewCount / postsDf.score).where("ratio < 35").show()
#|commentCount|    lastActivityDate|ownerUserId|                body|score|        creationDate|viewCount|title|                tags|answerCount|acceptedAnswerId|postTypeId|  id|              ratio|
#|           5|2013-11-21 14:04:...|          8|&lt;p&gt;The use ...|   13|2013-11-11 21:01:...|      142| null|&lt;prepositions&...|          2|            1212|         1|1192| 10.923076923076923|
#|           0|2013-11-12 09:26:...|         17|&lt;p&gt;When wri...|    5|2013-11-11 21:01:...|       70| null|&lt;punctuation&g...|          4|            1195|         1|1193|               14.0|
#|           1|2013-11-12 12:53:...|         99|&lt;p&gt;I can't ...|   -3|2013-11-12 10:57:...|       68| null|&lt;grammar&gt;&l...|          3|            1216|         1|1203|-22.666666666666668|
#|           3|2014-09-11 14:37:...|         63|&lt;p&gt;The plur...|    5|2013-11-12 13:34:...|       59| null|&lt;plural&gt;&lt...|          1|            1227|         1|1221|               11.8|
#|           1|2013-11-12 13:49:...|         63|&lt;p&gt;I rememb...|    6|2013-11-12 13:38:...|       53| null|&lt;usage&gt;&lt;...|          1|            1223|         1|1222|  8.833333333333334|
#|           5|2013-11-13 00:32:...|        159|&lt;p&gt;Girando ...|    6|2013-11-12 23:50:...|       88| null|&lt;grammar&gt;&l...|          1|            1247|         1|1246| 14.666666666666666|
#|           0|2013-11-14 00:54:...|        159|&lt;p&gt;Mi A?ca...|    7|2013-11-14 00:19:...|       70| null|       &lt;verbs&gt;|          1|            null|         1|1258|               10.0|
#|           1|2013-11-15 12:17:...|         18|&lt;p&gt;Clearly ...|    7|2013-11-14 01:21:...|       68| null|&lt;grammar&gt;&l...|          2|            null|         1|1262|  9.714285714285714|
#|           0|2013-11-14 21:14:...|         79|&lt;p&gt;Alle ele...|    8|2013-11-14 20:16:...|       96| null|&lt;grammar&gt;&l...|          1|            1271|         1|1270|               12.0|
#|           0|2013-11-15 17:12:...|         63|&lt;p&gt;In Itali...|    8|2013-11-15 14:54:...|       68| null|&lt;usage&gt;&lt;...|          1|            1277|         1|1275|                8.5|
#|           3|2013-11-19 18:08:...|          8|&lt;p&gt;The Ital...|    6|2013-11-15 16:09:...|       87| null|&lt;grammar&gt;&l...|          1|            null|         1|1276|               14.5|
#|           1|2014-08-14 13:13:...|         12|&lt;p&gt;When I s...|    5|2013-11-16 09:36:...|       74| null|&lt;regional&gt;&...|          3|            null|         1|1279|               14.8|
#|          10|2014-03-15 08:25:...|        176|&lt;p&gt;In Engli...|   12|2013-11-16 11:13:...|      148| null|&lt;punctuation&g...|          2|            1286|         1|1285| 12.333333333333334|
#|           2|2013-11-17 15:54:...|         79|&lt;p&gt;Al di fu...|    7|2013-11-16 13:16:...|       70| null|     &lt;accents&gt;|          2|            null|         1|1287|               10.0|
#|           1|2013-11-16 19:05:...|        176|&lt;p&gt;Often ti...|   12|2013-11-16 14:16:...|      106| null|&lt;verbs&gt;&lt;...|          1|            null|         1|1290|  8.833333333333334|
#|           4|2013-11-17 15:50:...|         22|&lt;p&gt;The verb...|    6|2013-11-17 14:30:...|       66| null|&lt;verbs&gt;&lt;...|          1|            null|         1|1298|               11.0|
#|           0|2014-09-12 10:55:...|          8|&lt;p&gt;Wikipedi...|   10|2013-11-20 16:42:...|      145| null|&lt;orthography&g...|          5|            1336|         1|1321|               14.5|
#|           2|2013-11-21 12:09:...|         22|&lt;p&gt;La parol...|    5|2013-11-20 20:48:...|       49| null|&lt;usage&gt;&lt;...|          1|            1338|         1|1324|                9.8|
#|           0|2013-11-22 13:34:...|        114|&lt;p&gt;There ar...|    7|2013-11-20 20:53:...|       69| null|   &lt;homograph&gt;|          2|            1330|         1|1325|  9.857142857142858|
#|           6|2013-11-26 19:12:...|         12|&lt;p&gt;Sento ch...|   -3|2013-11-21 21:12:...|       79| null|  &lt;word-usage&gt;|          2|            null|         1|1347|-26.333333333333332|
# only showing top 20 rows

#The 10 most recently modified questions:
postsDf.filter(postsDf.postTypeId == 1).orderBy(postsDf.lastActivityDate.desc()).limit(10).show()

-- (13) ------------------------------------------------------------------------------------

from pyspark.sql.functions import *
postsDf.filter(postsDf.postTypeId == 1).withColumn("activePeriod", datediff(postsDf.lastActivityDate, postsDf.creationDate)).orderBy(desc("activePeriod")).head().body.replace("&lt;","<").replace("&gt;",">")
#<p>The plural of <em>braccio</em> is <em>braccia</em>, and the plural of <em>avambraccio</em> is <em>avambracci</em>.</p><p>Why are the plural of those words so different, if they both are referring to parts of the human body, and <em>avambraccio</em> derives from <em>braccio</em>?</p>

postsDf.select(avg(postsDf.score), max(postsDf.score), count(postsDf.score)).show()

from pyspark.sql.window import Window
winDf = postsDf.filter(postsDf.postTypeId == 1).select(postsDf.ownerUserId, postsDf.acceptedAnswerId, postsDf.score, max(postsDf.score).over(Window.partitionBy(postsDf.ownerUserId)).alias("maxPerUser"))
winDf.withColumn("toMax", winDf.maxPerUser - winDf.score).show(10)
# +-----------+----------------+-----+----------+-----+
# |ownerUserId|acceptedAnswerId|score|maxPerUser|toMax|
# +-----------+----------------+-----+----------+-----+
# |        232|            2185|    6|         6|    0|
# |        833|            2277|    4|         4|    0|
# |        833|            null|    1|         4|    3|
# |        235|            2004|   10|        10|    0|
# |        835|            2280|    3|         3|    0|
# |         37|            null|    4|        13|    9|
# |         37|            null|   13|        13|    0|
# |         37|            2313|    8|        13|    5|
# |         37|              20|   13|        13|    0|
# |         37|            null|    4|        13|    9|
# +-----------+----------------+-----+----------+-----+

postsDf.filter(postsDf.postTypeId == 1).select(postsDf.ownerUserId, postsDf.id, postsDf.creationDate, lag(postsDf.id, 1).over(Window.partitionBy(postsDf.ownerUserId).orderBy(postsDf.creationDate)).alias("prev"), lead(postsDf.id, 1).over(Window.partitionBy(postsDf.ownerUserId).orderBy(postsDf.creationDate)).alias("next")).orderBy(postsDf.ownerUserId, postsDf.id).show()
# +-----------+----+--------------------+----+----+
# |ownerUserId|  id|        creationDate|prev|next|
# +-----------+----+--------------------+----+----+
# |          4|1637|2014-01-24 06:51:...|null|null|
# |          8|   1|2013-11-05 20:22:...|null| 112|
# |          8| 112|2013-11-08 13:14:...|   1|1192|
# |          8|1192|2013-11-11 21:01:...| 112|1276|
# |          8|1276|2013-11-15 16:09:...|1192|1321|
# |          8|1321|2013-11-20 16:42:...|1276|1365|
# |          8|1365|2013-11-23 09:09:...|1321|null|
# |         12|  11|2013-11-05 21:30:...|null|  17|
# |         12|  17|2013-11-05 22:17:...|  11|  18|
# |         12|  18|2013-11-05 22:34:...|  17|  19|
# |         12|  19|2013-11-05 22:38:...|  18|  63|
# |         12|  63|2013-11-06 17:54:...|  19|  65|
# |         12|  65|2013-11-06 18:07:...|  63|  69|
# |         12|  69|2013-11-06 19:41:...|  65|  70|
# |         12|  70|2013-11-06 20:35:...|  69|  89|
# |         12|  89|2013-11-07 19:22:...|  70|  94|
# |         12|  94|2013-11-07 20:42:...|  89| 107|
# |         12| 107|2013-11-08 08:27:...|  94| 122|
# |         12| 122|2013-11-08 20:55:...| 107|1141|
# |         12|1141|2013-11-09 20:50:...| 122|1142|
# +-----------+----+--------------------+----+----+

countTags = udf(lambda (tags): tags.count("&lt;"), IntegerType())
postsDf.filter(postsDf.postTypeId == 1).select("tags", countTags(postsDf.tags).alias("tagCnt")).show(10, False)
# +-------------------------------------------------------------------+------+
# |tags                                                               |tagCnt|
# +-------------------------------------------------------------------+------+
# |&lt;word-choice&gt;                                                |1     |
# |&lt;english-comparison&gt;&lt;translation&gt;&lt;phrase-request&gt;|3     |
# |&lt;usage&gt;&lt;verbs&gt;                                         |2     |
# |&lt;usage&gt;&lt;tenses&gt;&lt;english-comparison&gt;              |3     |
# |&lt;usage&gt;&lt;punctuation&gt;                                   |2     |
# |&lt;usage&gt;&lt;tenses&gt;                                        |2     |
# |&lt;history&gt;&lt;english-comparison&gt;                          |2     |
# |&lt;idioms&gt;&lt;etymology&gt;                                    |2     |
# |&lt;idioms&gt;&lt;regional&gt;                                     |2     |
# |&lt;grammar&gt;                                                    |1     |
# +-------------------------------------------------------------------+------+

-- (14) ------------------------------------------------------------------------------------

cleanPosts = postsDf.na.drop()

postsDf.na.fill({"viewCount": 0}).show()

postsDf.na.replace(1177, 3000, ["id", "acceptedAnswerId"]).show()

-- (15) ------------------------------------------------------------------------------------

postsRdd = postsDf.rdd

def replaceLtGt(row):
	return Row(
	  commentCount = row.commentCount,
    lastActivityDate = row.lastActivityDate,
    ownerUserId = row.ownerUserId,
    body = row.body.replace("&lt;","<").replace("&gt;",">"),
    score = row.score,
    creationDate = row.creationDate,
    viewCount = row.viewCount,
    title = row.title,
    tags = row.tags.replace("&lt;","<").replace("&gt;",">"),
    answerCount = row.answerCount,
    acceptedAnswerId = row.acceptedAnswerId,
    postTypeId = row.postTypeId,
    id = row.id)

postsMapped = postsRdd.map(replaceLtGt)

def sortSchema(schema):
	fields = {f.name: f for f in schema.fields}
	names = sorted(fields.keys())
	return StructType([fields[f] for f in names])

postsDfNew = sqlContext.createDataFrame(postsMapped, sortSchema(postsDf.schema))

-- (16) ------------------------------------------------------------------------------------

postsDfNew.groupBy(postsDfNew.ownerUserId, postsDfNew.tags, postsDfNew.postTypeId).count().orderBy(postsDfNew.ownerUserId.desc()).show(10)
#|ownerUserId|                tags|postTypeId|count|
#|        862|                    |         2|    1|
#|        855|         <resources>|         1|    1|
#|        846|<translation><eng...|         1|    1|
#|        845|<word-meaning><tr...|         1|    1|
#|        842|  <verbs><resources>|         1|    1|
#|        835|    <grammar><verbs>|         1|    1|
#|        833|                    |         2|    1|
#|        833|           <meaning>|         1|    1|
#|        833|<meaning><article...|         1|    1|
#|        814|                    |         2|    1|

postsDfNew.groupBy(postsDfNew.ownerUserId).agg(max(postsDfNew.lastActivityDate), max(postsDfNew.score)).show(10)
postsDfNew.groupBy(postsDfNew.ownerUserId).agg({"lastActivityDate": "max", "score": "max"}).show(10)
# +-----------+---------------------+----------+
# |ownerUserId|max(lastActivityDate)|max(score)|
# +-----------+---------------------+----------+
# |        431| 2014-02-16 14:16:...|         1|
# |        232| 2014-08-18 20:25:...|         6|
# |        833| 2014-09-03 19:53:...|         4|
# |        633| 2014-05-15 22:22:...|         1|
# |        634| 2014-05-27 09:22:...|         6|
# |        234| 2014-07-12 17:56:...|         5|
# |        235| 2014-08-28 19:30:...|        10|
# |        435| 2014-02-18 13:10:...|        -2|
# |        835| 2014-08-26 15:35:...|         3|
# |         37| 2014-09-13 13:29:...|        23|
# +-----------+---------------------+----------+
postsDfNew.groupBy(postsDfNew.ownerUserId).agg(max(postsDfNew.lastActivityDate), max(postsDfNew.score) > 5).show(10)
# +-----------+---------------------+----------------+
# |ownerUserId|max(lastActivityDate)|(max(score) > 5)|
# +-----------+---------------------+----------------+
# |        431| 2014-02-16 14:16:...|           false|
# |        232| 2014-08-18 20:25:...|            true|
# |        833| 2014-09-03 19:53:...|           false|
# |        633| 2014-05-15 22:22:...|           false|
# |        634| 2014-05-27 09:22:...|            true|
# |        234| 2014-07-12 17:56:...|           false|
# |        235| 2014-08-28 19:30:...|            true|
# |        435| 2014-02-18 13:10:...|           false|
# |        835| 2014-08-26 15:35:...|           false|
# |         37| 2014-09-13 13:29:...|            true|
# +-----------+---------------------+----------------+

smplDf = postsDfNew.where((postsDfNew.ownerUserId >= 13) & (postsDfNew.ownerUserId <= 15))
smplDf.groupBy(smplDf.ownerUserId, smplDf.tags, smplDf.postTypeId).count().show()
# +-----------+----+----------+-----+
# |ownerUserId|tags|postTypeId|count|
# +-----------+----+----------+-----+
# |         15|    |         2|    2|
# |         14|    |         2|    2|
# |         13|    |         2|    1|
# +-----------+----+----------+-----+
smplDf.rollup(smplDf.ownerUserId, smplDf.tags, smplDf.postTypeId).count().show()
# +-----------+----+----------+-----+
# |ownerUserId|tags|postTypeId|count|
# +-----------+----+----------+-----+
# |         15|    |         2|    2|
# |         13|    |      null|    1|
# |         13|null|      null|    1|
# |         14|    |      null|    2|
# |         13|    |         2|    1|
# |         14|null|      null|    2|
# |         15|    |      null|    2|
# |         14|    |         2|    2|
# |         15|null|      null|    2|
# |       null|null|      null|    5|
# +-----------+----+----------+-----+
smplDf.cube(smplDf.ownerUserId, smplDf.tags, smplDf.postTypeId).count().show()
# +-----------+----+----------+-----+
# |ownerUserId|tags|postTypeId|count|
# +-----------+----+----------+-----+
# |         15|    |         2|    2|
# |       null|    |         2|    5|
# |         13|    |      null|    1|
# |         15|null|         2|    2|
# |       null|null|         2|    5|
# |         13|null|      null|    1|
# |         14|    |      null|    2|
# |         13|    |         2|    1|
# |         14|null|      null|    2|
# |         15|    |      null|    2|
# |         13|null|         2|    1|
# |       null|    |      null|    5|
# |         14|    |         2|    2|
# |         15|null|      null|    2|
# |       null|null|      null|    5|
# |         14|null|         2|    2|
# +-----------+----+----------+-----+

-- (17) ------------------------------------------------------------------------------------

itVotesRaw = sc.textFile("/~~~~~~~~/italianVotes.csv").map(lambda x: x.split("~"))
itVotesRows = itVotesRaw.map(lambda row: Row(id=long(row[0]), postId=long(row[1]), voteTypeId=int(row[2]), creationDate=datetime.strptime(row[3], "%Y-%m-%d %H:%M:%S.%f")))
votesSchema = StructType([
  StructField("creationDate", TimestampType(), False),
  StructField("id", LongType(), False),
  StructField("postId", LongType(), False),
  StructField("voteTypeId", IntegerType(), False)

votesDf = sqlContext.createDataFrame(itVotesRows, votesSchema)

postsVotes = postsDf.join(votesDf, postsDf.id == votesDf.postId)
postsVotesOuter = postsDf.join(votesDf, postsDf.id == votesDf.postId, "outer")

-- (18) ------------------------------------------------------------------------------------

sqlContext.sql("SET spark.sql.caseSensitive=true")
sqlContext.setConf("spark.sql.caseSensitive", "true")

-- (21) ------------------------------------------------------------------------------------


-- (22) ------------------------------------------------------------------------------------

resultDf = sqlContext.sql("select * from posts")

spark-sql> select substring(title, 0, 70) from posts where postTypeId = 1 order by creationDate desc limit 3;
$ spark-sql -e "select substring(title, 0, 70) from posts where postTypeId = 1 order by creationDate desc limit 3"

-- (31) ------------------------------------------------------------------------------------


sqlContext.sql("select * from postsjson")

props = {"user": "user", "password": "password"}
postsDf.write.jdbc("jdbc:postgresql:#postgresrv/mydb", "posts", properties=props)

-- (32) ------------------------------------------------------------------------------------

postsDf = sqlContext.read.table("posts")
postsDf = sqlContext.table("posts")

result = sqlContext.read.jdbc("jdbc:postgresql:#postgresrv/mydb", "posts", predicates=["viewCount > 3"], properties=props)

sqlContext.sql("CREATE TEMPORARY TABLE postsjdbc "+
  "USING org.apache.spark.sql.jdbc "+
  "OPTIONS ("+
    "url 'jdbc:postgresql:#postgresrv/mydb',"+
    "dbtable 'posts',"+
    "user 'user',"+
    "password 'password')")

sqlContext.sql("CREATE TEMPORARY TABLE postsParquet "+
  "USING org.apache.spark.sql.parquet "+
  "OPTIONS (path '/path/to/parquet_file')")
resParq = sql("select * from postsParquet")

-- (41) ------------------------------------------------------------------------------------

postsRatio = postsDf.filter(postsDf.postTypeId == 1).withColumn("ratio", postsDf.viewCount / postsDf.score)
postsFiltered = postsRatio.where(postsRatio.ratio < 35)
# == Parsed Logical Plan ==
# 'Filter (ratio#314 < 35)
# +- Project [commentCount#285,lastActivityDate#286,ownerUserId#287L,body#288,score#289,creationDate#290,viewCount#291,title#292,tags#293,answerCount#294,acceptedAnswerId#295L,postTypeId#296L,id#297L,(cast(viewCount#291 as double) / cast(score#289 as double)) AS ratio#314]
#    +- Filter (postTypeId#296L = cast(1 as bigint))
#       +- Subquery posts
#          +- Relation[commentCount#285,lastActivityDate#286,ownerUserId#287L,body#288,score#289,creationDate#290,viewCount#291,title#292,tags#293,answerCount#294,acceptedAnswerId#295L,postTypeId#296L,id#297L] ParquetRelation
# == Analyzed Logical Plan ==
# commentCount: int, lastActivityDate: timestamp, ownerUserId: bigint, body: string, score: int, creationDate: timestamp, viewCount: int, title: string, tags: string, answerCount: int, acceptedAnswerId: bigint, postTypeId: bigint, id: bigint, ratio: double
# Filter (ratio#314 < cast(35 as double))
# +- Project [commentCount#285,lastActivityDate#286,ownerUserId#287L,body#288,score#289,creationDate#290,viewCount#291,title#292,tags#293,answerCount#294,acceptedAnswerId#295L,postTypeId#296L,id#297L,(cast(viewCount#291 as double) / cast(score#289 as double)) AS ratio#314]
#    +- Filter (postTypeId#296L = cast(1 as bigint))
#       +- Subquery posts
#          +- Relation[commentCount#285,lastActivityDate#286,ownerUserId#287L,body#288,score#289,creationDate#290,viewCount#291,title#292,tags#293,answerCount#294,acceptedAnswerId#295L,postTypeId#296L,id#297L] ParquetRelation
# == Optimized Logical Plan ==
# Project [commentCount#285,lastActivityDate#286,ownerUserId#287L,body#288,score#289,creationDate#290,viewCount#291,title#292,tags#293,answerCount#294,acceptedAnswerId#295L,postTypeId#296L,id#297L,(cast(viewCount#291 as double) / cast(score#289 as double)) AS ratio#314]
# +- Filter ((postTypeId#296L = 1) && ((cast(viewCount#291 as double) / cast(score#289 as double)) < 35.0))
#    +- Relation[commentCount#285,lastActivityDate#286,ownerUserId#287L,body#288,score#289,creationDate#290,viewCount#291,title#292,tags#293,answerCount#294,acceptedAnswerId#295L,postTypeId#296L,id#297L] ParquetRelation
# == Physical Plan ==
# Project [commentCount#285,lastActivityDate#286,ownerUserId#287L,body#288,score#289,creationDate#290,viewCount#291,title#292,tags#293,answerCount#294,acceptedAnswerId#295L,postTypeId#296L,id#297L,(cast(viewCount#291 as double) / cast(score#289 as double)) AS ratio#314]
# +- Filter (((cast(viewCount#291 as double) / cast(score#289 as double)) < 35.0) && (postTypeId#296L = 1))
#    +- Scan ParquetRelation[lastActivityDate#286,commentCount#285,acceptedAnswerId#295L,title#292,id#297L,postTypeId#296L,ownerUserId#287L,score#289,tags#293,viewCount#291,body#288,creationDate#290,answerCount#294] InputPaths: file:/user/hive/warehouse/posts, PushedFilters: [EqualTo(postTypeId,1)]
