default_timezone
, default_ui_timezone
이 있다.start_date (시작일)
, end_date(종료일)
, schedule
- default_timezone에 지정된 Timezone을 따른다.execution_date
와 로그 시간
UTC
를 따르게 되어 있음UTC
를 일관되게 사용하는 것이 변환이 필요 없어서 선호됨dag_dir_list_interval
의 설정 타임에 따라 주기적으로 스캔되고, default로는 5 분에 한 번씩 스캔된다.Primary Key
를 모두 리스트에 올려 놓고 Look up을 해 주어야 한다.Primary Key
를 보장해 주어야 하는 것은 ETL을 할 때는 데이터 엔지니어가, ELT를 할 때는 데이터 분석가가 작업해 주어야 한다.CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
해당 테이블을 예시로 설명을 한다면 Full Refresh
가 아닌 Incremental Update
로 구현을 하게 된다면 Primary key Uniqueness
를 보장할 수 있다.
이때 created_date
필드가 필요해진다.
default GETDATE()
를 통해 적재된 날짜가 저장되게 되는데 해당 테이블은 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있는 정보이고 신뢰도에 따라 중복 데이터가 발생하였을 때 더 중요한 데이터를 적재해 줄 수 있다.
즉, created_date
가 더 최근인 정보를 적재하는 것이다.
그렇다면 created_date
가 더 최근인 정보임을 어떻게 비교하고 적재할 수 있을까?
ROW_NUMBER()
를 사용해 준다.CREATED_DATE
가 큰 것(더 최근의 것)부터 정렬될 수 있도록 ORDER BY
를 해 준다.DATE
로 해서 중복이 발생하지 않도록 해 준다.ROW_NUMBER() OVER(PARTITION BY DATE ORDER BY CREATED_DATE DESC) SEQ
Primary Key
를 기준으로 존재하는 레코드라면 새 정보로 수정한다.backfill
이 필요하게 되는데 airflow
는 backfill
이 쉽게 된다.backfill
은 Incremental Update
를 할 때만 사용할 수 있다.Incremental Update
는 효율성이 좋지만 유지보수와 운영의 난이도가 올라간다.execution_date
이 지정되어 있는데 airflow에서 execution_date
를 읽어온 후 이 날짜를 바탕으로 데이터 갱신이 되도록 코드를 작성해 준다. backfill
이 쉬워진다.start_date
값으로 설정해 준다.start_date
는 처음 읽어와야 하는 날짜라고 생각하면 된다. data의 start_date가 된다. 만약 @daily DAG라면 2020-11-08 데이터를 읽어와야 한다면 2020-11-07로 설정해 주어야 다음 날 DAG가 처음 실행된다. (주기에 따라 지정해 준 날짜를 기점으로 다음에 실행되기 때문)<내 생각>
catchup이 True라는 것은 실행되지 않은 이전 시간에 대한 작업이 실행된다는 것이고, 시작일자를 2020-08-10 02:00:00로 두었다면 그 다음 일자인 2020-08-11 02:00:00부터 작업이 실행될 것이다. 현재 시간은 2020-08-13 20:00:00이기 때문에 13 일의 작업까지 실행되어야 맞다.
그렇다면,
<풀이>
📌 과제 - airflow.cfg (풀이)
1. DAGs 폴더는 어디에 지정되는가?
- Airflow가 설치된 디렉토리 밑의 dags가 폴더가 되며 dags_folder 키에 저장된다.
2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?- dag_dir_list_interval이 키가 되고, 기본 값은 300 (초) 즉, 5 분이다.
3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야 하는가?- api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경해야 한다. airflow.api.auth.backend.basic_auth란 아이디와 패스워드로 인증하는 방법이다.
4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까?- 이 부분은 이해를 잘못했었는데 진짜로 암호화를 하는 게 아니라 variable에 값을 *로 표시되게 만들어 주는 방법을 찾는 게 해당 문제였다.
password, secret, passwd, autorization, api_key, apikey, access_token과 같은 단어를 써 주어야 한다.
5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?- sudo systemctl restart airflow-webserver 혹은 sudo systemctl restart airflow-scheduler 같은 명령으로 재실행을 해 준다.
6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?- fernet key를 사용해 준다.
📌 과제 - UpdateSymbol_v2의 Incremental Update 방식을 수정하기
- ROW_NUMBER 방식을 사용해서 Primary Key가 동일한 레코드들을 처리하도록 수정