[데이터 플랫폼 운영 / 개발] - Spark 9 (Stream join)

Chan hae OH·2024년 3월 22일
0

Spark

목록 보기
12/12

1. 시작말


안녕하세요.

데이터 엔지니어링 & 운영 업무를 하는 중 알게 된 지식이나 의문점들을 시리즈 형식으로 계속해서 작성해나가며

새로 알게 된 점이나 잘 못 알고 있었던 점을 더욱 기억에 남기기 위해 글을 꾸준히 작성 할려고 합니다.

Spark의 경우 Spark 완벽 가이드 책을 많이 참고하여 운영을 하고 있습니다.

반드시 글을 읽어 주실 때 잘 못 말하고 있는 부분은 정정 요청 드립니다.

저의 지식에 큰 도움이 됩니다. :)



2. Stateful join 이란?


stateful join 은 스트림과 스트림 간의 이전 상태를 저장하여 join 하는 방식을 말합니다. 시간의 흐름에 따라 데이터는 달라지며, 이에 상태가 관리되면서 join 을 수행할 수 있어야 합니다.



3. Spark Structured Streaming 에서의 Stream 처리


기존의 스트림 데이터는 주로 처음 생성된 데이터(로그, 이미지 등) 들이 들어올 확률이 높습니다.

이 때 해당 데이터들을 조합하여 의미가 있는 데이터로 만들기 위해서는 기존에 가지고 있는 기준 정보나 기타 데이터들을 조합할 필요가 있습니다.

또는 stream 간의 데이터를 실시간으로 조합하여 분석할 필요가 있습니다.

이 때 활용할 수 있는 것은 join 연산자 입니다.

Spark Structured Streaming 은 stream - static, stream-stream DataFrame 간의 join 연산을 지원 합니다.

https://spark.apache.org/docs/3.2.3/structured-streaming-programming-guide.html#join-operations



4. stream - static join


아래의 코드를 보면 staticDFread 메소드를 통해 데이터를 불러오기 때문에 static 데이터 프레임 입니다. 그리고 streamingDFreadStream 메소드를 통해 데이터를 불러오기 때문에 stream 데이터 프레임 입니다.

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF

이 두개의 데이터프레임 간의 join 수행을 spark 에서는 지원합니다. 하지만 몇가지 제약 사항을 명시하고 있습니다.

https://spark.apache.org/docs/3.2.3/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries



5. stream - stream join


Spark 에서는 두 개의 스트리밍 데이터 세트/DataFrame을 조인할 수 있습니다.

두 데이터 스트림 간에 조인 결과를 생성할 때의 과제는 어느 시점에서든 조인 양쪽에 대해 데이터 세트 보기가 불완전하여 입력 간에 일치하는 항목을 찾는 것이 훨씬 더 어렵다는 것입니다.

하나의 입력 스트림에서 수신된 행은 다른 입력 스트림에서 아직 수신되지 않은 향후 행과 일치할 수 있습니다. 따라서 두 입력 스트림 모두에 대해 과거 입력을 스트리밍 상태로 버퍼링하여 모든 미래 입력을 과거 입력과 일치시키고 그에 따라 결합된 결과를 생성할 수 있습니다. 또한 스트리밍 집계와 유사하게 지연되고 순서가 잘못된 데이터를 자동으로 처리하고 워터마크를 사용하여 상태를 제한할 수 있습니다.


5.1. inner join


모든 종류의 조인 조건과 함께 모든 종류의 열에 대한 내부 조인이 지원됩니다. 그러나 스트림이 실행됨에 따라 스트리밍 상태의 크기는 새로운 입력이 과거의 입력과 일치할 수 있으므로 모든 과거 입력을 저장해야 하므로 무한정 계속 커집니다. 제한되지 않은 상태를 방지하려면 무한정 오래된 입력이 향후 입력과 일치할 수 없으므로 상태에서 지워질 수 있도록 추가 조인 조건을 정의해야 합니다. 즉, 조인 시 다음과 같은 추가 단계를 수행해야 합니다.

엔진이 입력이 얼마나 지연될 수 있는지 알 수 있도록 두 입력 모두에 워터마크 지연을 정의합니다.

엔진이 한 입력의 이전 행이 다른 입력과 일치하는 데 필요하지 않은 시기(즉, 시간 제약 조건을 충족하지 않는 경우)를 파악할 수 있도록 두 입력에 대한 이벤트 시간에 대한 제약 조건을 정의합니다. 이 제약 조건은 두 가지 방법 중 하나로 정의할 수 있습니다.

  • 시간 범위 조인 조건 = (e.g. ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR)

  • 이벤트 윈도우 조인 조건 = (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow

광고 노출 스트림(광고가 표시될 때)을 광고에 대한 다른 사용자 클릭 스트림과 결합하여 노출이 수익 창출 가능한 클릭으로 이어질 때 상관 관계를 맺고 싶다고 가정해 보겠습니다. 이 스트림-스트림 조인에서 상태 정리를 허용하려면 다음과 같이 워터마킹 지연 및 시간 제약 조건을 지정해야 합니다.

워터마크 지연: 예를 들어 노출수와 해당 클릭수는 이벤트 시간에 각각 최대 2시간과 3시간까지 늦거나 순서가 뒤바뀌는 경우가 있습니다.

이벤트 시간 범위 조건: 해당 노출 후 0초~1시간 범위 내에서 클릭이 발생할 수 있다고 가정합니다.

from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)



5.2. outer join


워터마크 + 이벤트 시간 제약 조건은 내부 조인의 경우 선택 사항이지만 외부 조인의 경우 이를 지정해야 합니다. 이는 외부 조인에서 NULL 결과를 생성하기 위해 엔진이 입력 행이 향후 어떤 것과도 일치하지 않을 시기를 알아야 하기 때문입니다.

따라서 올바른 결과를 생성하려면 워터마크 + 이벤트 시간 제약 조건을 지정해야 합니다. 따라서 외부 조인을 사용하는 쿼리는 외부 조인이 되도록 지정하는 추가 매개변수가 있다는 점을 제외하면 이전의 광고 수익 창출 예시와 매우 비슷해 보입니다.

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

테스트 시에는 계속 left outer join 을 명시해도 inner join 이 발생하여 한번 더 확인 해봐야 할 듯 합니다.

주의사항

외부 결과가 생성되는 방식과 관련하여 주목해야 할 몇 가지 중요한 특징이 있습니다.

  • 외부 NULL 결과는 지정된 워터마크 지연 및 시간 범위 조건에 따라 지연되어 생성됩니다. 이는 엔진이 일치하는 항목이 없고 앞으로 더 이상 일치하는 항목이 없는지 확인하기 위해 오랫동안 기다려야 하기 때문입니다.

  • 마이크로 배치 엔진의 현재 구현에서 워터마크는 마이크로 배치가 끝날 때 진행되고 다음 마이크로 배치는 업데이트된 워터마크를 사용하여 상태를 정리하고 외부 결과를 출력합니다. 처리할 새 데이터가 있는 경우에만 마이크로 배치를 트리거하므로 스트림에 새 데이터가 수신되지 않으면 외부 결과 생성이 지연될 수 있습니다. 즉, 조인되는 두 입력 스트림 중 하나라도 한동안 데이터를 수신하지 않으면 외부(두 경우 모두 왼쪽 또는 오른쪽) 출력이 지연될 수 있습니다.



profile
Data Engineer

0개의 댓글