Spark Streaming에서 offset을 수동으로 조정하여 스트리밍 처리 [디버깅]

털난 개발자·2024년 3월 27일
0

Kubernetes에서 운영중인 Spark Streaming에서 Kafka의 메시지를 인코딩할 수 없는 에러가 발생했습니다.

실시간 데이터 처리에 끊김이 발생했고 원인을 파악하기 위해서 driver와 executor 파드의 로그를 확인합니다.

작업 복구 후 kafka 데이터를 확인해본 결과 value 값에 Inf 이 있어서 인코딩이 되지 않았습니다.
kafka broker로 데이터를 publish하는 곳에 AI모델 API인데, 추론 결과값에서 exception처리가 되지 않는 것 같습니다.


우선 Spark Driver와 Executor의 로그를 확인합니다. Pod 로그나 Spark history서버로 로그를 확인합니다.

=== Driver 로그===

......

TypeError: encoding without a string argument



=== Streaming Query ====

Identifier : [id = xxxxxxx--xxxx-xxxx, runId = xxxxx-xxxx-xxxx]

Current Commited Offsets: (KafkaV2[Subscribe[my-topic]]: {"my-topic":{"2":1625, "1":80,"0":0}}) # -> Commit(작업이 끝난) 지점

Current Available Offsets: (KafkaV2[Subscribe[my-topic]]: {"my-topic":{"2":1626, "1":80,"0":0}}) # -> 이제 작업을 할 지점



Current State: ACTIVE

Thread State: RUNNABLE

......

==== Executor 로그===


.....

KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-xxxxx-xxxx-xxxx....-executor-7, groupId=spark-kafka-source-xxxxx-xxxx-xxxx....-executor] Subscribed to partition(s): my-topic-2

KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-xxxxx-xxxx-xxxx....-executor-7, groupId=spark-kafka-source-xxxxx-xxxx-xxxx....-executor] Seeking to offset 1625 for partition my-topic-2

...... 중략.....

SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-xxxxx-xxxx-xxxx....-executor-7, groupId=spark-kafka-source-xxxxx-xxxx-xxxx....-executor] Seeking to EARLIEST offset of partition my-topic-2

SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-xxxxx-xxxx-xxxx....-executor-7, groupId=spark-kafka-source-xxxxx-xxxx-xxxx....-executor] Resetting offset for partition my-topic-2 to offset 1610

SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-xxxxx-xxxx-xxxx....-executor-7, groupId=spark-kafka-source-xxxxx-xxxx-xxxx....-executor] Seeking to LATEST offset of partition my-topic-2

SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-xxxxx-xxxx-xxxx....-executor-7, groupId=spark-kafka-source-xxxxx-xxxx-xxxx....-executor] Resetting offset for partition my-topic-2 to offset 1829

Traceback : 

....중략.... 

pyspark/serializers.py", line 211 in dump_stream

 ....중략....

TypeError: encoding without a string argument

...후략....

원인 파악

kafka에서 데이터를 read stream할때 offset 1625번 message가 직렬화(역직렬화)에 실패한 것으로 보입니다.

멈춰있는 파이프라인을 빠르게 정상화시킬 필요가 있어, 해당 오프셋의 메시지를 건너뛰고 정상데이터부터 데이터를 처리하도록 합니다. 2번 파티션에서 1626번 데이터부터 읽을 수 있도록 변경합니다.

Spark Streaming에서 실시간 데이터를 처리할떄 checkpoint를 통해 데이터를 어디까지 처리했는지에 대한 정보를 담고 있습니다.

장애발생시 Spark Streaming은 checkpoint의 정보를 읽어 스스로 장애 복구를 위해 장애가 발생한 시점에서 데이터 처리를 다시 시작합니다.


Checkpoint 구성 정보

checkpoint는 4개로 구성되어 있습니다

  1. metadata : spark의 run id 정보

  2. source : spark streaming 시작했을 때 topic의 시작 offset 정보를 담고 있음

    {“Spark_checkpoint_test”:{“2”:11,”1":11,”0":12}}
  3. offsets : 파티션별로 spark batch들이 읽고 처리한 데이터를 기록

    {"my-topic": {"2":1626, "1":80, "0":0}}

    주의
    가장 최근 offset파일은 다음 마이크로배치에 데이터를 읽을 offset입니다.
    그리고 바로 전 offset 파일은 현재 배치에서 읽을 offset입니다.

  4. commits : 처리가 완료된 내용을 저장


내 Checkpoint 디렉터리 확인

checkpoint 디렉터리에 들어가서 offset 확인

가장 최근 오프셋인 13974을 cat 13974 :{"my-topic": {"2":1626, "1":80, "0":0}}

최근 오프셋은 다음 batch에 실행할 오프셋 정보를 담고 있습니다. 위에 로그에서 'Current Available Offsets'한 오프셋입니다.

바로 전 오프셋인 13973를 확인해보면 {"my-topic": {"2":1625, "1":80, "0":0}}으로 batch가 성공적으로 끝나고 커밋된 오프셋정보입니다. 위 로그에서 'Current Commited Offsets'입니다.

=> 1625오프셋을 이번 배치에 읽어야 하는데 오류가 발생했습니다.

오프셋 수정

2번 파티션에서 1626데이터를 읽어오도록 강제로 변경하기 위해서는 Current Available Offsets을 "2":1627로 변경하고 커밋된 최종 오프셋은 "2":1626으로 변경하여 1625 데이터를 읽은 것처럼 수정합니다.

13974은 {"my-topic": {"2":1627, "1":80, "0":0}}

13973은 {"my-topic": {"2":1626, "1":80, "0":0}}

변경 후 job을 다시 실행하면 1626부터 데이터 처리를 시작합니다.

정리

1개 데이터의 유실이 있었지만, 실시간으로 바로 데이터를 처리해야하는 상황이라 강제로 offset을 변경하여 파이프라인을 정상화했습니다. 특히 Kafka로 데이터를 Publish할때는 데이터 타입을 검사해서 보내는 과정이 꼭 필요하다고 생각합니다.

0개의 댓글