PySpark - Kafka Streaming(2)

Andrew·2022년 2월 23일
0

kafka-programming

목록 보기
6/6

스트리밍 시도 1편
https://velog.io/@statco19/pyspark-kafka-streaming

약 한 달 반 전에 작성했던 PySpark - Kafka 스트리밍 시도 결과에 대해 추가하는 기록을 남기려 한다.

1편에서 AWS t2.micro free-tier 인스턴스로 스트리밍을 시도했을 때, 메모리 초과로 애플리케이션이 제대로 실행되지 않았다(t2.micro 인스턴스의 RAM은 1GB이다).
그 이후 여러 가지 방법을 강구했고, 메모리가 큰 인스턴스를 사용하는 방법도 있겠지만 추후 계속 실습을 할 때마다 서버 사용비가 부과되는 것이 은근 부담이었다. 그 결과, 로컬에 Spark, Kafka를 설치하고 실습을 진행하였다.

필자는 MacOS 사용자로서 Homebrew를 사용하여 spark와 kafka를 비교적 손쉽게 설치할 수 있었다.

Zookeeper, Kafka 로컬 설치

brew install zookeeper
brew install kafka

// 설치 완료 후 주키퍼, 카프카 실행
brew services start zookeeper
brew services start kafka

// 카프카, 주키퍼 중단
brew services stop kafka
brew services stop zookeeper

설치하는 시점에 따라 버전이 상이하겠지만 2022.02.23 기준으로 3.1.0 버전의 카프카가 설치된다.

Spark 로컬 설치

spark 설치를 위해서는 scala를 설치해야 한다. 이 역시 Homebrew를 통해 진행했다.

brew install scala
brew install apache-spark

spark가 설치된 디렉토리 경로를 ~/.zshrc에 SPARK_HOME으로 저장한다.

vmi ~/.zshrc
export SPARK_HOME=/usr/local/Cellar/apache-spark/3.2.1/libexec

// pyspark 실행 시 jupyter notebook으로 실행
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

source ~/.zshrc  // 변경된 .zshrc 파일 적요

pyspark 실행 시 jupyter notebook으로 실행 되게 설정해주었다(물론 jupyter가 설치되어 있어야 한다).
스파크 설정 관련 이슈는 다른 블로그를 참조하여 맞게 설정해주면 된다.

Spark - Kafka streaming

스파크가 카프카 브로커에서 실시간으로 데이터를 가져오기 위해서는 총 4가지의 디펜던시를 주입해주어야 한다.

  • org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0
  • org.apache.commons:commons-pool2:2.11.1
  • org.apache.kafka:kafka-clients:2.5.0
  • org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.2.1

마지막의 3.2.1 버전은 필자의 스파크 버전이다. 제일 처음 3.2.0 역시 스파크 버전으로 현재 필자의 스파크 최신 버전보다 낮아 사용이 가능했다. 각자 스파크 버전에 맞는 디펜던시를 추가해줘야 한다. 카프카의 경우 필자의 버전보다 낮지만 문제없이 사용 가능했다.
위의 네 가지 종류의 .jar 파일을 Maven repository에서 찾아 다운로드한다(방법은 1편 참고). 다운로드한 .jar 파일을 $SPARK_HOME/jars 폴더에 저장한다.

pyspark 실행 시 디펜던시를 주입 시키면서 실행해야 한다. 디펜던시를 주입하는 방법은 간단하다. --packages를 붙이고 , 로 네 가지를 모두 붙여주면 된다.

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.commons:commons-pool2:2.11.1,org.apache.kafka:kafka-clients:2.5.0,org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:3.2.1

readStream()

# 토픽 subscribe
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \  # 본인의 bootstrap server ip에 맞게 변경
    .option("subscribe", "topic_name") \
    .load() \

# 토픽의 partition 지정에 subscribe
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("assign", '{"topic_name":[0,2]}') \
    .load() \

df.isStreaming # df가 카프카 브로커의 토픽을 실시간으로 읽어오고 있는지 확인
# True

writeStream()

# 콘솔에 결과를 바로 출력
consoleSink = df \
    .writeStream \
    .queryName("kafka_spark_console")\
    .format("console") \
    .option("truncate", "false") \ # 메세지 길이를 줄이지 않고 모두 출력
    .start()

# 메모리에 저장
memorySink = df \
  .writeStream \
  .queryName("kafka_spark_memory")\
  .format("memory") \
  .start()
  
# 메모리에 저장한 결과 확인
spark.sql("SELECT * FROM kafka_spark_memory").show()
# 스트리밍 중단
consoleSink.stop()
memorySink.stop()

구조적 스트리밍 API를 사용하여 카프카 토픽의 레코드를 스파크를 통해 실시간으로 가져올 수 있었다.
DStream API와 비교하여 구조적 스트리밍 API의 가장 큰 장점은 DataFrame을 지원한다는 것이다. 따라서 우리에게 익숙한 SQL 구문을 사용하여 여러 연산 처리가 가능하다.

profile
조금씩 나아지는 중입니다!

0개의 댓글