아주 간단한 작업 같지만 사실 권한 문제 때문에 생각보다 쉽지 않다.
코드는 아주 간결하다
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("KafkaConsumer") \
.config("spark.jars", """
/usr/lib/spark/jars/commons-pool2-2.12.0.jar
""")\
.getOrCreate()
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.write.mode("overwrite").parquet("hdfs://ip-172-31-59-39.ap-northeast-2.compute.internal:8020/usr/livy/testdir1/testtable1.parquet")
df2 = spark.read.parquet("hdfs://ip-172-31-59-39.ap-northeast-2.compute.internal:8020/usr/livy/testdir1/testtable1.parquet")
df2.show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| id|gender|salary|
+---------+----------+--------+-----+------+------+
| Robert| |Williams|42114| M| 4000|
| Maria| Anne| Jones|39192| F| 4000|
| Jen| Mary| Brown| | F| -1|
| James| | Smith|36636| M| 3000|
| Michael| Rose| |40288| M| 4000|
+---------+----------+--------+-----+------+------+
하지만 user livy 에 대한 권한 문제로 livy에게 hdfs admin 권한을 부여하였지만 사실 이게 맞는지는 잘 모르겠으며 livy 에 대해서 스터디가 필요해 보인다.
파일은 잘 보이고 추후에는 parquet으로 하이브 익스터널 테이블로 올릴 계획이다