DevCourse TIL Day4 Week12 - Airflow - 구글시트 연동, 기타 기능

김태준·2023년 6월 24일
0

Data Enginnering DevCourse

목록 보기
54/93
post-thumbnail

어제는 slack 채널과의 연동으로 에러 사항을 출력해냈다면 오늘은 구글시트와 연동 학습 진행!

✅ gsheet & airflow

구글 시트를 테이블로 복사하는 예시

CREATE TABLE
	schema.table (
    	col1 int, 
        col2 int,
    );

방법은 다음과 같다.
1. 시트 API 활성화 후 어카운트 생성, key JSON파일로 만들고 다운로드
1-1. airflow 서버에서 variables로 json 저장
2. Account에서 생성해준 이메일 조작해 시트에 공유
3. Dag에서 해당 json 파일 인증 후 시트 조작

✅ API & Airflow 모니터링

Airflow API 활성화하는 법.
1. 터미널 환경

-> airflow.cfg의 api 섹션에서 auth_backend값을 airflow.api.auth.backend.basic_auth로 setup
-> yaml파일에서 AIRFLOWAPIAUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
=> 아래 명령으로 체크
$ docker exec -it SCHEDULER_ID airflow config get-value api auth_backend
결과 : airflow.api.auth.backend.basic_auth,airflow.api.auth.backend_session

  1. airflow web UI에서 체크
    Security -> List Users에서 사용자 정보 추가

🎈 - Airflow health check

  1. /health API 호출
    -> curl -X GET --user "airflow:airflow" http://localhost:8080/health

결과

{
 "metadatabase": {
 "status": "healthy"
 },
 "scheduler": {
 "status": "healthy",
 "latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00"
 }
}

🎈 - Airflow API로 외부에서 Airflow 조작

  1. 예시 ( 특정 DAG를 API로 트리거 (실행하기) )
  1. 모든 dag 리스트하기
  1. 모든 variable 리스트하기
  1. 모든 config 리스트하기

variables/connections Import, Export

$ docker exec -it CONTAINERID airflow variables export variables.json
$ docker exec -it CONTAINERID airflow variables import variables.json
$ docker exec -it CONTAINERID airflow connections export connections.json
$ docker exec -it CONTAINERID airflow connections import connections.json

✅ Airflow & API 모니터링

$ curl -X GET --user "airflow:airflow" http://localhost:8080/health

< 파이썬으로 requests.get에 Basic_auth 설정하기 - 활성화된 Dag에 한해 >

import requests

# Airflow 웹 서버의 URL과 인증 정보
airflow_url = 'http://airflow-url'
username = 'name'
password = 'password'

# Airflow API 엔드포인트
api_endpoint = f'{airflow_url}/api/v1/dags'

# 인증 정보를 담은 객체 생성
auth = requests.auth.HTTPBasicAuth(username, password)

# GET 요청 보내기
response = requests.get(api_endpoint, auth=auth)

# 응답 처리
if response.status_code == 200:
    # 성공적인 응답인 경우
    data = response.json()
    active_dags = [dag for dag in data['dags'] if dag['is_active']]
    for dag in active_dags:
        print('DAG ID:', dag['dag_id'])
        print('Description:', dag['description'])
        print('Schedule Interval:', dag['schedule_interval'])
        print('---')
else:
    # 에러 응답인 경우
    print('에러 발생. 상태 코드:', response.status_code)
    print('에러 메시지:', response.text)
  • config API 막혀있는 것 해결 ?
    -> yaml파일 내 아래 코드 확인
environment:
	AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'

이후 docker compose로 재시작해 config API(8080/api/v1/config) 호출해 결과 확인

  • variables API 환경변수 지정도 리턴 ?

variables/connections API는 환경변수로 지정되었다면 리턴 X

profile
To be a DataScientist

0개의 댓글