Credentials
선택+CREATE CREDENTIALS
를 선택 후 Service Account
선택Service Account
선택Keys
메뉴 선택Private Key
생성 화면에서 JSON
을 선택하고 해당 파일을 다운로드Variable
로 등록 (실습에서는 google_sheet_access_token
으로 설정)Share
를 통해 Service Account
이메일 공유. (iam.gserviceaccount.com
이 포함된 이메일)Admin
-> Connection
conn id
: aws_conn_id
conn type
: S3나 AMAZON WEB SERVICE나 GENERIC 선택Extra
: `{ "region_name": "ap-northeast-2"}CREATE
해 준다.spreadsheet_copy_testing
로 생성한다. (실습 기준)CREATE TABLE SSONG_JI_HY.SPREADSHEET_COPY_TESTING(
COL1 INT
, COL2 INT
, COL3 INT
, COL4 INT
)
import gspread
를 통해 구글 스프레드 시트와 연동하기 위해 사용된다.DATA_DIR
밑에 생성하고 이를 통해 구글 시트를 조작할 수 있는 OBJECT를 생성해 준다.
sheets = [
{
"url": "https://docs.google.com/spreadsheets/d/1hW-_16OqgctX-_lXBa0VSmQAs98uUnmfOqvDYYjuE50/",
"tab": "SheetToRedshift",
"schema": "ssong_ji_hy",
"table": "spreadsheet_copy_testing"
}
]
for sheet in sheets:
download_tab_in_gsheet = PythonOperator(
task_id = 'download_{}_in_gsheet'.format(sheet["table"]),
python_callable = download_tab_in_gsheet,
params = sheet,
dag = dag)
s3_key = sheet["schema"] + "_" + sheet["table"]
copy_to_s3 = PythonOperator(
task_id = 'copy_{}_to_s3'.format(sheet["table"]),
python_callable = copy_to_s3,
params = {
"table": sheet["table"],
"s3_key": s3_key
},
dag = dag)
run_copy_sql = S3ToRedshiftOperator(
task_id = 'run_copy_sql_{}'.format(sheet["table"]),
s3_bucket = "grepp-data-engineering",
s3_key = s3_key,
schema = sheet["schema"],
table = sheet["table"],
copy_options=['csv', 'IGNOREHEADER 1'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = 'aws_conn_id',
dag = dag
)
SELECT * FROM analytics.nps_summary
쿼리문을 update_gsheet에 넘겨 준다.gsheet.py
로 넘겨 주어 구글 시트에 연동될 수 있도록 처리해 준다.from airflow import DAG
from airflow.operators.python import PythonOperator
from plugins import gsheet
from datetime import datetime
def update_gsheet(**context):
sql = context["params"]["sql"]
sheetfilename = context["params"]["sheetfilename"]
sheetgid = context["params"]["sheetgid"]
gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")
with DAG(
dag_id = 'SQL_to_Sheet',
start_date = datetime(2022,6,18),
catchup=False,
tags=['example'],
schedule = '@once'
) as dag:
sheet_update = PythonOperator(
dag=dag,
task_id='update_sql_to_sheet1',
python_callable=update_gsheet,
params = {
"sql": "SELECT * FROM analytics.nps_summary",
"sheetfilename": "spreadsheet-copy-testing",
"sheetgid": "RedshiftToSheet"
}
)
PostgreHook
을 사용해 postgres
환경과 연동한다.pandas
를 통해 SELECT 한 내용을 DataFrame으로 생성하고 이를 스프레드 시트(worksheet)로 출력한다.def update_sheet(filename, sheetname, sql, conn_id):
client = get_gsheet_client()
hook = PostgresHook(postgres_conn_id=conn_id)
sh = client.open(filename)
df = hook.get_pandas_df(sql)
print(sh.worksheets())
sh.worksheet(sheetname).clear()
add_df_to_sheet_in_bulk(sh, sheetname, df.fillna(''))
airflow.cfg
의 API Section에서 auth_backend 값을 변경한다. docker-compose.yaml
에는 이미 설정되어 있다. (environments 밑의 `AIRFLOW__API_AUTH_BACKENDS: ``)AIRFLOW__Section__KEY
는 AIRFLOW
의 Section에서 KEY에 해당하는 내용을 오버라이딩 하는 것이다. 복수형인 경우는 KEY에 S
를 붙여 주며 ,
를 통해 이어 준다.docker exec -it scheduler_container_name airflow config get-value api auth_backend
해당 명령을 실행해 보면 위에서 설정한 값이 나온다.airflow
는 admin
계정이기 때문에 따로 유저 계정을 만드는 것이 좋다.)/health
API 호출metadatabase
의 status
와 scheduler
의 status
를 확인할 수 있다.특정 DAG를 API로 Trigger
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
모든 DAG 리스트
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
is_active
가 True인데 is_paused
가 True인 경우가 존재한다. is_active
는 코드가 DAGs 폴더에 존재한다면 True이고 is_paused
는 아직 활성화가 되어 있지 않은 경우 True이다.stats
가 queued
면 실행이 됐다는 뜻이다.DAGS를 조회할 수 있는 Python 코드
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))
print(dags.text) #dags.json
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
total_entries
만큼의 Variable이 존재한다.curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
admin
계정으로 액세스해도 기본적으로 막혀 있기 때문에 403 Forbidden
오류가 발생한다.AIRFLOW
Section Key를 docker-compose.yaml
에서 오버라이딩 해 주어야 한다.airflow variables export variables.json
airflow variables import variables.json
airflow connections export connections.json
airflow connections import connections.json
📌 과제
- 활성화된 DAGS만 출력해 주는 Python 코드를 작성
-is_paused
가 False인 값만 ID 출력 (GITHUB로 PR)- config API의 경우 Access가 거부당했는데 이를 컨트롤 해 주는 KEY가 무엇인지와 그 키를 docker-compose.yaml에 어떻게 적용해야 하는지 (슬랙 DM 제출)
- connections API와 variables API는 환경 변수도 리턴하는지 아닌지 (슬랙 DM 제출)