230704 - 빅데이터 처리 시스템

김지석·2023년 7월 11일
0

Spark 데이터 처리

Spark 데이터 시스템 아키텍처

  • 모든 데이터들이 데이터 구조를 갖게 되는데 나눠져서 처리됨.

데이터 병렬처리가 가능하려면?

  • 데이터가 먼저 분산되어야함
    • 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)
      • hdfs-site.xml에 있는 dfs.block.size 프로퍼티로 데이터 크기 결정 가능
    • Spark에서는 이를 파티션 (Partition)이라 부름. 파티션의 기본크기도 128MB
      • spark.sql.files.maxPartitionBytes: HDFS등에 있는 파일을 읽어올 때만 적용됨
  • 다음으로 나눠진 데이터를 각각 따로 동시 처리
    • 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크 실행
      • 블록 하나당 하나의 레포 생성
    • Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됨
      • 한 Executor에서 여러개의 partition을 처리할 수 있음.

처리 데이터를 나누기 -> 파티션 -> 병렬처리

Spark 데이터 처리 흐름

  • 데이터프레임은 작은 파티션들로 구성됨
    • 데이터프레임은 한번 만들어지면 수정 불가 (Immutable)
  • 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환
    • sort, group by, filter, map, join, …

셔플링: 파티션간에 데이터 이동이 필요한 경우 발생

  • 셔플링이 발생하는 경우는?
    • 명시적 파티션을 새롭게 하는(바꾸는) 경우 (예: 파티션 수를 줄이기)
    • 시스템에 의해 이뤄지는 셔플링
      • 예를 들면 그룹핑 등의 aggregation이나 sorting
  • 셔플링이 발생할 때 네트웍을 타고 데이터가 이동하게 됨
    • 몇 개의 파티션이 결과로 만들어질까?
      • spark.sql.shuffle.partitions이 결정
        • 기본값은 200이며 이는 최대 파티션 수
      • 오퍼레이션에 따라 파티션 수가 결정됨
        • random, hashing partition, range partition 등등
        • sorting의 경우 range partition을 사용함
    • 또한 셔플링 이후에 Data Skew 발생 가능!

셔플링: hashing partition

  • Aggregation 오퍼레이션

Data Skewness

❖ Data partitioning은 데이터 처리에 병렬성을 주지만 단점도 존재
● 이는 데이터가 균등하게 분포하지 않는 경우
▪ 주로 데이터 셔플링 후에 발생
● 셔플링을 최소화하는 것이 중요하고 파티션 최적화를 하는 것이 중요.

Spark 데이터 구조: RDD, DataFrame, Dataset

Spark 데이터 구조

  • RDD, DataFrame, Dataset (Immutable Distributed Data)
    • 2016년에 DataFrame과 Dataset은 하나의 API로 통합됨
    • 모두 파티션으로 나뉘어 Spark에서 처리됨

Spark 데이터 구조

  • RDD (Resilient Distributed Dataset)
    • 로우레벨 데이터로 클러스터내의 서버에 분산된 데이터를 지칭
    • 레코드별로 존재하지만 스키마가 존재하지 않음
      • 구조화된 데이터나 비구조화된 데이터 모두 지원
  • DataFrame과 Dataset
    • RDD위에 만들어지는 RDD와는 달리 필드 정보를 갖고 있음 (테이블)
    • Dataset은 타입 정보가 존재하며 컴파일 언어에서 사용가능
      • 컴파일 언어: Scala/Java에서 사용가능
    • PySpark에서는 DataFrame을 사용

Spark 데이터 구조

Spark 데이터 구조 - RDD

  • 변경이 불가능한 분산 저장된 데이터
    • RDD는 다수의 파티션으로 구성
    • 로우레벨의 함수형 변환 지원 (map, filter, flatMap 등등)
  • 일반 파이썬 데이터는 parallelize 함수로 RDD로 변환
    • RDD를 DataFrame으로 변환할 수 있으며 스키마를 지정해주어야 변환 가능.
    • 반대는 collect로 파이썬 데이터로 변환가능

Spark 데이터 구조 - 데이터 프레임

  • 변경이 불가한 분산 저장된 데이터
  • RDD와는 다르게 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
    • 판다스의 데이터 프레임 혹은 관계형 데이터베이스의 테이블과 거의 흡사
    • 다양한 데이터소스 지원: HDFS, Hive, 외부 데이터베이스, RDD 등등
  • 스칼라, 자바, 파이썬과 같은 언어에서 지원

Spark 프로그램 구조

Spark Session 생성과 설정, Spark 프로그램의 일반적인 구조

Spark Session 생성

  • Spark 프로그램의 시작은 SparkSession을 만드는 것
    • 프로그램마다 하나를 만들어 Spark Cluster와 통신: Singleton 객체
    • Spark 2.0에서 처음 소개됨
  • Spark Session을 통해 Spark이 제공해주는 다양한 기능을 사용
    • DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
    • config 메소드를 이용해 다양한 환경설정 가능
    • 단 RDD와 관련된 작업을 할때는 SparkSession 밑의 sparkContext 객체를 사용
  • Spark Session API 문서

  • .config()로 환경설정을 할 수도 있음

pyspark.sql 제공 주요 기능

  • pyspark.sql.SparkSession
  • pyspark.sql.DataFrame
  • pyspark.sql.Column
  • pyspark.sql.Row
  • pyspark.sql.functions
  • pyspark.sql.types
  • pyspark.sql.Window

Spark Session 환경 변수

  • Spark Session을 만들 때 다양한 환경 설정이 가능
  • 몇 가지 예
    • executor별 메모리: spark.executor.memory (기본값: 1GB)
    • executor별 CPU수: spark.executor.cores (YARN에서는 기본값 1)
    • driver 메모리: spark.driver.memory (기본값: 1GB)
    • Shuffle후 Partition의 수: spark.sql.shuffle.partitions (기본값: 최대 200개)
  • 가능한 모든 환경변수 옵션은 여기에서 찾을 수 있음
    • 사용하는 Resource Manager에 따라 환경변수가 많이 달라짐

Spark Session 환경 설정 방법 4가지

  • 환경변수
    1. $SPARK_HOME/conf/spark_defaults.conf
    • 보통 Spark Cluster 어드민이 관리
    1. spark-submit 명령의 커맨드라인 파라미터
    1. SparkSession 만들때 지정
    • SparkConf 클래스
  • 충돌 시 우선순위는 3 -> 2 -> 1

Spark 세션 환경 설정

  • SparkSession 생성시 일일히 지정
    -이 시점의 Spark Configuration은 앞서 언급한 환경변수와 spark_defaults.conf와
    spark-submit로 들어온 환경설정이 우선순위를 고려한 상태로 정리된 상태

  • SparkConf 객체에 환경 설정하고 SparkSession에 지정

전체적인 플로우

  • Spark 세션(SparkSession)을 만들기
    • 환경설정
  • 입력 데이터 로딩
  • 데이터 조작 작업 (판다스와 아주 흡사)
    • DataFrame API나 Spark SQL을 사용
    • 원하는 결과가 나올때까지 새로운 DataFrame을 생성
  • 최종 결과 저장

Spark Session이 지원하는 데이터 소스

  • spark.read(DataFrameReader)를 사용하여 데이터프레임으로 로드
  • spark.write(DataFrameWriter)을 사용하여 데이터프레임을 저장
  • 많이 사용되는 데이터 소스들
    • HDFS 파일
      • CSV, JSON, Parquet, ORC, Text, Avro
      • Parquet/ORC/Avro(binary file)에 대해서는 나중에 더 자세히 설명
      • Hive 테이블
    • JDBC 관계형 데이터베이스
    • 클라우드 기반 데이터 시스템
    • 스트리밍 시스템(키네시스, 카프카 등)

Spark 개발 환경

Spark 개발 환경 옵션

  • Local Standalone Spark + Spark Shell
  • Python IDE – PyCharm, Visual Studio
  • Databricks Cloud – 커뮤니티 에디션을 무료로 사용
  • 다른 노트북 – 주피터 노트북, 구글 Colab, 아나콘다 등등

◆Local Standalone Spark

  • Spark Cluster Manager로 local[n] 지정 (n=Thread의 개수)
    • master를 local[n]으로 지정
    • master는 클러스터 매니저를 지정하는데 사용
  • 주로 개발이나 간단한 테스트 용도
  • 하나의 JVM에서 모든 프로세스를 실행
    • 하나의 Driver와 하나의 Executor가 실행됨
    • 1+ 쓰레드가 Executor안에서 실행됨
  • Executor안에 생성되는 쓰레드 수
    • local:하나의 쓰레드만 생성
    • local[*]: 컴퓨터 CPU 수만큼 쓰레드를

구글 Colab에서 Spark 사용

  • PySpark + Py4J를 설치
    • 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
    • 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
    • Spark Web UI는 기본적으로는 접근 불가
      • ngrok을 통해 억지로 열 수는 있음
    • Py4J
      • 파이썬에서 JVM내에 있는 자바 객체를 사용가능하게 해줌

pyspark.sql.types 라이브러리

  • IntegerType
  • LongType
  • FloatType
  • StringType
  • BooleanType
  • TimestampType
  • DateType
  • ArrayType
  • StructType
  • StructField
  • MapType

DataFrame의 컬럼을 지칭하는 방식

from pyspark.sql.functions import col, column # 두개는 같음.
stationTemps = minTemps.select(
 "stationID",
 col("stationID"),
 column("stationID"),
 minTemps.stationID
)
profile
초짜에요...

0개의 댓글