Spark Job을 실행시켰는데 offset의 sink가 맞지 않아서 실행되지 않은 문제

Q·2023년 4월 2일
0

✅ 사전 지식

⚡ spark의 checkpoint

스파크 스트리밍에서 Checkpoint스트리밍 애플리케이션의 상태를 주기적으로 저장하여 고장 복구를 도울 수 있는 메커니즘이다. 체크포인트는 다음 두 가지 정보를 저장한다.

1. 메타데이터: 스트리밍 애플리케이션의 설정, DStream 연산, 그리고 작업 진행 상황과 같은 메타데이터가 포함된다. 메타데이터는 애플리케이션 복구를 위해 필요한 정보를 제공한다.

2. 데이터: 스트리밍 애플리케이션에서 사용되는 RDD의 데이터를 저장한다. 이 데이터는 스트리밍 애플리케이션 복구에 필요한 중간 연산 결과를 포함하며, 스트리밍 애플리케이션 복구 시 이전 상태를 복원하는 데 사용된다.
체크포인트 디렉터리를 설정하려면, 다음과 같이 SparkConf 객체에서 spark.checkpoint.dir 속성을 지정한다.

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

conf = SparkConf().setAppName("CheckpointExample")
sc = SparkContext(conf=conf)

# Set the checkpoint directory
checkpoint_dir = "hdfs://localhost:9000/checkpoints"
sc.setCheckpointDir(checkpoint_dir)

# Create a streaming context with a batch interval of 5 seconds
ssc = StreamingContext(sc, 5)

체크포인트 디렉터리HDFS와 같은 분산 파일 시스템에 있어야 한다. 이렇게 하면 스트리밍 애플리케이션이 실패한 경우 체크포인트 데이터를 사용하여 애플리케이션을 복구할 수 있다.

체크포인트를 사용하면 스트리밍 애플리케이션의 내구성과 안정성이 향상됩니다. 그러나 체크포인트 저장에 따른 오버헤드가 있으므로, 애플리케이션의 성능과 요구 사항을 고려하여 적절한 체크포인트 간격을 설정해야 한다.

✅ 문제 발생

회사에서 Hadoop Cluster를 재시작하고 Yarn을 cluster manager로 사용하는 Spark Job을 재시작 했을때 Job이 실행되지 않은 문제가 발생했다.

✅ 발생한 에러

org.apache.spark.sql.streaming.StreamingQueryException: Cannot fetch records for the given partition and offset

위의 에러가 발생한 곳을 찾아야 했는데 (나는 어디에 로그가 찍히는지 몰라서 offset sink 문제인지 몰랐다ㅠㅠ)

✅ 문제 발생 이유

Kafka에서 메시지가 삭제되거나, Spark 애플리케이션이 체크포인트를 완료하기 전에 실패하거나 중지된 경우, 오프셋 불일치가 발생할 수 있다.

Kafka 오프셋과 Spark 스트리밍 애플리케이션의 싱크가 맞지 않는 것은 Spark 애플리케이션이 Kafka에서 처리해야 할 오프셋과 실제로 처리한 오프셋 사이에 차이가 있다는 것을 의미한다.

  • Kafka 오프셋 : Kafka에서 관리되는 각 파티션의 메시지 위치를 나타낸다.
  • Spark 스트리밍 애플리케이션의 오프셋 : Spark가 처리한 메시지의 위치를 나타낸다. ( 체크포인트에 저장된 오프셋은 Spark 스트리밍 애플리케이션에 의해 처리된 Kafka 오프셋, 체크포인트를 사용하면 장애 발생 시 Spark 애플리케이션이 이전 실행 상태를 복구하는데 사용 )

따라서 체크포인트에 저장된 오프셋과 Kafka에서 관리되는 오프셋 사이에 차이가 없어야 올바른 처리가 가능하다.

✅ 해결 방법

⚡ 체크포인트 재설정

  • 스파크 스트리밍 잡에서 체크포인트를 사용하는 경우, 오프셋 문제가 발생할 수 있다. 이 문제를 해결하려면 체크포인트 디렉토리를 제거하고 잡을 다시 시작해야한다. 체크포인트를 삭제하면 스트리밍 잡이 처음부터 다시 시작된다.

✅ 더 나아간 해결 방법

⚡ 오프셋 관리 개선

  • Kafka의 auto.offset.reset 옵션을 사용하여, 오프셋이 없거나 더 이상 유효하지 않은 경우 처리 방식을 지정할 수 있다. 이 옵션에는 "earliest" (가장 이전 오프셋부터 시작) 또는 "latest" (가장 최근 오프셋부터 시작)를 설정할 수 있다.
  • 만약 설정하지 않았으면 기본값은 "latest" 이다.( 체크포인트가 없을 때 애플리케이션의 시작 시점부터 처리되는 데이터만 고려한다는 의미 )

예를 들어, Spark 스트리밍 애플리케이션에서 다음과 같이 옵션을 설정할 수 있다.

kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "your_topic_name") \
    .option("startingOffsets", "latest") \
    .option("auto.offset.reset", "latest") \
    .load()

⚡ 실패한 작업 다시 시도

  • "Retry" 기능을 사용하여 실패한 작업을 다시 시도할 수 있다. 이렇게 하면 일시적인 오류가 발생할 때 작업이 자동으로 다시 시작되어 오프셋 불일치 문제를 해결할 수 있다.

⚡ 체크포인트와 메시지 보존 정책 조정

  • Kafka의 메시지 보존 정책과 체크포인트 간격을 조정하여 오프셋 불일치를 줄일 수 있다. 메시지 보존 기간을 늘리고 체크포인트 간격을 줄이면, 오프셋 불일치의 가능성을 줄일 수 있다.
profile
Data Engineer

0개의 댓글