EMR Kafka Consuming 하기(2)

록스블로그·2023년 10월 29일
0
post-thumbnail

드디어 해결했다....

spark = SparkSession.builder \
    .appName("KafkaConsumer") \
    .config("spark.jars", """
                        /usr/lib/spark/jars/kafka-clients-3.4.1.jar,
                        /usr/lib/spark/jars/kafka/spark-sql-kafka-0-10_2.12-3.3.0.jar,
                        /usr/lib/spark/jars/spark-streaming-kafka-0-10_2.12-3.3.0.jar,
                        /usr/lib/spark/jars/spark-token-provider-kafka-0-10_2.12-3.3.0.jar,
                        /usr/lib/spark/jars/commons-pool2-2.12.0.jar
                        """)\
    .getOrCreate()

kafkaParams = {
    "kafka.bootstrap.servers": "dns:9092", 
    "subscribe": "apple_stock_info_topic_test",
    "startingOffsets": "earliest" 
}
table = spark.read\
    .format("kafka") \
    .options(**kafkaParams) \
    .load()
    
table = table.selectExpr("cast(value as string) as value")
table.show()
+--------------------+
|               value|
+--------------------+
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
|{"timestamp": "20...|
+--------------------+
only showing top 20 rows

우와.. 결국 크리티컬한 문제는 commons-pool2-2.12.0.jar 해당 jar 파일이 없었다는 것이다.

찾아보니 객체의 생성, 저장 및 재사용을 관리하는 기술로, 객체 생성과 가비지 컬렉션이 비용이 많이 드는 시나리오에서 성능과 리소스 관리를 향상시키는 데 주로 사용되는 자르 파일 하지만 기존에 분명 commons-pool-1.5.4.jar 해당 자르 파일이 있었는데.. 이걸로는 해결이 안되나보다.. jar 파일은 항상 최신 버전을 유지해야하는 문제인 걸까?


wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.1/kafka-clients-3.4.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.3.0/spark-streaming-kafka-0-10_2.12-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.0/commons-pool2-2.12.0.jar

maven repo에서 다운로드 코드이다.

profile
어려움에 성장하는 데이터 엔지니어

0개의 댓글