[Airflow] Connection , Hook 그리고 Variable

식빵·2025년 6월 25일
0

Airflow

목록 보기
8/9
post-thumbnail

🔥 Airflow 의 전역 설정값

개발하시는 분들은 공감하시겠지만,
애플리케이션을 개발하다 보면 외부 서비스를 활용하는 경우가 많습니다.

그리고 이 외부 서비스에 대한 설정 정보를 한 곳에 모아놓고,
전역변수 처럼 애플리케이션 코드에서 활용할 때가 많습니다.

이런 설정 정보의 예로는 다음과 같은 것들이 있습니다.

  • open api 호출을 위한 http url, port 등의 정보
  • database 통신을 위한 host, port, username, password

이런 정보들을 한곳에서 관리하게 때문에 개발의 편의성이 많이 올라갑니다.

Airflow 에서는 이와 비슷하게 ConnectionHook 라는 것을 통해서
연결 파라미터 정보들을 저장/조회할 수 있습니다.

추가로 복잡한 연결 정보가 아닌 단순한 전역변수 를 저장/조회할 수 있는
Variable 기능도 제공합니다.

지금부터 각각의 용어와 사용법을 알아보겠습니다.




📡 Connection 과 Hook

1. 용어 정리

먼저 간단하게 Connection 과 Hook 이 뭐고, 특징이 뭔지 가볍게 알아봅시다.


  • Connection :
    • Airflow 에서 다른 시스템과 통신(또는 상호작용)을 하기 위한 파라미터 집합
    • conn_id 를 통해서 서로 다른 Connection 을 구분
    • 통신의 방법이 다양한 만큼 Connection 타입도 다양함
      • Http Connection, File (Path) Connection

  • Hook :
    • 커넥션의 conn_id 를 통해 시스템 통신에 필요한 정보와 기능을 제공하는 인터페이스
    • Hook 의 주요 기능 :
      • Connection 에서 설정한 파라미터 값 조회
      • get_conn 메소드 반환 인스턴스로 직접적인 통신 기능을 내장한 인스턴스 참조

혹여 당장 이해가 안되더라도 실습을 해보면 뭔말인지 금방 아실 수 있습니다!




2. 실습 해보기

저는 간단한 Http 통신을 통해 어떤 json 데이터를 가져오고, 파일로 저장하는
DAG 를 작성해볼 예정입니다. 이를 위해서 먼저 Connection 하나를 생성해보겠습니다.

2-1. Connection 생성

airflow ui 화면을 띄우고 위 그림과 같이 메뉴 버튼을 클릭해줍니다.


위 Connection 설정은 https://jsonplaceholder.typicode.com/ 라는 사이트에서
제공하는 더미 json 데이터를 다운로드 받기 위해 만든 겁니다.

connection_id 는 추후에 Hook 생성을 위해 사용되는 값이니,
제 나름대로 다른 Connection 과 구분하기 쉬운 id 를 부여했습니다.

Http 통신으로 json 을 다운로드 받아야 해서 Connection Type : HTTP 로 지정하고,
Host, Schema 정보를 적절히 지정했습니다. (맨 밑에 Extra 는 작성 안해도 됩니다)

이러고 나서 화면 하단에 있는 Save 버튼을 클릭해서 Connection 설정을 저장합니다.


목록화면에 앞서 생성한 Connection 정보가 보입니다 😀



2-2. DAG 작성 및 Hook 사용

이제 Connection 을 생성했으니 conn_id 를 통해서 Hook 을 생성하고,
Hook 을 기반으로 HttpRequest 를 전송하고,
반환된 json 을 파일로 저장하는 DAG 를 작성해보겠습니다.

from airflow import DAG
from airflow.decorators import task
from airflow.models import TaskInstance
import pendulum


with DAG(
    dag_id="dags_connection_hook_blog",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False,
    schedule=None,
) as dag:
    
    @task(task_id='read_dummy_json_task')
    def read_dummy_json_task(**kwargs):
        from airflow.hooks.base import BaseHook
        from airflow.providers.http.hooks.http import HttpHook
        
        conn_id = 'jsonplaceholder.typicode'

        # Hook 의 classMethod (=get_connection) 으로 connection_id 와 매칭되는 정보를 갖는 Connection 인스턴스 생성
        conn = BaseHook.get_connection(conn_id)
        
        # 설정한 정보를 조회할 수 있습니다.
        print(f'Connection Id: {conn.conn_id}')
        print(f'Connection Type: {conn.conn_type}')
        print(f'Connection schema: {conn.schema}')
        print(f'Connection host: {conn.host}')
        
        # Hook 의 get_conn 메소드로 실제 통신 객체를 참조해보겠습니다.
        hook = HttpHook(http_conn_id=conn_id) # 첫번째 파라미터는 hook.run() 메소드 사용시 필요함.
        
        # Connection 정보 조회
        # hook.get_connection() # BaseHook.get_connection(conn_id) 와 동일
        
        # 실제 Http 통신 인스턴스(requests.Session) 참조
        session = hook.get_conn() 

        # 주의!
        # - hook.get_connection 메소드는 airflow ui 에서 생성한 Connection 반환
        # - hook.get_conn 메소드는 Connection 타입에 따라 실제 통신을 가능케하는 인스턴스를 반환
        # 헷갈리지 마세요!

        # requests.Session 인스턴스의 메소드를 사용해서 json 데이터 받아오기
        response = session.get(f'{hook.base_url}/posts') 

        # xcom_push 의 RETURN_VALUE 키의 value 로 return 값이 들어갑니다.
        return response.text
    

	# 반환받은 json 텍스트를 파일로 저장하는 태스크
    @task(task_id='save_json_text_to_file')
    def save_json_text_to_file(**kwargs):
        import os
        
        print(f'data_interval_end : {kwargs["data_interval_start"].in_timezone("Asia/Seoul")}')

        # read_dummy_json_task 메소드가 반환한 response.text 값을 받습니다.
        ti = kwargs['ti']
        result_json = ti.xcom_pull(task_ids='read_dummy_json_task')
        file_save_dir = '/opt/airflow/files'
        os.makedirs(file_save_dir, exist_ok=True)
        with open(f'{file_save_dir}/data.json', 'w', encoding='UTF-8') as f:
            f.write(result_json) # json 문자열을 파일로 저장!

    read_dummy_json_task() >> save_json_text_to_file()

핵심적인 부분 요약

  • hook.get_connection() 메소드를 통해서 설정한 파라미터 값 조회
  • hook.get_connection, hook.get_conn 은 비슷한 명칭 때문에 헷갈리니 주의!
  • hook.get_conn 메소드를 통해서 Http 통신이 가능한 requests.Session 인스턴스 참조
  • 이렇듯 hook 은 Connection 파라미터 정보뿐만 아니라,
    Connection 타입에 따라 실제 통신이 가능한 인스턴스를 생성해줍니다.

자~ 코드를 어느정도 숙지했다면 실행을 해보죠.


2-3. 실행결과:

Dag Trigger 를 버튼을 클릭하고 실행한 결과를 보면...

일단 2개의 Task 가 모두 성공한 것을 확인


read_dummy_json_task 태스크의 return 값이 Logs 에 찍히는 것을 확인할 수 있네요.


로그에 찍힌 return 값은 xcom 의 return_value 키의 value 로 들어가는 것도 확인!


마지막으로 실제 파일을 조회하여 json 이 정상적으로 저장된 것을 확인했습니다.
(참고로 저는 docker 사용 중이라서, container 내부로 들어가서 조회한 겁니다)




3. connection_id 를 사용하는 Operator 들

저희가 이전 목차에서 @task 데코레이터(=PythonOperator)와
HttpHook 을 통해서 http request 를 날리는 코드를 모두 작성했습니다.
그런데 이와 똑같은 기능을 제공하는 Operator 가 사실 이미 존재합니다.

바로 HttpOperator 이죠.
그런데 이 Operator 는 conn_id 를 요구합니다.
아래 코드를 한번 볼까요?

from airflow import DAG
from airflow.decorators import task
from airflow.providers.http.operators.http import HttpOperator
import pendulum

with DAG(
    #... 생략 ....
) as dag:
    
    conn_id = 'jsonplaceholder.typicode'
    
    read_dummy_json_task = HttpOperator(
        task_id='read_dummy_json_task',
        headers={ 'Accept': 'application/json' },
        method='GET', 		  # HTTP GET 으로 요청 날림
        http_conn_id=conn_id, # (중요) HttpConnection 의 connection_id !!
        endpoint='/posts',    # 나머지 경로 작성
    )
    
    @task(task_id='save_json_text_to_file')
    def save_json_text_to_file(**kwargs):
        # .......................................
		# ... 내용이 이전과 완전히 같으므로 생략! ....
        # .......................................

보면 알겠지만 connection_id 를 생성자에서 요구합니다.

이러한 conn_id 를 요구하는 Operator 들은 task 가 실행될 때
Hook.get_connection(), Hook.get_conn() 등의 메소드를 내부적으로
호출해서 사용하게 됩니다.

결과적으로 위 코드는 저희가 이전에 작성한 코드와 완전히 동일하게 동작합니다.

이런 Operator 들은 편하기는 하지만 내부적인 동작 방식을 어느정도 알아야 쓰기 좋습니다.
그러므로 가볍게라도 Operatorexecute 메소드 내용을 보는 걸 추천드립니다!



🌐 Variable

1. Airflow 의 전역변수

아~ 그런데 위에 작성한 코드에서 뭔가 마음에 안드는 부분이 있습니다.

# 반환받은 json 텍스트를 파일로 저장하는 태스크
@task(task_id='save_json_text_to_file')
def save_json_text_to_file(**kwargs):

	# 중간 내용 생략!!
    
    file_save_dir = '/opt/airflow/files' # 하드 코딩된 경로!!!!
    os.makedirs(file_save_dir, exist_ok=True)
    with open(f'{file_save_dir}/data.json', 'w', encoding='UTF-8') as f:
    	f.write(result_json) # json 문자열을 파일로 저장!

바로 하드코딩된 파일 저장 디렉토리 경로입니다. 불편합니다.

왜냐하면 이런 경로 정보가 똑같은 값으로 서로 다른 DAG 에
하드 코딩된 상태에서 중간에 이 저장 경로가 바뀌면 골치가 아프기 때문입니다.

이미 하드코딩된 경로를 여러 DAG 에서 사용 중이니
결국 모든 DAG 파일을 뒤져보면서 수정해줘야할 겁니다.

이런 경우를 방지하기 위해서 Airflow 에서는 전역변수 기능인
Variable 을 제공합니다.



2. 설정 및 사용법

airflow ui 를 실행시키고 위 그림처럼 클릭해줍니다.


이후에 위 그림처럼 적절하게 Key 값을 주고, Val 에는 아까 하드코딩한
디렉토리 경로를 작성준 후에 Save 버튼을 클릭해서 전역변수를 저장합니다.


# 반환받은 json 텍스트를 파일로 저장하는 태스크
@task(task_id='save_json_text_to_file')
def save_json_text_to_file(**kwargs):
	import os
	from airflow.models import Variable # import 추가!!!

	# ...중간 내용 생략...
    
    # file_save_dir = '/opt/airflow/files' # 하드코딩 주석
    file_save_dir = Variable.get("json_save_directory")
    os.makedirs(file_save_dir, exist_ok=True)
    with open(f'{file_save_dir}/data.json', 'w', encoding='UTF-8') as f:
    	f.write(result_json) # json 문자열을 파일로 저장!

DAG 코드를 위처럼 수정해줍니다.
이후 다시 실행하면 마찬가지로 성공하게 됩니다.

참고로 Jinja Template 을 사용할 수 있는 경우에는 아래처럼 조회할 수 있습니다.
{{ var.value.json_save_directory }}


이상으로 글을 마치겠습니다.
읽어주셔서 감사합니다~



🧾 참고한 링크

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글