Kafka, Spark 연동 확인
Apache Kafka, Apache Spark (Streaming or Structured Streaming)
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.31.153.48:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
spark-master:
image: bitnami/spark:3.5.0
container_name: spark-master
hostname: spark-master
ports:
- "8080:8080"
- "7077:7077"
environment:
SPARK_MODE: master
volumes:
- ./app:/opt/spark-apps/
command: bin/spark-class org.apache.spark.deploy.master.Master --host spark-master
spark-worker:
image: bitnami/spark:3.5.0
container_name: spark-worker
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_MASTER_URL: spark://spark-master:7077
ports:
- "8082:8081"
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
spark-history:
image: bitnami/spark:3.5.0
container_name: spark-history
environment:
SPARK_MODE: history-server
SPARK_HISTORY_SERVER_EVENTLOG_DIR: file:///tmp/spark-events
ports:
- "18080:18080"
java -version
openjdk version "11.0.0.2" 2024-07-02
OpenJDK Runtime Environment 18.9 (build 11.0.0.2+2-2)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.0.2+2-2, mixed mode)
sbt -version
sbt version in this project: 1.11.4
sbt runner version: 1.10.11
[info] sbt runner (sbt-the-batch-script) is a runner to run any declared version of sbt.
[info] Actual version of the sbt is declared using project\build.properties for each build.
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import scala.collection.JavaConverters._
object ListTopic {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka:9092")
val adminClient = AdminClient.create(props)
val topics = adminClient.listTopics().names().get()
topics.asScala.foreach(println)
adminClient.close()
}
}
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.18"
lazy val root = (project in file("."))
.settings(
name := "listTopic"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "3.5.0" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0"
)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0")
sbt assembly
listTopic\target\scala-2.12\listTopic-assembly-0.1.0-SNAPSHOT.jar
docker exec -it kafka kafka-topics --create \
--topic test-topic \
--bootstrap-server kafka:9092
spark-submit --class ListTopic --master local[2] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 /opt/spark-apps/listTopic-assembly-0.1.0-SNAPSHOT.jar