DevCourse TIL Day3 Week12 - Airflow & slack 연동

김태준·2023년 6월 24일
0

Data Enginnering DevCourse

목록 보기
53/93
post-thumbnail

k8s에 이어 Airflow 고급 기능들에 대해 학습하고자 한다.
이어서 ELT 과정에서 핵심이 되는 dbt, data catalog, discovery등을 학습할 예정.

✅ airflow 환경설정

🎈 docker-compose.yaml 수정

_PIP_ADDITIONAL_REQUIREMENTS 수정
data 폴더를 호스트 폴더에서 생성하고 볼륨으로 공유하기.
-> 이를 docker volume으로 지정해 추후 디버킹에 활용

기존 docker-compose.yaml에 아래 코드 추가

environment:
 AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
 _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- yfinance pandas numpy oauth2client gspread}
volumes:
 ...
 - $ {AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
airflow-init:
 ...
 mkdir -p /sources/logs /sources/dags /sources/plugins /sources/data
 chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,data}

🎈 Tip

  1. 기타 환경설정값들(variables, connections 등 yaml파일 아래에서 관리 및 배포

-> 아래코드로 수정

environment:
 &airflow-common-env
 AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
 AIRFLOW_CONN_TEST_ID: test_connection
  1. IMAGE로 관리하는 부분과 yaml에서 무엇을 관리할 지 Partition 필요.
  2. Dag 작성 시 IMAGE로 Dag코드 복사해 만드는 것이 더 깔끔. (또는 docker-compose에서 host volume형태로 설정)
  3. airflow내 dag 스캔 시 dags_folder 폴더 내 서브폴더들까지 다 실행되므로 의도적으로 무시해야 하는 DAG_FOLDER 내 디렉토리, 파일을 airflowignore로 지정해 줄 필요 있음.

✅ airflow 리눅스 환경에서 실행

이전까지 진행해왔던 거지만, 한번 더 복습차원에서 진행.

  1. github repo를 다운 받아 원하는 폴더에서 yml 파일 다운 진행하기
    -> git clone https://github.com/TAEJUN1293/Programmers_Dag.git
    -> 해당 폴더로 이동 후 curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
  2. ls -tl docker-compose.test.yaml 확인 후 vi docker-docmpose.test.yaml로 yml 파일 확인
    2-1. 커서 위치에서 i눌러 수정모드 변경 후 esc키 누르고 : 입력 후 wq입력, enter 치면 변경내용 저장
  3. docker compose -f docker-compose.test.yaml up -d 로 dettach 실행.
    -> 이때 docker-compose.test.yaml은 기존 yaml파일에서 앞선 환경변수 변경해준 것.
  4. docker exec -it dags-airflow-scheduler-1 airflow variables get DATA_DIR

-> 항상 yaml up이전 실행. echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
-> 윈도우에선 어쩔 수 없다...

✅ Airflow + Redshift 구현

구현에 앞서 git repo 정리하기. (dag copy)
SQL문 중 CTAS 구문을 Dag로 만들어보기
< mau_summary 예제 >

{
          'table': 'mau_summary',
          'schema': 'taejun3305',
          'main_sql': """
SELECT LEFT(created_at, 7) AS Month,
  ROUND(SUM(CASE
    WHEN score >= 9 THEN 1 
    WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2)
FROM raw_data.nps
GROUP BY 1
ORDER BY 1;""",
          'input_check':
          [
            {
              'sql': 'SELECT COUNT(1) FROM raw_data.nps',
              'count': 150000
            },
          ],
          'output_check':
          [
            {
              'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}',
              'count': 12
            }
          ],
}

✅ dag - slack connection

dag 실행 중 에러 발생 시 슬랙 연동 위해 아래 작업 실행
from plugins import slack
default_args= {
'on_failure_callback': slack.on_failure_callback,
}

  1. slack에서 채널 임의로 생성
  2. https://api.slack.com/messaging/webhooks 사이트 들어가서 create app 이후 Incoming Webhooks App 생성 - 1에서 만든 채널과 연결
  3. curl 다운 받아 리눅스 터미널로 이동해 아래 코드 paste 후 실행
    -> curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!!!!!"}' https://hooks.slack.com/services/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
    -> 리눅스 창에서는 문제 X
    결과는 다음과 같음. (dic 형태 내 text 안에 내용 출력)
  4. airflow 서버 접속 후 admin-variable slack_url로 key 지정 후 Val에는 T04T8V1DKG9/B05DW6SCZK6/yyxfWl8F4L1uHabs2QgD7NfC 해당 코드 입력 후 save
  5. redshift connection 확인

흠... 연동 과정에서 다음과 같은 에러가 계속해서 발생한다..

AWS access KEY, S3 connection 등은 3번 이상 확인했고 문제가 없었지만 Max retries exceeded with url인 것으로 보아 주어진 csv_url로 정해진 횟수 이상 처리가 되어 작동을 멈춘 것으로 보인다..

해결 필요해 보임.

profile
To be a DataScientist

0개의 댓글