들어가기 앞서
질문1
AWS의 서비스에 권한을 부여하는 방법
- IAM에서 별도의 acount를 만들고 특정 기능만 부여
- ec2 자체에 권한을 부여
질문2
SqltoS3Operator의 성능?
- MySQLtoS3Operator와 같이 특정 데이터베이스에 종속된 Operator에서 SqlAlchemy를 통해 모든 관계형 데이터에 엑세스 가능한 SqltoS3Operator로 바뀜
- 데이터를 읽어올때 pandas로 읽어오는 방식을 채용하였는데 pandas로 읽어오게되면 데이터를 메모리에 올려두기 때문에 대량의 데이터를 다룰때 메모리 부족 문제가 발생할 수 있다.
- 그래서 Production 환경에서는 쓰기에는 적절치 않다.
- 프로덕션 환경에서는 SQL 결과를 파일로 덤프하는 방법이 좋다.
- 또한 하나의 파일에 저장하는 것이 아니라 10~20개로 나누어 병렬처리가 가능하도록 하는 것이 좋다.
- 이러한 operator를 customize하는 것이 좋다.
질문3
airflow의 timezone 관련
- airflow.cfg 의 [core] default_timezone, [webserver] default_ui_timezone으로 설정 가능
- default_timezone은 start_date,end_date,schedule이 timezone을 의미
- default_ui_timezone 는 web ui의 좌측 상단에 표시되는 timezone을 의미
- 하지만 execution_date, log에 나타나는 시간은 UTC로 고정되어 있음
- 혼란을 피하기위해 왠만하면 timezone을 변경하지 않는 것이 좋음
질문4
EC2 서버 기반 Airflow 접속 이슈
- 카페 등에서 접속시 블록킹 이슈(SSH포트인 22번을 차단하는 경우가 많음)
- EC2 서버 자체가 불안정한 경우
- Ec2 서버가 항상 잘 동작하는 것이 나한테 할당되지 않는다.
- 그 서버가 고장이 안난다고 생각하고 서비스를 만들면 안된다.
- instance status checks / instance reachability check failed 와 같은 오류문구가 나타남
질문5
airflow 2.5.1로 오면서 변경된 내용
- Connection : S3가 사라지고 Amazon Web Service로 대체
- MySQLtoS3Operator → SqltoS3Operator
- S3DeleteObjectsOperator
- SqltoS3Operator에 replace 파라미터로 대체
- S3ToRedshiftOperator : Upsert 지원
Docker & Kubernetes
Docker
: 다양한 소프트웨어들을 Container 단위로 실행하여 소프트웨어 간의 충돌을 막을 수 있는 가벼운 가상화 기술
- 컴퓨팅 자원을 필요한 만큼 끌어다 쓰고 다 쓰면 반납할 수 있다.
- AWS의 ami를 구성해서 ec2 서버를 구성할 수 있다.
- 마치 도커 이미지를 실행하여 컨테이너를 만드는 것 처럼
Kubernetes
- 하나의 기능을 담당하는 docker container를 Pod단위로 구성하며 이러한 Pod를 관리해주는 도구
Airflow에서 Kubernetes
를 활용하는 법
Kubernetesexecutor
- airflow의 마스터는 따로 서버를 구성하고 쿠버네티스 위에서 워커노드를 돌리는 방식
- Dag level(전체 Task)를 하나의 Pod로 실행하는 방법
- airflow.cfg 를 변경하여 설정 가능
KubernetesPodOperator
- 특정 Task(Operator) 단위로 별개의 Pod로 실행하는 방법
- 점점 Kubernetes를 활용하는 것이 일반적인 방법이 되고 있다.
- dag가 많이 생기면 그들간의 충돌을 피하기위해 Kubernetes활용
- virtual environment에 비해 Kubernetes를 활용하는 것이 간단함
- 규모가 작을때는 굳이 Kubernetes를 활용하지 않아도 됨
- Single Server (scale up) → 클라우드 → Kubernetes(AWS:EKS,ECS)
DBT
- ELT용 오픈소스 : in-warehouse data transformation
- dbt labs라는 회사가 상용화
- 기능이 워낙 강력하다 보니까 analytics engineer라는 말을 만들어냄
- CTAS를 고도화한 tool로 다양한 기능을 지원
- input check, output check, snapshot, Lineage 추적
- snapshot : table의 변화 history를 저장하여 과거의 데이터를 조회할 수 있음
- 다양한 데이터 웨어하우스 지원
- Redshift, Snowflake, Bigquery, Spark
- 클라우드 버전도 존재 : dbtCloud
airflow configuration for production usage
- airflow.cfg 를 주기적으로 백업
- 보통 이런것은 개발자들이 백업하기보다는 devops팀과 같이 일을해서 테라폼과 같은 다양한 데브옵스관련된 서비스로 관리
- airflow.cfg를 내가 마음대로 바꾸는 것이 아니라 데브옵스팀이 정한 프로세스를 따라서 테스트해보고 동작하면 airflow.cfg가 저장되어 있는 github 레포에 push. 그것이 테라폼과 같은 것을 통해 배포되는 식
- airflow database
- Sqlite → Postgres or MySQL 로 업그레이드
- airflow.cfg [core] 섹션의 sql_alchemy_conn
- 이 데이터베이스도 주기적으로 백업해줘야함
- 보통 클라우드에서 쓸때 airflow 바깥에 RDS(AWS)와 같은 서비스를 이용해서 Postgres or MySQL 서버를 하나 구성하여 그것을 연결해서 쓰는 것이 일반적
- 보안
- airflow 2.0 부터는 authentication 이 기본적으로 ON
- airflow web UI에는 민감한 정보가 포함되어 있어 pulic으로 노출하는 것은 바람직하지 않다.
- 보통 회사에서는 vpn 뒤에 감춤. 그래서 vpn으로 별도로 로그인해야 접근할 수 있게 만듬
- airflow를 운영하게 되면 Log 데이터가 쌓이게 된다.
- 이것을 주기적으로 clean up을 해줘야함. Ex) 최근 30일 데이터만 보관
- log 를 저장하는 곳의 볼륨을 충분히(100GB) 설정
- Scale up을 먼저 하고 그 후 Scale out을 고려
- Cloud Composer(GCP)
- 다수의 airflow 서비스를 동시에 운영할 수 있다. 각 서비스를 enviroment라고 부른다.
- kubernetes를 쓰지않고 다수의 worker 노드를 가지고 시작을 함
- MWAA(AWS)
- GCP와 동일하게 enviroment라는 용어를 사용
- kubernetes를 쓰지않고 다수의 worker 노드를 가지고 시작을 함
- 최소 서버 3대를 돌려야함. 처음 시작하면 낭비가 될 가능성이 높다.
- Data Factory(Azure)
slack 과 연동
- https://api.slack.com/messaging/webhooks 접속
- Create your slack app 클릭
- From scratch - App Name, workspace 설정 - create app
- Incoming Webhooks - Activate Incoming Webhooks - Add New Webhooks to Workspace
- Post할 채널 설정 - WEBHOOK_URL
- slack_url이라는 Variable 로 WEBHOOK_URL 저장
- application/json 의 형식으로 https://https://hooks.slack.com/services/{WEBHOOK_URL} 에 post 요청을 보내면 해당 workspace-channel에 메시지가 전달됨
- Dag의 default_args["on_failure_callback"], default_args["on_success_callback"] 의 값을 slack 과 연동하는 Python 함수 지정
airflow API 활성화
- airflow.cfg 의 [api] 섹션의 auth_backend=airflow.api.auth.backend.basic_auth로 설정하면 id, password 를 이용해 airflow에 api로 접근가능
- GET 방식으로 --user "{userid:password}" http://{airflow_server}:8080/{option}
- health
- metadatabase, scheduler의 status확인
- scheduler의 최근 heartbeat가 언제인지 확인가능 만약 현재보다 30초 이전에 마지막 heartbeat가 왔다면 문제가 있다고 판단
- api/v1/dags
- root 유저로 접근해야함
- dag list를 보여줌
- api/v1/dags/{dag_id}/dagRuns
- ...
Google Spreadsheet 연동
- Google Cloud에서 poject 생성
- library - Google Spread Sheets API -
ENABLE
- 사용자 인증 정보 생성
- 서비스 계정 - 키 추가
- ...
Dag Dependencies
- TriggerDagOperator
- 특정 dag의 task를 trigger할 수 있음
- 실행될 dag에게 데이터를 넘겨줄 수 있음
- reset_dag_run=True : 실행될 dag에대해 이미 해당 날짜가 실행되었더라도 재실행
- wait_for_completion=True : 실행될 dag가 모두 완료할때까지 wait
- ExternalTaskSensor
- 특정 task가 실행완료되었는지 timeout동안 주기적으로 polling
- 그 task가 실행완료되면 자신이 실행
- timeout동안 주기적으로 polling하기 때문에 비효율적(timeout동안 cpu를 쥐고 있음)
- BranchPythonOperator
- 뒤에 실행될 task가 어떤 task인지 조건문을 통해 결정할 수 있음