Prefect란? - Task, Flow, Parameter

hsh·2021년 8월 16일
1
  • airflow와 같은 workflow orchestration tool인 prefect에 대해 알아보자.

1. Why Prefect?

  • prefect는 데이터 파이프라인을 구축하는데 쓰인다.
  • 데이터는 만들어질 때 작업들 간의 dependency가 있는데 이것을 관리해준다.
    • 예를 들어 매출 데이터, 유저 통계를 만든 후 매출통계와 유저 통계를 합쳐 유저별 매출 데이터를 만든다면 데이터를 만드는 순서가 생긴다.
  • 데이터는 정해진 시간마다 자동으로 주기적으로 만들어져야 하는데, prefect는 스케줄러를 통해 이를 도와준다.
  • Flow, Task, Parameter는 prefect의 핵심개념으로 꼭 이해해야한다.

2. task

  • 하나의 가장 작은 작업 단위라고 생각할 수 있다.

  • 배치 잡 한개 한개가 곧 Task가 될 수 있다

  • code 예시

    from prefect import task
    
    @task
    def say_hello():
    	print("Hello, world!")
    
    @task
    def add(x, y=1):
        return x + y
  • 위 예제에서는 테스크를 만들 때 데코레이터를 사용하지만 Task classes를 통해 Custom class를 만들 수 있다.

    from prefect import Task
    
    class AddTask(Task):
    
        def __init__(self, default: int, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.default = default
    
        def run(self, x: int, y: int=None) -> int:
            if y is None:
                y = self.default
            return x + y
    
    # initialize the task instance
    add = AddTask(default=1)

3. Flow

  • task간의 관계를 정의한 전체 pipeline

    from prefect import Flow
    
    with Flow("My first flow!") as flow:
        first_result = add(1, y=2)
        second_result = add(x=first_result, y=100)
  • with 문 안에서 task를 호출하여 flow를 구성할 수 있음

  • 주의할 점은 flow를 정의하는 단계에서 task가 실행되는 것은 아니다.
    - flow 정의 부분에서는 task 간의 dependency를 파악하는 것이다.

    • 정의된 flow 실행 방법은 뒷부분 참고

4. Parameter

  • flow를 실행할 때 여러가지 인자를 넘겨주고 싶을 수 있다.

  • flow 실행에 필요한 인자들을 Parameter를 통해 넘겨 줄 수 있다.

  • example code

    from prefect import Parameter
    
    with Flow("Say hi!") as flow:
        name = Parameter("name", default='Hi')
        say_hello(name)
  • Paraemter default값은 정해진 상수 값만 넣을 수 있다.

  • Parameter는 flow 실행시 넘길 수 있는 값으로 flow run의 인자로 넘기거나 web UI flow 실행 화면에서 설정할 수 있다.
    - flow_name이라는 이름의 parameter를 설정했다면 아래처럼 실행할 때 값을 적어 넘길 수 있다.

5. run, register flow

  • flow를 정의한다고 해서 서버에 flow가 등록되거나, 실행되는 것이 아니다

  • 아래와 같은 flow를 만들었을 때

    from prefect import Flow
    with Flow("My first flow!") as flow:
        first_result = add(1, y=2)
        second_result = add(x=first_result, y=100)
    
    flow.register()
    • 실행은 다음과 같이 할 수 있다

      **flow.run()**

      - run() 으로 실행할 경우 prefect 서버를 통해 실행되는 것이 아니라 flow.run을 호출한 장비에서 실행된다. Run 명령어로는 로컬 머신에서 실행된다는 점을 주의해야한다.

    • 등록은 다음과 같이 할 수 있다.

      **flow.register()**

profile
Machine Learning Engineer: recsys, mlops

0개의 댓글