구글 코랩에서 spark 실습을 위해 pyspark와 py4j를 설치해야한다.
sparksession은 SparkSession.builder를 호출하여 생성하며, 다양한 함수들을 통해 세부 설정이 가능하다.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark_Tutorial')\
.getOrCreate()
master의 인자는 내가 사용하고 싶은 호스트 이름이 들어감.
[*]의 의미는 서버에 있는 모든 CPU를 쓰겠다는 의미.
파이썬 리스트를 RDD로 변환.
변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨(파티션). 또한 Lazy Execution이 된다는 점 기억.
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
rdd = spark.sparkContext.parallelize(name_list_json)
위 코드를 실행한다고 해서 parallelize가 실행되지 않고 아래와 같이 의미있는 작업을 진행해야 실행됨(Lazy Execution)
rdd.count()
rdd에 함수 적용하기
parsed_rdd = rdd.map(lambda el:json.loads(el))
collect에서 출력하기 == 데이터를 파이썬 프로그래밍으로 전달. 데이터가 너무 클 경우 주의하자
parsed_rdd.collect()
파이썬 리스트를 데이터프레임으로 변환하기
from pyspark.sql.types import StringType
df = spark.createDataFrame(name_list_json, StringType())
df.count()
df.printSchema()
필드이름을 주지 않으면 기본적으로 필드 이름이 value임
df.select('*').collect()
df.select('value').collect()
필드이름을 주면서 데이터 프레임으로 변환
from pyspark.sql import Row
row = Row("name") # Or some other column name
df_name = parsed_name_rdd.map(row).toDF()
df_name.printSchema()
df_name.select('name').collect()