[데이터 엔지니어링 데브코스 2기] TIL-14주차 빅데이터 처리 시스템, Hadoop, Spark (2)

이재호·2024년 1월 16일
0

1. Spark 데이터 구조


Spark 데이터 시스템 아키텍처.

  • 내부 데이터(HDFS, AWS S3, Azure Blob, GCP Cloud Storage) -> 리소스 매니저(YARN, K8s) -> Spark Core Engine -> Spark SQL, Spark Streaming, ...
  • 외부 데이터(주기적인 ETL) -> Spark Core Engine -> ...
  • 외부 데이터(바로 처리) -> Spark SQL, Spark Streaming, ...

데이터 병렬 처리가 되려면?

  • 데이터가 먼저 분산되어야 함.
    - 하둡 맵에서는 데이터 처리 단위를 데이터 블록(128MB)으로 나눔. (hdfs-site.xml의 dfs.block.size로 지정 가능.)
    • Spark에서는 이를 파티션(Partition, 128MB)이라고 부름. (spark.sql.files.maxPartitionBytes 라는 환경 변수로 지정 가능. HDFS에 있는 파일만 읽을 수 있음.)
  • 다음으로 나눠진 데이터를 각각 따로 동시 처리해야 함.
    - 맵리듀스에서는 N개의 데이터 블록으로 구성된 파일을 N개의 Map task로 실행함.
    • Spark에서는 파티션 단위로 메모리에 로드하여 Executor가 배정됨.
  • 처리 데이터 나누기 -> 파티션 -> 병렬 처리.

Spark 데이터 처리 흐름.

  • 데이터프레임(immutable, 수정 불가) -> 파티셔닝 -> 작은 파티션들로 나뉨.
  • 원하는 결과 데이터프레임이 나올 때까지 입력 데이터프레임을 계속 변환.
  • 셔플링: 파티션 간에 데이터 이동이 필요한 경우에 발생. (네트웤을 타고 데이터가 이동함.)
    - 파티션의 수를 조정할 때.
    - 같은 키 값을 갖는 레코드는 같은 파티션 들어가도록 지정할 때.
    • 데이터프레임의 operation 중 그룹핑 등의 aggregation이나 sorting 시.
  • 셔플링이 발생할 경우, 새로 생기는 데이터프레임의 파티션 수는?
    - spark.sql.suffle.partitions가 결정(default=200). - 최대 200개.
    • 오퍼레이션에 따라 파티션 수가 결정됨.
      • random, hashing partiton, range partion 등.
  • Data Skew 발생 가능.

Spark 데이터 구조.
1. RDD: low level.
2. DataFrame: high level. 파이썬.
3. Dataset: high level. 스칼라, 자바.

  • 공통 특징: Immutable Distributed Data.
  • 가장 좋은 방법: Spark SQL로만, 안 되면 위 데이터 사용.

RDD.

  • 로우 레벨 데이터로 클러스터 내의 서버에 분산된 데이터를 지칭.
  • 레코드별로 존재하지만 스키마가 존재하지 않아 구조화/비구조화 모두 지원.
  • 변경이 불가능한 분산 저장된 데이터.
  • map, filter, flatMap 등 함수형 변환 지원.
  • 일반 파이썬 자료구조를 parallelize 함수로 RDD로 변환 가능. 반대는 collect 함수.

DataFrame

  • RDD 위에 만들어지며, RDD와 달리 필드 정보르르 갖고 있음(테이블).
  • PySpark을 쓸 때는 DataFrame 사용.
  • 변경이 불가능한 분산 저장된 데이터.
  • RDD와 다르게 RDB 테이블처럼 칼럼으로 나눠서 저장.
  • 판다스의 데이터 프레임 or 관계형 디비의 테이블과 거의 흡사함.
  • 다양한 데이터소스 지원: HDFS, Hive, 외부 인터페이스, RDD, ..

데이터 구조.
RDD API -> Spark SQL Engine -> SPark SQL, DataFrame, Dataset

2. Spark 프로그램 구조


Spark Session 생성.

  • Spark 프로그램의 시작은 SparkSession을 만드는 것.
    - 프로그램마다 하나를 만들어 Spark Cluster와 통신: Singleton 객체.
    • Spark 2.0에서 처음 소개됨.
  • Spark Session을 통해 Spark의 여러 기능 사용 가능.
    - DataFrame, SQL, Streaming, ML API 모두 해당 객체로 통신함.
    • config 메서드로 다양한 환경설정 가능.
    • RDD 관련 작업 시에는 SparkSession 밑의 SparkContext 객체를 사용함.

Spark session 생성 - PySpark 예제.

from pyspark.sql import SparkSession

# singleton 객체로 생성함.
spark = SparkSession.builder\
	.master("local[*]*)\
    .appName('PySpark Tutorial')\
    .getOrCreate() # 1개만 생성 or 이미 만들어 진 것 갖고 오기.

...

spark.stop()

Spark Session 환경 변수

  • Spark Session을 만들 때 다양한 환경 설정이 가능함.
  • spark.executor.memory(default=1GB) : executor별 메모리.
  • spark.executor.cores(default=1 in YARN) : executor별 CPU 수.
  • spark.driver.memory(default=1GB) : driver 메모리.
  • spark.sql.shuffle.partitions(default=200) : Shuffling 후 최대 Partition 수.
  • 사용하는 리소스 매니저에 따라 환경 변수가 많이 달라짐.

Spark Session 환경 설정 방법.
1. 환경 변수 - 보통 Spark Cluster 어드민이 관리함.
2. &SPARK_HOME/conf/spark_defaults.conf - 보통 Spark Cluster 어드민이 관리함.
3. spark-submit 명령의 커맨드 파라미터.
4. SparkSession 만들 때 SparkConf라는 클래스로 지정.

  • 만약 환경 변수 충돌 시, 4->3->2->1 순으로 우선순위가 높음.

환경 설정 방법 1 - SparkSession 생성 시 빌더로 일일이 지정.

from pyspark.sql import SparkSession

# singleton 객체로 생성함.
spark = SparkSession.builder\
	.master("local[*]*)\
    .appName('PySpark Tutorial')\
    .config("spark.some.config.option1", "some-value")\
    .config("spark.some.config.option2", "some-value")\
    .getOrCreate() # 1개만 생성 or 이미 만들어 진 것 갖고 오기.

환경 설정 방법 2 - SparkConf 객체에 환경 설정하고 지정하기.

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")

# singleton 객체로 생성함.
spark = SparkSession.builder\
	.config(config=conf)\
    .getOrCreate() # 1개만 생성 or 이미 만들어 진 것 갖고 오기.

전체적인 흐름.
1. SparkSession 만들기.
2. 입력 데이터 로딩.
3. SparkSession을 통해 DataFrame API나 Spark SQL을 사용하여 데이터 조작 작업 수행.
4. 최종 결과를 HDFS or RDB or Hive Table에 저장.

Spark Session이 지원하는 데이터 소스.

  • spark.read(DataFrameReader) : 데이터프레임으로 로드.
  • spark.write(DataFrameWriter) : 데이터프레임을 저장.
  • HDFS 파일 - csv, json, parquet, orc, test, avro, Hive 테이블...
  • JDBC RDB
  • 클라우드 기반 데이터 시스템
  • 스트리밍 시스템

3. Spark DataFrame 실습


소스 코드 : https://github.com/keeyong/beginner-spark-programming-with-pyspark/tree/main/chapter2

개발 환경.

  • local standalone Spark + Spark shell.
  • Python IDE.
  • Databricks Cloud.
  • Notebook.

Local Standalone Spark.

  • Spark Cluster Manager롤 local[n] 지정.
    - master를 local[n]으로 지정. - 한 executor 당 n개의 쓰레드 할당.
  • 개발/테스트 용도.
  • 하나의 JVM에서 모든 프로세스 실행.

google Colab.

Local Standalone Spark.

  • java가 설치된 상태에서 수행.
  • https://spark.apache.org/downloads.html 에서 Spark 설치.
  • spark dir 생성 후 이동.
  • 해당 dir에서 다운로드한 spark 파일 카피.mv ~/Downloads/spark-3.5.0-bin-hadoop3.tgz .
  • 해당 dir에서 spark 설치. tar xvf spark-3.5.0-bin-hadoop3.tgz
  • 현재 디렉토리 경로 + 설치된 spark 경로로 SPARK_HOME 설정하기 위해 경로 확인. pwd ls -tl
  • vi ~/.zshrc에서 export SPARK_HOME=/Users/jaeho/Desktop/programmers/spark_course/spark-3.5.0-bin-hadoop3 추가.
  • 마찬 가지로 위에서 export PATH=$PATH:$SPARK_HOME/bin 추가.
  • spark-shell로 쉘 오픈(언어: 스칼라).
  • http://192.168.190.96:4040 웹 UI 접속해 보기.
  • :q로 쉘 종료.
  • 파이썬 버전의 쉘을 위해 pip install py4j 설치.
  • pyspark으로 파이썬 쉘 오픈. exit()으로 쉘 종료.
  • spark-submit --master 'local[4]' ./spark-3.5.0-bin-hadoop3/examples/src/main/python/pi.py으로 spark-submit 실행. - pi 값 계산하는 예제. 쓰레드 4개.

실습 1 - 헤더가 없는 CSV 파일 처리하기.


  • 입력 데이터: https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv

  • 코드 : https://github.com/keeyong/beginner-spark-programming-with-pyspark/blob/main/chapter2/PySpark_DataFrame_1.ipynb

  • 판다스의 데이터프레임으로 헤더 네임 및 칼럼 지정.

    import pandas as pd 
    
    pd_df = pd.read_csv(
        "1800.csv",
        names=["stationID", "date", "measure_type", "temperature"],
        usecols=[0, 1, 2, 3]
    )
  • Spark으로 칼럼 지정.

    from pyspark.sql import SparkSession
    from pyspark import SparkConf
    
    conf = SparkConf()
    conf.set("spark.app.name", "PySpark DataFrame #1")
    conf.set("spark.master", "local[*]")
    
    spark = SparkSession.builder\
            .config(conf=conf)\
            .getOrCreate()
    
    df = spark.read.format("csv")\
    .option("inferSchema", "true")\ # column 타입을 알아서 정해 줌.
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
  • Spark으로 칼럼 및 데이터 타입 지정.

    from pyspark.sql.types import StringType, IntegerType, FloatType
    from pyspark.sql.types import StructType, StructField
    
    schema = StructType([ \
                         StructField("stationID", StringType(), True), \ # 칼럼명, 데이터타입, NULL 가능 여부
                         StructField("date", IntegerType(), True), \
                         StructField("measure_type", StringType(), True), \
                         StructField("temperature", FloatType(), True)])
    	
    # df = spark.read.schema(schema).format("csv").load("1800.csv")
    df = spark.read.schema(schema).csv("1800.csv")
  • 기타 join, aggregation 등 가능.

    # Spark SQL 방식.
    df.createOrReplaceTempView("station1800")
    results = spark.sql("""SELECT stationID, MIN(temperature)
    FROM station1800
    WHERE measure_type = 'TMIN'
    GROUP BY 1""").collect()

실습 2 - 데이터에 스키마 정의하여 cust_id를 기준으로 amount_spent의 합을 계산하기.


  • 코드: https://github.com/keeyong/beginner-spark-programming-with-pyspark/blob/main/chapter2/PySpark_DataFrame_2.ipynb

  • SparkSession 생성.

    from pyspark.sql import SparkSession
    from pyspark import SparkConf
    
    conf = SparkConf()
    conf.set("spark.app.name", "PySpark DataFrame #2")
    conf.set("spark.master", "local[*]")
    
    spark = SparkSession.builder\
            .config(conf=conf)\
            .getOrCreate()
    
  • 스키마 생성.

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as func
    from pyspark.sql.types import StructType, StructField, StringType, FloatType
    
    schema = StructType([ \
                         StructField("cust_id", StringType(), True), \
                         StructField("item_id", StringType(), True), \
                         StructField("amount_spent", FloatType(), True)])
    
  • 스키마 지정.

    df = spark.read.schema(schema).csv("customer-orders.csv")
  • cust_id를 기준으로 amount_spent 합 구하기.

    # 방법 1.
    df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum") # 새로 생긴 데이터프레임의 칼럼을 개명하기.
    
    # 방법 2.
    import pyspark.sql.functions as f
    
    df_ca = df.groupBy("cust_id") \
       .agg(f.sum('amount_spent').alias('sum'))
    	
    # 방법 3.
    df.createOrReplaceTempView("customer_orders")
    
    spark.sql("""SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
    FROM customer_orders
    GROUP BY 1""").head(5)

실습 3 - 텍스트를 파싱해서 구조화된 데이터로 변환하기.


  • 코드: https://github.com/keeyong/beginner-spark-programming-with-pyspark/blob/main/chapter2/PySpark_DataFrame_3.ipynb

  • 정규식 표현으로 추출. regex 패턴.

  • SparkSession 생성.

    from pyspark.sql import SparkSession
    from pyspark import SparkConf
    
    conf = SparkConf()
    conf.set("spark.app.name", "PySpark DataFrame #3")
    conf.set("spark.master", "local[*]")
    
    spark = SparkSession.builder\
            .config(conf=conf)\
            .getOrCreate()
    
  • 스키마 생성 및 지정.

    import pyspark.sql.functions as F
    from pyspark.sql.types import *
    
    schema = StructType([ StructField("text", StringType(), True)])
    transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
    
  • regex 적용.

    from pyspark.sql.functions import *
    regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
    	
    # withColumn으로 새로운 칼럼 생성 or 이름 변경.
    df_with_new_columns = transfer_cost_df\
        .withColumn('week', regexp_extract('text', regex_str, 1))\ # withColumn(칼럼 이름, 저장할 값) # regexp_extract(칼럼명, 정규식 패턴, 몇 번째 매칭된 문자열을 지정할 지)
        .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
        .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
        .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
        .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
    

실습 4 - StackOverflow 서베이 기반 인기 언어 찾기.


  • 코드: https://github.com/keeyong/beginner-spark-programming-with-pyspark/blob/main/chapter2/PySpark_DataFrame_4.ipynb

  • SparkSession 생성.

    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
        .getOrCreate()
  • csv 파일을 읽은 df로부터 원하는 df로 추출하기.

    df = spark.read.csv("survey_results_public.csv", header=True).select('ResponseId', 'LanguageHaveWorkedWith', 'LanguageWantToWorkWith')
    
    import pyspark.sql.functions as F
    
    # LanguageHaveWorkedWith 값을 트림하고 ;를 가지고 나눠서 리스트의 형태로 language_have 필드로 설정
    df2 = df.withColumn(
        "language_have",
        F.split(F.trim(F.col("LanguageHaveWorkedWith")), ";")
    )
    # LanguageWantToWorkWith 값을 트림하고 ;를 가지고 나눠서 리스트의 형태로 language_want 필드로 설정
    df3 = df2.withColumn(
        "language_want",
        F.split(F.trim(F.col("LanguageWantToWorkWith")), ";")
    )
  • df로부터 인기 언어 찾기.

    df_language_have = df3.select(
        df3.ResponseId,
        F.explode(df3.language_have).alias("language_have")
    )
    df_language_have.groupby("language_have").count().show(10)
    df_language_have.groupby("language_have").count().sort(F.desc("count")).collect()
    		df_language_have.groupby("language_have").count().orderBy('count', ascending=False).collect()
    
    		df_language50_have = df_language_have.groupby("language_have")\
    .count()\
    .orderBy('count', ascending=False)\
    .limit(50)
    	
    df_language50_have.write.mode('overwrite').csv("language50_have") # mode로 HDFS에 기록.
    

실습 5 - Redsfhit 연결해 보기.


내용.

  • Monthly Active User 계산.
  • 두 개의 테이블을 Redsfhit에서 Spark으로 로드. (JDBC 연결)
  • DataFrame과 SparkSQL으로 조인.
  • DataFrame JOIN
    - left_DF.join(right_DF, join condition, join type)
profile
천천히, 그리고 꾸준히.

0개의 댓글