드디어 해결했다....
spark = SparkSession.builder \
.appName("KafkaConsumer") \
.config("spark.jars", """
/usr/lib/spark/jars/kafka-clients-3.4.1.jar,
/usr/lib/spark/jars/kafka/spark-sql-kafka-0-10_2.12-3.3.0.jar,
/usr/lib/spark/jars/spark-streaming-kafka-0-10_2.12-3.3.0.jar,
/usr/lib/spark/jars/spark-token-provider-kafka-0-10_2.12-3.3.0.jar,
/usr/lib/spark/jars/commons-pool2-2.12.0.jar
""")\
.getOrCreate()
kafkaParams = {
"kafka.bootstrap.servers": "dns:9092",
"subscribe": "apple_stock_info_topic_test",
"startingOffsets": "earliest"
}
table = spark.read\
.format("kafka") \
.options(**kafkaParams) \
.load()
table = table.selectExpr("cast(value as string) as value")
table.show()
+--------------------+
| value|
+--------------------+
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
+--------------------+
only showing top 20 rows
우와.. 결국 크리티컬한 문제는 commons-pool2-2.12.0.jar 해당 jar 파일이 없었다는 것이다.
찾아보니 객체의 생성, 저장 및 재사용을 관리하는 기술로, 객체 생성과 가비지 컬렉션이 비용이 많이 드는 시나리오에서 성능과 리소스 관리를 향상시키는 데 주로 사용되는 자르 파일 하지만 기존에 분명 commons-pool-1.5.4.jar 해당 자르 파일이 있었는데.. 이걸로는 해결이 안되나보다.. jar 파일은 항상 최신 버전을 유지해야하는 문제인 걸까?
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.1/kafka-clients-3.4.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.3.0/spark-streaming-kafka-0-10_2.12-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.0/commons-pool2-2.12.0.jar
maven repo에서 다운로드 코드이다.