# Redshift Connection
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_Redshift_connection(autocommit = True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def get_country_info(url):
# requests
data = requests.get(url)
records = []
for row in data.json():
records.append([row["name"]["official"], row["population"], row["area"]])
return records
conn.cursor()
를 넣어주어야한다.country varchar(150)
: 길이가 official이기 때문에 매우 길다.cur.execute(sql, (r[0], r[1], r[2]))
: 값들 중에 's로 끝나는 단어에서 에러가 많이 발생하였다. 원래는 sql에 값들을 모두 넣고 진행하였으나, GPT의 추천을 받아 다음과 같이 작성하였고 해결하였다.@task
def load(schema, table, records):
# load start
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE {schema}.{table}(
country varchar(150),
population bigint,
area varchar(150)
)
;""")
for r in records:
sql = f"INSERT INTO {schema}.{table} VALUES (%s, %s::bigint, %s);"
print(sql, (r[0], r[1], r[2]))
cur.execute(sql, (r[0], r[1], r[2]))
cur.execute("COMMIT;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id='CountryInfo',
start_date=datetime(2023, 12, 13),
schedule='30 6 * * Sat',
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
url = Variable.get("country_url")
country_data=get_country_info(url)
load("areacmzl","country_info",country_data)
VSCODE로 WSL의 환경을 불러오다가 permission Error가 발생
: root계정으로 접근하지 않은 점이 문제였다. root로 접속하거나, sudo chown -R <계정명> <작업폴더>
권한을 변경해주었다.
airflow dags test <DAG_id> <time>
을 이용하여 test 진행 시에 DAG_id를 DAG파일의 이름으로 착각하여 에러가 발생하였다.
: DAG_id를 파일명과 동일하게 수정하였다.