spark streaming이란?
Spark Streaming은 Spark RDD를 기반으로 Micro-batch를 수행한다.
그림처럼 실시간으로 input data stream을 받아서 DStream(discretized stream)의 작은 배치 단위로 나눠 Spark Engine으로 보낸다. Spark engine에서는 가공한 데이터를 반환한다.
DStream은 이산화된 데이터로, 시간별로 도착한 데이터들의 RDD 집합이다.
소켓으로 부터 스트리밍을 받아 사용자 입력한 단어 카운트를 console로 뿌리는 예제
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#스파크 인스턴스 생성
spark = SparkSession.builder.appName("stream-word-count").getOrCreate()
# readStream 포멧 설정 (socket) / 호스트설정(localhost) / 포트설정 ( 9999)
lines_df = spark.readStream.format("socket").option("host", "localhost").option("port", "9999").load()
# lines_df 데이터 프레임을 words_df 데이터 프레임으로 변환
words_df = lines_df.select(expr("explode(split(value, ' ')) as word"))
# 공백 단위로 잘린 word 기준으로 카운트
counts_df = words_df.groupBy("word").count()
# read -> write
word_count_query = counts_df.writeStream.format("console")\
.outputMode("complete")\
.option("checkpointLocation", ".checkpoint")\
.start()
word_count_query.awaitTermination()
nc -lk 9999 명령어로 local의 9999포트 오픈
위 streaming.py를 제출하게되면 아래와 같은 그림 출력
이제 스트리밍을 보내면 word, count가 증가하게 된다.
현재 주고받은 데이터는 streaming.py에서 설정한 .checkpoint 디렉토리에 저장되며
소켓이 끊겨도 재연결 시 체크포인트가 저장되어 있어 기존의 데이터를 불러올 수 있다.