학습주제
Backfill
학습내용
window 함수를 써서(업써트라고 부름) pk uniqueness 보장해봄
마지막에 퀴즈
1. 꼭 고정이 아님. dags_folder에 지정해주면 됨.
2. dag_dir_list_interval = 300초.
3. api 섹션의 auth_backend = airflow.api.auth.backend.basic_auth로 변경
4. password, secret, passwd, authorization, api_key, apikey, access_token
5. sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler
6. fernet_key
명시적으로 다시 시작해주면 된다. 웹서버, 스케줄러. ec2에서 실습해본적 있다.
도커의 경우 해당 서비스들을 중단했다 다시 시작해주면 됨.
airflow db init을 답으로 많이 선택했는데, 이는 메타데이터 DB를 다시 초기화 시켜주는 역할을 함.
fernet_key 스트링 값 주면, 에어플로우가 메타 DB에 쓸때 암호화 하고, 읽을때 가져다 씀. 디폴트는 세팅되어 있지 않음.
현재로 가장 좋은 방법은 UTC를 일관되게 사용하는 것으로 보임.
두개의 키가 있음.
ui 타임존 - 기본은 UTC. -> 이걸 바꾸면 한국시간 등등 적용 가능
웹서버 단의 시간만 결정해줌. -> 우측 상단의 시계를 어떤 타임존 할지
default_timezone -> 스케줄, start_date, end_date
굉장히 혼동스럽게도
execution_date 가 중요 변수가 될텐데 (백필)
로그를 보면 시간이 표시됨. 언제 기록이 되었는지.
얘는 항상 UTC를 따름.
뭘 따라가는지 헷갈리게됨. -> 맥스님의 경우 무조건 UTC를 따라가게 함. 섞어쓰게 되면 체크해야함.
코어 섹션에서
dag_dir_list_interval = 300
무슨파일이 Dag, 무슨 파일이 dag 가 아닌지 확인하기 위해, 파이썬 파일들을 다 실행시켜봄. 실행이 안될꺼라 생각하고 살려둘 수 있는데 이게 5분마다 실행이 되어버림.
이걸 모르면 귀신이 곡할 노릇. 5분마다 테이블이 계속 날라가는 등.
에어플로우는 dags_folder = /var/lib/airflow/dags의 모든 파일을 순회함.
맥스님도 이것때문에 크게 고생하신 적 있음.
대그 모듈이 들어있는 모든 파일들의 메인 함수가 실행이 됨. -> 테스트 코드도 실행되어 버릴수도 있음.
예를들어
cur.execute("DELETE FROM.." 같이 DB 날리는 것.
숙제 리뷰.
별도의 API key가 없음.
750개 나라의 정보 중에 특정 값만 추출하였음.
JSON 형태였지만, 제일 최초로 감싸고 있는 형태는 LIST였음.
나의 경우 WorldInfo로 만들었었음.
스키마 이름은 worldinfo
가능하면 개인 깃험에 repo 만들어서 제출.
애플 주식 업데이트와 거의 비슷
나의 경우 도커에 requests가 깔려있지 않다는 오류가 떠서 docker.compose.yml env 파일에 저 리퀘스트를 별도로 설치하는 명령문을 썼었음.
import requests
@task
def extract_transform():
response = requests.get("https://restcountries.com/v3/all")
countries = response.json()
records = []
for country in countries:
name = country['name']['common']
population = country['population']
area = country['area']
records.append([name, population, area])
retrun records
레코드는 나라들의 집합인 것을 알 수 있음.
load함수에서 적재
schedule에서 마지막 파라미터 5번째 파라미터를 사용함. 0~6사이의 값을 가짐. 0은 일요일. 1은 월요일 6은 토요일. '30 6 * * 6' 매주 토요일 6시 30에 실행하는 대그가 됨.
name 필드를 PK로 세팅함. 나라 이름이기 때문. 데이터 웨어하우스의 경우 PK를 지정해준다고 해서 유일성을 보장해주지 않음. 일종의 힌트라고 생각하면 됨.
도커 환경에 로그인해서 해당 코드를 본다.
실행 데모 아님.
코드 리뷰.
airflow.cfg가 도커에 어디에 있는지.
country info를 한번 보고, cfg 찾는다.
스케줄러 컨테이너로 로그인 한다. 윈도우 환경에선 앞에 winpty를 붙여준다.
홈 디렉토리로 바로 로그인 됨
저기 airflow.cfg가 보임
그 전에
상단에 내가 만든 WorldInfo.py가 보인다
cat dags/WorldInfo.py
롤백 뒤에 raise를 붙여주는게 좋음.
vi. 처럼 편집은 안됨. 별도의 yml 환경이나 도커파일로 접근하라고 한다.
cat처럼 읽기전용으로는 접근 가능함.
/dag_dir_list를 검색하니 300
/[api]
섹션에 가보면
이 에어플로우 서버는 외부에서 엑세스 불가함.
fernet 키 : 암호화 여부. 메타데이터 DB 저장 시.
dags 처럼 로컬시스템의 특정 디렉토리를 연결해본다.
Docker를 사용하여 로컬 시스템의 특정 디렉토리를 컨테이너 내부의 디렉토리와 매핑할 수 있습니다. 이를 "볼륨 마운팅(Volume Mounting)"이라고 합니다. 이 방법을 사용하면 airflow.cfg 파일을 로컬 시스템에서 편집하고, 이 변경 사항이 컨테이너에 바로 반영되게 할 수 있습니다.
예를 들어, Docker Compose를 사용하는 경우 다음과 같이 docker-compose.yml 파일을 편집하여 airflow.cfg 파일을 컨테이너와 매핑할 수 있습니다:
version: '3.7'
services:
airflow:
image: apache/airflow
volumes:
- ./dags:/usr/local/airflow/dags
- ./airflow.cfg:/usr/local/airflow/airflow.cfg
docker cp d720237b3f17:/opt/airflow/airflow.cfg ./airflow.cfg
우선 파일을 매핑하면 로컬에서는 파일이 바로 뜨지 않음. 연동을 위해. 위의 복사 명령어로 airflow.cfg를 복사함
이제 여기서 편집한 값은 에어플로우에도 반영 예정.