[Python] Airflow DB 연동 및 예제

JunMyung Lee·2022년 5월 13일
0

데이터

목록 보기
7/14

해당 페이지에서는 이미 설치되어있는 airflow(SQLite)를 PostgreSQL을 이용한
연결로 변경하고자 함이다.

EMR의 설치로 인하여 HUE전용 MySQL이 설치가 되어 있어서 해당 부분에 연동을 하려고 하였는데, EMR을 최신버전으로 설치했음에도 불구하고 버전이 5.5가 설치가 되어있고 MySQL이 아니라 MariaDB가 설치되어 있다.
특정 글에 MariaDB는 권장하지 않는다고 한다. ( explicit_defaults_for_timestamp 설정불가 )

SQLite로 설치가 진행이 되면 병렬처리가 되지 않는다고 한다.
executor 설정이 SequentialExecutor로만 진행이 되면 해당 Executor가 아니라 LocalExecutor로 해야하다고 한다. (이름이 참 오해하기 좋게..)

PostgreSQL 정보입력하기

psycopg2 설치

pip3 install psycopg2

PostgreSQL 계정 추가하기

-- 접속
psql -h [HOST] -p [PORT] -U root --dbname=[DATABASE]

-- 계정 생성
CREATE USER [USER] with password '[PASSWORD]';

-- DB 생성
CREATE DATABASE [DATABASE] TEMPLATE template0 LC_COLLATE 'C' LC_CTYPE 'ko_KR.UTF-8';

-- DB 소유자 지정
ALTER DATABASE [DATABASE] OWNER TO [USER];

-- 특정 계정으로만 접근 가능하도록 grant
GRANT CONNECT, TEMPORARY ON DATABASE [DATABASE] TO [USER];


-- 스키마 생성 (해당 DB 소유권한 필요)
-- 해당 작업은 현재 페이지에서는 하지 않음
CREATE SCHEMA [SCHEMA] AUTHORIZATION [USER];

여기서 PostgreSQL이면 응당 schema를 만들고 public 스키마가 아닌 생성된 스키마로 해야하는데 스키마를 지정하는 설정파일을 추가하면 DB 생성시 에외가 발생한다. 해당 부분은 어떠한 부분때문인지 알 수가 없어서 public 스키마로 진행

설정파일 변경

vi $AIRFLOW_HOME/airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://[USER]:[PASSWORD]@[HOST]:[PORT]/[DATABASE]
sql_engine_encoding = utf-8

Airflow PostgreSQL 연결

DB 테이블 생성

해당 명령어를 입력하면 지정된 DB에 여러 테이블들이 생성된다.

airflow db init

USER 생성

해당 명령어를 입력하면 ab_user 테이블에 생성

airflow users create --username [USER] --firstname Liam --lastname Liam --role Admin --email [EMAIL] -p [PASSWORD]

Airflow Webserver 서비스 등록

서비스를 수동으로 등록한다.

# root 계정으로 진행
vi /usr/lib/systemd/system/airflow-webserver.service

[Unit]
Description=Airflow webserver daemon
After=network.target

[Service]
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/bin/python3 /home/airflow/.local/bin/airflow webserver --port 8082 --pid /home/airflow/airflow/webserver.pid
Restart=on-failure
RestartSec=5s
PrivateTmp=true

[Install]
WantedBy=multi-user.target
systemctl enable airflow-webserver.service
systemctl start airflow-webserver.service

schedule, worker도 같은 형식으로 진행하면 되며, 시작 커맨드만 다음과 같이 한다.

# Worker
ExecStart=/usr/bin/python3 /home/airflow/.local/bin/airflow celery worker
# Schedule
ExecStart=/usr/bin/python3 /home/airflow/.local/bin/airflow scheduler

Airflow 등록 및 테스트

해당 위치에 py파일 작성.
여기서 작성한 코드는 다음의 블로그글을 참조하여 작성되었다. (코드-블로그)

$AIRFLOW_HOME/dags/[file_name].py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
          'owner': 'airflow'
        , 'catchup': False
        , 'execution_timeout': timedelta(hours=6)
        , 'depends_on_past': False
    }

dag = DAG(
          'sample'
        , default_args = default_args
        , description = "sample description"
        , schedule_interval = "@daily"
        , start_date = days_ago(3)
        , tags = ['daily']
        , max_active_runs=3
        , concurrency=1
    )

sample_a = BashOperator(
          task_id='sample_a'
        , bash_command='echo hello'
        , dag=dag
    )

sample_b = BashOperator(
        task_id='sample_b'
        , bash_command='echo hello'
        , dag=dag
    )

sample_a << sample_b

동작 테스트

여기서 py파일이름이 아니라 코드 내부의 dag_id, task_id를 명시해야한다. 또한 테스트시, 마지막 인자값은 현재 이후의 날짜는 동작하지 않고 이전의 날짜만 가능하다.

# task test
airflow tasks test sample sample_a 2022-05-12
# out
[2022-05-13 16:11:29,813] {dagbag.py:507} INFO - Filling up the DagBag from /home/airflow/airflow/dags
/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py:128 DeprecationWarning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
[2022-05-13 16:11:29,913] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample.sample_a __airflow_temporary_run_2022-05-13T07:11:29.836223+00:00__ [None]>
[2022-05-13 16:11:29,924] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample.sample_a __airflow_temporary_run_2022-05-13T07:11:29.836223+00:00__ [None]>
[2022-05-13 16:11:29,924] {taskinstance.py:1356} INFO -
--------------------------------------------------------------------------------
[2022-05-13 16:11:29,924] {taskinstance.py:1357} INFO - Starting attempt 1 of 1
[2022-05-13 16:11:29,925] {taskinstance.py:1358} INFO -
--------------------------------------------------------------------------------
[2022-05-13 16:11:29,929] {taskinstance.py:1377} INFO - Executing <Task(BashOperator): sample_a> on 2022-05-12T00:00:00+00:00
[2022-05-13 16:11:29,992] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sample
AIRFLOW_CTX_TASK_ID=sample_a
AIRFLOW_CTX_EXECUTION_DATE=2022-05-12T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=__airflow_temporary_run_2022-05-13T07:11:29.836223+00:00__
[2022-05-13 16:11:29,992] {subprocess.py:62} INFO - Tmp dir root location:
 /tmp
[2022-05-13 16:11:29,992] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'echo hello']
[2022-05-13 16:11:29,997] {subprocess.py:85} INFO - Output:
[2022-05-13 16:11:29,998] {subprocess.py:92} INFO - hello # 실행값
[2022-05-13 16:11:29,998] {subprocess.py:96} INFO - Command exited with return code 0
[2022-05-13 16:11:30,102] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample, task_id=sample_a, execution_date=20220512T000000, start_date=, end_date=20220513T071130
# dag test
airflow dags test sample 2022-05-12
# out
[2022-05-13 16:39:09,505] {dagbag.py:507} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2022-05-13 16:39:09,700] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample.sample_b backfill__2022-05-12T00:00:00+00:00 [queued]>']
[2022-05-13 16:39:14,665] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sample
AIRFLOW_CTX_TASK_ID=sample_b # 수행 task
AIRFLOW_CTX_EXECUTION_DATE=2022-05-12T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-05-12T00:00:00+00:00
[2022-05-13 16:39:14,666] {subprocess.py:62} INFO - Tmp dir root location:
 /tmp
[2022-05-13 16:39:14,666] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'echo hello']
[2022-05-13 16:39:14,672] {subprocess.py:85} INFO - Output:
[2022-05-13 16:39:14,672] {subprocess.py:92} INFO - hello
[2022-05-13 16:39:14,673] {subprocess.py:96} INFO - Command exited with return code 0
[2022-05-13 16:39:14,697] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample, task_id=sample_b, execution_date=20220512T000000, start_date=20220513T073909, end_date=20220513T073914
[2022-05-13 16:39:14,742] {backfill_job.py:378} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-05-13 16:39:14,776] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample.sample_a backfill__2022-05-12T00:00:00+00:00 [queued]>']
[2022-05-13 16:39:19,631] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sample
AIRFLOW_CTX_TASK_ID=sample_a # 수행 task
AIRFLOW_CTX_EXECUTION_DATE=2022-05-12T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-05-12T00:00:00+00:00
[2022-05-13 16:39:19,631] {subprocess.py:62} INFO - Tmp dir root location:
 /tmp
[2022-05-13 16:39:19,632] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'echo hello']
[2022-05-13 16:39:19,637] {subprocess.py:85} INFO - Output:
[2022-05-13 16:39:19,638] {subprocess.py:92} INFO - hello
[2022-05-13 16:39:19,638] {subprocess.py:96} INFO - Command exited with return code 0
[2022-05-13 16:39:19,661] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample, task_id=sample_a, execution_date=20220512T000000, start_date=20220513T073909, end_date=20220513T073919
[2022-05-13 16:39:19,693] {dagrun.py:562} INFO - Marking run <DagRun sample @ 2022-05-12T00:00:00+00:00: backfill__2022-05-12T00:00:00+00:00, externally triggered: False> successful
[2022-05-13 16:39:19,693] {dagrun.py:622} INFO - DagRun Finished: dag_id=sample, execution_date=2022-05-12T00:00:00+00:00, run_id=backfill__2022-05-12T00:00:00+00:00, run_start_date=2022-05-13 07:39:09.601044+00:00, run_end_date=2022-05-13 07:39:19.693573+00:00, run_duration=10.092529, state=success, external_trigger=False, run_type=backfill, data_interval_start=2022-05-12T00:00:00+00:00, data_interval_end=2022-05-13T00:00:00+00:00, dag_hash=None
[2022-05-13 16:39:19,694] {backfill_job.py:378} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-05-13 16:39:19,701] {backfill_job.py:879} INFO - Backfill done. Exiting.

코드의 마지막에 sample_a << sample_b로 되어있으므로 sample_b가 먼저 수행되었다.
실제 여러 flow를 그릴것이면, 왼쪽에서 오른쪽으로 수행되는 그림을 그리는게 나을듯 하다.

sample_a >> sample_b

UI에서 보여지는 순서

0개의 댓글