path="/FileStore/tables/2015_summary.json"
#DataFrame 생성
df= spark.read.format('json').load(path)
#스키마 출력
df.printSchema()
root
-- DEST_COUNTRY_NAME: string (nullable = true)
-- ORIGIN_COUNTRY_NAME: string (nullable = true)
-- count: long (nullable = true)
spark.read.format('json').load(path).schema
Out[4]: StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))
from pyspark.sql import types as T
#직접 스키마 정의
myManualSchema = T.StructType([
T.StructField('DEST_COUNTRY_NAME', T.StringType(), True),
T.StructField('ORIGIN_COUNTRY_NAME', T.StringType(), True),
T.StructField('count', T.LongType(), False, metadata={'hello':'world'})
])
#스키마 적용
df2 = spark.read.format('json').schema(myManualSchema).load(path)
from pyspark.sql import functions as F
#col함수로 컬럼 생성
F.col('someColumnName')
Out[7]: Column<'someColumnName'>
F.expr("(((someCol + 5)*200) - 6) < otherCol")
Out[8]: Column<'((((someCol + 5) * 200) - 6) < otherCol)'>
df.columns
Out[9]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
#first메서드로 로우 확인
df.first()
Out[10]: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)
from pyspark.sql import Row
myRow = Row("hello", None, 1, False)
myRow[0]
Out[12]: 'hello'
myRow[2]
Out[13]: 1
myManualSchema=T.StructType([
T.StructField("some", T.StringType(), True),
T.StructField("col", T.StringType(), True),
T.StructField("names", T.LongType(), True),
])
myRow = Row("hello", None,1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
+-----+----+-----+
some| col|names|
+-----+----+-----+
hello|null| 1|
+-----+----+-----+
#select dest_country_name from table limit 2
df.select("DEST_COUNTRY_NAME").show(2)
+-----------------+
DEST_COUNTRY_NAME|
+-----------------+
United States|
United States|
+-----------------+
only showing top 2 rows
#select dest_country_name, origin_country_name from table limit 2
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
+-----------------+-------------------+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
United States| Romania|
United States| Croatia|
+-----------------+-------------------+
only showing top 2 rows
#expr함수로 컬럼 참조하기
#expr함수는 단순 컬럼 참조나 문자열을 이용해 컬럼을 참조할 수 있음
df.select(F.expr("dest_country_name as destination")).show(2)
+-------------+
destination|
+-------------+
United States|
United States|
+-------------+
only showing top 2 rows
-> 이를 간단하고 효율적으로 할 수 있는 메서드가 selectExpr
df.selectExpr("dest_country_name as destination").show(2)
+-------------+
destination|
+-------------+
United States|
United States|
+-------------+
only showing top 2 rows
#예제1: 출발지와 도착지가 같은지 나타내는 withCountry컬럼 추가
df.selectExpr('*', '(dest_country_name = origin_country_name) as withinCountry').show(2)
+-----------------+-------------------+-----+-------------+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
United States| Romania| 15| false|
United States| Croatia| 1| false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows
#예제2: 컬럼에 대한 집계 함수 사용
df.selectExpr("avg(count)", "count(distinct(dest_country_name))").show(2)
+-----------+---------------------------------+
avg(count)|count(DISTINCT dest_country_name)|
+-----------+---------------------------------+
1770.765625| 132|
+-----------+---------------------------------+
df.selectExpr('*', '1 as one').show(5)
+-----------------+-------------------+-----+---+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
United States| Romania| 15| 1|
United States| Croatia| 1| 1|
United States| Ireland| 344| 1|
Egypt| United States| 15| 1|
United States| India| 62| 1|
+-----------------+-------------------+-----+---+
only showing top 5 rows
df.select('*', F.lit(1).alias('one')).show(5)
+-----------------+-------------------+-----+---+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
United States| Romania| 15| 1|
United States| Croatia| 1| 1|
United States| Ireland| 344| 1|
Egypt| United States| 15| 1|
United States| India| 62| 1|
+-----------------+-------------------+-----+---+
only showing top 5 rows
df.withColumn('withinCountry', F.expr('origin_country_name == dest_country_name')).show(5)
+-----------------+-------------------+-----+-------------+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
United States| Romania| 15| false|
United States| Croatia| 1| false|
United States| Ireland| 344| false|
Egypt| United States| 15| false|
United States| India| 62| false|
+-----------------+-------------------+-----+-------------+
only showing top 5 rows
df.withColumnRenamed('dest_country_name', 'dest').columns
Out[31]: ['dest', 'ORIGIN_COUNTRY_NAME', 'count']
dfWithLongColName = df.withColumn(
'This Long Column-Name',
F.expr('origin_country_name')
)
dfWithLongColName.columns
Out[33]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'This Long Column-Name']
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")\
.show(2)
+---------------------+-------+
This Long Column-Name|new col|
+---------------------+-------+
Romania|Romania|
Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows
spark.sql('set spark.sql.caseSensitive=true')
Out[59]: DataFrame[key: string, value: string]
df.columns
Out[60]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
df.select('COUNT').show(1)
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<command-2616171458017162> in <module>
----> 1 df.select('COUNT').show(1)
/databricks/spark/python/pyspark/sql/dataframe.py in select(self, *cols)
1690 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
1691 """ ->
1692 jdf = self._jdf.select(self._jcols(*cols))
1693 return DataFrame(jdf, self.sql_ctx)
1694
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121 # Hide where the exception came from that shows a non-Pythonic
122 # JVM exception message.
--> 123 raise converted from None
124 else:
125 raise
AnalysisException: cannot resolve '`COUNT`' given input columns: [DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count];
'Project ['COUNT]
+- Relation[DEST_COUNTRY_NAME#13,ORIGIN_COUNTRY_NAME#14,count#15L] json
df.select('count').show(1)
+-----+
count|
+-----+
15|
+-----+
only showing top 1 row
dfWithLongColName.drop('ORIGIN_COUNTRY_NAME', 'DEST_COUNTRY_NAME')
Out[69]: DataFrame[count: bigint, This Long Column-Name: string]
df.withColumn('count2', F.col('count').cast(T.StringType()))
Out[70]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: string]
df.filter(F.col('count')<2).show(2)
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
United States| Croatia| 1|
United States| Singapore| 1|
+-----------------+-------------------+-----+
only showing top 2 rows
df.where('count<2').show(2)
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
United States| Croatia| 1|
United States| Singapore| 1|
+-----------------+-------------------+-----+
only showing top 2 rows
df.select('ORIGIN_COUNTRY_NAME').distinct().count()
Out[75]: 125
df.sample(withReplacement=False, fraction=0.5, seed=1).count()
Out[79]: 135
tmp_df = df.randomSplit([0.25,0.75],seed=1)
tmp_df[0].count()
Out[77]: 77
tmp_df[1].count()
Out[78]: 179
schema= df.schema
newRows= [
Row('new_country', 'other_country', 5),
Row('new_country_2', 'otheR_country_3', 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)\
.where('count=1')\
.where(F.col('ORIGIN_COUNTRY_NAME') != 'United States').show()
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
United States| Croatia| 1|
United States| Singapore| 1|
United States| Gibraltar| 1|
United States| Cyprus| 1|
United States| Estonia| 1|
United States| Lithuania| 1|
United States| Bulgaria| 1|
United States| Georgia| 1|
United States| Bahrain| 1|
United States| Papua New Guinea| 1|
United States| Montenegro| 1|
United States| Namibia| 1|
new_country_2| otheR_country_3| 1|
+-----------------+-------------------+-----+
df.sort('count').show(3)
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
Moldova| United States| 1|
United States| Singapore| 1|
United States| Croatia| 1|
+-----------------+-------------------+-----+
only showing top 3 rows
df.orderBy(F.desc('count'), 'DEST_COUNTRY_NAME').show(3)
+-----------------+-------------------+------+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
United States| United States|370002|
United States| Canada| 8483|
Canada| United States| 8399|
+-----------------+-------------------+------+
only showing top 3 rows
df.limit(5).show()
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
United States| Romania| 15|
United States| Croatia| 1|
United States| Ireland| 344|
Egypt| United States| 15|
United States| India| 62|
+-----------------+-------------------+-----+
repartition 메서드로 파티션 재분배 가능
coalesce 메서드로 전체 데이터를 셔플하지 않고 파티션 병합 가능
df.rdd.getNumPartitions()
Out[100]: 1
#파티션 수 지정
df.repartition(5)
Out[99]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
#특정 컬럼 기준 파티션 재분배
df.repartition(F.col('DEST_COUNTRY_NAME'))
Out[102]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
#특정 컬럼을 기준으로 셔플을 수행해서 5개의 파티션으로 나누고,
#전체 데이터를 셔플 없이 병합
df.repartition(5, col('DEST_COUNTRY_NAME')).coalesce(2)
collectDF = df.limit(10)
collectDF.collect()
Out[108]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]
collectDF.take(5)
Out[105]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]
collectDF.show(5)
+-----------------+-------------------+-----+
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
United States| Romania| 15|
United States| Croatia| 1|
United States| Ireland| 344|
Egypt| United States| 15|
United States| India| 62|
+-----------------+-------------------+-----+
only showing top 5 rows
collectDF.toLocalIterator()