Spark 완벽 가이드 ch21. 구조적 스트리밍의 기초

Q·2023년 1월 31일
0

Spark 완벽 가이드

목록 보기
22/24
  • 구조적 스트리밍을 구성하는 핵심 개념과 구조적 스트리밍이 얼마나 사용하기 쉬운지 살펴볼것임

구조적 스트리밍의 기초

- 스파크의 구조적 API(DataFrame, Dataset, SQL)를 사용함
  • 스트림 데이터를 데이터가 계속해서 추가되는 테이블처럼 다룬다는 것이 핵심 아이디어
  • 스트리밍 잡은 계속해서 신규 입력 데이터를 확인 및 처리
  • 내고장성을 보장하면서 신규 데이터가 유입될 때마다 효율적으로 처리 결과 갱신
    • 스트림 처리와 관련된 쿼리 구문이 따로 있지 않음(배치와 동일)
      • 그냥 쿼리 실행 유형만 스트리밍으로 지정하면 됨
      • 일부 제약은 있음
    • 내부적으로 사용자의 쿼리를 어떻게 증분할지 자동으로 파악함
  • 스파크의 다른 기능들과 통합해서 데이터에 실시간으로 반응하는 통합 빅데이터 처리 애플리케이션을 만들 수 있음
    • 이로 인해 하나의 프레임워크를 이용해서 전체 애플리케이션을 쉽게 정의하고 여러 처리 영역에서 일관된 결과를 얻을 수 있도록 함
    • ex) 사용자가 스파크 SQL을 통해 대화형으로 조회하는 테이블을 계속해서 갱신
    • ex) MLlib을 사용해 학습한 머신러닝 모델을 적용한 결과를 제공
    • ex) 스파크 데이터소스의 오프라인 데이터와 스트림을 조인

핵심 개념

트랜스포메이션과 액션

  • 기존 트랜스포메이션 개념과 거의 같음

  • 근데 몇 가지 제약사항은 있음

    • 엔진에서 증분 처리를 할 수 없는 일부 쿼리 유형
  • 액션은 단 한 가지

    • 스트림 처리를 시작한 뒤 연속적으로 처리해 결과를 출력하는 액션만 있음

입력 소스

  • 스트리밍 방식으로 데이터를 읽을 수 있는 몇 가지 입력 소스(버전 3.1)
    • HDFS나 S3 등 분산 파일 시스템의 파일
    • 카프카
    • 테스팅용 소켓 소스
    • 테스팅용 레이트 소스
      • 특정 개수의 row를 매초마다 생성

싱크

  • 싱크로 스트림의 결과를 저장할 목적지를 명시함

  • 싱크와 실행 엔진은 데이터 처리의 진행 상황을 신뢰도 있고 정확하게 추적하는 역할을 함

  • 지원하는 출력용 싱크(버전 3.1)

    • 거의 모든 파일 포맷
    • 카프카
    • 출력 레코드에 임의 연산을 실행하는 foreach 싱크
    • 디버깅용 콘솔 싱크
    • 디버깅용 메모리 싱크

출력 모드

  • 싱크를 정의하기 위해서는 데이터를 출력하는 방법도 정의해야함

  • 지원하는 출력 모드(버전 3.1)

    • append: 싱크에 신규 레코드만 추가(default)
    • update: 변경 대상 레코드 자체를 갱신
    • complete: 전체 출력 내용 재작성
  • 특정 쿼리와 싱크는 일부 출력모드만 지원함

    • ex) 스트림에 map연산만 하는 잡
      • 매번 전체 데이터를 신규 파일로 저장하는 complete 모드는 적합하지 않음

트리거

  • 트리거는 데이터 출력 시점을 정의함

    • 언제 신규 데이터를 확인하고 결과를 갱신할지 정의함
  • 기본적으로는 마지막 입력 데이터를 처리한 직후에 신규 데이터를 확인하고 결과를 갱신함

    • 근데 파일 싱크를 사용하는 경우, 작은 크기의 파일이 여러 개 생길 수 있음
  • 처리 시간(고정된 주기로만 신규 데이터 탐색)기반의 트리거도 지원

이벤트 시간 처리

  • 이벤트 시간 기준의 처리도 지원함

  • 이 처리 방식은 무작위로 도착한 레코드 내부에 기록된 타임스탬프를 기준으로함

  • 두 가지 핵심 아이디어

    • 이벤트 시간 데이터
      • 이벤트 시간은 데이터에 기록된 시간 필드를 의미
      • 표준 SQL 연산자를 이용해 그룹화, 집계, 윈도우 처리를 하여 이벤트 시간 필드를 인식하도록 해서 이벤트 시간 처리 가능
    • 워터마크
      • 시간 제한을 설정할 수 있는 스트리밍 시스템 기능
      • 늦게 들어온 이벤트를 어디까지 처리할지 시간을 제한할 수 있음

구조적 스트리밍 활용

  • 데이터셋: 인간 행동 인지를 위한 이기종(Heterogeneity Human Activity Recognition) 데이터셋
    • 스마트폰, 스마트워치 사용자가 자전거 타기, 앉기, 일어서기, 걷기 등의 활동을 하는 동안 기록된 센서 데이터로 구성
      • 기기는 여러 종류로 구성
      • 참여자는 9명
path = 'FileStore/tables/bin/activity-data'

정적 Dataframe

static = spark.read.json(path)
dataSchema = static.schema
dataSchema
Out[3]: StructType(List(StructField(Arrival_Time,LongType,true),StructField(Creation_Time,LongType,true),StructField(Device,StringType,true),StructField(Index,LongType,true),StructField(Model,StringType,true),StructField(User,StringType,true),StructField(gt,StringType,true),StructField(x,DoubleType,true),StructField(y,DoubleType,true),StructField(z,DoubleType,true)))
display(static.take(5))
Arrival_TimeCreation_TimeDeviceIndexModelUsergtxyz
14246867350901424686733090638193nexus4_118nexus4gstand3.356934E-4-5.645752E-4-0.018814087
14246867352921424688581345918092nexus4_266nexus4gstand-0.0057220460.0290832520.005569458
14246867355001424686733498505625nexus4_199nexus4gstand0.0078125-0.0176544190.010025024
14246867356911424688581745026978nexus4_2145nexus4gstand-3.814697E-40.0184021-0.013656616
14246867358901424688581945252808nexus4_2185nexus4gstand-3.814697E-4-0.031799316-0.00831604
  • 타임스탬프, 장비 및 모델 정보, 사용자 정보가 있음
  • gt필드는 해당 시점의 사용자 행동 유형임

스트리밍 Dataframe

#read 대신 readStream 메서드
streaming = spark.readStream.schema(dataSchema).option('maxFilesPerTrigger',1).json('/'+path)
#트랜스포메이션 정의
activityCounts = streaming.groupBy('gt').count()
#액션 정의
#쿼리 결과를 내보낼 목적지나 싱크를 지정해야함

activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format('memory').outputMode('complete').start()#.awaitTermination()
  • 위 예제에서는 스트림 처리에 사용하는 쿼리의 이름을 'activity_counts'로 정의함
  • 결과를 메모리에 저장하는 메모리 싱크를 사용함
  • 출력모드는 트리거를 실행한 후 모든 키와 데이터 수를 다시 저장하는 complete 출력 모드 사용
  • 운영용 애플리케이션에서는 반드시 awaitTermination()을 지정해야함
    • 실행 중에 드라이버 프로세스가 종료되는 상황을 막아줌
#실행중인 스트림 목록 확인
spark.streams.active
Out[8]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3e3c88>]

결과 조회

from time import sleep

#스트리밍 쿼리의 결과를 1초마다 출력
for _ in range(3):
  spark.sql('select * from activity_counts').show()
  sleep(1)
+----------+-----+ 
gt|count| 
+----------+-----+ 
stairsup|41829| 
sit|49238| 
stand|45539| 
walk|53019| 
bike|43185| 
stairsdown|37455| 
null|41785| 
+----------+-----+ 

+----------+-----+ 
gt|count| 
+----------+-----+ 
stairsup|41829| 
sit|49238| 
stand|45539| 
walk|53019| 
bike|43185| 
stairsdown|37455| 
null|41785| 
+----------+-----+ 

+----------+-----+ 
gt|count| 
+----------+-----+ 
stairsup|41829| 
sit|49238| 
stand|45539| 
walk|53019| 
bike|43185| 
stairsdown|37455| 
null|41785| 
+----------+-----+

스트림 트랜스포메이션

선택과 필터링

  • 구조적 스트리밍은 DataFrame의 모든 함수와 개별 컬럼을 처리하는 선택과 필터링, 단순 트랜스포메이션을 지원함
from pyspark.sql import functions as F
simpleTransform = streaming.withColumn('stairs', F.expr("gt like '%stairs%'"))\
.where('stairs')\
.where('gt is not null')\
.select('gt', 'model', 'arrival_time', 'creation_time')\
.writeStream.queryName('simple_transform')\
.format('memory')\
.outputMode('append').start()
#이제 두 개의 스트림이 실행 중
spark.streams.active
Out[11]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3a8240>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3a8be0>]

결과 조회

spark.sql('select distinct(gt) from simple_transform').show()
+----------+ 
gt| 
+----------+ 
stairsup| 
stairsdown| 
+----------+
for _ in range(3):
  spark.sql('select * from simple_transform').limit(3).show()
  sleep(1)
+--------+------+-------------+-------------------+ 
gt| model| arrival_time| creation_time| 
+--------+------+-------------+-------------------+ 
stairsup|nexus4|1424785980959|1424787826278414112| 
stairsup|nexus4|1424785981157|1424785984637538373| 
stairsup|nexus4|1424785981366|1424785984843806684| 
+--------+------+-------------+-------------------+ 
+--------+------+-------------+-------------------+ 
gt| model| arrival_time| creation_time| 
+--------+------+-------------+-------------------+ 
stairsup|nexus4|1424697948398|1424699794447729066| 
stairsup|nexus4|1424697948595|1424699794648961976| 
stairsup|nexus4|1424697948810|1424697946816230086| 
+--------+------+-------------+-------------------+ 
+----------+------+-------------+-------------------+ 
gt| model| arrival_time| creation_time| 
+----------+------+-------------+-------------------+ 
stairsdown|nexus4|1424700667634|1424702513688330618| 
stairsdown|nexus4|1424700667837|1424702513889631513| 
stairsdown|nexus4|1424700668035|1424702514090864423| 
+----------+------+-------------+-------------------+

집계

  • 구조적 API처럼 임의의 집계 연산을 지정할 수 있음
deviceModelStats = streaming.cube('gt', 'model').avg()\
.drop("avg(Arrival_Time)")\
.drop("avg(Creation_Time)")\
.drop('avg(Index)')\
.writeStream.queryName('device_counts').format('memory')\
.outputMode('complete').start()
spark.streams.active
Out[15]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6898>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b67f0>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6748>]

결과 조회

for _ in range(3):
  spark.sql('select * from device_counts').limit(5).show()
  sleep(1)
+----------+------+--------------------+--------------------+--------------------+ 
gt| model| avg(x)| avg(y)| avg(z)| 
+----------+------+--------------------+--------------------+--------------------+ 
null| null|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...| 
stairsdown| null| 0.02425847993757609|-0.03760274457414847| 0.12621213764276576| 
stairsdown|nexus4| 0.02425847993757609|-0.03760274457414847| 0.12621213764276576| 
null|nexus4|-0.01085041855216...|0.003269388401703...|0.007623247528790201| 
stand| null|-4.85009687984190...|7.843164314448882E-5| 6.53799922292491E-4| 
+----------+------+--------------------+--------------------+--------------------+ 
+-----+------+--------------------+--------------------+--------------------+ 
gt| model| avg(x)| avg(y)| avg(z)| 
+-----+------+--------------------+--------------------+--------------------+ 
null|nexus4|-0.01085041855216...|0.003269388401703...|0.007623247528790201| 
null|nexus4|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...| 
null| null|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...| 
bike|nexus4|0.023557159293850122| -0.0103710797474206|-0.08150749488259697| 
stand| null|-4.85009687984190...|7.843164314448882E-5| 6.53799922292491E-4| 
+-----+------+--------------------+--------------------+--------------------+ 
+-----+------+--------------------+--------------------+--------------------+ 
gt| model| avg(x)| avg(y)| avg(z)| 
+-----+------+--------------------+--------------------+--------------------+ 
stand| null|-4.85009687984190...|7.843164314448882E-5| 6.53799922292491E-4| 
sit| null|-6.03518642992934...|3.315789799252582...|1.607174773661537E-4| 
walk|nexus4|-0.00477321663226...|0.007647625398144233|1.359592299788926E-4| 
null|nexus4|-0.01085041855216...|0.003269388401703...|0.007623247528790201| 
null| null|-9.03767393822733...|-0.00478809579938...|-0.00803015271059...| 
+-----+------+--------------------+--------------------+--------------------+

조인

historicalAgg = static.groupBy('gt', 'model').avg()
deviceModelStats = streaming.drop('Arrival_Time', 'Creation_Time','Index')\
.cube('gt', 'model').avg()\
.join(historicalAgg, ['gt','model'])\
.writeStream.queryName('device_counts2').format('memory')\
.outputMode('complete').start()
spark.streams.active
Out[18]: [<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6fd0>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6eb8>, 
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6128>,
<pyspark.sql.streaming.StreamingQuery at 0x7f503b3b6e48>]

결과 조회

for _ in range(3):
  spark.sql('select * from device_counts2').show()
  sleep(1)
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
gt| model| avg(x)| avg(y)| avg(z)| avg(Arrival_Time)| avg(Creation_Time)| avg(Index)| avg(x)| avg(y)| avg(z)| 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
bike|nexus4|0.021949482812608244|-0.00749751739872...|-0.08270261150639441|1.424751134339993...|1.424752127369581...| 326459.6867328154| 0.02268875955086689|-0.00877912156368...|-0.08251001663412375| 
walk|nexus4|-0.00285732139643...|0.003373111639646...|-0.00155088920916...|1.424746420641788...|1.424747351060679...|149760.09974990616|-0.00390116006094...|0.001052508689953713|-6.95435553042992...| 
stairsdown|nexus4| 0.02262820081168685|-0.03079508201793...| 0.12088580480524012|1.424744591412858...|1.424745503635642...|230452.44623187225|0.021613908669165325|-0.03249018824752615| 0.12035922691504057| 
sit|nexus4|-6.44713381746548...|2.614244124546443E-4|-1.55316137749796...|1.424741207868231...|1.424742112220357...| 74577.84690275553|-5.49433244039586...|2.791446281700070...|-2.33994461689889...| 
stand|nexus4|-3.89372639367013...|3.238874567024346...|2.323141750435506...|1.424743637921212...|1.424744579547465...|31317.877585550017|-3.11082189691723...| 3.21846166597532E-4|2.141300040636485E-4| 
null|nexus4|-0.00981882283597...|-0.00189753526302...|0.003903477859879...|1.424749002876348E12|1.424749919482132...| 219276.9663669269|-0.00847688860109...|-7.30455258739180...|0.003090601491419...| 
stairsup|nexus4|-0.02424235941716...|-0.00542728576322...|-0.10011072208887009|1.424745996101166E12|1.424746915892742...|227912.96550673083|-0.02479965287771...|-0.00800392344379...| -0.100340884150604| 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
gt| model| avg(x)| avg(y)| avg(z)| avg(Arrival_Time)| avg(Creation_Time)| avg(Index)| avg(x)| avg(y)| avg(z)| 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
bike|nexus4|0.021949482812608244|-0.00749751739872...|-0.08270261150639441|1.424751134339993...|1.424752127369581...| 326459.6867328154| 0.02268875955086689|-0.00877912156368...|-0.08251001663412375| 
walk|nexus4|-0.00285732139643...|0.003373111639646...|-0.00155088920916...|1.424746420641788...|1.424747351060679...|149760.09974990616|-0.00390116006094...|0.001052508689953713|-6.95435553042992...| 
stairsdown|nexus4| 0.02262820081168685|-0.03079508201793...| 0.12088580480524012|1.424744591412858...|1.424745503635642...|230452.44623187225|0.021613908669165325|-0.03249018824752615| 0.12035922691504057| 
sit|nexus4|-6.44713381746548...|2.614244124546443E-4|-1.55316137749796...|1.424741207868231...|1.424742112220357...| 74577.84690275553|-5.49433244039586...|2.791446281700070...|-2.33994461689889...| 
stand|nexus4|-3.89372639367013...|3.238874567024346...|2.323141750435506...|1.424743637921212...|1.424744579547465...|31317.877585550017|-3.11082189691723...| 3.21846166597532E-4|2.141300040636485E-4| 
null|nexus4|-0.00981882283597...|-0.00189753526302...|0.003903477859879...|1.424749002876348E12|1.424749919482132...| 219276.9663669269|-0.00847688860109...|-7.30455258739180...|0.003090601491419...| 
stairsup|nexus4|-0.02424235941716...|-0.00542728576322...|-0.10011072208887009|1.424745996101166E12|1.424746915892742...|227912.96550673083|-0.02479965287771...|-0.00800392344379...| -0.100340884150604| 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
gt| model| avg(x)| avg(y)| avg(z)| avg(Arrival_Time)| avg(Creation_Time)| avg(Index)| avg(x)| avg(y)| avg(z)| 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+ 
bike|nexus4|0.021949482812608244|-0.00749751739872...|-0.08270261150639441|1.424751134339993...|1.424752127369581...| 326459.6867328154| 0.02268875955086689|-0.00877912156368...|-0.08251001663412375| 
walk|nexus4|-0.00285732139643...|0.003373111639646...|-0.00155088920916...|1.424746420641788...|1.424747351060679...|149760.09974990616|-0.00390116006094...|0.001052508689953713|-6.95435553042992...| 
stairsdown|nexus4| 0.02262820081168685|-0.03079508201793...| 0.12088580480524012|1.424744591412858...|1.424745503635642...|230452.44623187225|0.021613908669165325|-0.03249018824752615| 0.12035922691504057| 
sit|nexus4|-6.44713381746548...|2.614244124546443E-4|-1.55316137749796...|1.424741207868231...|1.424742112220357...| 74577.84690275553|-5.49433244039586...|2.791446281700070...|-2.33994461689889...| 
stand|nexus4|-3.89372639367013...|3.238874567024346...|2.323141750435506...|1.424743637921212...|1.424744579547465...|31317.877585550017|-3.11082189691723...| 3.21846166597532E-4|2.141300040636485E-4| 
null|nexus4|-0.00981882283597...|-0.00189753526302...|0.003903477859879...|1.424749002876348E12|1.424749919482132...| 219276.9663669269|-0.00847688860109...|-7.30455258739180...|0.003090601491419...| 
stairsup|nexus4|-0.02424235941716...|-0.00542728576322...|-0.10011072208887009|1.424745996101166E12|1.424746915892742...|227912.96550673083|-0.02479965287771...|-0.00800392344379...| -0.100340884150604| 
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+

입력과 출력

  • 소스, 싱크, 출력모드가 구조적 스트리밍에서 어떻게 동작하는지 볼 것임
  • 언제, 어디서, 어떻게 데이터가 유입되고 외부로 나가는지 설명할 것임

데이터를 읽고 쓰는 장소(소스와 싱크)

  • 구조적 스트리밍은 몇 가지 실전용 소스와 싱크(파일과 아파치 카프카), 디버깅용 메모리 테이블 싱크 등을 지원함

파일 소스와 싱크

  • 가장 간단한 소스는 파일 소스

    • 파일 소스의 동작 방식은 쉽게 추측하고 이해할 수 있음
  • 실전에서는 파케이, 텍스트, json, csv를 자주 사용

  • 스트리밍에서 파일 소스/싱크와 정적 파일 소스를 사용할 때 유일한 차이점

    • 트리거 시 읽을 파일 수를 결정할 수 있음(maxFilePerTrigger 옵션)

카프카 소스와 싱크

  • 아파치 카프카는 데이터 스트림을 위한 발행-구독(Publish-Subscribe) 방식의 분산형 시스템

    • 발행: 데이터를 쓰는 동작 (프로듀서가 씀)
    • 구독: 데이터를 읽는 동작 (컨슈머가 읽음)
  • 발행된 메시지는 내고장성을 보장하는 저장소에 저장됨

  • 레코드의 스트림은 토픽이라 불리는 카테고리에 저장

    • 각 레코드는 키, 값, 타임스탬프로 구성됨
    • 토픽은 순서를 바꿀 수 없는 레코드로 구성되며, 레코드의 위치를 오프셋이라고 부름

카프카 소스에서 메시지 읽기

  • 메시지를 읽기 위해 첫 번째로 해야 할 일은, 다음 옵션 중 하나를 선택하는 것

    • assign: 토픽뿐만 아니라 파티션까지 세밀하게 지정하여 구독
    • subscribe: 토픽 목록을 지정하여 구독
    • subscribePattern: 토픽 패턴을 지정하여 구독
  • 두 번째로 해야 할 일은, 카프카 서비스에 접속할 수 있도록 kafka.bootstrap.servers 값을 지정하는 것

  • 카프카에서 데이터를 읽거나 쓸 땐 json이나 avro를 자주 사용함

#topic1 수신
df1 = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('subscribe', 'topic1').load()

#여러 토픽 수신
df2 = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('subscribe', 'topic1,topic2').load()

#패턴에 맞는 토픽 수신
df3 = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('subscribePattern', 'topic.*').load()
카프카 소스 스키마
  • 키: binary
  • 값: binary
  • 토픽: string
  • 패턴: int
  • 오프셋: long
  • 타임스탬프: long

카프카 싱크에 메시지 쓰기

path
Out[32]: 'FileStore/tables/bin/activity-data'
#selecExpr에서 토픽 지정하는 방법

#'topic'은 df1의 컬럼명을 의미함 즉, topic 컬럼의 값은 위에서 설정한대로 'topic1'이므로 topic1을 읽게 되는 것임
df1.selectExpr('topic', 'CAST(key as STRING)', 'CAST(value as STRING)')\
.writeStream.format('kafka').option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('checkpointLocation', '/FileStore/tables/bin/to/HDFS-compatible/dir').start()
Out[34]: <pyspark.sql.streaming.StreamingQuery at 0x7f503b3c3d30>
#option()에서 토픽 지정하는 방법

df1.selectExpr('CAST(key as STRING)', 'CAST(value as STRING)')\
.writeStream.format('kafka').option('kafka.bootstrap.servers', 'host1:port1, host2:port2')\
.option('checkpointLocation', '/FileStore/tables/bin/to/HDFS-compatible/dir')\
.option('topic','topic1').start()
Out[35]: <pyspark.sql.streaming.StreamingQuery at 0x7f5058b1e518>
foreach 싱크
  • 각 파티션에서 임의의 연산을 병렬로 수행함
  • foreach싱크를 사용하려면 ForeachWriter 인터페이스를 구현해야함
  • 인터페이스 구현 시 알아야할 내용
    • UDF나 Dataset맵 함수처럼 반드시 Serializable 인터페이스를 구현해야함
    • 해당 인터페이스가 가지는 세 가지 메서드(open, process, close)는 각 익스큐터에서 실행됨
    • 연결을 맺거나 트랜잭션을 시작하는 등의 모든 초기화 작업은 반드시 open 메서드에서 수행해야함
소켓 소스
  • TCP 소켓을 통해 스트림 데이터를 전송할 수 있게함
  • 종단 간 내고장성을 지원하지 않으므로 다음 기능들은 운영 환경에서는 절대 사용하면 안됨
    • 소켓이 드라이버에 있어 종단 간 내고장성을 보장할 수 없음
#데이터를 읽기 위한 호스트와 포트 지정
socketDF = spark.readStream.format('socket').option('host', 'localhost').option('port',9999).load()
콘솔 싱크
  • 스트리밍 쿼리의 처리 결과를 콘솔로 출력할 때 사용
  • 기본적으로 append와 complete 출력 모드를 지원
  • 내고장성을 지원하지 않으므로 운영 환경에선 절대 사용하면 안됨
메모리 싱크
  • 스트리밍 시스템을 테스트하는 데 사용하는 소스
  • 드라이버에 데이터를 모은 후 대화형 쿼리가 가능한 메모리 테이블에 저장함
  • 기본적으로 append와 complete 출력 모드 지원
  • 이 또한 내고장성을 제공하지 않음

데이터 출력 방법(출력 모드)

append 모드

  • 기본 동작 방식
  • 새로운 로우가 결과 테이블에 추가되면 사용자가 명시한 트리거에 맞춰 싱크로 출력됨
  • 해당 모드는 내고장성을 보장하는 싱크를 사용한다는 가정 하에 모든 로우를 한 번만 출력

complete 모드

  • 결과 테이블의 전체 상태를 싱크로 출력
  • 모든 데이터가 계속 변경될 수 있는 일부 상태 기반 데이터를 다룰 때 유용
  • 저수준 업뎃을 지원하지 않을 때 유용

update 모드

  • 이전 출력 결과에서 변경된 로우만 싱크로 출력
  • 이 모드를 지원하는 싱크는 저수준 업뎃을 지원해야함

데이터 출력 시점(트리거)

  • 데이터를 싱크로 출력하는 시점을 제어하려면 트리거를 설정해야함
    • 원래 기본적으로는, 직전 처리를 마치자마자 즉시 데이터를 출력함
  • 싱크에 큰 부하가 발생하거나 출력 파일의 크기를 제어하는 용도로 사용
  • 너무 많은 수정이 발생할 때 트리거를 사용

처리 시간 기반 트리거

  • 처리 주기를 지정함
#5초 주기
activityCounts.writeStream.trigger(processingTime='5 seconds')\
.format('console').outputMode('complete').start()
Out[8]: <pyspark.sql.streaming.StreamingQuery at 0x7f634b17fcc0>

일회성 트리거

  • 스트리밍 잡을 일회성으로 실행하는 트리거 설정 가능
activityCounts.writeStream.trigger(once=True)\
.format('console').outputMode('complete').start()
Out[9]: <pyspark.sql.streaming.StreamingQuery at 0x7f634b2ede48>
profile
Data Engineer

0개의 댓글