다음 프로젝트는

정화·2024년 5월 28일

파케이로 저장해보기. (처리속도가 얼마나 증가하는지 궁금)
트랜잭션(transaction)이란 "쪼갤 수 없는 업무 처리의 최소 단위"를 말한다.
begin, end= commit으로 하나의 트랜젝션 만들어보기

만약 roll back - 비긴 이전으로 돌아감


이건 개인 선호도 차이


try execpt 할 때 except 뒤에 슬랙으로 알리든 하는 기능을 넣어야 함.
아니면 계속 모르고 넘어가게 됨.
따라서 raise를 써주면 fail하기 때문에 airflow가 알 수 있게됨.
fail했을때 이메일을 보낸다거나 슬랙으로 알린다거나 하도록 하게 하면 됨.








에어플로 설치

가장 좋은 방법 : 사양이 좋은 EC2서버를 만들고 그안에 다커 엔진을 설치한 다음에 다커 컨테이너 형태로 에어플로를 실행하는게 가장 좋은 방법(팀프로젝트 할 땐 이렇게 해야 할 것)

리눅스 이해하기
● 우분투 (ubuntu): 리눅스 타입 중의 하나. 다른 타입은 데비안, 레드햇, 페도라,
...
● ssh: 리눅스 혹은 유닉스 서버에 로그인해주는 프로그램 (터미널)
○ private key와 public key를 사용
● sudo: 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램이다.
● apt-get: 우분투/데비안 계열의 리눅스에서 프로그램 설치/삭제를 관리해주는
프로그램
○ apt-get update, apt-get install
● su: substitue user의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고
다른 사용자의 권한을 얻을 때 사용한다
● vi: 텍스트 에디터. https://withcoding.com/112






1. 도커 엔진 맥북에 설치 : 맥에서는 4GB가 필요

  1. airflow 도커 이미지 다운
    ●먼저 터미널 프로그램을 실행하고 적당한 폴더로 이동
    ●airflow-setup Github repo를 클론
    ○ git clone https://github.com/keeyong/airflow-setup.git
    ● airflow-setup 폴더로 이동하고
    ○ cd airflow-setup
    ●2.5.1 이미지 관련 yml 파일 다운로드
    ○ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
    ● 이미지 다운로드
    ○docker-compose -f docker-compose.yaml pull
  1. 그 이미지를 도커 컨테이너 안으로 실행을 시킨 다음
    ● 컨테이너 실행
    ○ docker-compose -f docker-compose.yaml up

  2. 그게 다 잘되면 airflow 웹서버가 뜰거고, 그거를 웹 브라우저를 통해 access 해보는 형태로 데모를 실행해 볼것.
    http://localhost:8080으로 웹 UI 로그인
    ○airflow:airflow 사용

아래는 DAG객체 만든 예시
(한기용)

from airflow import DAG
dag = DAG(
"dag_v1", #DAG이름
start_date = datetime(2020,8,7,hour = 0 ,minute = 00),
schedule = "0****", #얘도 한시간에 한번임.모든 0분마다 한번씩 시작이 됨.
tags = ["example"],
catchup = False ,
#아까 공통 세팅 정해준거 넣기
defalut_args = defalut_args
)

(다른 블로그)

def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")

def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")

with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), #시작날짜
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=dag_success_alert, #성공했을 때
    on_failure_callback=task_failure_alert, #실패했을 때
    tags=["example"],
)

schedule = None : 나는 주기적으로 실행 안할겨. 다른 DAG가 끝났을 때 trigger 할겨
schedule = @once : 나는 주기적으로 실행 안할겨. 다른 DAG가 끝났을 때 trigger 할거
schedule = @hourly : 매 시 0분
schedule = @daily : 매일 0시 0분
schedule = @weekly : 매 주 ...
schedule = @monthly
schedule = @yearly

catchup
실행 안된 날짜에 대해서 매 시간마다 이 dag를 실행하려고 함.
catchup=False 하면
start date과 지금 날짜의 갭에 대해서 아무런 행동을 하지 않음
특히 dag가 Full refresh라면 과거 2년치에 대해 실행하나
지금 한번 실행하나 그 결과가 똑같기 때문에 꼭 False로 설정해줘야 함
incremental한거 아니면 다 False로 설정 할 것.

profile
개발자를 꿈꾸는..

0개의 댓글