[TIL]Day 132

이재희·2021년 4월 10일
0

TIL

목록 보기
132/312

구글 코랩에서 spark 실습을 위해 pyspark와 py4j를 설치해야한다.

sparksession은 SparkSession.builder를 호출하여 생성하며, 다양한 함수들을 통해 세부 설정이 가능하다.

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

master의 인자는 내가 사용하고 싶은 호스트 이름이 들어감.
[*]의 의미는 서버에 있는 모든 CPU를 쓰겠다는 의미.

파이썬 리스트를 RDD로 변환.
변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨(파티션). 또한 Lazy Execution이 된다는 점 기억.

name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
rdd = spark.sparkContext.parallelize(name_list_json)

위 코드를 실행한다고 해서 parallelize가 실행되지 않고 아래와 같이 의미있는 작업을 진행해야 실행됨(Lazy Execution)

rdd.count()

rdd에 함수 적용하기

parsed_rdd = rdd.map(lambda el:json.loads(el))

collect에서 출력하기 == 데이터를 파이썬 프로그래밍으로 전달. 데이터가 너무 클 경우 주의하자

parsed_rdd.collect()

파이썬 리스트를 데이터프레임으로 변환하기

from pyspark.sql.types import StringType
df = spark.createDataFrame(name_list_json, StringType())
df.count()
df.printSchema()

필드이름을 주지 않으면 기본적으로 필드 이름이 value임

df.select('*').collect()
df.select('value').collect()

필드이름을 주면서 데이터 프레임으로 변환

from pyspark.sql import Row

row = Row("name") # Or some other column name
df_name = parsed_name_rdd.map(row).toDF()
df_name.printSchema()
df_name.select('name').collect()
profile
오늘부터 열심히 산다

0개의 댓글