Airflow Pipeline 만들기 - Redshift Query하기

bradley·2022년 8월 17일
1

Airflow

목록 보기
4/16

Redshift 연결 만들기


아래 연결 정보 입력 후 [Test] 진행하여 연결을 확인한다.

  • Connection Id : 연결 이름 기입
  • Connection Type : Amazon Redshift 선택
  • Host : 엔드포인트 입력 (뒤에 Port와 DB명은 제거)
  • Schema : 따로 Schema를 만들지 않았다면, default인 dev 입력
  • Login : DB 계정
  • Password :
  • Port : 따로 Custom Port를 설정하지 않았다면, 5439

DAG 작성하기


Library 설치

apache-airflow-providers-amazon 모듈을 설치해준다.

pip3 install apache-airflow-providers-amazon

Python에서는 다음과 같이 import한다.

from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

DAG 작성하기

connect_to_redshift.py

from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

sql_create_table = """
    CREATE TABLE IF NOT EXISTS offices (
        officeCode varchar(10) NOT NULL,
        city varchar(50) NOT NULL,
        phone varchar(50) NOT NULL,
        addressLine1 varchar(50) NOT NULL,
        addressLine2 varchar(50) DEFAULT NULL,
        state varchar(50) DEFAULT NULL,
        country varchar(50) NOT NULL,
        postalCode varchar(15) NOT NULL,
        territory varchar(10) NOT NULL,
        PRIMARY KEY (officeCode)
    );
"""

sql_insert_data = """
    insert  into offices (officeCode,city,phone,addressLine1,addressLine2,state,country,postalCode,territory) values 
        ('1','San Francisco','+1 650 219 4782','100 Market Street','Suite 300','CA','USA','94080','NA'),
        ('2','Boston','+1 215 837 0825','1550 Court Place','Suite 102','MA','USA','02107','NA'),
        ('3','NYC','+1 212 555 3000','523 East 53rd Street','apt. 5A','NY','USA','10022','NA'),
        ('4','Tokyo','+81 33 224 5000','4-1 Kioicho',NULL,'Chiyoda-Ku','Japan','102-8578','Japan'),
        ('5','Sydney','+61 2 9264 2451','5-11 Wentworth Avenue','Floor #2',NULL,'Australia','NSW 2010','APAC'),
        ('6','London','+44 20 7877 2041','25 Old Broad Street','Level 7',NULL,'UK','EC2N 1HN','EMEA')
        ;
"""

with DAG(
    dag_id = "connect_to_redshift",
    start_date = datetime(2022, 1, 1),
    schedule_interval = None,
    tags = ['Redshift']
) as dag:
    t1 = RedshiftSQLOperator(
        task_id="create_table",
        sql=sql_create_table
    ),
    t2 = RedshiftSQLOperator(
        task_id="insert_data",
        sql=sql_insert_data
    )

    t1 >> t2

Python Script 동작 확인

Airflow에서 DAG 실행하기


DAG가 리스팅되는 것을 확인 후 DAG를 실행한다.

정상적으로 실행 완료된 것을 확인한다.

Redshift 확인하기


Redshift 서비스에서 [쿼리 편집기] 또는 [쿼리 편집기 v2]에 접속한다.

테이블이 생성되고 데이터가 정상적으로 적재된 것을 확인할 수 있다.

profile
데이터 엔지니어링에 관심이 많은 홀로 삽질하는 느림보

0개의 댓글