Airflow 변수 관리와 로컬 환경 테스트

이한형·2022년 9월 26일
0

Airflow 변수 관리와 로컬 환경 테스트

Airflow 에서 직면한 문제

Airflow 환경과 Local 환경에서의 환경 변수 관리

airflow에서 제공하는 variable 관리 시스템을 이용하면 소스코드상에서 Variable.get(’변수 이름’)으로 가져와야 하는데 로컬 환경에서는 적용이 되지 않았습니다.

Airflow 환경과 local 환경의 절대 경로 차이

airflow 환경에서 최상위 루트 폴더는 dags 폴더로 되어있고 로컬 환경에서는 프로젝트 폴더가 루트 폴더로 되어 있기 때문에 import 경로에서 차이가 존재하였습니다.

로컬 환경에서의 테스트

airflow의 dag의 함수들의 로직을 테스트하기 위해서는 mwaa의 dev 서버 푸시 하거나 로컬 환경에서 docker를 이용하여 airflow를 이용하여 테스트를 해야 했습니다.

작업 ↔ 작업 간의 트리거를 테스트하거나 스케줄을 테스트하는 게 아니라면 airflow 위에서 테스트는 불필요하다고 느껴졌습니다.

이전에 해결했던 방법

환경 변수

로컬에서 간단한 소스코드를 테스트하기 위해서는 Variable.get(’변수 이름’)코드를 주석 처리를 하고 변수에 직접 값을 할당하여 테스트를 진행하였습니다.


from airflow.models import Variable

env_name = Variable.get('env_name')
# env_name = '~~~'

테스트

airflow의 dag을 작성하기 전에 플로우에 맞춰 파이썬 코드를 작성하여 테스트를 진행한 뒤 airflow 환경 위에서 테스트를 진행하였습니다.

해결 방법

airflow와 로컬에서의 경로 차이가 존재했는데 pycharm의 source route를 이용하여 경로 차이를 없애고 동일하게 import 할 수 있었습니다.

또한 airflow 환경과 로컬 환경에서의 환경 변수를 테스트를 할 때마다 코드를 수정하지 않고 진행하기 위해 airflow 환경 위에서는 airflow variable을 가져오고 로컬 환경에서는 dotenv를 이용하여 변수를 가져오는 함수를 작성을 하였습니다.

from airflow.models import Variable
from dotenv import load_dotenv
import os

def variable(name: str) -> any:
    try:
        return Variable.get(name)
    except:
        load_dotenv()
        env = os.environ.get(name)
        if env:
            return env
        else:
            raise Exception('Variable Error')

위의 코드를 이용하여 환경 변수 부분은 해결이 되었으나 airflow dag 안의 함수들을 테스트하기에는 다른 과정이 필요하다고 생각이 들었습니다.

airflow 작업을 실행시킬 때 필요한 정보들을 airflow dag 안에 전달시키고 실행된 dag 함수들은 keyword arguments를 이용하여 정보들을 이용하고 있습니다.


def job(**kwargs):
       conf = kwargs['dag_run'].conf

이러한 과정을 해결하기 위해 작업을 실행시키기 위한 정보들을 conf라는 변수에 저장하는 class를 생성해 주고 순차적으로 작업들에게 클래스를 전달 시켜주고 실행 해주는 함수를 작성하여 테스트를 진행하였습니다.

class Configuration:
    def __init__(self, params: dict):
        self.conf = params


def run_test(jobs: list, params: dict):
    conf = Configuration(params)
    for job in jobs:
        job(dag_run=conf)

from src.test import run_test
from test_job import job1, job2, job3

if __name__ == "__main__":
    params = { "variable1": "1", "variable2": "2" }

    jobs = [job1, job2, job3]
    run_test(jobs=jobs, params=params)

테스트를 진행하였을 때 작업들이 순차적으로 진행이 되었고, params 또한 별 문제 없이 전달이 되었습니다.

결론

로컬 환경에서 dag 함수들의 플로우에 맞는지 간단하고 빠르게 테스트를 하는 용도로 사용할 수 있는 것을 확인할 수 있었습니다.

아직까지는 dag들 간의 trigger를 걸어 유기적으로 연결되는 작업들 같은 경우에는 테스트가 불가능하기 때문에 추후에 고도화를 진행해 볼 수 있을 것 같습니다.

profile
풀스택 개발자를 지향하는 개발자

0개의 댓글