데이터 엔지니어링 스터디 6주차

skh951225·2023년 4월 29일
0

들어가기 앞서

질문1 AWS의 서비스에 권한을 부여하는 방법

  • IAM에서 별도의 acount를 만들고 특정 기능만 부여
  • ec2 자체에 권한을 부여

질문2 SqltoS3Operator의 성능?

  • MySQLtoS3Operator와 같이 특정 데이터베이스에 종속된 Operator에서 SqlAlchemy를 통해 모든 관계형 데이터에 엑세스 가능한 SqltoS3Operator로 바뀜
  • 데이터를 읽어올때 pandas로 읽어오는 방식을 채용하였는데 pandas로 읽어오게되면 데이터를 메모리에 올려두기 때문에 대량의 데이터를 다룰때 메모리 부족 문제가 발생할 수 있다.
  • 그래서 Production 환경에서는 쓰기에는 적절치 않다.
    • 프로덕션 환경에서는 SQL 결과를 파일로 덤프하는 방법이 좋다.
    • 또한 하나의 파일에 저장하는 것이 아니라 10~20개로 나누어 병렬처리가 가능하도록 하는 것이 좋다.
    • 이러한 operator를 customize하는 것이 좋다.

질문3 airflow의 timezone 관련

  • airflow.cfg 의 [core] default_timezone, [webserver] default_ui_timezone으로 설정 가능
  • default_timezone은 start_date,end_date,schedule이 timezone을 의미
  • default_ui_timezone 는 web ui의 좌측 상단에 표시되는 timezone을 의미
  • 하지만 execution_date, log에 나타나는 시간은 UTC로 고정되어 있음
  • 혼란을 피하기위해 왠만하면 timezone을 변경하지 않는 것이 좋음

질문4 EC2 서버 기반 Airflow 접속 이슈

  • 카페 등에서 접속시 블록킹 이슈(SSH포트인 22번을 차단하는 경우가 많음)
  • EC2 서버 자체가 불안정한 경우
    • Ec2 서버가 항상 잘 동작하는 것이 나한테 할당되지 않는다.
    • 그 서버가 고장이 안난다고 생각하고 서비스를 만들면 안된다.
    • instance status checks / instance reachability check failed 와 같은 오류문구가 나타남

질문5 airflow 2.5.1로 오면서 변경된 내용

  • Connection : S3가 사라지고 Amazon Web Service로 대체
  • MySQLtoS3Operator → SqltoS3Operator
  • S3DeleteObjectsOperator
    • SqltoS3Operator에 replace 파라미터로 대체
  • S3ToRedshiftOperator : Upsert 지원

유용한 tool

Docker & Kubernetes

  • Docker : 다양한 소프트웨어들을 Container 단위로 실행하여 소프트웨어 간의 충돌을 막을 수 있는 가벼운 가상화 기술
    • 컴퓨팅 자원을 필요한 만큼 끌어다 쓰고 다 쓰면 반납할 수 있다.
    • AWS의 ami를 구성해서 ec2 서버를 구성할 수 있다.
    • 마치 도커 이미지를 실행하여 컨테이너를 만드는 것 처럼
  • Kubernetes
    • 하나의 기능을 담당하는 docker container를 Pod단위로 구성하며 이러한 Pod를 관리해주는 도구
  • Airflow에서 Kubernetes를 활용하는 법
    • Kubernetesexecutor
      • airflow의 마스터는 따로 서버를 구성하고 쿠버네티스 위에서 워커노드를 돌리는 방식
      • Dag level(전체 Task)를 하나의 Pod로 실행하는 방법
      • airflow.cfg 를 변경하여 설정 가능
    • KubernetesPodOperator
      • 특정 Task(Operator) 단위로 별개의 Pod로 실행하는 방법
  • 점점 Kubernetes를 활용하는 것이 일반적인 방법이 되고 있다.
    • dag가 많이 생기면 그들간의 충돌을 피하기위해 Kubernetes활용
    • virtual environment에 비해 Kubernetes를 활용하는 것이 간단함
    • 규모가 작을때는 굳이 Kubernetes를 활용하지 않아도 됨
      • Single Server (scale up) → 클라우드 → Kubernetes(AWS:EKS,ECS)

DBT

  • ELT용 오픈소스 : in-warehouse data transformation
  • dbt labs라는 회사가 상용화
  • 기능이 워낙 강력하다 보니까 analytics engineer라는 말을 만들어냄
  • CTAS를 고도화한 tool로 다양한 기능을 지원
    • input check, output check, snapshot, Lineage 추적
    • snapshot : table의 변화 history를 저장하여 과거의 데이터를 조회할 수 있음
    • 다양한 데이터 웨어하우스 지원
      • Redshift, Snowflake, Bigquery, Spark
  • 클라우드 버전도 존재 : dbtCloud

airflow configuration for production usage

  • airflow.cfg 를 주기적으로 백업
    • 보통 이런것은 개발자들이 백업하기보다는 devops팀과 같이 일을해서 테라폼과 같은 다양한 데브옵스관련된 서비스로 관리
    • airflow.cfg를 내가 마음대로 바꾸는 것이 아니라 데브옵스팀이 정한 프로세스를 따라서 테스트해보고 동작하면 airflow.cfg가 저장되어 있는 github 레포에 push. 그것이 테라폼과 같은 것을 통해 배포되는 식
  • airflow database
    • Sqlite → Postgres or MySQL 로 업그레이드
    • airflow.cfg [core] 섹션의 sql_alchemy_conn
    • 이 데이터베이스도 주기적으로 백업해줘야함
    • 보통 클라우드에서 쓸때 airflow 바깥에 RDS(AWS)와 같은 서비스를 이용해서 Postgres or MySQL 서버를 하나 구성하여 그것을 연결해서 쓰는 것이 일반적
      • RDS에는 백업 기능이 존재
  • 보안
    • airflow 2.0 부터는 authentication 이 기본적으로 ON
    • airflow web UI에는 민감한 정보가 포함되어 있어 pulic으로 노출하는 것은 바람직하지 않다.
      • 보통 회사에서는 vpn 뒤에 감춤. 그래서 vpn으로 별도로 로그인해야 접근할 수 있게 만듬
  • airflow를 운영하게 되면 Log 데이터가 쌓이게 된다.
    • 이것을 주기적으로 clean up을 해줘야함. Ex) 최근 30일 데이터만 보관
    • log 를 저장하는 곳의 볼륨을 충분히(100GB) 설정
  • Scale up을 먼저 하고 그 후 Scale out을 고려
    • Cloud Composer(GCP)
      • 다수의 airflow 서비스를 동시에 운영할 수 있다. 각 서비스를 enviroment라고 부른다.
      • kubernetes를 쓰지않고 다수의 worker 노드를 가지고 시작을 함
    • MWAA(AWS)
      • GCP와 동일하게 enviroment라는 용어를 사용
      • kubernetes를 쓰지않고 다수의 worker 노드를 가지고 시작을 함
      • 최소 서버 3대를 돌려야함. 처음 시작하면 낭비가 될 가능성이 높다.
    • Data Factory(Azure)
      • 비추..

slack 과 연동

  1. https://api.slack.com/messaging/webhooks 접속
  2. Create your slack app 클릭
  3. From scratch - App Name, workspace 설정 - create app
  4. Incoming Webhooks - Activate Incoming Webhooks - Add New Webhooks to Workspace
  5. Post할 채널 설정 - WEBHOOK_URL
  • slack_url이라는 Variable 로 WEBHOOK_URL 저장
  • application/json 의 형식으로 https://https://hooks.slack.com/services/{WEBHOOK_URL} 에 post 요청을 보내면 해당 workspace-channel에 메시지가 전달됨
  • Dag의 default_args["on_failure_callback"], default_args["on_success_callback"] 의 값을 slack 과 연동하는 Python 함수 지정

airflow API 활성화

  • airflow.cfg 의 [api] 섹션의 auth_backend=airflow.api.auth.backend.basic_auth로 설정하면 id, password 를 이용해 airflow에 api로 접근가능
  • GET 방식으로 --user "{userid:password}" http://{airflow_server}:8080/{option}
    • health
      • metadatabase, scheduler의 status확인
      • scheduler의 최근 heartbeat가 언제인지 확인가능 만약 현재보다 30초 이전에 마지막 heartbeat가 왔다면 문제가 있다고 판단
    • api/v1/dags
      • root 유저로 접근해야함
      • dag list를 보여줌
    • api/v1/dags/{dag_id}/dagRuns
      • dag를 실행해줌
    • ...

Google Spreadsheet 연동

  • Google Cloud에서 poject 생성
  • library - Google Spread Sheets API - ENABLE
  • 사용자 인증 정보 생성
  • 서비스 계정 - 키 추가
  • ...

Dag Dependencies

  • TriggerDagOperator
    • 특정 dag의 task를 trigger할 수 있음
    • 실행될 dag에게 데이터를 넘겨줄 수 있음
    • reset_dag_run=True : 실행될 dag에대해 이미 해당 날짜가 실행되었더라도 재실행
    • wait_for_completion=True : 실행될 dag가 모두 완료할때까지 wait
  • ExternalTaskSensor
    • 특정 task가 실행완료되었는지 timeout동안 주기적으로 polling
    • 그 task가 실행완료되면 자신이 실행
    • timeout동안 주기적으로 polling하기 때문에 비효율적(timeout동안 cpu를 쥐고 있음)
  • BranchPythonOperator
    • 뒤에 실행될 task가 어떤 task인지 조건문을 통해 결정할 수 있음

0개의 댓글