Kafka, Spark Streaming

Jeonghak Cho·2025년 8월 10일

Kafka

목록 보기
2/3

1️⃣ 계획 및 정의 (Plan & Scope)

목표

Kafka, Spark 연동 확인

주요 기술

Apache Kafka, Apache Spark (Streaming or Structured Streaming)

2️⃣ 환경 구성 (Environment Setup)

도커 컴포즈

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.31.153.48:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  spark-master:
    image: bitnami/spark:3.5.0
    container_name: spark-master
    hostname: spark-master
    ports:
      - "8080:8080"
      - "7077:7077"
    environment:
      SPARK_MODE: master
    volumes:
      - ./app:/opt/spark-apps/
    command: bin/spark-class org.apache.spark.deploy.master.Master --host spark-master

  spark-worker:
    image: bitnami/spark:3.5.0
    container_name: spark-worker
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077
    ports:
      - "8082:8081"
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077

  spark-history:
    image: bitnami/spark:3.5.0
    container_name: spark-history
    environment:
      SPARK_MODE: history-server
      SPARK_HISTORY_SERVER_EVENTLOG_DIR: file:///tmp/spark-events
    ports:
      - "18080:18080"

자바, 스칼라, SBT, 도커

java -version

openjdk version "11.0.0.2" 2024-07-02
OpenJDK Runtime Environment 18.9 (build 11.0.0.2+2-2)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.0.2+2-2, mixed mode)

sbt -version
sbt version in this project: 1.11.4
sbt runner version: 1.10.11

[info] sbt runner (sbt-the-batch-script) is a runner to run any declared version of sbt.
[info] Actual version of the sbt is declared using project\build.properties for each build.

3️⃣ 개발

Scala 소스

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object KafkaStream {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("KafkaSparkStream")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    // Kafka에서 읽기
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("subscribe", "test-topic")
      .option("startingOffsets", "earliest")
      .load()

    // value 컬럼을 String으로 변환
    val messages = df.selectExpr("CAST(value AS STRING)").withColumnRenamed("value", "message")

    // 콘솔 출력
    val query = messages.writeStream
      .format("console")
      .outputMode("append")
      .start()

    query.awaitTermination()
  }
}

build.sbt

name := "HelloWorldStream"

version := "0.1"

scalaVersion := "2.12.18"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0" // Kafka 안 써도 Structured Streaming 의존성 맞춰줌
)

Plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0")

4️⃣ 실행

빌드

  • 빌드
sbt assembly
  • 빌드 결과
listTopic\target\scala-2.12\listTopic-assembly-0.1.0-SNAPSHOT.jar

토픽 생성

docker exec -it kafka kafka-topics --create \
  --topic test-topic \
  --bootstrap-server kafka:9092

스파크 실행

spark-submit   --class KafkaStream   --master local[2]   --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0   /opt/spark-apps/HelloWorldStream-assembly-0.1.jar

Spark Structured Streaming vs Kafka Consumer (CLI) 차이

항목Spark Structured StreamingKafka Console Consumer
주 목적대규모 데이터 처리, 변환, 집계, 저장 등 실시간 ETL단순 모니터링, 디버깅, 메시지 확인
데이터 처리DataFrame/Dataset API 사용 가능, SQL 쿼리 가능, 복잡한 변환 처리그냥 메시지를 그대로 출력
확장성클러스터 환경에서 병렬 처리 가능 (Spark Executors)단일 프로세스에서 직렬 소비
장애 복구Checkpointing과 WAL(Write Ahead Log)로 정확히 한 번 처리(Exactly-Once) 가능소비 위치(offset)는 기본적으로 휘발성 (옵션으로 커밋 가능)
트랜잭션Kafka의 트랜잭션 + Structured Streaming으로 end-to-end exactly-once 가능없음
출력 대상Kafka, DB, S3, HDFS, 콘솔 등 다양하게 연동 가능표준 출력(stdout)만
성능배치 단위(Micro-batch) 또는 Continuous 모드로 대량 처리 최적화단일 소비 속도에 의존
코드 활용Spark SQL / UDF / MLlib / GraphX 등 다양한 Spark 라이브러리 활용 가능불가능

0개의 댓글