[Programmers]실리콘밸리에서 날아온 DE 스타터 키트4️⃣주차

포동동·2022년 10월 13일
0

🈯 숙제 해설

3주차 복습

  • start_date
    • 처음 DAG가 실행되는 날짜
    • 실제로 실행되는 것은 start_date + 실행주기
    • backfill을 쉽게 하기 위한 용도
  • execution_date
  • 멱등성
    • 데이터 파이프라인이 연속 실행되었을 때 소스에 있는 데이터가 그대로 DW로 저장되어야하는 성질
    • 멱등성을 지키기 위한 방법
      • Full Refresh의 경우 INSERT전에 항상 DELETE로 기존 테이블의 내용을 지워주고 INSERT 실행
      • Incremental Update의 경우 타임스탬프와 같은 필드 생성 후 execution_date 활용
  • Transaction
    • Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법.
    • BEGIN;(queries);END; 와 같은 방법으로 구현.
    • 만약, 중간에 하나라도 실패하면 ROLLBACK이라는 sql 실행하면 임시상태의 모든 쿼리 삭제.
    • 임시 상태이기 때문에 쿼리를 너무 많이 Transaction으로 묶으면 안 됨. 최소로 하는 게 좋음.
    • AUTOCOMMIT을 True, False로 할 지는 팀 내에서 정해야함.
    • SQL에서 BEGIN;END;를 쓸지, Python에서 try/except를 쓸지는 선호 차이.


🧐 4주차

Airflow DAG

default_args = {
                'owner': 'ownerid',
                'retries': 0,
                'retry_delay': timedelta(seconds=20),
                'depends_on_past': False
                }
  • DAG 예시
dag = DAG(  
			"dag_v1", # DAG name
            'start_date': datetime(2020,8,7,hour=0,minute=00),  
            # schedule (same as cronjob) 
            schedule_interval="0 * * * *",   
            # common settings 
            default_args=default_args 
          )
  • Task 예시
task_start = DummyOperator(
                            task_id='start',
                            dag=dag_runAirflow_v1
                           )

t1 = BashOperator(
                  task_id='print_date',
                  bash_command='date',
              	 )
  • 중요한 DAG parameters
    • max_active_runs : 동시 실행 가능한 DAG 수(default=16)
    • max_active_tasks : 동시 실행 가능한 Task 수(default=16)
    • catchup : 밀린 거 자동으로 채울건지
    • 위와 같은 DAG parameters는 DAG레벨로 적용되는 것이고, default_args로 지정되는 건 Task parameters로 Task레벨로 적용되는 것!


🧐 4주차

Airflow DAG 개선

  • params를 통해 변수 넘기기
from airflow import DAG
from airflow.operators.python import PythonOperator

def func(**context) :
	link = context["params"]["url"]
    ...

dag_name = DAG(...)

task = PythonOperator(
                      task_id = 'func',
                      python_callable = func,
                      params = {
                                  "url" : "https://..."
                               },
                      dag = dag_name
    				 )
  • Xcom객체를 이용해 task의 결과값 받기 + Variable 사용하기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

def func1(**context):
	link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']
    ...
    return something
 
def func2(**context):
	text = context["task_instance"].xcom_pull(key="return_value", task_ids="func1")
    ...

dag_name = DAG(...)

task1 = PythonOperator(
                      task_id = 'func1',
                      python_callable = func1,
                      params = {
                                  'url':  Variable.get("csv_url")
                               },
                      dag = dag_name
    				 )
 
 task2 = PythonOperator(
 					  task_id = 'func2',
                      python_callable = func2,
                      params = {
                               },
                      dag = dag_name
    				 )
  • Connection 사용하기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook

def connection(autocommit=False):
	hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

def func1(**context):
	link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']
    ...
    return something
 
def func2(**context):
	text = context["task_instance"].xcom_pull(key="return_value", task_ids="func1")
    ...

dag_name = DAG(...)

task1 = PythonOperator(
                      task_id = 'func1',
                      python_callable = func1,
                      params = {
                                  'url':  Variable.get("csv_url")
                               },
                      dag = dag_name
    				 )
 
 task2 = PythonOperator(
 					  task_id = 'func2',
                      python_callable = func2,
                      params = {
                               },
                      dag = dag_name
    				 )
  • VariablesConnections
    • Variables : 자주 사용되는 configuration info들을 미리 저장해 두는 것.
      • [‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’]와 같은 단어가 들어가면 별표(*)로 WEB UI에 표시된다.
    • Connections : 외부 서비스와 연결하기 위한 계정 정보들을 미리 저장해 두는 것.
  • DAG 실행시 Primary Key 유지하기
    • 임시 테이블을 활용하는 방법
        1. DAG가 실행될 때마다 임시 테이블 만들기
        1. 새로 읽어들인 레코드를 임시 테이블에 복사
        1. ROW_NUMBER()를 이용해서 primary key로 partition 나누고 ts로 order by 해서 하나의 레코드만 남기기
        1. 기존의 원본 테이블을 DROP 하고 임시 테이블을 원본 테이블로 복사
profile
완료주의

0개의 댓글