Spark Streaming Kafka 연동(feat. ClassNotFoundException)

유알·2024년 5월 12일
0

Spark Stream을 통해 Kafka에서 메시지를 받아서 hdfs 에 적재하는 코드를 작성하였다.

코드

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1',
    'org.apache.kafka:kafka-clients:2.8.1'
]

# SparkSession 초기화
spark = SparkSession.builder \
    .master("yarn") \
    .appName("apiUsageKafkaConsumer") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.jars.packages", ",".join(packages)) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Kafka value 에 있는 json구조
api_usage_trace_schema = StructType([
    StructField("eventId", StringType(), False),
    StructField("requestTime", TimestampType(), False),
    StructField("responseTime", TimestampType(), False),
    StructField("requestProtocol", StringType(), True),
    StructField("requestMethod", StringType(), True),
    StructField("requestUri", StringType(), False),
    StructField("responseStatus", IntegerType(), True),
    StructField("clientIp", StringType(), True),
    StructField("clientAgent", StringType(), True),
    StructField("apiKey", StringType(), True),
    StructField("traceId", StringType(), False)
])

# 각종 설정 변수
kafka_topic = "api-usage-trace" # 카프카 토픽
kafka_servers = "카프카 부트스트랩 서버" 
offset_reset = "earliest"
time_zone = "Asia/Seoul" # 나의 경우, 한국 기준으로 date를 뽑아내야해서

# Kafka에서 가져오는 df
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", kafka_topic) \
    .option("includeHeaders", "true") \
    .option("startingOffsets", offset_reset) \
    .load()

transformed_df = df.select(
    from_json(col("value").cast("string"), api_usage_trace_schema).alias("parsed_value"), # json 파싱
    col("offset"),
    col("partition"),
    col("timestamp")
).select(
    col("parsed_value.*"), # flat하게
    col("offset").alias("kafka_offset"),
    col("partition").alias("kafka_partition"),
    col("timestamp").alias("kafka_timestamp")
).withColumn(
    "date", to_date(from_utc_timestamp(col("requestTime"), time_zone))
)

# Date별로 파티셔닝 해서, parquet으로 저장
query = transformed_df \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .partitionBy("date") \
    .trigger(processingTime='5 seconds') \
    .option("path", "/tracking/api_usage") \
    .option("checkpointLocation", "/tracking/api_usage/check_point") \
    .start()

query.awaitTermination()

하면서 알게된 점들

Spark Stream 개념

생각보다 SparkStream이라고 특별한 것이 없다는 생각이 들었다.
보니까 Stream이라는 것은 일정 시간마다 polling을 하는 구조 인것 같고, 이것은 Daemon 쓰레드에서 돌아간다.

// 약간 자바 코드로 보면 이런것 같다.
Thread th = new Thread(() -> {대충 폴링하는 코드})
th.setDaemon(true);
th.start()

그래서 일정 주기마다 가져온 데이터를 dataframe에 매핑하고(구조적 스트리밍 기준) 이것을 우리가 만든 dag를 따라 돌리는 것 같다.

맨 마지막에 awaitTermination()의 경우는 join과 같은 개념인것 같다.

생각보다 특별할게 없는데, outputMode랑 trigger부분만 좀 조심해서 쓰면 되겠다.
그리고 kafka의 경우, 커밋이 중요한데, 이는 어떻게 처리하는지 좀더 알아봐야겠다.

참고로, daemon이기 때문에 main쓰레드가 종료되면 종료된다. pid로 kill을 하거나, resourceManager web ui에서 kill application을 할 수 있다.

Spark On Yarn

처음에는 Spring Batch랑 spark랑 뭐가 다르지? 싶었다.
지금 보니까 배치 작업을 할 수 있다는 것 빼고는 다 다르다.

Spark의 경우, rdd라는 개념을 통해 추상화된 데이터 구조를 제공한다. 우리는 그 api를 통해 데이터를 조작하지만, 실제로는 분산된 클러스터에 데이터가 나눠져서 분산 처리된다.(정말 엄청난 기술이다.)

처음에는 모든 개념이 낯설었지만, 이제 조금 알겠다.
Spark는 클러스터를 관리하는 방법을 여러개 제공하는데, Spark가 자체적으로 제어하고 관리하는 모드가 있고, Hadoop의 ResourceManager인 YARN을 활용하는 모드가 있다.(다른것도 있다.)
어쨌든 핵심은 클러스터를 관리하고 스케쥴링을 해야한다.

Spark가 실행될때, 이 YARN에게 필요한 리소스를 요청하고, 각 클러스터의 nodemanager가 생성한 컨테이너를 배정한다. 그리고 Spark가 이 노드에서 분산처리를 하는 것이다.
(정말, 멋진 프레임워크다)

기본포트를 사용할때,
8088 에서 리소스 매니저 정보를 볼 수 있고, 스파크가 실행 중일 때는, 4040포트에서 스파크 잡을 모니터링 할 수 있다.

Parquet

파케이? 라고 읽는 것 같다. 아직 이해도가 부족하지만, 내가 느낀 특징은 다음과 같다

  • 열기반 압축을 한다.(그래서 더 효율적이다.)
  • 압축률이 높다.
  • 일부 컬럼을 가져올 때 전체 레코드를 로드하지 않는다.
  • 스키마 정보를 유지한다.(read해서 스키마를 인식할 수 있다.)

아무래도 스파크에 확실히 최적화된 포맷이 아닌가 싶다.

삽질 기록

진짜... 제대로 된 정보가 없었다.
일단 spark에서 kafka 를 활용하기 위해서는 추가적인 jar가 필요하다.
공식문서에서는 spark-submit과 함께 --packages옵션을 활용하라고 안내하고 있다.(그러면 제대로좀 만들어놓던가..ㅜㅜ)
https://spark.apache.org/docs/3.3.1/structured-streaming-kafka-integration.html

--packages 옵션은 자동으로 디펜던시를 탐색해서 ~/.ivy폴더에 캐싱한다.
근데, 잘 작동하지도 않고, 매우 느리다. wget을 통하면, 단숨에 끝나는 다운로드가 엄청 느리게 동작한다.(왜 그런지는 모르겠다.)
옵션을 붙여서 실행을 시키면, ClassNotFoundException이나, ClassDef를 못찾겠다고, 에러가 뜨면서 job 이 fail상태가 된다.

나의 예상으로는 Java의 classloader가 런타임에 패키지를 로드하도록 설계되어 있는 것 같다. 자바 기반 psuedo 코드를 작성하자면

Class.forName("$SPARK_HOME/jars/{}")

이렇게 불러오는것 같다.(예상에는)

그래서 해결책이 뭐냐? 나의 경우 그냥 jar를 다운받아서 저 경로에 넣어주었다.
카프카를 사용할 때 필요한 dependency는

  • spark-sql-kafka-0-10_2.12-3.2.1.jar
  • kafka-clients-2.1.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar
  • commons-pool2-2.8.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.2.1.jar
    나의 경우 이렇게 넣어주었다. (버전은 당연히 각자 상황에 맞게, scalar버전-spark버전 형식이다.)

다운로드 받는 법은, 메이븐 레포에서(https://mvnrepository.com/) 원하는 종속성을 검색하고 버전을 선택한다(스칼라 버전 주의해서). 그러면, 다운로드 받는 링크가 있다.

Files에 Jar버튼을 누르면, 다운로드 된다. 나의 경우 링크 경로를 복사해서, wget을 통해 다운받고, $SPARK_HOME/jars 위치에 옮겨주었다.

spark-submit --master yarn --deploy-mode client --name api_usage_consumer /home/on5949/Project/repo/tracking/api_usage_consumer.py

실행은 단순하게 이렇게 실행하면 된다.

profile
더 좋은 구조를 고민하는 개발자 입니다

0개의 댓글