[데이터 엔지니어링 데브코스] TIL 48일차 - Airflow 고급 기능 배우기(1) & (2)

박단이·2024년 1월 3일
0

데브코스 TIL

목록 보기
48/56

오늘 공부한 내용🤓

Airflow와 Slack 연동

자동으로 원하는 시간에 돌아가도록 만든 DAG는 에러가 발생해도 직접 Airflow에 접속해서 일일이 확인하지 않는다면 에러가 발생했는지 알 수가 없다. 에러가 발생했을 때 실시간으로 알 수 있는 방법으로 Slack과 연동하는 방법이 있다.

전반적인 순서는 아래와 같다.

  1. Slack의 workspace에 App를 만들어 해당 channel과 연결한다.
  2. 그 App을 이용해 Slack과 연동할 수 있도록 python 함수를 개발한다.
  3. 개발한 함수를 DAG의 default_args 인자에 {'on_failure_callback':slack연동함수} 를 설정한다.

자세한 내용 : https://velog.io/@qkr471/airflow-slack-연동하기

Airflow와 구글 스프레드시트 연동

Airflow는 전적으로 데이터 엔지니어의 영역이다. 하지만 Data는 비개발자와 개발자 모두 보기 편해야하고 데이터를 실시간으로 처리 되길 원하는 것도 모두다 원한다. 비개발자, 개발자 모두 편한 구글 스프레드시트의 데이터를 airflow와 연동하여 바로 DB에 적재하고 DB에 있는 내용을 바로 구글 스프레드시트로 뿌려준다면 사용자의 니즈를 충족시킬 수 있을 것이다.

전반적인 순서는 아래와 같다.

  1. GCP 에 가입을 한다.(해당 내용은 무료이기 때문에 과금 걱정 X)
  2. 새로운 프로젝트를 만들고, 그 안에 구글 시트 API와 구글 드라이브 API를 활성화한다.
  3. 구글 서비스 어카운트를 생성하여 이 내용을 담은 JSON 파일을 다운로드한다.
  4. JSON 파일에서 제시하는 이메일 주소를 원하는 구글 시트에 공유한다.
  5. Airflow DAG에서 해당 JSON 파일을 인증하고 시트를 조작한다.

자세한 내용 : https://velog.io/@qkr471/Airflow-구글-스프레드시트-연동

Airflow API 활성화

지금까지 Airlfow 조작을 CLI와 Web UI에서 진행했다. Airflow에도 API를 제공해서 이 기능을 사용할 수도 있다.
하지만 airflow의 내용은 기밀사항을 아주아주 많이 담고 있기 때문에 VPN 안에서 진행하길 바란다.

우선, API를 활성화 시켜야 한다. 방법은 2가지가 있다.

  1. airflow.cfg 수정하기
    • api 섹션에서 auth_backend 값을 변경한다.
    • [API]
      auth_backend=airflow.api.auth.backend.basic_auth
    • 위에서 설정한 basic_auth는 모든 사용자에게 열어주는 권한이기 때문에 꼬옥 VPN 안에서 설정하자.
  2. docker-compose.yml 수정하기
    • 위의 airflow.cfgdocker-compose 단계에서 override하는 방법이다.
    • evironment 에서 AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' 으로 설정한다.
    • docker-compose에서 airflow.cfg를 override하고 싶다면 AIRFLOW__section__key를 사용한다.

이렇게 API를 활성화 시키고 나면 Airflow web ui에 가서 사용자를 추가한다. Security > List Users > + 를 눌러서 권한으로 User을 주고 이 사용자를 통해 API를 사용한다.

API 종류

  1. Airflow의 상태 확인
    정말 말 그대로 Airflow가 잘 작동되고 있는지 확인한다.
curl -X GET --user "airflow:airflow" http://localhost:8080/health
  1. 특정 DAG를 Trigger 하기
    DAG의 excution date를 입력해야되기 때문에 POST Method를 사용한다.
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d
'{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/dag-id/dagRuns"
  1. 모든 DAG 리스트하기
    DAG 리스트 중 is_active는 그 DAG의 존재 유무에 대한 내용이고 is_paused는 DAG의 활성화 유무에 대한 내용이다. 헷갈리지 말 것!
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
  1. 모든 Variable/Connection 리스트하기
    Connection의 경우 비밀번호는 반환하지 않는다. 또한, docker-compose를 통해 등록한 환경 변수들은 보이지 않는다.
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/connections
  1. 모든 config 리스트하기
    airflow.cfg 내용을 모두 표출해주는 API이기 때문에 환경설정을 하나 더 해줘야한다.
    airflow.cfg에서 webserver 섹션을 찾아서 expose_config = True로 수정한다. 이 방법은 아주아주 보안에 취약하므로 되도록 사용 X
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config

Variables, Connections를 CLI로 내보내기/불러오기

이 정보들은 날라가면 안되는 정보들이기 때문에 주기적으로 Meta DB를 백업해주던지 CLI를 통해 만들어진 파일을 백업한다.

$ airflow variables export variables.json
$ airflow variables import variables.json

$ airflow connections export connections.json
$ airflow connections import connections.json

Python으로 API 부르기

이번 숙제가 Python을 사용하여 활성화된 dag만 확인할 수 있는 코드를 짜는 것이라서 내 TIL에 올려본다,

import requests
from requests.auth import HTTPBasicAuth

url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth('airflow', 'airflow'))
dags = dags.json()['dags']

for dag in dags:
    if not dag['is_paused']:
        print(dag['dag_id'])

느낀 점😊

airflow와 slack을 연결하는 것은 과정 하나하나가 너무 신기했다. 이렇게 세상이 좋아졌다니! 하면서 박수를 혼자 짝짝 쳐대며 실습을 했고, 찾아보니 on_success_callback 함수도 적용할 수 있는 것 같다. 넘겨지는 context에 어떤 내용이 들어가는지 까보고 적용해봐야겠다.

구글 스프레드와 연동하는 것은 목차에서 보고 '엥? 필요한 기술이라고?' 라는 생각을 했다. 하지만 교육을 모두 듣고 나서는 이게 굉장히 유용한 기술이라는 것을 알게됐다. 비개발자들과 개발자들의 경계를 넘나드는 엑셀... 정말 누가 만들었는지 모르겠지만 이 분은 이렇게 잘 쓰는 우리들을 보며 행복하시지 않을까? 생각이 든다.

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글