- JIRA - Slack 연동하여 협업 및 각자 진행사항 정리 (to do list)
dag 작성 시 요령.
- Task 순서도 작성
- library check
오늘 학습할 내용은 어제에 이어 GCS -> Bigquery 과정 Dag화이다.
어제 작성한 dag에서 특정 라이브러리 호출에 있어 문제가 존재해 이를 해결하고자 한다.
어제 발생한 error는 BigQueryCheckOperator를 호출할 때 발생한 것으로, BigQueryCheckOperator란, 빅쿼리 작업 결과를 확인하고자 사용하는 연산자로 특정 쿼리 결과 확인, 특정 조건 검사 기능을 통해 작업 성공여부를 확인할 수 있다.dag 작성에 있어 사용될 태스크들을 정의하기 전 주어진 상황을 살펴보면 다음과 같다.
1-1. GCS에 적재되는 데이터 소스 (3개) - 2개는 scheduling되어 적재 중.
1-2. 데이터 소스 1개는 스키마 수정 빈도 상당히 낮기에 1차 검증 후 바로 DWH로 적재
2. Bigquery로 적재할 table 1개
- 데이터 이동 간 validation 처리? (스키마 변경, 수정 등)
-> Raw data가 적재된 GCS에서 Bigquery로 이동하는 과정에서 staging이라는 테이블을 만들어
각 이동과정 별 validation 작업 진행- 만일 서버 과부화로 인해 다운되는 경우 이중화 작업 어떻게 처리?
-> failover 기능 구현 (추가 학습 필요)
- start pipeline (GCS -> Bigquery 작업 시작을 알리는 태스크)
- scheduling 되어 적재 중인 데이터에 한해 load 태스크 후 check task (loading 되었나)
- 체킹된 데이터들(check, 데이터소스 1개)에 한해 loaded data to staging task 처리
- 1차 검증 이후 2차 검증 진행 및 DWH적재를 위해 create & check 작업 진행.(2개 데이터 소스에 한해)
- 1차 검증과 2차 검증 비교하여 값이 다른 경우 dwh가 아닌 staging에 그대로 적재.
- GCS_to_Bigquery Dag 작성 완료.
- 각 태스크 별 성공, 실패 작업 slack 알람 및 default_args로 email 알람 기능 처리
- staging테이블 생성하여 모든 데이터들은 중간 단계를 거쳐 validation 진행
- external로 이동하는 경우에서 create 태스크, check task에 한해 sql 쿼리 작성 필요.
-> 어떤 db 연동하여 사용할 지 판단 중요. (MySQL or Postgres)- Dag 고도화 작업.
- failover 기능 여부 판단
-> 학습 필요