아래 연결 정보 입력 후 [Test] 진행하여 연결을 확인한다.
dev
입력5439
apache-airflow-providers-amazon
모듈을 설치해준다.
pip3 install apache-airflow-providers-amazon
Python에서는 다음과 같이 import한다.
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
connect_to_redshift.py
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
sql_create_table = """
CREATE TABLE IF NOT EXISTS offices (
officeCode varchar(10) NOT NULL,
city varchar(50) NOT NULL,
phone varchar(50) NOT NULL,
addressLine1 varchar(50) NOT NULL,
addressLine2 varchar(50) DEFAULT NULL,
state varchar(50) DEFAULT NULL,
country varchar(50) NOT NULL,
postalCode varchar(15) NOT NULL,
territory varchar(10) NOT NULL,
PRIMARY KEY (officeCode)
);
"""
sql_insert_data = """
insert into offices (officeCode,city,phone,addressLine1,addressLine2,state,country,postalCode,territory) values
('1','San Francisco','+1 650 219 4782','100 Market Street','Suite 300','CA','USA','94080','NA'),
('2','Boston','+1 215 837 0825','1550 Court Place','Suite 102','MA','USA','02107','NA'),
('3','NYC','+1 212 555 3000','523 East 53rd Street','apt. 5A','NY','USA','10022','NA'),
('4','Tokyo','+81 33 224 5000','4-1 Kioicho',NULL,'Chiyoda-Ku','Japan','102-8578','Japan'),
('5','Sydney','+61 2 9264 2451','5-11 Wentworth Avenue','Floor #2',NULL,'Australia','NSW 2010','APAC'),
('6','London','+44 20 7877 2041','25 Old Broad Street','Level 7',NULL,'UK','EC2N 1HN','EMEA')
;
"""
with DAG(
dag_id = "connect_to_redshift",
start_date = datetime(2022, 1, 1),
schedule_interval = None,
tags = ['Redshift']
) as dag:
t1 = RedshiftSQLOperator(
task_id="create_table",
sql=sql_create_table
),
t2 = RedshiftSQLOperator(
task_id="insert_data",
sql=sql_insert_data
)
t1 >> t2
DAG가 리스팅되는 것을 확인 후 DAG를 실행한다.
정상적으로 실행 완료된 것을 확인한다.
Redshift 서비스에서 [쿼리 편집기] 또는 [쿼리 편집기 v2]에 접속한다.
테이블이 생성되고 데이터가 정상적으로 적재된 것을 확인할 수 있다.