03_DB and Executors

Copes·2021년 11월 30일
0

Airflow

목록 보기
3/5
post-thumbnail

수백 개의 tasks, DAGs는 동시에 어떻게 다룰 것인가?

  • Executor
    • LocalExecutor
    • CeleryExecutor
  • parameter(Tasks, DAGs 동시 수행 관련)
    • parallelism
    • concurrency(dag_concurrency, concurrency(해당 dag만))
    • max_active_run(max_active_runs_per_dag, max_active_runs(해당 dag만))

DAGs에서 여러 Task를 동시에 수행하기 위한 방법

  • SequentialExecutor (Sequential하게 task 하나씩 진행)
  • LocalExecutor (Single Machine에서 multiple tasks 수행 가능)
  • CeleryExecutor (Multiple Machines에서 multiple tasks 수행 가능)

SequentialExecutor

SequentialExecutor 내에서는
task1 >> [task2, task3] >> task4
으로 구성하는 경우,

task1 -> task2 or task3 -> task2, task3 중 앞에서 수행하지 않은 것 -> task4
순으로 동작한다.

즉, 순차적으로 진행된다.

task2, task3이 동시에, 병렬적으로 수행되게 하려면 어떻게 해야 하는가?


LocalExecutor

2가지 설정이 선행되어야 한다.

1. 순차 데이터베이스(SQLite)는 한번에 하나의 writer만 가지기 때문에, 데이터베이스를 변경해야 한다.(postgresql로 변경)
2. SequentialExecutorLocalExecutor로 변경해야 한다.

1. Postgresql 설치 및 MetaDB를 Postgresql로 변경

  • postgresql 설치
    $ sudo apt update
    $ sudo apt install postgresql
  • connect to postgresql
    $ sudo -u postgres psql
  • password 설정
    postgres=# ALTER USER postgres PASSWORD [PASSWORD]

postgresql과 연결 준비 완료

  • extra package
    $ pip install 'apache-airflow[postgres]'

postgres를 airflow의 metastore(metaDB)로 사용할 준비 완료

  • airflow.cfg의
    sql_alchemy_conn = ...
    sql_alchemy_conn = postgresql+psycopg2://postgres:[PASSWORD]@localhost/postgres로 변경

  • 저장 후 제대로 설정되었는지 metaDB 확인
    $ airflow db check로 확인할 수 있다.

2. LocalExecutor로 executor 변경

  • airflow.cfg의
    executor = SequentialExecutor
    executor = LocalExecutor로 변경

init, ID 생성 등 다시 수행

$ airflow db init

$ airflow users create \
 --username [username] \
 --firstname [firstname] \
 --lastname [lastname] \
 --role Admin \
 --password [password] \
 --email [email@email.com]

이후 적용시키려면 webserver를 재시작해야 한다.

Gantt를 확인하면 두 가지 작업이 동시에 수행되는 것을 확인할 수 있다.
(Single Machine에서도 자원에 따라 동시에 수행할 수 있음을 알 수 있다.)

  • 최대한 많이 수행하려면?
    • 예를 들어 Single Machine에서 100개 이상의 작업을 동시에 수행하는 것은 어려움이 있을 것이다.
    • 다른 executor를 선택(필요한 만큼 확장하기 위해서)
      • CeleryExecutor
      • KubernetesExecutor
  • 무한한 task를 동시에 수행 가능한가?

가장 중요한 것은 모든 executor가 순서에 맞게 실행되어야 한다는 점이다. > Queue를 사용.

  1. LocalExecutor에서는 Task를 Queue에 Push,
  2. Task가 실행될 준비가 되면 Queue에서 Pulled out, execute된다.

CeleryExecutor

  • LocalExecutor로 Single Machine에서 multiple tasks를 parallel하게 수행할 수 있지만, 이 역시 한계가 있을 수 있다. -> Multiple Machines(Workers)에서 Multiple Tasks를 수행해야 할때는? "CeleryExecutor"

Queue 역할로 Redis를 사용한다.

CeleryExecutor

  1. CeleryExecutor는 tasks를 multiple machines에서 수행 가능하도록 해준다.(반면, LocalExecutor에서는 tasks를 single machine에서만 수행할 수 있다.)
  2. CeleryExecutor를 사용하고 싶다면 external tool을 install and setup해야 한다.(Redis 같은)
  3. 각각의 Worker는 일치해야 한다.
    각 worker는 machine에 해당하므로, 각 machine에는 task가 실행되게 하는 인스턴스가 실행된다. 그러므로 모든 machines가 same dependencies를 공유해야 한다.(예를 들어 작업이 AWS와 상호작용한다고 하면, interact를 위한 python module을 모든 machines에 설치해야 한다. 그렇지 않으면 오류가 발생한다.)

CeleryExecutor 설정

Celery 설정

$ pip install 'apache-airflow[celery]'
airflow.cfg에서 executor = CeleryExecutor로 변경

Redis 설정

message broker 역할(Queue)로 Redis를 설치
$ sudo apt update
$ sudo apt install redis-server

  • redis 설정 변경
    $ sudo nano /etc/redis/redis.conf 들어가서
    supervised no -> `supervised systemd로 변경 후 저장(ctrl + x)
  • restart 해서 적용 및 확인
    $ sudo systemctl restart redis.service
    $ sudo systemctl status redis.service
    -> active : active(running) 상태 확인
celery를 세팅하기 위해 변경해야 할 parameter 설정.
  • broker_url : airflow.cfg의
    broker_url - celeryExecutor에서 task를 redis message로 push하기 위해서
    사용
    된다.

    broker_url = redis://redis:6379/0
    -> broker_url = redis://localhost:6379/0

6379는 포트이고 0은 데이터베이스 이름이 0이라는 뜻이다.

  • result_backend
    : celery에서 해당 작업의 실행과 관련된 일부 metadata가 저장된다.
    => result_backend 설정해야 하는 이유이다.

    result_backend = db+postgresql://postgres:airflow@postgres/airflow
    -> result_backend = postgresql+psycopg2://postgres:postgres@localhost/postgres

  • 추가 설정
    broker_url를 redis와 연결되게 설정했기 때문에 redis 패키지 관련 설치
    $ pip install 'apache-airflow[redis]'

Flower

  • flower란 airflow에서 CeleryExecutor에 default 로 제공하는 UI로
    tasks가 실행되는 workers, machines를 모니터링하게 해준다.

    2.1.0 version에서는 default가 아닌 것으로 보인다. -> Link를 이용해서 버전 변경 후

$ airflow celery flower로 동작한다.

  • worker 추가하는 방법?
    result_backend = postgresql+psycopg2://postgres:postgres@localhost/postgres에서
    result_backend = db+postgresql://postgres:postgres@localhost/postgres로 변경

$ airflow celery worker

  • flower UI에서 worker가 추가 된 것을 확인할 수 있다.(이제 해당 worker(machine)에 task를 추가할 수 있다.)
  • Trigger 시에 worker에서 성공, 실패 등 task의 상태를 점검할 수 있다.

Task, DAG의 수 제한

  • Airflow instance 내의 Task의 총 수를 제한하고 싶다면? (parallelism)
  • 하나의 worker에서 수행되는 task의 수를 제한하고 싶다면? (dag_concurrency, concurrency)
  • DAGrun의 수를 제한하고 싶다면?(max_active_runs_per_age, max_active_runs)

parallelism

  • parallelism : 전체 DAG instance에서 실행 가능한 최대 task 수를 정의

ex) LocalExecutor를 사용하더라도 parallelism = 1로 설정하면
SequentialExecutor와 동일하게 동작한다.


concurrency

  • dag_concurrency : 전체 DAG instance에서 동시에 수행할 수 있는 최대 task의 수
  • concurrency : 각각의 DAG에서 설정할 수 있으며 해당 DAG에서 동시에 수행할 수 있는 task의 수를 제한
# x는 해당 DAG에서 동시에 수행할 수 있는 최대 tasks 수
with DAG(..., concurrency = x)

max_active_runs

  • max_active_runs_per_dag : 전체 DAG instance에서 동시에 수행할 수 있는 최대 DAG의 수
  • max_active_runs : 해당 DAG에 허용되는 활성 DAG 실행의 최대 수
# x는 해당 DAG에 허용되는 활성 DAG 실행의 최대 수
with DAG(..., concurrency = x)

예를 들어) 하나의 DAGrun이 일부 데이터를 처리하며,
데이터다음 DAGrun에서 사용된다고 가정하자.
이전 Dagrun이 수행되고 다음 DAGrun이 수행되어야 하므로, 동시에 실행할 수 있는
DAGrun의 수를 제한하는 것이 필요
하다. -> max_active_runs 사용


Exercise)

1)

parallelism = 4이고
dags_concurrency = 6이면?

parallelism에 우선순위가 있기 때문에
한 dag에서 동시에 수행 가능한 task의 수는 4이다.

2)

2개의 DAG가 있다.
DAG A [T1, T2] >> T3
DAG B [T1, T2] >> T3

parallelism = 4
dags_concurrency = 2
max_active_runs_per_dag = 1
이면, DAG A, DAG B가 동시에 수행될 수 있는가?

parallelism, dags_concurrency만 봐서는 가능하지만,
한번에 최대 1개의 DAG만 동시에 수행 가능하기 때문에 불가능하다.

0개의 댓글