스파크 스트리밍에서 Checkpoint는 스트리밍 애플리케이션의 상태를 주기적으로 저장하여 고장 복구를 도울 수 있는 메커니즘이다. 체크포인트는 다음 두 가지 정보를 저장한다.
1. 메타데이터: 스트리밍 애플리케이션의 설정, DStream 연산, 그리고 작업 진행 상황과 같은 메타데이터가 포함된다. 메타데이터는 애플리케이션 복구를 위해 필요한 정보를 제공한다.
2. 데이터: 스트리밍 애플리케이션에서 사용되는 RDD의 데이터를 저장한다. 이 데이터는 스트리밍 애플리케이션 복구에 필요한 중간 연산 결과를 포함하며, 스트리밍 애플리케이션 복구 시 이전 상태를 복원하는 데 사용된다.
체크포인트 디렉터리를 설정하려면, 다음과 같이 SparkConf 객체에서 spark.checkpoint.dir 속성을 지정한다.
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
conf = SparkConf().setAppName("CheckpointExample")
sc = SparkContext(conf=conf)
# Set the checkpoint directory
checkpoint_dir = "hdfs://localhost:9000/checkpoints"
sc.setCheckpointDir(checkpoint_dir)
# Create a streaming context with a batch interval of 5 seconds
ssc = StreamingContext(sc, 5)
체크포인트 디렉터리는 HDFS와 같은 분산 파일 시스템에 있어야 한다. 이렇게 하면 스트리밍 애플리케이션이 실패한 경우 체크포인트 데이터를 사용하여 애플리케이션을 복구할 수 있다.
체크포인트를 사용하면 스트리밍 애플리케이션의 내구성과 안정성이 향상됩니다. 그러나 체크포인트 저장에 따른 오버헤드가 있으므로, 애플리케이션의 성능과 요구 사항을 고려하여 적절한 체크포인트 간격을 설정해야 한다.
회사에서 Hadoop Cluster를 재시작하고 Yarn을 cluster manager로 사용하는 Spark Job을 재시작 했을때 Job이 실행되지 않은 문제가 발생했다.
org.apache.spark.sql.streaming.StreamingQueryException: Cannot fetch records for the given partition and offset
위의 에러가 발생한 곳을 찾아야 했는데 (나는 어디에 로그가 찍히는지 몰라서 offset sink 문제인지 몰랐다ㅠㅠ)
Kafka에서 메시지가 삭제되거나, Spark 애플리케이션이 체크포인트를 완료하기 전에 실패하거나 중지된 경우, 오프셋 불일치가 발생할 수 있다.
Kafka 오프셋과 Spark 스트리밍 애플리케이션의 싱크가 맞지 않는 것은 Spark 애플리케이션이 Kafka에서 처리해야 할 오프셋과 실제로 처리한 오프셋 사이에 차이가 있다는 것을 의미한다.
따라서 체크포인트에 저장된 오프셋과 Kafka에서 관리되는 오프셋 사이에 차이가 없어야 올바른 처리가 가능하다.
예를 들어, Spark 스트리밍 애플리케이션에서 다음과 같이 옵션을 설정할 수 있다.
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "your_topic_name") \
.option("startingOffsets", "latest") \
.option("auto.offset.reset", "latest") \
.load()