어제는 slack 채널과의 연동으로 에러 사항을 출력해냈다면 오늘은 구글시트와 연동 학습 진행!
구글 시트를 테이블로 복사하는 예시
CREATE TABLE schema.table ( col1 int, col2 int, );
방법은 다음과 같다.
1. 시트 API 활성화 후 어카운트 생성, key JSON파일로 만들고 다운로드
1-1. airflow 서버에서 variables로 json 저장
2. Account에서 생성해준 이메일 조작해 시트에 공유
3. Dag에서 해당 json 파일 인증 후 시트 조작
Airflow API 활성화하는 법.
1. 터미널 환경-> airflow.cfg의 api 섹션에서 auth_backend값을 airflow.api.auth.backend.basic_auth로 setup
-> yaml파일에서 AIRFLOWAPIAUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
=> 아래 명령으로 체크
$ docker exec -it SCHEDULER_ID airflow config get-value api auth_backend
결과 : airflow.api.auth.backend.basic_auth,airflow.api.auth.backend_session
- airflow web UI에서 체크
Security -> List Users에서 사용자 정보 추가
- /health API 호출
-> curl -X GET --user "airflow:airflow" http://localhost:8080/health결과
{ "metadatabase": { "status": "healthy" }, "scheduler": { "status": "healthy", "latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00" } }
🎈 - Airflow API로 외부에서 Airflow 조작
- 예시 ( 특정 DAG를 API로 트리거 (실행하기) )
- curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
- 모든 dag 리스트하기
- curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
- 모든 variable 리스트하기
- curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
- 모든 config 리스트하기
- curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
+) connections도 실행 가능variables/connections Import, Export
$ docker exec -it CONTAINERID airflow variables export variables.json
$ docker exec -it CONTAINERID airflow variables import variables.json
$ docker exec -it CONTAINERID airflow connections export connections.json
$ docker exec -it CONTAINERID airflow connections import connections.json
$ curl -X GET --user "airflow:airflow" http://localhost:8080/health
< 파이썬으로 requests.get에 Basic_auth 설정하기 - 활성화된 Dag에 한해 >
import requests
# Airflow 웹 서버의 URL과 인증 정보
airflow_url = 'http://airflow-url'
username = 'name'
password = 'password'
# Airflow API 엔드포인트
api_endpoint = f'{airflow_url}/api/v1/dags'
# 인증 정보를 담은 객체 생성
auth = requests.auth.HTTPBasicAuth(username, password)
# GET 요청 보내기
response = requests.get(api_endpoint, auth=auth)
# 응답 처리
if response.status_code == 200:
# 성공적인 응답인 경우
data = response.json()
active_dags = [dag for dag in data['dags'] if dag['is_active']]
for dag in active_dags:
print('DAG ID:', dag['dag_id'])
print('Description:', dag['description'])
print('Schedule Interval:', dag['schedule_interval'])
print('---')
else:
# 에러 응답인 경우
print('에러 발생. 상태 코드:', response.status_code)
print('에러 메시지:', response.text)
- config API 막혀있는 것 해결 ?
-> yaml파일 내 아래 코드 확인environment: AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'
이후 docker compose로 재시작해 config API(8080/api/v1/config) 호출해 결과 확인
- variables API 환경변수 지정도 리턴 ?
variables/connections API는 환경변수로 지정되었다면 리턴 X