내고장성과 체크포인팅
- 장애 복구는 스트리밍 애플리케이션을 운영할 때 매우 신경써야하는 부분
- 장애 원인 예시
- 클러스터 머신 문제
- 마이그레이션 실수로 스키마 변경 문제
- 구조적 스트리밍 애플리케이션은 단순한 재시작만으로 장애 상황 극복 가능
- 체크포인트와 WAL을 사용하기 때문
- 장애 상황 발생 시 단순히 애플리케이션을 다시 시작해서 중간 상탯값을 저장한 체크포인트 경로를 참조하도록 설정 가능
- 체크포인팅은 현재까지 처리한 스트림과 모든 중간 상태를 저장함
from pyspark.sql import functions as F
from pyspark.sql import types as T
path = '/FileStore/tables/bin/activity-data'
static = spark.read.json(path)
display(static.limit(5))
Arrival_Time | Creation_Time | Device | Index | Model | User | gt | x | y | z |
---|
1424686735090 | 1424686733090638193 | nexus4_1 | 18 | nexus4 | g | stand | 3.356934E-4 | -5.645752E-4 | -0.018814087 |
1424686735292 | 1424688581345918092 | nexus4_2 | 66 | nexus4 | g | stand | -0.005722046 | 0.029083252 | 0.005569458 |
1424686735500 | 1424686733498505625 | nexus4_1 | 99 | nexus4 | g | stand | 0.0078125 | -0.017654419 | 0.010025024 |
1424686735691 | 1424688581745026978 | nexus4_2 | 145 | nexus4 | g | stand | -3.814697E-4 | 0.0184021 | -0.013656616 |
1424686735890 | 1424688581945252808 | nexus4_2 | 185 | nexus4 | g | stand | -3.814697E-4 | -0.031799316 | -0.00831604 |
streaming = spark.readStream.schema(static.schema)\
.option("maxFilePerTrigger",10).json(path).groupby('gt').count()
query = streaming.writeStream.outputMode('complete')\
.option("checkpointLocation", "/FileStore/tables/bin/location/")\
.queryName("test_python_stream").format("memory").start()
%fs ls /FileStore/tables/bin/location
path | name | size |
---|
dbfs:/FileStore/tables/bin/location/commits/ | commits/ | 0 |
dbfs:/FileStore/tables/bin/location/metadata | metadata | 45 |
dbfs:/FileStore/tables/bin/location/offsets/ | offsets/ | 0 |
dbfs:/FileStore/tables/bin/location/sources/ | sources/ | 0 |
dbfs:/FileStore/tables/bin/location/state/ | state/ | 0 |
애플리케이션 변경하기
- 채크포인팅은 운영 환경에서 애플리케이션을 실행하는 데 가장 중요한 기능임
- 따라서 스트리밍 애플리케이션을 업뎃할때 이전 체크포인트 데이터를 고려해야함
- 이전과 비교했을 때, 큰 변화사항이 있는지 고려해야함
- 이와 관련된 두 가지 유형의 업뎃
- 스트리밍 애플리케이션 코드 업뎃
- 스파크 버전 업뎃
스트리밍 애플리케이션 코드 업데이트하기
- 변화 정도에 따라 처리가 다름
- 새로운 컬럼을 추가하거나 사용자 정의 함수를 변경하는 등의 작은 업뎃이 발생한 경우
- 기존 체크포인트 디렉터리 사용 가능하고, 그냥 재시작하면 됨
- 새로운 집계 키를 추가하거나 쿼리를 완전히 변경하는 등의 큰 업뎃이 발생한 경우
- 스파크는 이전 체크포인트 디렉터리에 저장된 정보에서 새로운 쿼리에 필요한 상태정보를 만들어내지 못함
- 따라서 반드시 비어있는 신규 체크포인트 디렉터리를 새로 지정하고 처음부터 다시 처리해야함
스파크 버전 업데이트하기
- 구조적 스트리밍 애플리케이션은 스파크의 패치 버전 업뎃에 상관없이 이전 체크포인트 디렉터리를 사용해 재시작 가능
- 체크포인트 포맷이 상위 버전과 호환되도록 대부분 설계되어 있음
- 정확하게는 릴리스 노트를 보고 새로운 스파크 버전이 이전 체크포인트 정보를 사용할 수 있는지 확인해야함
애플리케이션의 초기 규모 산정과 재조정하기
- 클러스터는 평균 데이터 발생량 이상으로 데이터가 급증하는 상황에서도 안정적으로 처리할 수 있는 크기를 가져야함
- 전반적으로 유입률이 처리율보다 훨씬 크다면 클러스터나 애플리케이션의 크기를 늘려야함
- 리소스 매니저와 배포 방식에 따라 애플리케이션의 익스큐터를 동적으로 추가 가능
- 익스큐터를 제거하거나 애플리케이션에 설정된 자원을 줄인 다음 재시작하는 방식으로 애플리케이션의 크기를 줄일 수도 있음
메트릭과 모니터링
- 스트리밍 애플리케이션의 메트릭과 모니터링은 일반 스파크 애플리케이션과 거의 같으나 추가 기능을 제공함
- 스트리밍 애플리케이션의 상태를 자세히 파악할 수 있는 두 가지 API 제공
- 쿼리 상태 모니터링 API
- 쿼리 진행 상황 모니터링 API
쿼리 상태
- 가장 기본적인 모니터링 API
- '지금 스트림에서 어떤 처리를 하고 있지?'에 대한 답을 얻을 수 있음
- 이 정보는 startStream메서드에서 반환한 쿼리 객체의 status 속성으로 확인할 수 있음
query.status
Out[6]: {'message': 'Getting offsets from FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]',
'isDataAvailable': False,
'isTriggerActive': True}
최근 진행 상황
- 진행 상황 API
- '튜플을 얼마나 처리하고 있지?','소스에서 이벤트가 얼마나 빠르게 들어오지?'와 같은 질문에 대한 답을 얻을 수 있음
- 쿼리 객체의 recentProgress 속성으로 처리율과 배치 주기 등 시간 기반의 정보를 얻을 수 있음
query.recentProgress
Out[7]: [{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0',
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a',
'name': 'test_python_stream',
'timestamp': '2021-10-02T12:29:53.024Z',
'batchId': 1, 'numInputRows': 0,
'processedRowsPerSecond': 0.0,
'durationMs': {'getOffset': 298, 'triggerExecution': 1163},
'stateOperators': [],
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]',
'startOffset': {'logOffset': 0},
'endOffset': {'logOffset': 0},
'numInputRows': 0,
'processedRowsPerSecond': 0.0}],
'sink': {'description': 'MemorySink'}},
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0',
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a',
'name': 'test_python_stream',
'timestamp': '2021-10-02T12:30:04.192Z',
'batchId': 1,
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0,
'durationMs': {'getOffset': 93,
'triggerExecution': 94},
'stateOperators': [],
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]',
'startOffset': {'logOffset': 0},
'endOffset': {'logOffset': 0},
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0}],
'sink': {'description': 'MemorySink'}},
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0',
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a',
'name': 'test_python_stream',
'timestamp': '2021-10-02T12:30:14.289Z',
'batchId': 1,
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0,
'durationMs': {'getOffset': 90,
'triggerExecution': 90},
'stateOperators': [],
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]',
'startOffset': {'logOffset': 0},
'endOffset': {'logOffset': 0},
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0}],
'sink': {'description': 'MemorySink'}},
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0',
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a',
'name': 'test_python_stream',
'timestamp': '2021-10-02T12:30:24.377Z',
'batchId': 1,
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0,
'durationMs': {'getOffset': 97,
'triggerExecution': 97},
'stateOperators': [],
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]',
'startOffset': {'logOffset': 0},
'endOffset': {'logOffset': 0},
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0}],
'sink': {'description': 'MemorySink'}},
{'id': '38971f4c-afad-413c-a6fb-1a49063ee0c0',
'runId': '61331994-6b93-4e1e-a1c8-191d7fce953a',
'name': 'test_python_stream',
'timestamp': '2021-10-02T12:30:34.467Z',
'batchId': 1,
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0,
'durationMs': {'getOffset': 99,
'triggerExecution': 99},
'stateOperators': [],
'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/tables/bin/activity-data]',
'startOffset': {'logOffset': 0},
'endOffset': {'logOffset': 0},
'numInputRows': 0,
'inputRowsPerSecond': 0.0,
'processedRowsPerSecond': 0.0}],
'sink': {'description': 'MemorySink'}}]
주요 필드
- 유입률(input rate)
- 입력소스에서 구조적 스트리밍 내부로 데이터가 유입되는 양
- 처리율(processing rate)
- 배치주기
- 대부분 스트리밍 시스템은 적정한 처리량을 얻기 위해 배치 방식으로 처리함
- 일부 스트리밍 시스템은 처리량을 줄이는 대신 느리게 응답하는 옵션 제공
스파크 UI
- 잡, 태스크, 처리 메트릭 확인 가능
- DStream API와 달리 구조적 스트리밍은 Streaming 탭을 사용하지 않음
알림
- 사용자는 대시보드의 메트릭을 계속 확인해서 잠재적인 문제를 발견하는 과정이 필요함
- 따라서 잡이 실패하거나 유입률보다 처리율이 떨어지는 경우 등을 자동으로 알려주는 기능이 필요
- 스파크는 위의 상태 및 진행 상황 API를 기반으로 알림 시스템을 구축할 수 있도록 함
스트리밍 리스너를 사용한 고급 모니터링
- StreamingQueryListener 클래스를 이용하여 비동기 방식으로 스트리밍 쿼리 정보를 수신하며 더 강력하게 애플리케이션을 관찰할 수 있음
- 해당 클래스를 상속해서 자체 로직을 구현하면됨