일단... 결론부터 말하자면 성공을 하지 못했다.
from pyspark.sql import SparkSession
# spark = SparkSession.builder \
# .appName("KafkaConsumer") \
# .config("spark.jars.packages", "org.apache.kafka:kafka-clients:3.4.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,")\
# .getOrCreate()
spark = SparkSession.builder \
.appName("KafkaConsumer") \
.config("spark.jars", "/usr/lib/spark/jars/kafka-clients-3.4.1.jar, /usr/lib/spark/jars/kafka/spark-sql-kafka-0-10_2.12-3.3.0.jar, /usr/lib/spark/jars/spark-streaming-kafka-0-10_2.12-3.3.0.jar")\
.getOrCreate()
kafkaParams = {
"kafka.bootstrap.servers": "dns:9092",
"subscribe": "apple_stock_info_topic_test",
"startingOffsets": "earliest"
}
jar 파일 3개 설치하고 (모든 노드에) 그리고
table = spark.read\
.format("kafka") \
.options(**kafkaParams) \
.load()
table
DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]
테이블 스키마 까지 찍히는데!!
An error was encountered:
An error occurred while calling o108.toJavaRDD.
: java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:645)
at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:61)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:380)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)
음.... 해결해야하는데.. 내일 추가적으로 더 진행해야겠다..