1. Why Flow Dependency??
- 왜 필요한지 이해하기 위해 예시를 들어보자.
- One flow
- 비트코인데이터를 RDS로 적재한 후 RDS에 쌓인 데이터로 주식 가격 예측을 위한 feature를 생성한다고 가정해보자.
- 이를 위해서 비트코인데이터를 RDS로 적재하는 task A를 만들었다.
- 또한 RDS에 있는 데이터를 가공하여 feature를 만든 후 RDS로 재적재하는 task B를 만들었다.
- New Feature test
- 근데 주식 예측을 위한 feature로 아닌 기존 알고리즘 B가 아닌 새로운 알고리즘C로 feature C를 만들고 싶어진 것이다.
- 그래서 feature C를 위해 task-c를 만들었다.
- 또한 과거 시간의 feature C가 필요하기 때문에 Feature C를 만들기 위해 과거 모든 기록에 대해 Flow를 돌려줄 필요가 생겼다(backfill)
- 이때 task a는 이미 성공해서 데이터를 잘 쌓았지만, task c 때문에 다시 돌려야하는 일이 생긴다.
- 새로운 feature 알고리즘 추가 및 교체는 자주 있는 일이기 때문에 의미 없는 task a의 재실행이 많아졌다.
- Flow dependency
- 위처럼 구성하면 평소에는 flow C를 돌리다가 task b의 backfill이 필요하면 Flow B만 다시 돌리면 된다.
- 위 예시말고도 아마 다양한 상황에서 다양한 이유로 flow dependecy 기능을 쓰고 싶은 경우가 있을 것이다.
2. Basic
2-1 공식문서 튜토리얼
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun
weekday_schedule = CronSchedule(
"30 9 * * 1-5", start_date=pendulum.now(tz="US/Eastern")
)
flow_a = StartFlowRun(flow_name="A", project_name="examples", wait=True)
flow_b = StartFlowRun(flow_name="B", project_name="examples", wait=True)
flow_c = StartFlowRun(flow_name="C", project_name="examples", wait=True)
flow_d = StartFlowRun(flow_name="D", project_name="examples", wait=True)
with Flow("parent-flow", schedule=weekday_schedule) as flow:
b = flow_b(upstream_tasks=[flow_a])
c = flow_c(upstream_tasks=[flow_a])
d = flow_d(upstream_tasks=[b, c])
3. 한계점
- 하지만 상황은 위 처럼 단순하지 않고 다양한 기능에 대한 요구가 필요해지게 된다.
3-1 Flow끼리 결과 주고 받기
- Flow A가 Flow B를 trigger 한다고 해보자.
- 근데 Flow B는 Flow A가 가지고 있는 특정 task의 결과를 필요로 한다.
- 예를 들면 위 예제에서 task a에서 코인데이터를 쌓을 때 코인데이터를 쌓은 데이터 베이스 테이블의 이름을 task b에 넘겨주고 싶다고 해보자.
- task a, task b가 한 flow에 있으면 쉬웠던 일이 flow a, flow b로 쪼개지면 어려워진다.
3-2 log가 보고 싶다..
- 위 방법을 StartFlowRun 방법을 사용하면 단지 flow를 trigger만 해줄뿐이다.
- trigger된 flow의 로그를 보고 싶다면..?
- trigger 기록을 보고 해당 flow 화면으로 넘어가 다시 로그를 봐야한다.
- 이게 운영하다보면 정~~말 귀찮고 불편하다.
4. 여러 기능을 가진 Prefect 0.15.X
4-1 Flow끼리 결과 주고 받기
-
prefect release 노트를 보면 아래 처럼 0.15.0에서 flow run 결과를 간단하게 넘겨줄 수 있는 task가 추가 되었다고 한다.
-
-
pr을 살펴보니 아래와 같은 task가 추가 되었다.
-
flow간 task를 주고 받기 위해서는 get_task_run_result
를 쓰면 된다.
-
StartFlowRun
을 결과로 flow_run_id를 넘겨준다.
-
이 아이디를 get_task_run_result(flow_run_id, task_slug=task-slug)
으로 넘겨주면된다.
-
또한 flow안에는 여러 task가 존재한다. 어떤 task의 결과를 넘겨줄지 정해야하기 때문에 task이 별명인 task_slug를 넘겨 줘야 한다.
- 이 task가 안에서 실제로 하는 일은 database에 저장된 task의 결과를 쿼리해서 가져오는 것을 대신 해주는 것이다. 우리는 쿼리할 때 쓰일 인자를 잘 넘겨주기만 하면 된다.
4-1-1 신기능이 완벽할리가 없었어.. ㅜ_ㅜ
- 나는 task_slug를 분명 잘 지정해서 넘겼는데 이상하게 prefect는 task 결과를 못 받아오고 있었다.
- 결국 db의 task 관련 모든 테이블을 까보게 되었고 내가 넘긴 task_slug 뒤에 이상한
-copy
문자열이 달려있는 것을 확인했다.
- 글을 쓰는 시점에 알게된 사실인데 관련 버그는 issue에도 등록이 되어있었다.
- 결국 나는 그냥 끝에 -copy 문자열을 달아주는 방식으로 쓰고 있다. 🙁
4-2 log가 보고 싶다..
- 로그는
wait_for_flow_run
를 통해서 볼 수 있다.
StartFlowRun
을 wait=False로 주게 되면 바로 trigger만 하고 넘어간다.
- 그 뒤에
wait_for_flow_run
task를 통해 해당 flow 가 끝나기를 기다리면 StartFlowRun
을 wait=True로 쓴 것과 동일한 효과를 얻을 수 있다.
wait_for_flow_run
의 인자 중에 stream_logs=True
로 바꿔주면 로그가 상위 flow에서도 잘 보이게 된다.
4-2-1 역시나..신기능이 완벽할리가 없었어.. ㅜ_ㅜ
- 이 기능은 근데 막상 써보니..... 하위 flow가 실패해도 상위 flow의 wait 부분은 성공으로 생각하고 그냥 넘어가 버렸다.
- 원래는 아래같은 flow 구조에서 flowA가 성공을 못하고 어떤 이유로 실패하면 flowB 실행을 못 하고 Flow C도 같이 실패 했다.(flow a 실패 -> flow C 실패, B실행 못함)
- 근데 stream logs를 쓰기 위해
wait_for_flow_run
을 쓰니 Flow A가 실패해도 FLow B를 실행하고 Flow C는 성공으로 처리된다...😡
- 결국 이 기능은 못 쓰고 그냥 prefect ui 페이지를 헤메며 로그를 보고 있다.
4-2-2 0.15.8 부터 status 관련 기능 보안
- 글을 쓰는 시점으로 부터 6일전에 raise_final_state라는 옵션이 wait_for_flow_run 테스크에 추가 되었다!!
- 이렇게 되면 trigger한 flow의 상태를 모니터링해서 원하던 대로 실패 처리를 할 수 있을 것 처럼 보였다!!
- 두근 거리는 마음으로 Agent 버전업을 한 후 적용을 해봤지만 UI서버 버전이 낮을 경우 하위 버전과 호환 되지 않았다.
- task 관련 업데이트라 agent만 버전업하면 될 것 같았지만, 그 동안 개발된 다른 기능들로 인해서 ui 서버 보다 높은 버전의 agent에서 flow 등록 자체가 불가능했다.. 🥲
- ui 서버를 언제 한번 버전업을 진행한 후 flow 새 기능을 써봐야겠다..
5. Colusion
- flow dependency를 걸 수 있는 방법을 알아봄
- dependency와 관련해서 겪을 수 있는 문제 사항들과 해결책들을 알아봄.