
개발하시는 분들은 공감하시겠지만,
애플리케이션을 개발하다 보면 외부 서비스를 활용하는 경우가 많습니다.
그리고 이 외부 서비스에 대한 설정 정보를 한 곳에 모아놓고,
전역변수 처럼 애플리케이션 코드에서 활용할 때가 많습니다.
이런 설정 정보의 예로는 다음과 같은 것들이 있습니다.
http url, port 등의 정보host, port, username, password이런 정보들을 한곳에서 관리하게 때문에 개발의 편의성이 많이 올라갑니다.
Airflow 에서는 이와 비슷하게 Connection 과 Hook 라는 것을 통해서
연결 파라미터 정보들을 저장/조회할 수 있습니다.
추가로 복잡한 연결 정보가 아닌 단순한 전역변수 를 저장/조회할 수 있는
Variable 기능도 제공합니다.
지금부터 각각의 용어와 사용법을 알아보겠습니다.
먼저 간단하게 Connection 과 Hook 이 뭐고, 특징이 뭔지 가볍게 알아봅시다.
Airflow 에서 다른 시스템과 통신(또는 상호작용)을 하기 위한 파라미터 집합conn_id 를 통해서 서로 다른 Connection 을 구분Http Connection, File (Path) Connection 등conn_id 를 통해 시스템 통신에 필요한 정보와 기능을 제공하는 인터페이스get_conn 메소드 반환 인스턴스로 직접적인 통신 기능을 내장한 인스턴스 참조혹여 당장 이해가 안되더라도 실습을 해보면 뭔말인지 금방 아실 수 있습니다!
저는 간단한 Http 통신을 통해 어떤 json 데이터를 가져오고, 파일로 저장하는
DAG 를 작성해볼 예정입니다. 이를 위해서 먼저 Connection 하나를 생성해보겠습니다.
airflow ui 화면을 띄우고 위 그림과 같이 메뉴 버튼을 클릭해줍니다.
위 Connection 설정은 https://jsonplaceholder.typicode.com/ 라는 사이트에서
제공하는 더미 json 데이터를 다운로드 받기 위해 만든 겁니다.
connection_id 는 추후에 Hook 생성을 위해 사용되는 값이니,
제 나름대로 다른 Connection 과 구분하기 쉬운 id 를 부여했습니다.
Http 통신으로 json 을 다운로드 받아야 해서 Connection Type : HTTP 로 지정하고,
Host, Schema 정보를 적절히 지정했습니다. (맨 밑에 Extra 는 작성 안해도 됩니다)
이러고 나서 화면 하단에 있는 Save 버튼을 클릭해서 Connection 설정을 저장합니다.
목록화면에 앞서 생성한 Connection 정보가 보입니다 😀
이제 Connection 을 생성했으니 conn_id 를 통해서 Hook 을 생성하고,
Hook 을 기반으로 HttpRequest 를 전송하고,
반환된 json 을 파일로 저장하는 DAG 를 작성해보겠습니다.
from airflow import DAG
from airflow.decorators import task
from airflow.models import TaskInstance
import pendulum
with DAG(
dag_id="dags_connection_hook_blog",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
catchup=False,
schedule=None,
) as dag:
@task(task_id='read_dummy_json_task')
def read_dummy_json_task(**kwargs):
from airflow.hooks.base import BaseHook
from airflow.providers.http.hooks.http import HttpHook
conn_id = 'jsonplaceholder.typicode'
# Hook 의 classMethod (=get_connection) 으로 connection_id 와 매칭되는 정보를 갖는 Connection 인스턴스 생성
conn = BaseHook.get_connection(conn_id)
# 설정한 정보를 조회할 수 있습니다.
print(f'Connection Id: {conn.conn_id}')
print(f'Connection Type: {conn.conn_type}')
print(f'Connection schema: {conn.schema}')
print(f'Connection host: {conn.host}')
# Hook 의 get_conn 메소드로 실제 통신 객체를 참조해보겠습니다.
hook = HttpHook(http_conn_id=conn_id) # 첫번째 파라미터는 hook.run() 메소드 사용시 필요함.
# Connection 정보 조회
# hook.get_connection() # BaseHook.get_connection(conn_id) 와 동일
# 실제 Http 통신 인스턴스(requests.Session) 참조
session = hook.get_conn()
# 주의!
# - hook.get_connection 메소드는 airflow ui 에서 생성한 Connection 반환
# - hook.get_conn 메소드는 Connection 타입에 따라 실제 통신을 가능케하는 인스턴스를 반환
# 헷갈리지 마세요!
# requests.Session 인스턴스의 메소드를 사용해서 json 데이터 받아오기
response = session.get(f'{hook.base_url}/posts')
# xcom_push 의 RETURN_VALUE 키의 value 로 return 값이 들어갑니다.
return response.text
# 반환받은 json 텍스트를 파일로 저장하는 태스크
@task(task_id='save_json_text_to_file')
def save_json_text_to_file(**kwargs):
import os
print(f'data_interval_end : {kwargs["data_interval_start"].in_timezone("Asia/Seoul")}')
# read_dummy_json_task 메소드가 반환한 response.text 값을 받습니다.
ti = kwargs['ti']
result_json = ti.xcom_pull(task_ids='read_dummy_json_task')
file_save_dir = '/opt/airflow/files'
os.makedirs(file_save_dir, exist_ok=True)
with open(f'{file_save_dir}/data.json', 'w', encoding='UTF-8') as f:
f.write(result_json) # json 문자열을 파일로 저장!
read_dummy_json_task() >> save_json_text_to_file()
핵심적인 부분 요약
hook.get_connection() 메소드를 통해서 설정한 파라미터 값 조회hook.get_connection, hook.get_conn 은 비슷한 명칭 때문에 헷갈리니 주의!hook.get_conn 메소드를 통해서 Http 통신이 가능한 requests.Session 인스턴스 참조자~ 코드를 어느정도 숙지했다면 실행을 해보죠.
Dag Trigger 를 버튼을 클릭하고 실행한 결과를 보면...
일단 2개의 Task 가 모두 성공한 것을 확인
read_dummy_json_task 태스크의 return 값이 Logs 에 찍히는 것을 확인할 수 있네요.
로그에 찍힌 return 값은 xcom 의 return_value 키의 value 로 들어가는 것도 확인!
마지막으로 실제 파일을 조회하여 json 이 정상적으로 저장된 것을 확인했습니다.
(참고로 저는 docker 사용 중이라서, container 내부로 들어가서 조회한 겁니다)
저희가 이전 목차에서 @task 데코레이터(=PythonOperator)와
HttpHook 을 통해서 http request 를 날리는 코드를 모두 작성했습니다.
그런데 이와 똑같은 기능을 제공하는 Operator 가 사실 이미 존재합니다.
바로 HttpOperator 이죠.
그런데 이 Operator 는 conn_id 를 요구합니다.
아래 코드를 한번 볼까요?
from airflow import DAG
from airflow.decorators import task
from airflow.providers.http.operators.http import HttpOperator
import pendulum
with DAG(
#... 생략 ....
) as dag:
conn_id = 'jsonplaceholder.typicode'
read_dummy_json_task = HttpOperator(
task_id='read_dummy_json_task',
headers={ 'Accept': 'application/json' },
method='GET', # HTTP GET 으로 요청 날림
http_conn_id=conn_id, # (중요) HttpConnection 의 connection_id !!
endpoint='/posts', # 나머지 경로 작성
)
@task(task_id='save_json_text_to_file')
def save_json_text_to_file(**kwargs):
# .......................................
# ... 내용이 이전과 완전히 같으므로 생략! ....
# .......................................
보면 알겠지만 connection_id 를 생성자에서 요구합니다.
이러한 conn_id 를 요구하는 Operator 들은 task 가 실행될 때
Hook.get_connection(), Hook.get_conn() 등의 메소드를 내부적으로
호출해서 사용하게 됩니다.
결과적으로 위 코드는 저희가 이전에 작성한 코드와 완전히 동일하게 동작합니다.
이런 Operator 들은 편하기는 하지만 내부적인 동작 방식을 어느정도 알아야 쓰기 좋습니다.
그러므로 가볍게라도Operator의execute메소드 내용을 보는 걸 추천드립니다!
아~ 그런데 위에 작성한 코드에서 뭔가 마음에 안드는 부분이 있습니다.
# 반환받은 json 텍스트를 파일로 저장하는 태스크
@task(task_id='save_json_text_to_file')
def save_json_text_to_file(**kwargs):
# 중간 내용 생략!!
file_save_dir = '/opt/airflow/files' # 하드 코딩된 경로!!!!
os.makedirs(file_save_dir, exist_ok=True)
with open(f'{file_save_dir}/data.json', 'w', encoding='UTF-8') as f:
f.write(result_json) # json 문자열을 파일로 저장!
바로 하드코딩된 파일 저장 디렉토리 경로입니다. 불편합니다.
왜냐하면 이런 경로 정보가 똑같은 값으로 서로 다른 DAG 에
하드 코딩된 상태에서 중간에 이 저장 경로가 바뀌면 골치가 아프기 때문입니다.
이미 하드코딩된 경로를 여러 DAG 에서 사용 중이니
결국 모든 DAG 파일을 뒤져보면서 수정해줘야할 겁니다.
이런 경우를 방지하기 위해서 Airflow 에서는 전역변수 기능인
Variable 을 제공합니다.
airflow ui 를 실행시키고 위 그림처럼 클릭해줍니다.
이후에 위 그림처럼 적절하게 Key 값을 주고, Val 에는 아까 하드코딩한
디렉토리 경로를 작성준 후에 Save 버튼을 클릭해서 전역변수를 저장합니다.
# 반환받은 json 텍스트를 파일로 저장하는 태스크
@task(task_id='save_json_text_to_file')
def save_json_text_to_file(**kwargs):
import os
from airflow.models import Variable # import 추가!!!
# ...중간 내용 생략...
# file_save_dir = '/opt/airflow/files' # 하드코딩 주석
file_save_dir = Variable.get("json_save_directory")
os.makedirs(file_save_dir, exist_ok=True)
with open(f'{file_save_dir}/data.json', 'w', encoding='UTF-8') as f:
f.write(result_json) # json 문자열을 파일로 저장!
DAG 코드를 위처럼 수정해줍니다.
이후 다시 실행하면 마찬가지로 성공하게 됩니다.
참고로 Jinja Template 을 사용할 수 있는 경우에는 아래처럼 조회할 수 있습니다.
{{ var.value.json_save_directory }}
이상으로 글을 마치겠습니다.
읽어주셔서 감사합니다~