스파크 애플리케이션 작성하기
- 스파크 애플리케이션은 스파크 클러스터와 사용자 코드 두 가지 조합으로 구성됨
from __future__ import print_function
if __name__ == '__main__':
from pyspark.sql import SparkSession
spark= SparkSession.builder.master('local').appName('Word Count')\
.config("spark.some.config.option", 'some-value').getOrCreate()
print(spark.range(5000).where('id>500').selectExpr('sum(id)').collect())
[Row(sum(id)=12372250)]
- 위 코드가 실행되면 애플리케이션에서 활용할 수 있는 SparkSession객체가 생성됨
spark.conf.get('spark.master')
Out[5]: 'local'
spark.conf.get('spark.app.name')
Out[6]: 'Word Count'
스파크 애플리케이션 테스트
- 애플리케이션을 테스트하려면 애플리케이션을 작성할 때 몇 가지 핵심 원칙과 구성 전략을 고려해야함
전략적 원칙
- 데이터 파이프라인과 애플리케이션에 대한 테스트 코드 개발은 실제 애플리케이션 개발만큼이나 중요
- 테스트 코드는 미래에 발생할 수 있는 데이터, 로직, 결과 변화에 유연하게 대처할 수 있게 도와줌
입력 데이터에 대한 유연성
- 비즈니스 요구사항이 변하면 데이터도 변함
- 따라서 애플리케이션과 파이프라인은 입력 데이터 중 일부가 변하더라도 유연하게 대처할 수 있어야함
비즈니스 로직 변경에 대한 유연성
- 입력 데이터뿐만 아니라 파이프라인 내부의 비즈니스 로직이 바뀔수도 있음
- 비즈니스 로직을 테스트해 복잡한 비즈니스 파이프라인이 의도한 대로 동작하는지 확인해야함
- 이 유형에서는 스파크가 가진 기능을 테스트하는 '스파크 단위 테스트'를 작성하지 않도록 조심해야함
- 예상했던 원형 데이터의 형태가 실제 원형 데이터와 같은지 확인
결과의 유연성과 원자성
- 결과를 의도한 대로 반환하는지 확인
- 데이터가 스키마에 맞는 적절한 형태로 반환될 수 있도록 제어해야함
테스트 코드 작성 시 고려사항
- 애플리케이션 테스트를 쉽게 만들어주는 테스트 구성 전략도 중요
- 적절한 단위 테스트를 작성해서 입력 데이터나 구조가 변경되어도 비즈니스 로직이 정상적으로 동작하는지 확인해야함
- 단위 테스트를 하면 스키마가 변경되는 상황에 쉽게 대응할 수 있음
- 단위 테스트 구성 방법은 비즈니스 도메인과 도메인 경험에 따라 다양할 수 있음
SparkSession 관리하기
- 스파크 로컬 모드 덕분에 JUnit이나 ScalaTest 같은 단위 테스트용 프레임워크로 비교적 쉽게 스파크 코드를 테스트할 수 있음
- 테스트 하네스(테스트를 지원하기 위해 생성된 코드와 데이터)의 일부로 로컬 모드의 SparkSession을 만들어 사용하기만 하면 됨
- SparkSession을 한 번만 초기화하고 런타임 환경에서 함수와 클래스에 전달하는 방식을 사용하면 테스트 중에 SoarkSession을 쉽게 교체 가능
테스트 코드용 스파크 API 선정하기
- 스파크는 SQL, DataFrame, DataSet 등 다양한 API를 제공
- 각 API는 사용자 애플리케이션의 유지 보수성과 테스트 용이성 측면에서 서로 다른 영향을 미칠 수 있음
- 적합한 API는 사용자가 속한 팀과 팀에서 무엇을 필요로 하는지에 따라 달라질 수 있음
- 개발 속도를 올리기 위해서는 덜 엄격한 SQL, DataFrame을 사용할 수 있고
- 타입 안정성을 위해서는 Dataset과 RDD를 사용할 수 있음
단위 테스트 프레임워크에 연결하기
- 코드를 단위 테스트하려면 각 언어의 표준 프레임워크(JUnit, ScalaTest 등)를 사용하고,
- 테스트 하네스에서 테스트마다 SparkSession을 생성하고 제거하도록 설정하는 것이 좋음
데이터소스 연결하기
- 가능하면 테스트 코드에서는 운영 환경의 데이터소스에 접속하지 말아야함
- 그래야 데이터소스가 변경되더라도 고립된 환경에서 개발자가 쉽게 테스트 코드를 실행할 수 있음
- 데이터소스에 직접 접근하지 말고 Dataframe이나 Dataset을 넘겨받게 만들어야함
개발 프로세스
- 초기화된 작업 공간 마련
- 핵심 컴포넌트와 알고리즘 개발
- 라이브러리나 패키지 같은 영구적인 영역으로 코드 옮김
애플리케이션 시작하기
- 대부분의 운영용 애플리케이션은 spark-submit 명령으로 실행
- spark-submit 명령으로 스파크 잡을 제출할 때는 클라이언트 모드(default)나 클러스터 모드 중 하나를 선택해야함
- 근데 드라이버와 익스큐터 간의 지연 시간을 줄이기 위해 클러스터 모드 추천
애플리케이션 환경 설정하기
-
스파크는 다양한 환경 설정을 제공함
-
설정 목록
- 애플리케이션 속성
- 런타임 환경
- 셔플 동작 방식
- 스파크 UI
- 압축과 직렬화
- 메모리 관리
- 처리 방식
- 네트워크 설정
- 스케줄링
- 동적 할당
- 보안
- 암호화
- 스파크 SQL
- 스파크 스트리밍
- SparkR
SparkConf
- SparkConf객체로 스파크 속성을 설정할 수 있음
- 스파크 애플리케이션에서 생성된 SparkConf 객체는 불변성임
- 스파크 속성값은 스파크 애플리케이션의 동작 방식과 클러스터 구성 방식을 제어
from pyspark import SparkConf
conf = SparkConf().setMaster('local[2]').setAppName('DefinitiveGuide').set('some.conf','to.some.value')
애플리케이션에서 잡 스케줄링
- 스파크 애플리케이션에서 별도의 스레드를 사용해 여러 잡을 동시에 실행할 수 있음
- 여기서 '잡'은 해당 액션을 수행하기 위해 실행되어야 할 모든 태스크와 스파크 액션을 의미
- 스파크의 스케줄러는 스레드 안정성을 충분히 보장하고, 여러 요청을 동시에 처리할 수 있는 애플리케이션을 만들 수 있게함
- 스파크 스케줄러
- FIFO(default)
- 큐의 전단에 있는 잡이 많은 자원을 사용하지만 않으면 이후 잡을 바로 실행 가능
- 만약 전단 잡이 너무 크면 이후 잡은 매우 늦게 실행될 것임
- 라운드로빈
- 여러 잡이 자원을 공평하게 나눠쓰도록 함
- 장시간 수행되는 잡의 종료를 기다리지 않고 빠르게 실행 가능
- 사용자가 많은 환경에 적합
- 페어(FAIR)
- 여러 개의 잡을 pool로 그룹화
- 개별 pool에 다른 스케줄링 옵션이나 우선 순위 부여 가능