Spark 완벽 가이드 ch23. 운영 환경에서의 구조적 스트리밍

Q·2023년 2월 1일
0

Spark 완벽 가이드

목록 보기
24/24

내고장성과 체크포인팅

  • 장애 복구는 스트리밍 애플리케이션을 운영할 때 매우 신경써야하는 부분
  • 장애 원인 예시
    • 클러스터 머신 문제
    • 마이그레이션 실수로 스키마 변경 문제
  • 구조적 스트리밍 애플리케이션은 단순한 재시작만으로 장애 상황 극복 가능
    • 체크포인트와 WAL을 사용하기 때문
    • 장애 상황 발생 시 단순히 애플리케이션을 다시 시작해서 중간 상탯값을 저장한 체크포인트 경로를 참조하도록 설정 가능
      • 체크포인팅은 현재까지 처리한 스트림과 모든 중간 상태를 저장함
from pyspark.sql import functions as F
from pyspark.sql import types as T
path = '/FileStore/tables/bin/activity-data'
static = spark.read.json(path)
display(static.limit(5))
Arrival_TimeCreation_TimeDeviceIndexModelUsergtxyz
14246867350901424686733090638193nexus4_118nexus4gstand3.356934E-4-5.645752E-4-0.018814087
14246867352921424688581345918092nexus4_266nexus4gstand-0.0057220460.0290832520.005569458
14246867355001424686733498505625nexus4_199nexus4gstand0.0078125-0.0176544190.010025024
14246867356911424688581745026978nexus4_2145nexus4gstand-3.814697E-40.0184021-0.013656616
14246867358901424688581945252808nexus4_2185nexus4gstand-3.814697E-4-0.031799316-0.00831604
streaming = spark.readStream.schema(static.schema)\
.option("maxFilePerTrigger",10).json(path).groupby('gt').count()
query = streaming.writeStream.outputMode('complete')\
.option("checkpointLocation", "/FileStore/tables/bin/location/")\
.queryName("test_python_stream").format("memory").start()
%fs ls /FileStore/tables/bin/location
pathnamesize
dbfs:/FileStore/tables/bin/location/commits/commits/0
dbfs:/FileStore/tables/bin/location/metadatametadata45
dbfs:/FileStore/tables/bin/location/offsets/offsets/0
dbfs:/FileStore/tables/bin/location/sources/sources/0
dbfs:/FileStore/tables/bin/location/state/state/0

애플리케이션 변경하기

  • 채크포인팅은 운영 환경에서 애플리케이션을 실행하는 데 가장 중요한 기능임
  • 따라서 스트리밍 애플리케이션을 업뎃할때 이전 체크포인트 데이터를 고려해야함
    • 이전과 비교했을 때, 큰 변화사항이 있는지 고려해야함
  • 이와 관련된 두 가지 유형의 업뎃
    • 스트리밍 애플리케이션 코드 업뎃
    • 스파크 버전 업뎃

스트리밍 애플리케이션 코드 업데이트하기

  • 변화 정도에 따라 처리가 다름
  • 새로운 컬럼을 추가하거나 사용자 정의 함수를 변경하는 등의 작은 업뎃이 발생한 경우
    • 기존 체크포인트 디렉터리 사용 가능하고, 그냥 재시작하면 됨
  • 새로운 집계 키를 추가하거나 쿼리를 완전히 변경하는 등의 큰 업뎃이 발생한 경우
    • 스파크는 이전 체크포인트 디렉터리에 저장된 정보에서 새로운 쿼리에 필요한 상태정보를 만들어내지 못함
    • 따라서 반드시 비어있는 신규 체크포인트 디렉터리를 새로 지정하고 처음부터 다시 처리해야함

스파크 버전 업데이트하기

  • 구조적 스트리밍 애플리케이션은 스파크의 패치 버전 업뎃에 상관없이 이전 체크포인트 디렉터리를 사용해 재시작 가능
    • 체크포인트 포맷이 상위 버전과 호환되도록 대부분 설계되어 있음
  • 정확하게는 릴리스 노트를 보고 새로운 스파크 버전이 이전 체크포인트 정보를 사용할 수 있는지 확인해야함

애플리케이션의 초기 규모 산정과 재조정하기

  • 클러스터는 평균 데이터 발생량 이상으로 데이터가 급증하는 상황에서도 안정적으로 처리할 수 있는 크기를 가져야함
    • 전반적으로 유입률이 처리율보다 훨씬 크다면 클러스터나 애플리케이션의 크기를 늘려야함
  • 리소스 매니저와 배포 방식에 따라 애플리케이션의 익스큐터를 동적으로 추가 가능
  • 익스큐터를 제거하거나 애플리케이션에 설정된 자원을 줄인 다음 재시작하는 방식으로 애플리케이션의 크기를 줄일 수도 있음

메트릭과 모니터링

  • 스트리밍 애플리케이션의 메트릭과 모니터링은 일반 스파크 애플리케이션과 거의 같으나 추가 기능을 제공함
  • 스트리밍 애플리케이션의 상태를 자세히 파악할 수 있는 두 가지 API 제공
    • 쿼리 상태 모니터링 API
    • 쿼리 진행 상황 모니터링 API

쿼리 상태

  • 가장 기본적인 모니터링 API
    • '지금 스트림에서 어떤 처리를 하고 있지?'에 대한 답을 얻을 수 있음
    • 이 정보는 startStream메서드에서 반환한 쿼리 객체의 status 속성으로 확인할 수 있음
query.status
Out[6]: {'message': 'Getting offsets from FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]', 
'isDataAvailable': False, 
'isTriggerActive': True}

최근 진행 상황

  • 진행 상황 API
    • '튜플을 얼마나 처리하고 있지?','소스에서 이벤트가 얼마나 빠르게 들어오지?'와 같은 질문에 대한 답을 얻을 수 있음
    • 쿼리 객체의 recentProgress 속성으로 처리율과 배치 주기 등 시간 기반의 정보를 얻을 수 있음
query.recentProgress
Out[7]: [{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0', 
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a', 
'name': 'test_python_stream', 
'timestamp': '2021-10-02T12:29:53.024Z', 
'batchId': 1, 'numInputRows': 0, 
'processedRowsPerSecond': 0.0, 
'durationMs': {'getOffset': 298, 'triggerExecution': 1163}, 
'stateOperators': [], 
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]', 
'startOffset': {'logOffset': 0}, 
'endOffset': {'logOffset': 0}, 
'numInputRows': 0, 
'processedRowsPerSecond': 0.0}], 
'sink': {'description': 'MemorySink'}}, 
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0', 
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a', 
'name': 'test_python_stream', 
'timestamp': '2021-10-02T12:30:04.192Z', 
'batchId': 1, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0, 
'durationMs': {'getOffset': 93, 
'triggerExecution': 94}, 
'stateOperators': [], 
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]', 
'startOffset': {'logOffset': 0}, 
'endOffset': {'logOffset': 0}, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0}], 
'sink': {'description': 'MemorySink'}}, 
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0', 
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a', 
'name': 'test_python_stream', 
'timestamp': '2021-10-02T12:30:14.289Z', 
'batchId': 1, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0, 
'durationMs': {'getOffset': 90, 
'triggerExecution': 90}, 
'stateOperators': [], 
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]', 
'startOffset': {'logOffset': 0}, 
'endOffset': {'logOffset': 0}, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0}], 
'sink': {'description': 'MemorySink'}}, 
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0', 
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a', 
'name': 'test_python_stream', 
'timestamp': '2021-10-02T12:30:24.377Z', 
'batchId': 1, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0, 
'durationMs': {'getOffset': 97, 
'triggerExecution': 97}, 
'stateOperators': [], 
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]', 
'startOffset': {'logOffset': 0}, 
'endOffset': {'logOffset': 0}, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0}], 
'sink': {'description': 'MemorySink'}}, 
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0', 
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a', 
'name': 'test_python_stream', 
'timestamp': '2021-10-02T12:30:34.467Z', 
'batchId': 1, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0, 
'durationMs': {'getOffset': 99, 
'triggerExecution': 99}, 
'stateOperators': [], 
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]', 
'startOffset': {'logOffset': 0}, 
'endOffset': {'logOffset': 0}, 
'numInputRows': 0, 
'inputRowsPerSecond': 0.0, 
'processedRowsPerSecond': 0.0}], 
'sink': {'description': 'MemorySink'}}]

주요 필드

  • 유입률(input rate)
    • 입력소스에서 구조적 스트리밍 내부로 데이터가 유입되는 양
  • 처리율(processing rate)
    • 유입된 데이터를 처리하는 속도
  • 배치주기
    • 대부분 스트리밍 시스템은 적정한 처리량을 얻기 위해 배치 방식으로 처리함
    • 일부 스트리밍 시스템은 처리량을 줄이는 대신 느리게 응답하는 옵션 제공

스파크 UI

  • 잡, 태스크, 처리 메트릭 확인 가능
  • DStream API와 달리 구조적 스트리밍은 Streaming 탭을 사용하지 않음

알림

  • 사용자는 대시보드의 메트릭을 계속 확인해서 잠재적인 문제를 발견하는 과정이 필요함
  • 따라서 잡이 실패하거나 유입률보다 처리율이 떨어지는 경우 등을 자동으로 알려주는 기능이 필요
  • 스파크는 위의 상태 및 진행 상황 API를 기반으로 알림 시스템을 구축할 수 있도록 함

스트리밍 리스너를 사용한 고급 모니터링

  • StreamingQueryListener 클래스를 이용하여 비동기 방식으로 스트리밍 쿼리 정보를 수신하며 더 강력하게 애플리케이션을 관찰할 수 있음
    • 해당 클래스를 상속해서 자체 로직을 구현하면됨
profile
Data Engineer

0개의 댓글