실시간 빅데이터 처리를 위한 Spark & Flink Online 3) RDD 함수

Bradley·2022년 3월 7일
0

병렬처리와 분산처리

병렬처리

  1. 데이터를 여러개로 쪼개고
  2. 여러 쓰레드에서 각자 task를 적용
  3. 각자 만든 결과값을 합치는 과정

분산된 환경에서의 병렬처리

  1. 데이터를 여러개로 쪼개서 여러 노드로 보낸다
  2. 여러 노드에서 각자 독립적으로 task를 적용
  3. 각자 만든 결과값을 합치는 과정
  • 노드간 통신같이 신경써야될 것이 늘어남
    Spark를 이용하면 분산된 환경에서도 일반적인 병렬처리를 하듯 코드를 짜는게 가능

  • Spark는 분산된 환경에서 데이터 병렬 모델을 구현해 추상화 시켜주기 때문

분산처리 문제

  • 부분 실패 - 노드 몇개가 프로그램과 상관없는 이유로 인해 실패
  • 속도 - 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하

처리속도를 항상 생각하며 코드작업

메모리 > 디스크 > 네트워크

정형데이터와 RDD

Key-value RDD

  • Key와 Value 쌍을 갖음
  • Pairs RDD라고도 함
  • 간단한 DB처럼 다룰 수 있음
  • key를 기준으로 고차원적인 연산

Single-Value Rdd

  • 간단하게 개수를 세거나
  • 비정형 데이터를 다루거나
  • 1차원적인 연산

Key-value RDD - Reduction Operation

키값을 기준으로 데이터를 묶어서 처리
데이터를 줄이는데 쓰임

  • reduceByKey() - 키값을 기준으로 테스크 처리
  • groupByKey() - 키값을 기준으로 벨류를 묶는다
  • sortByKey() - 키값을 기준으로 정렬
  • keys() - 키값 추출
  • values() - 벨류값 추출

Key-value RDD - Join Operation

  • join
  • rightOuterJoin
  • leftOuterJoin
  • subtractByKey

Key value 데이터에서 key를 바꾸지 않는 경우 map 대신 mapValues를 써주면 Spark 내부 파티션 유지에 효율적

RDD Transformations-Actions

Transformations

  • 결과값으로 새로운 RDD를 반환
  • 지연실행 - Lazy Execution
    map()
    flatMap()
    filter()
    distinct() - 중복 제거 다른 RDD를 리턴
    reduceByKey()
    groupByKey()
    mapValues()
    flatMapValues()
    sortByKey()

Actions

  • 결과값을 연산하여 출력하거나 저장
  • 즉시실행 - Eager Execution
    collect() - RDD 안 value를 확인, 실제 운용 환경에서는 사용안함 (데이터를 모두가져오기 때문)
    count()
    countByValue()
    take() - value의 개수를 지정해 확인
    first() - 첫 번째 value를 확인
    top()
    reduce()
    fold()
    foreach() - 워커노드에 실행

Transformations = Narrow + Wide

Narrow Transformations

  • 1:1 변환
  • filter(), map(), flatMap(), sample(), union()
  • 1열을 조작하기 위해 다른 열/파티션의 데이터를 쓸 필요가 없다
  • 정렬이 필요하지 않은 경우

Wide

  • Shuffling
  • Intersection and join, distinct, cartesian, reduceByKey(), groupByKey()
  • 아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음

Lazy Execution이 유용한 경우

  • 메모리를 최대한 활용할 수 있다.
  • 디스크, 네트워크 연산을 최소화 할 수 있다
  • 데이터를 다루는 task는 반복되는 경우가 많다
    task->disk->task->disk 오래걸림
    task -> task 인메모리방식

인메모리방식

어떤 데이터를 메모리에 남겨야할지 알아야 가능하다.
Transformations는 지연실행되기 때문에 메모리에 저장해둘 수 있다

Cache() & Persist()

데이터를 메모리에 남겨둘때 사용하는 함수

Cache

  • 디폴트 Storage Level 사용
  • RDD: MEMORY_ONLY
  • DF: MEMORY_AND_DISK

Persist

  • Storage Level을 사용자가 원하는대로 지정 가능

Storage Level

  • MEMORY_ONLY: 메모리에만 데이터를 저장하는 것
  • MEMORY_AND_DISK: 메모리와 디스크 두 곳에 저장, 메모리에 데이터가 없을 경우 디스크까지 보겠다
  • MEMORY_ONLY_SER: 메모리의 용량을 아끼기 위해서 구조화된 데이터를 SERIALIZED하는 과정을 거침
    -SER: SERIALIZATION 용량을 아낄 수 있으나 다시 데이터를 읽어올 때 DESERIALIZATION과정을 거쳐야하므로 약간의 연산이 추가되는 트레이드 오프가 있음
  • MEOMORY_AND_DISK_SER: 메모리와 디스크의 용량을 아끼기 위해서 구조화된 데이터를 SERIALIZED하는 과정을 거침
  • DISK_ONLY: 디스크에만 저장

Spark 클러스터의 내부구조 (Cluster Topology)

Master Worker Topology

Spark는 Master Worker Topology로 구성되어있다.

Spark를 쓰면서 잊지 말아야할 점

  • 항상 데이터가 여러곳에 분산되어 있다는 것
  • 같은 연산이어도 여러 노드에 걸쳐서 실행된다는 점

기본적인 구조

  • Driver Program
    스파크의 메인프로그램
    개발자나 유저가 프로그램과 상호작용
    Transformation 과 Action 을 저장 또는 워커노드에 전달
    • SparkContext: 새로운 RDD를 생성
  • Worker Node
    워커
  • Cluster Manager
    Driver Program과 Worker Node의 소통을 담당
    클러스터 전반에 대한 작업을 담당

일반적으로 Aciton은 워커노드로부터 데이터를 받는 것까지 포함
모두 함수들로만 이루어져있으므로 함수들이 어디서 작동하는지를 항상 생각하면서 작성

궁금한 점
wide transformations
아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음 이란 말이 여러 파티션의 데이터에서 불러올때만 해당되는 것인가?

profile
2022년부턴 후회없이

0개의 댓글

관련 채용 정보