EMR Kafka Consuming 하기(1)

록스블로그·2023년 10월 28일
0

일단... 결론부터 말하자면 성공을 하지 못했다.

from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("KafkaConsumer") \
#     .config("spark.jars.packages", "org.apache.kafka:kafka-clients:3.4.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,")\
#     .getOrCreate()
spark = SparkSession.builder \
    .appName("KafkaConsumer") \
    .config("spark.jars", "/usr/lib/spark/jars/kafka-clients-3.4.1.jar, /usr/lib/spark/jars/kafka/spark-sql-kafka-0-10_2.12-3.3.0.jar, /usr/lib/spark/jars/spark-streaming-kafka-0-10_2.12-3.3.0.jar")\
    .getOrCreate()

kafkaParams = {
    "kafka.bootstrap.servers": "dns:9092", 
    "subscribe": "apple_stock_info_topic_test",
    "startingOffsets": "earliest" 
}

jar 파일 3개 설치하고 (모든 노드에) 그리고

table = spark.read\
    .format("kafka") \
    .options(**kafkaParams) \
    .load()
    
table
DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

테이블 스키마 까지 찍히는데!!

An error was encountered:
An error occurred while calling o108.toJavaRDD.
: java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
	at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:645)
	at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:61)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:380)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:72)

음.... 해결해야하는데.. 내일 추가적으로 더 진행해야겠다..

profile
어려움에 성장하는 데이터 엔지니어

0개의 댓글