Why Prefect?
Flow
, Task
, Parameter
는 prefect의 핵심개념으로 꼭 이해해야한다.task
하나의 가장 작은 작업 단위라고 생각할 수 있다.
배치 잡 한개 한개가 곧 Task가 될 수 있다
code 예시
from prefect import task
@task
def say_hello():
print("Hello, world!")
@task
def add(x, y=1):
return x + y
위 예제에서는 테스크를 만들 때 데코레이터를 사용하지만 Task classes를 통해 Custom class를 만들 수 있다.
from prefect import Task
class AddTask(Task):
def __init__(self, default: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.default = default
def run(self, x: int, y: int=None) -> int:
if y is None:
y = self.default
return x + y
# initialize the task instance
add = AddTask(default=1)
Flow
task간의 관계를 정의한 전체 pipeline
from prefect import Flow
with Flow("My first flow!") as flow:
first_result = add(1, y=2)
second_result = add(x=first_result, y=100)
with 문 안에서 task를 호출하여 flow를 구성할 수 있음
주의할 점은 flow를 정의하는 단계에서 task가 실행되는 것은 아니다.
- flow 정의 부분에서는 task 간의 dependency를 파악하는 것이다.
Parameter
flow를 실행할 때 여러가지 인자를 넘겨주고 싶을 수 있다.
flow 실행에 필요한 인자들을 Parameter를 통해 넘겨 줄 수 있다.
example code
from prefect import Parameter
with Flow("Say hi!") as flow:
name = Parameter("name", default='Hi')
say_hello(name)
Paraemter default값은 정해진 상수 값만 넣을 수 있다.
Parameter는 flow 실행시 넘길 수 있는 값으로 flow run의 인자로 넘기거나 web UI flow 실행 화면에서 설정할 수 있다.
- flow_name이라는 이름의 parameter를 설정했다면 아래처럼 실행할 때 값을 적어 넘길 수 있다.
run, register flow
flow를 정의한다고 해서 서버에 flow가 등록되거나, 실행되는 것이 아니다
아래와 같은 flow를 만들었을 때
from prefect import Flow
with Flow("My first flow!") as flow:
first_result = add(1, y=2)
second_result = add(x=first_result, y=100)
flow.register()
실행은 다음과 같이 할 수 있다
**flow.run()**
- run() 으로 실행할 경우 prefect 서버를 통해 실행되는 것이 아니라 flow.run을 호출한 장비에서 실행된다. Run 명령어로는 로컬 머신에서 실행된다는 점을 주의해야한다.
등록은 다음과 같이 할 수 있다.
**flow.register()**