[Prefect] Flow dependency

홍성환·2021년 11월 16일
1
post-thumbnail

1. Why Flow Dependency??

  • 왜 필요한지 이해하기 위해 예시를 들어보자.
  • One flow
    • 비트코인데이터를 RDS로 적재한 후 RDS에 쌓인 데이터로 주식 가격 예측을 위한 feature를 생성한다고 가정해보자.
    • 이를 위해서 비트코인데이터를 RDS로 적재하는 task A를 만들었다.
    • 또한 RDS에 있는 데이터를 가공하여 feature를 만든 후 RDS로 재적재하는 task B를 만들었다.
  • New Feature test
    • 근데 주식 예측을 위한 feature로 아닌 기존 알고리즘 B가 아닌 새로운 알고리즘C로 feature C를 만들고 싶어진 것이다.
    • 그래서 feature C를 위해 task-c를 만들었다.
    • 또한 과거 시간의 feature C가 필요하기 때문에 Feature C를 만들기 위해 과거 모든 기록에 대해 Flow를 돌려줄 필요가 생겼다(backfill)
    • 이때 task a는 이미 성공해서 데이터를 잘 쌓았지만, task c 때문에 다시 돌려야하는 일이 생긴다.
    • 새로운 feature 알고리즘 추가 및 교체는 자주 있는 일이기 때문에 의미 없는 task a의 재실행이 많아졌다.
  • Flow dependency
    • 위처럼 구성하면 평소에는 flow C를 돌리다가 task b의 backfill이 필요하면 Flow B만 다시 돌리면 된다.
  • 위 예시말고도 아마 다양한 상황에서 다양한 이유로 flow dependecy 기능을 쓰고 싶은 경우가 있을 것이다.

2. Basic

2-1 공식문서 튜토리얼

  • 공식문서에 소개된 튜토리얼 링크에서 어떻게 하면 dependency를 걸 수 있는지 알려준다.
    https://docs.prefect.io/core/idioms/flow-to-flow.html
  • StartFlowRun이라는 prefect 기본 task 중에 하나를 쓰는 방식이다.
  • StartFlowRun에 project이름과 flow이름을 넘겨주면 된다.
  • 코드 예시
  from prefect import Flow
  from prefect.schedules import CronSchedule
  from prefect.tasks.prefect import StartFlowRun


  weekday_schedule = CronSchedule(
      "30 9 * * 1-5", start_date=pendulum.now(tz="US/Eastern")
  )


  # assumes you have registered the following flows in a project named "examples"
  flow_a = StartFlowRun(flow_name="A", project_name="examples", wait=True)
  flow_b = StartFlowRun(flow_name="B", project_name="examples", wait=True)
  flow_c = StartFlowRun(flow_name="C", project_name="examples", wait=True)
  flow_d = StartFlowRun(flow_name="D", project_name="examples", wait=True)

  with Flow("parent-flow", schedule=weekday_schedule) as flow:
      b = flow_b(upstream_tasks=[flow_a])
      c = flow_c(upstream_tasks=[flow_a])
      d = flow_d(upstream_tasks=[b, c])

3. 한계점

  • 하지만 상황은 위 처럼 단순하지 않고 다양한 기능에 대한 요구가 필요해지게 된다.

3-1 Flow끼리 결과 주고 받기

  • Flow A가 Flow B를 trigger 한다고 해보자.
  • 근데 Flow B는 Flow A가 가지고 있는 특정 task의 결과를 필요로 한다.
  • 예를 들면 위 예제에서 task a에서 코인데이터를 쌓을 때 코인데이터를 쌓은 데이터 베이스 테이블의 이름을 task b에 넘겨주고 싶다고 해보자.
  • task a, task b가 한 flow에 있으면 쉬웠던 일이 flow a, flow b로 쪼개지면 어려워진다.

3-2 log가 보고 싶다..

  • 위 방법을 StartFlowRun 방법을 사용하면 단지 flow를 trigger만 해줄뿐이다.
  • trigger된 flow의 로그를 보고 싶다면..?
    • trigger 기록을 보고 해당 flow 화면으로 넘어가 다시 로그를 봐야한다.
    • 이게 운영하다보면 정~~말 귀찮고 불편하다.

4. 여러 기능을 가진 Prefect 0.15.X

4-1 Flow끼리 결과 주고 받기

  • prefect release 노트를 보면 아래 처럼 0.15.0에서 flow run 결과를 간단하게 넘겨줄 수 있는 task가 추가 되었다고 한다.

  • pr을 살펴보니 아래와 같은 task가 추가 되었다.

  • flow간 task를 주고 받기 위해서는 get_task_run_result를 쓰면 된다.

  • StartFlowRun을 결과로 flow_run_id를 넘겨준다.

  • 이 아이디를 get_task_run_result(flow_run_id, task_slug=task-slug) 으로 넘겨주면된다.

  • 또한 flow안에는 여러 task가 존재한다. 어떤 task의 결과를 넘겨줄지 정해야하기 때문에 task이 별명인 task_slug를 넘겨 줘야 한다.

    • 이 task가 안에서 실제로 하는 일은 database에 저장된 task의 결과를 쿼리해서 가져오는 것을 대신 해주는 것이다. 우리는 쿼리할 때 쓰일 인자를 잘 넘겨주기만 하면 된다.

4-1-1 신기능이 완벽할리가 없었어.. ㅜ_ㅜ

  • 나는 task_slug를 분명 잘 지정해서 넘겼는데 이상하게 prefect는 task 결과를 못 받아오고 있었다.
  • 결국 db의 task 관련 모든 테이블을 까보게 되었고 내가 넘긴 task_slug 뒤에 이상한 -copy 문자열이 달려있는 것을 확인했다.
  • 글을 쓰는 시점에 알게된 사실인데 관련 버그는 issue에도 등록이 되어있었다.
  • 결국 나는 그냥 끝에 -copy 문자열을 달아주는 방식으로 쓰고 있다. 🙁

4-2 log가 보고 싶다..

  • 로그는 wait_for_flow_run 를 통해서 볼 수 있다.
  • StartFlowRun을 wait=False로 주게 되면 바로 trigger만 하고 넘어간다.
  • 그 뒤에 wait_for_flow_run task를 통해 해당 flow 가 끝나기를 기다리면 StartFlowRun을 wait=True로 쓴 것과 동일한 효과를 얻을 수 있다.
  • wait_for_flow_run 의 인자 중에 stream_logs=True 로 바꿔주면 로그가 상위 flow에서도 잘 보이게 된다.

4-2-1 역시나..신기능이 완벽할리가 없었어.. ㅜ_ㅜ

  • 이 기능은 근데 막상 써보니..... 하위 flow가 실패해도 상위 flow의 wait 부분은 성공으로 생각하고 그냥 넘어가 버렸다.
  • 원래는 아래같은 flow 구조에서 flowA가 성공을 못하고 어떤 이유로 실패하면 flowB 실행을 못 하고 Flow C도 같이 실패 했다.(flow a 실패 -> flow C 실패, B실행 못함)
  • 근데 stream logs를 쓰기 위해 wait_for_flow_run을 쓰니 Flow A가 실패해도 FLow B를 실행하고 Flow C는 성공으로 처리된다...😡
  • 결국 이 기능은 못 쓰고 그냥 prefect ui 페이지를 헤메며 로그를 보고 있다.

4-2-2 0.15.8 부터 status 관련 기능 보안

  • 글을 쓰는 시점으로 부터 6일전에 raise_final_state라는 옵션이 wait_for_flow_run 테스크에 추가 되었다!!
  • 이렇게 되면 trigger한 flow의 상태를 모니터링해서 원하던 대로 실패 처리를 할 수 있을 것 처럼 보였다!!
  • 두근 거리는 마음으로 Agent 버전업을 한 후 적용을 해봤지만 UI서버 버전이 낮을 경우 하위 버전과 호환 되지 않았다.
  • task 관련 업데이트라 agent만 버전업하면 될 것 같았지만, 그 동안 개발된 다른 기능들로 인해서 ui 서버 보다 높은 버전의 agent에서 flow 등록 자체가 불가능했다.. 🥲
  • ui 서버를 언제 한번 버전업을 진행한 후 flow 새 기능을 써봐야겠다..

5. Colusion

  • flow dependency를 걸 수 있는 방법을 알아봄
  • dependency와 관련해서 겪을 수 있는 문제 사항들과 해결책들을 알아봄.
profile
Machine Learning Engineer: recsys, mlops

0개의 댓글