Spark를 이용한 데이터 병렬 분산 처리

qwer·2022년 3월 10일
0

Apache Spark

목록 보기
2/4

본 내용은 Fast Campus의 올인원 패키지: 실시간 빅데이터 처리를 위한 Spark & Flink Online 강의를 듣고 정리한 내용입니다.

Data parallel과 Distributed Data parallel

  1. Data Parallel?

    • 데이터를 여러개로 쪼갬
    • 여러 쓰레드에서 각자 task를 적용
    • 최종적으로 각각의 쓰레드에서 만든 결과값을 합침
  2. Distributed Data Parallel?

    • 데이터를 여러개로 쪼갬
    • 여러 노드로 보낸 후 여러 노드에서 각자 독립적으로 task를 적용
    • 최종적으로 각각의 노드에서 만든 결과값을 합침
    • 노드간 통신과 같이 처리해야할 것들이 늘어남
    • Spark 를 이용하면 분산된 환경에서도 일반적인 병렬처리를 하듯 코딩이 가능
    • 단, 노드간 통신 속도를 신경쓰면서 코딩해야 성능을 끌어낼 수 있음

분산처리 문제

  • 분산처리로 넘어가면서 신경써야될 문제가 많아짐
    • 속도: 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하
    • 부분실패: 일부 노드가 프로그램과 상관 없는 이유로 인해 실패
    • 필터링을 한 후에 노드간 통신이 이루어져야 성능이 좋음

Structured Data와 RDD

Key-Value RDD

  1. 정의

    • Key와 Value 쌍을 갖는 Key-Value RDD
    • (Key, Value) 쌍을 갖기 때문에 Pairs RDD라고도 불림
    • 간단한 데이터베이스처럼 다룰 수 있음
    • 예) 쇼핑몰에서 상품당 평점 구하기
      • Key: 상품 ID
      • Value: 평점
  2. Key Value RDD로 할 수 있는 것들

    • Reduction
      • reduceByKey() - 키값을 기준으로 task 처리
      • groupByKey() - 키값을 기준으로 Value를 grouping
      • sortByKey() - 키값을 기준으로 정렬
      • keys() - key 값 추출
      • values() - value 값 추출
      • rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
    • Join
      • join
      • rightOuterJoin
      • leftOuterJoin
      • subtractByKey
  3. Mapping values

    • Key Value RDD 에서 Key를 바꾸지 않는 경우 map() 함수 대신 value만 다루는 mapValues() 함수를 사용하자 (Spark 내부에서 파티션을 유지할 수 있어 효율적)
    • mapValues(), flatMapValues()

Transformations & Actions

Transformations

  • 결과값으로 새로운 RDD를 반환
  • 지연실행(Lazy Execution)
  • map(), flatMap(), filter(), distinct(), reduceByKey(), groupByKey(), sortByKey() 등
  • Wide Transformation과 Narrow Transformation가 존재
  • Wide Transformation
    • 1:1 변환
    • filter(), map(), flatMap(), union(), sample()
    • 1열의 데이터를 조작하기 위해 다른 열/파티션의 데이터를 사용할 필요가 없음
    • 정렬이 필요하지 않은 경우
  • Narrow Transformation
    • shuffling
    • intersection, join, distinct, reduceByKey(), GroupByKey()
    • output RDD의 파티션에 다른 파티션의 데이터가 사용될 수 있음
    • 많은 네트워크 통신과 리소스가 사용됨(성능을 위해 최적화가 중요)

Actions

  • 결과값을 연산하여 출력하거나 저장
  • 즉시실행(Eager Execution)
  • collect(), count(), countByValue(), take(), top(), reduce(), fold(), foreach()

왜 지연실행 필요한가?

  • 머신러닝과 같은 작업은 task에서 처리한 데이터를 disk에 저장하고 다시 task에서 disk의 데이터를 읽는 작업이 반복되는 비효율성이 존재
  • 이때 메모리를 활용하면 더욱 효율적인 작업이 가능한데 이때 어떤 데이터를 메모리에 남겨야 할지를 알아야 함
  • Transformations는 지연실행 되기 때문에 메모리에 저장 가능

Cache & Persist

  • cache()와 persist()로 데이터를 메모리에 저장해 놓고 사용 가능
  • 다음과 같은 storage level이 존재
    • MEMORY_ONLY: 메모리만 사용
    • MEMORY_AND_DISK: 메모리에 더이상 저장할 수 없는 경우 디스크에 저장
    • MEMORY_ONLY_SER: 직렬화 된 데이터를 메모리에 저장
    • MEMORY_AND_DISK_SER: 직렬화 된 데이터를 메모리와 디스크에 저장
    • DISK_ONLY: 디스크만 사용
    • _SER 이 포함된 storage level의 경우 직렬화 함으로써 메모리나 디스크의 공간을 절약할 수는 있지만 데이터를 읽어올 때 추가적인 역직렬화 연산이 필요함
  • Cache
    • default storage level을 사용
    • RDD's default storage level: MEMORY_ONLY
    • DF's default storage level: MEMORY_AND_DISK
  • Persist
    • 사용자가 원하는 storage level을 지정 가능

0개의 댓글