💡 Windows기반 Docker 사용 시 airflow.cfg 파일 접근, 수정 방법
1. Docker GUI 실행
2. docker-webserver 컨테이너 클릭
3. Files 탭 클릭
4. opt > airflow > airflow.cfg
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /opt/airflow/dags
DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가? : 주기적으로 스캔하게 되며 dags 폴더와 동일하게 core 섹션의 scheduler_dag_dir_list_interval 키가 스캔 주기를 결정한다. 300이면 5분, 5분에 한 번씩 "core 섹션의 dags_folder가 가리키는 디렉토리를 스캔한다. 또한 서브 디렉토리까지도" -> 파이썬 파일들을 모두 실행해보면서 수행한다.
이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가? : api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경 -> airflow 로그인할 때 사용했던 정보를 이용해서 api를 외부에서 호출하고 특정 대그를 실행하거나 어떤 대그들이 있는지 리스트화하는 것들이 가능해진다.
Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? : password, secret, passwd, authorization, api_key, apikey, access_token -> 여기서 한 단계 더 나아가면 prefix의 형태로 변수 이름 앞에 특정 시퀀스가 있다면, 변수의 값을 엔크립션해서 저장하는 것까지 되기도 한다.
이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은? : 스케줄러와 웹서버를 재시작해야함. (도커 컴포즈를 사용하는 환경이라면, airflow.cfg의 내용이 보관이 되지 않기 때문에 원래 값으로 돌아가게 된다. docker-compose.yaml 내부에 변경하고 싶은 내용을 지정해야 변경이 유지된다. compose down -> up 하면 변경사항이 유지된다.) Docker라면 docker-compose.yaml 파일이 변경되고 docker compose down과 docker compose up을 차례로 실행
Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가? : fernet_key 값을 세팅해주면 암호화되어 저장된다. (airflow는 자기 정보를 postgres에 적는다. 이를 보통 메타 데이터 DB라고 부른다. 이 내용이 외부에 노출되면 보안상 이슈 발생) 설정 안하면 plain text로 저장된다. 단, fernet key를 잃어버리면 DB 세팅 불가
# 웹서버로 진입
docker exec -it airflow-setup-airflow-webserver-1 sh
# 현재 경로 확인
pwd
'''/opt/airflow'''
# 서브 폴더 및 파일 보기
ls -tl
'''airflow.cfg 파일이 있는 것을 확인할 수 있다.'''
# airflow.cfg 확인
cat airflow.cfg | grep dags
업데이트되는 디렉토리는 /opt/airflow/dags이다. dags 폴더 내부로 진입해본다. 또한, 이 디렉토리는 로컬 컴퓨터의 특정 폴더와 매칭(싱크)이 되어있다. 로컬 컴퓨터의 터미널에서 수정을 하면 바로 반영이 된다.
그러나, airflow.cfg는 바로 반영되도록 설정되어 있지 않은 상태이다. 따라서, cd .. 을 통해 dags 폴더를 나오고 이곳에 있는 airflow.cfg는 수정하고 나가서 도커 컴포즈를 다운했다가 업한다고 해서 변경 내용이 반영되지 않는다.
따라서, exit을 통해 host system으로 돌아오고 docker-compose.yaml 파일을 열어보면 environment 부분을 볼 수 있다.
AIRFLOW -> airflow.cfg의 내용을 덮어쓰라는 의미
AIRFLOWCORE__FERNET_KEY : ' ' -> aiflow.cfg의 core 섹션에 fernet key의 값을 ''로 세팅하라. 값이 없으므로 암호화를 안 한 것
따라서, docker-compose로 airflow의 내용을 변경하고 싶을 때 yaml 파일을 변경하면 된다.
docker-compose를 통해 실행한 airflow는 api가 이미 활성화되어 있다. 따라서, airflow와 관계된 사용자 ip, pd만 알면 api를 호출할 수 있다.
즉, airflow.cfg의 내용을 docker compose로 실행하는 airflow의 경우 airflow.cfg의 내용을 직접 건드리는 것이 아니라 docker-compose.yaml 파일의 environment 밑에 키 입력 패턴을 따라 작성해주면 된다.
AIRFLOW(언더바 2개)섹션이름(언더바 2개)키이름: 키의 값
위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
무료 계정으로 api key를 받아서 이를 호출시에 사용
API Key를 open_weather_api_key라는 Variable로 저장
변수명에 api_key라는 단어가 포함되었기 때문에 암호화된 것처럼 보이지만, 실제로는 fernet key 설정을 안 했기 때문에 내용 자체는 암호화되지 않은 상태로 DB에 저장됨
서울의 위도와 경도를 찾을 것
# API 엔드포인트 형태
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={API
key}&units=metric
{lat} : 위도
{lon} : 경도
units=metric : 섭씨 사용
API 호출에 대한 응답 형태
일별, 시간별, 분별 날씨 정보 등, dt는 리눅스에서 시간을 나타내는 방식. 1970년 1월 1일 이후로 몇 초가 지났는지 나타내준다. dt는 어떤 날인지. 그 날 태양, 달이 언제 뜨고 지는지. 달의 모양이 어떤지, temp아래에 그 날의 온도 정보. 유료 api를 사용하면 과거, 8일 이후 등 더 광범위한 날씨 정보를 불러올 수 있다.
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
GETDATE()는 레코드가 생성될 때 시간을 넣는다는 것
해당 DAG를 full refresh로 생성한 뒤에 incremental update로 변경해볼 것이다. GETDATE 정보는 full refresh에서는 쓰이지 않지만 incremental update시 사용된다.
One-Call API는 결과를 JSON 형태로 리턴해줌
결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어감 있음
epoch로 주어진 값이 d["dt"] 와 같은 딕셔너리 아이템에 있다면 사람이 읽을 수 있는 형태로 변환하고 싶을 때 위의 코드를 사용한다.
Airflow Connections를 통해 만들어진 Redshift connection
두 가지 방식의 Full Refresh 구현 방식
API Key는 어디에 저장해야할까? -> airflow > variables > 키 생성
Full Refresh : 매번 테이블을 지우고 다시 빌드
DW상의 테이블은 아래처럼 정의
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float, -- 낮 온도
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key") #airflow variables에 저장되었던 키를 읽어와서
# 서울의 위도/경도 설정 (하드코딩)
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api ->API URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts" #daily 정보만 필요하기 때문에 안 쓰는 정보들은 exclude로 제외
response = requests.get(url)
data = json.loads(response.text) #response.json()과 동일
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]: #data["daily"]에 8일치의 날씨 정보가 들어있다.
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table}; #full refresh 형태
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret) #정보 insert
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("keeyong", "weather_forecast")
task가 한 개, task명은 etl이다.
하드코딩된 서울의 위도, 경도 부분을 etl 함수의 파라미터로 전달하는 방식으로 수정 추천
오늘 DAG를 실행하면 내일부터 8일간의 레코드를 불러온다. 내일 또 실행하면 다음 날부터 8일간의 레코드를 불러온다. -> 7일 정보가 중복될 것이다. 겹치는 날짜는 언제 불러온 것을 우선시 할 것인가? 일기예보는 미래의 날짜일수록 더 정확할 것이다. 따라서, 우선 순위를 정할 때 created_date timestamp default GETDATE() 정보를 활용한다.
-> ROW_NUMBER 문법 사용
보통 데이터 웨어하우스가 지정해주는 업데이트 방법이 있다.(Upsert) 이것을 사용하는 것이 가장 좋지만, 직접 한다면 ROW_NUMBER 사용
CREATE TABLE products (
product_id INT PRIMARY KEY,
name VARCHAR(50),
price decimal(7, 2)
);
CREATE TABLE orders (
order_id INT,
product_id INT,
PRIMARY KEY (order_id, product_id),
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
데이터 웨어하우스가 프라이머리 키 유니크를 보장하지 않는다. 기본키 컬럼이 유니크함을 보장하는 것은 개발자의 책임.
CREATE TABLE keeyong.test (
date date primary key,
value bigint
);
Primary Key Uniqueness 보장하기
INSERT INTO keeyong.test VALUES ('2023-05-10', 100);
INSERT INTO keeyong.test VALUES ('2023-05-10', 150); -- 이 작업이
성공함!
CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
Primary Key Uniqueness 보장하기
💡 Primary Key 보장 방법
- 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
- 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사 -> 이 때 중복 존재 가능
- 중복을 걸러주는 SQL 작성:
- 최신 레코드를 우선 순위로 선택
- ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering(역순 DESC)을 수행해 primary key별로 하나의 레코드를 잡아냄
- 위의 SQL을 바탕으로 최종 원본 테이블로 복사
- 이때 원본 테이블에서 레코드들을 삭제
- 임시 temp 테이블을 원본 테이블로 복사 (일련번호가 1번인 것들만 선택)
💡 Primary Key 보장 방법 코드
# 1. 원래 테이블의 내용을 임시 테이블 t로 복사 CREATE TEMP TABLE t AS SELECT * FROM keeyong.weather_forecast; # 2. DAG는 임시 테이블(스테이징 테이블)에 레코드를 추가. 이때 중복 데이터가 들어갈 수 있음 # 3. 원본 테이블 내용 삭제 DELETE FROM keeyong.weather_forecast; # 4. 중복을 없앤 형태로 새로운 테이블 생성 INSERT INTO keeyong.weather_forecast SELECT date, temp, min_temp, max_temp, created_date FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM t ) WHERE seq = 1; # 일련번호가 1인 레코드들만 원본 테이블로 복사한다.
위의 코드는 매번 새로 덮어쓰는 형식의 업데이트를 가정한다.
매일 날짜가 하나씩 늘어가는 형태로 레코드가 유지될 것이다.
3번과 4번 과정은 Transaction으로 묶여야 한다. 그렇지 않으면 delete하고 커밋된 것을 사용자가 엑세스하면 내용이 없을 수 있는 등 문제 발생 (autocommit 이 True인 경우)
(autocommit이 false인 경우도) delete와 insert가 끝난 후 커밋을 해야 한다. delete만 하고 커밋하면 다른 사용자가 접근했을 때 레코드가 아무것도 보이지 않게 된다. insert 하다가 에러가 발생해도 delete된 상태가 이미 커밋되었기 때문에 문제가 있을 것이다. autocommit이 True, False에 상관없이 3번과 4번은 트랜잭션으로 묶여야 한다.
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table, lat, lon, api_key):
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = [] #API 호출 결과 저장
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
# 원본 테이블이 없다면 생성
create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
logging.info(create_table_sql)
# 임시 테이블 생성
create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};""" #TEMP 테이블 생성 (원본 테이블의 내용을 읽어와서 CTAS 함)
logging.info(create_t_sql)
try:
cur.execute(create_table_sql)
cur.execute(create_t_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 임시 테이블 데이터 입력
insert_sql = f"INSERT INTO t VALUES " + ",".join(ret) #8일치 날씨 데이터가 저장된 ret리스트를 temp테이블에 insert 한다.
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 기존 테이블 대체
alter_sql = f"""DELETE FROM {schema}.{table}; #원본 테이블의 레코드를 다 날리고
INSERT INTO {schema}.{table} #원본 테이블에 temp 테이블의 내용을 insert한다.
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;""" #일련번호가 1번인 것만 읽어서, 날짜 낮 온도, 최저 최고 온도 저장 -> primary key uniqueness를 직접 보장하는 방법
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
with DAG(
dag_id = 'Weather_to_Redshift_v2',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 4 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("keeyong", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))
ROW_NUMBER로 일련번호가 1번인 것만 읽어오는 Primary Key Uniqueness를 직접 보장하는 방법이 비효율적으로 보일 수 있지만, 이것을 효율적으로 처리할 수 있게 만들어진 것이 데이터 웨엏우스이다. 각 데이터 웨어하우스에서 지정하는 방식(Upsert)은 6주차에 진행
Redshift는 copy sql을 쓸 때 지원해준다.
Bigquery, Snowflake 등 빅데이터 기반 데이터 웨어하우스는 PK Uniqueness를 보장하지 않지만, 전부 Upsert 문법이 존재한다.
Upsert는 레코드 단위로 동작하지 않고 bulk단위로 동작한다. 즉, 레코드 단위로 insert 될 때 동작하지 않는다. S3와 같은 클라우드 데이터 스토리지에 적재하고 싶은 레코드를 파일 형태로 로딩해놓고 파일에서 한큐에 테이블로 bulk insert를 하게 되는데 그때 copy라는 것이 있고, copy를 쓸 때 Upsert를 쓸지 말지 결정할 수 있다.
데이터의 크기가 작다면 오류가 발생한 지점으로 돌아가서 다시 실행할 필요가 없다. 오늘 실행한 것이 오류가 없었다면 (오늘 실행이 이전의 오류가 발생했던 부분까지 포함하고 있기 때문에)
그러나 incremental update의 경우, 특히 시간 단위로 업데이트하는 경우, 과거의 실패가 구멍이 되어버린다.
Backfill에 일관된 방법이 없다면, 코드를 이해하고 특정 날짜의 레코드를 재실행하는 것이 불가능하게 된다. 이러 부분을 일관되게 만들어주는 것이 Airflow가 가진 큰 강점
start_date과 execution_date을 이해해야 한다.
● 지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d') #어제 날짜 epoch를 날짜 포맷으로 변경
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
위에서 본 코드는 어제의 레코드를 읽어올 뿐 1년치의 데이터를 읽어오지는 못한다.
from datetime import datetime, timedelta
~~y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')~~
yesterday = '2023-01-01' # 이 부분을 365번 바꿔서 실행하던지 루프를 돌리는
걸로 변경
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
여기에서는 하드코딩으로 고쳐서 돌렸지만 실제 현업에서는 코드가 훨씬 복잡하기 때문에 위와 같은 방법을 적용하기 어렵다. 상당한 시간을 써서 봐야 이해가 된다. 개발한 사람이 없거나 바쁠때, 내가 온콜인데 밤에 상황이 발생하면 시간소모
정리하면, 시간을 따지는 것이 아니라 내가 만든 DAG의 실행주기를 Airflow가 이미 알고 있기 때문에 하루에 한 번 실행하는 DAG라면 전날 날짜를 주고, 한 시간에 한 번 도는 것이라면 전 시간의 날짜와 시간을 주도록 Airflow를 만든다.
# 지금 시간 기준으로 어제 날짜를 계산
# 기존 코드
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d') #어제 날짜 epoch를 날짜 포맷으로 변경
# 개선 코드 (python operator의 경우)
yesterday = context["execution_date”]
execution_date로 받아오기 때문에 코드를 단순화할 수 있다.
Airflow에서는 기존 코드 부분이 개선 코드로 바뀐다.
Airflow의 접근방식
운영과 Backfill이 동일한 코드로 가능해진다. 그것이 execution_date. 이것이 가능한 이유는 Airflow가 DAG의 실행주기를 알고, 성공 실패 여부를 알기 때문이다. 이런 정보들이 메타 데이터 디비에 기록이 된다.
from datetime import datetime, timedelta
yesterday = context["execution_date”]
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 4 * * *', # 적당히 조절
2022년 8월 24일 4시 0분에 DAG가 처음 실행될 것으로 생각되지만, 실제로는 2022년 8월 25일 오전 4시 0분에 처음 실행된다. start_date는 DAG가 incremental update를 한다는 가정 하에 처음 읽어와야 하는 데이터의 날짜이다.
즉, 2022년 8월 24일 데이터를 읽어오려면 그 다음날인 2022년 8월 25일에 가능하다.
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '30 * * * *', # 적당히 조절
1시간에 1번씩 4시 30분에 실행되는 DAG(hourly) 2022년 8월 24일 오전 0시 30분에 처음 실행되는 것이 아니라 이 날의 데이터를 읽어오는 것이므로, 2022년 8월 24일 오전 1시 30분에 DAG가 처음 실행될 것이다.
예를 들어 2020년 11월 7일의 데이터부터 매일매일 하루치 데이터를 읽어온다고 가정한다.
이 경우 언제부터 해당 ETL이 동작해야하나? -> 2020년 11월 8일
다르게 이야기하면 2020년 11월 8일날 동작하지만 읽어와야 하는 데이터의 날짜는? -> 2020년 11월 7일: 이것이 start_date이 된다.
Airflow의 start_date은 시작 날짜라기는 보다는 DAG가 처음 읽어와야하는 데이터의 날짜
execution_date은 읽어와야하는 데이터의 날짜로 설정된다.
Airflow의 start_date는
- DAG 관점 X
- DATA 관점 O
DAG가 처음 실행되기를 바라는 날짜와 시간이 아니라, 처음 읽어오고 싶은 데이터의 날짜와 시간이다.
Q : 이 경우 이 job은 몇번 실행될까? (execution_date)
1. 2020-08-10 02:00:00
2. 2020-08-11 02:00:00
3. 2020-08-12 02:00:00
4. 2020-08-13 02:00:00
2, 3, 4번이 실행된다.
2020-08-10의 데이터를 읽어오기 위해 이 날짜를 execution_date로 설정한다.
즉, 08-10(execution_date)을 읽기 위해 08-11 02:00에 실행되고, 08-11(execution_date)을 읽기 위해 08-12 02:00에 실행되고, 08-12(execution_date)를 읽기 위해 08-13 02:00에 실행된다.
각각 그 전날의 날짜와 시간이 execution_date로 들어가게 된다.
위의 사례는 @once를 통해 필요 시 한 번만 실행하면 되었으나, 습관적으로 날짜를 부여해서 daily로 반복되게 했고, 매 실행마다 돈이 부과되었다.
또 다른 문제점은 catchup 파라미터의 몰랐고, start_date와의 관계를 몰랐기 때문이다.
8월 6일을 start_date로 설정했다.
catchup은 디폴트가 True이다. True이면 Airflow에서 DAG를 활성화했을 때, start_date부터 지금까지 실행이 되지 않았던 회차가 연달아 실행된다.
따라서, 한 번 돌면 2000불인 쿼리가 8번 돌게 되었다.
쓴 만큼 돈이 부과되는 웨어하우스의 최대 단점. redshift 내가 산 용량 내에서 똑같은 비용을 낸다. 즉, scalable한 시스템이 꼭 좋은 것만은 아니다.
snowflake, bigquery를 사용하는 팀에게 매일 오전 혹은 전날밤에 sql을 비용 순으로 sorting해서 10개를 전체 팀에 뿌리게 했다. 경각심을 갖도록. 탓하기 위함이 아니라 어떻게 최적화할 수 있는지 세션을 연다. 비용에 대해 고민하도록. limit을 걸수도 있고, sql explain을 통해 비용을 미리 산정해볼 수 있다. by. Max
Airflow에서 하나의 DAG는 다수의 ()로 구성된다. ()에 들어갈 말은?
매일 동작하는 DAG의 Start date이 2021-02-05라면 이 DAG의 첫 실행 날짜는?
위 DAG의 경우 이때 execution_date으로 들어오는 날짜는?
Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 올바른 설명은?
Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date이 "2021-02-04 00:00:00"으로 잡혀있다면 이 DAG의 첫 번째 실행 날짜와 시간은 언제인가?
Airflow의 DAG가 처음 ON이 되었을 때 start_date과 현재 날짜 사이에 실행이 안된 run들이 있을 경우 이를 실행한다. 이는 (??) 파라미터에 의해 결정된다. 이 파라미터를 False로 세팅하면 과거 실행이 안된 run을 무시한다.
다음 중 Redshift에서 큰 데이터를 테이블로 복사하는 방식을 제대로 설명한 것은?
- 하나씩 INSERT INTO를 실행하여 복사해준다.
- 복사할 레코드들을 파일로 저장해서 한번 Redshift로 올린다.
- 복사할 레코드들을 파일로 저장해서 S3로 올린 후에 거기서 Redshift로 벌크 복사한다.
ROW_NUMBER 사용을 위해 테이블에 아래 필드 추가
created_date timestamp default GETDATE()
그럼 만약 start_date가 2023.07.01이라고 했을 때 첫 데이터의 execution_date도 2023.07.01이고, 실제 dag가 실행되는 시간은 2023.07.02인 거죠? Daily 실행을 전제로 했을 때 -> YES
dag_dir_list_interval 이 값으로 인해 300초마다 dags들을 스캔하면서 실제 수행도 된다고 하셨는데, 최초로 올라갈 때만 실행을 하는건가요? -> NO. 무조건 실행이다.
300초마다 코드들을 다 돌려보는 것. 코드를 돌려보면 무엇이 없어졌고, 무엇이 생겼는지 알 수 있다. 그런데 레코드를 삭제하는 파이썬 파일 등을 간편하게 하려고 작성하면 에어플로우가 DAG가 있는지 확인하려고
에어플로우 관점에서 제대로 작성된 DAG란 DAG가 실행되는 것이 아니라 어떤 대그가 있고 테스크가 있는지 파악하고 끝나는 것이다. 그런데 일반 파이썬 파일(ex:레코드 삭제)이 있으면 실행하게 된다. 이를 막고 싶으면 .airflowignore를 만들어서 test로 시작하는 모든 파이썬 파일을 실행하지 말라 와 같은 조건을 설정하거나
특정 파일 이름을 지정할 수 있고, 그냥 test를 쓰면 test를 포함하는 스크립트는 실행되지 않을 것이다.
즉, DAG를 수행한다는 것이 아니라 파이썬 스크립트의 메인 함수가 실행된다는 것
Elasticsearch 같은 NoSql을 DW로 사용하는 경우도 있나요? -> 못 봄. 왜냐하면, 쿼리를 날려 데이터 분석을 하거나 하기는 어렵다. 문법도 다르고, 데분이 쉽게 쓸 수 없다. 그러나 엘라스틱서치 자체는 굉장히 강력한 툴이다.
글 잘 봤습니다, 감사합니다.