Kafka, Spark 연동

Jeonghak Cho·2025년 8월 10일

Kafka

목록 보기
1/3

1️⃣ 계획 및 정의 (Plan & Scope)

목표

Kafka, Spark 연동 확인

주요 기술

Apache Kafka, Apache Spark (Streaming or Structured Streaming)

2️⃣ 환경 구성 (Environment Setup)

도커 컴포즈

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.31.153.48:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  spark-master:
    image: bitnami/spark:3.5.0
    container_name: spark-master
    hostname: spark-master
    ports:
      - "8080:8080"
      - "7077:7077"
    environment:
      SPARK_MODE: master
    volumes:
      - ./app:/opt/spark-apps/
    command: bin/spark-class org.apache.spark.deploy.master.Master --host spark-master

  spark-worker:
    image: bitnami/spark:3.5.0
    container_name: spark-worker
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077
    ports:
      - "8082:8081"
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077

  spark-history:
    image: bitnami/spark:3.5.0
    container_name: spark-history
    environment:
      SPARK_MODE: history-server
      SPARK_HISTORY_SERVER_EVENTLOG_DIR: file:///tmp/spark-events
    ports:
      - "18080:18080"

자바, 스칼라, SBT, 도커

java -version

openjdk version "11.0.0.2" 2024-07-02
OpenJDK Runtime Environment 18.9 (build 11.0.0.2+2-2)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.0.2+2-2, mixed mode)

sbt -version
sbt version in this project: 1.11.4
sbt runner version: 1.10.11

[info] sbt runner (sbt-the-batch-script) is a runner to run any declared version of sbt.
[info] Actual version of the sbt is declared using project\build.properties for each build.

3️⃣ 개발

Scala 소스

import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import scala.collection.JavaConverters._

object ListTopic {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka:9092")

    val adminClient = AdminClient.create(props)
    val topics = adminClient.listTopics().names().get()

    topics.asScala.foreach(println)

    adminClient.close()
  }
}

build.sbt

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.18"

lazy val root = (project in file("."))
  .settings(
    name := "listTopic"
  )
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "3.5.0" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0"
)

Plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0")

4️⃣ 실행

빌드

  • 빌드
sbt assembly
  • 빌드 결과
listTopic\target\scala-2.12\listTopic-assembly-0.1.0-SNAPSHOT.jar

토픽 생성

docker exec -it kafka kafka-topics --create \
  --topic test-topic \
  --bootstrap-server kafka:9092

스파크 실행

spark-submit   --class ListTopic   --master local[2]   --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0   /opt/spark-apps/listTopic-assembly-0.1.0-SNAPSHOT.jar

5️⃣ 검증

  • Kafka 토픽에 데이터 정상 송수신 확인
  • Spark에서 Kafka 스트림 수신 및 실시간 처리 구현
  • 처리 결과의 정확성 및 성능 검증
  • 오류 및 재시작 상황에서 안정성 점검
  • Kafka 메시지 소비 지연 시간(Latency)
  • Spark 처리량 및 처리 지연
  • 장애 복구 및 재시작 후 데이터 유실 여부
  • 데이터 정합성

0개의 댓글