6. airflow-day3-6

data_hamster·2023년 6월 12일
0

학습주제
숙제

학습내용
http://www.kocw.net/home/search/majorCourses.do#subject/03
강의가 오픈소스로 열려있다 괜찮은 것 같다.
혼자 공부하면 헷갈리니 저걸 수강하자.


https://velog.io/@skh951225/%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%97%94%EC%A7%80%EB%8B%88%EC%96%B4%EB%A7%81-%EC%8A%A4%ED%84%B0%EB%94%94-5%EC%A3%BC%EC%B0%A8

Q3. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? :)

password, secret, passwd, authorization, api_key, apikey, access_token

https://restcountries.com

https://restcountries.com/v3/all
-> 엔드포인트, 별도의 API 키 없이 부를 수 있음.
부르면 나라별로 JSON 레코드가 리턴될 것임.

결과에서 아래 3개 정보를 추출하여 각자 스키마 밑에 테이블 생성.
나라도 여러 정보가 있는데 오피셜
1주일 한번 토요일 6시 30분 실행. 대그 스케줄. 크론탭 문법으로 세팅.
개인 리포에

숙제


JSON은 다음과 같은 형태를 가지고 있다.
파이썬에서 웹에서 JSON을 가져와서 redshift 테이블에 적재하는 작업은 대략적으로.

  1. 웹에서 JSON 데이터 가져오기
  2. JSON 데이터를 판다스 데이터 프레임으로 변환하기
  3. 판다스 데이터프레임을 SQL 테이블로 전송하기.

pip install pandas requests


requests.get(url)로 전체를 받아서
response.json()으로 json을 받음.

HTTP 응답의 본문이 JSON 형태로 되어 있을 때, response.json() 함수를 호출하면 해당 JSON 데이터를 파이썬 객체로 변환하여 반환합니다. 이때, 반환되는 파이썬 객체의 타입은 JSON 데이터의 최상위 구조에 따라 결정됩니다.
만약 JSON 데이터의 최상위 구조가 {} 즉, 객체(Object)라면, 파이썬에서는 이를 사전(Dictionary)으로 변환하여 반환합니다.
만약 JSON 데이터의 최상위 구조가 [] 즉, 배열(Array)라면, 파이썬에서는 이를 리스트(List)로 변환하여 반환합니다.
따라서 response.json()의 결과가 리스트라는 것은 원래 받은 JSON 데이터의 최상위 구조가 배열이었을 가능성이 높습니다

최상위 구조는 list임 그리고 리스트의 각 요소는 나라의 정보를 갖고 있음.
for 문으로 각 컬럼의 값을 추출함.


for 문으로 접근하니 나라별 이름이 추출됨. 모든 정보가 온전히 있다고 가정. get을 안쓰고 바로 [][] 형식으로 접근.

population, area도 시도.

잘 뜸

대략적인 extract는 끝난거 같음.

transform은 굳이 안해도 될꺼 같음.

load에 분리해서 넣기 보다 records로 한번에 넘기기로 함.

schedule은 크론식으로 매주 토요일 6시 30분이면 '30 6 * * 6'


파일을 생성해서
에어플로우로 돌려보니
requests가 없어서 실패.
환경설정에 추가해준다.

컨테이너 내리고 다시 up --build


돌리니까 잘 돌아간거 같다.


웹 UI에선 실패함

03ef6082f0d2
*** Reading local file: /opt/airflow/logs/dag_id=WorldInfo/run_id=manual__2023-06-12T14:53:14.486068+00:00/task_id=load/attempt=1.log
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: WorldInfo.load manual__2023-06-12T14:53:14.486068+00:00 [queued]>
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: WorldInfo.load manual__2023-06-12T14:53:14.486068+00:00 [queued]>
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1279} INFO - 
--------------------------------------------------------------------------------
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1280} INFO - Starting attempt 1 of 1
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1281} INFO - 
--------------------------------------------------------------------------------
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1300} INFO - Executing <Task(_PythonDecoratedOperator): load> on 2023-06-12 14:53:14.486068+00:00
[2023-06-12, 14:53:18 UTC] {standard_task_runner.py:55} INFO - Started process 1795 to run task
[2023-06-12, 14:53:18 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'WorldInfo', 'load', 'manual__2023-06-12T14:53:14.486068+00:00', '--job-id', '52', '--raw', '--subdir', 'DAGS_FOLDER/WorldInfo.py', '--cfg-path', '/tmp/tmpdfmp5hwx']
[2023-06-12, 14:53:18 UTC] {standard_task_runner.py:83} INFO - Job 52: Subtask load
[2023-06-12, 14:53:18 UTC] {task_command.py:388} INFO - Running <TaskInstance: WorldInfo.load manual__2023-06-12T14:53:14.486068+00:00 [running]> on host 03ef6082f0d2
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1509} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=WorldInfo
AIRFLOW_CTX_TASK_ID=load
AIRFLOW_CTX_EXECUTION_DATE=2023-06-12T14:53:14.486068+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-06-12T14:53:14.486068+00:00
[2023-06-12, 14:53:18 UTC] {WorldInfo.py:44} INFO - load started
[2023-06-12, 14:53:18 UTC] {base.py:73} INFO - Using connection ID 'redshift_dev_db' for task execution.
[2023-06-12, 14:53:18 UTC] {logging_mixin.py:137} INFO - syntax error at or near "{"
LINE 1: DROP TABLE IF EXISTS {schema}.{table};
                             ^
[2023-06-12, 14:53:18 UTC] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/decorators/base.py", line 217, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 192, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/WorldInfo.py", line 49, in load
    cur.execute("DROP TABLE IF EXISTS {schema}.{table};")
psycopg2.errors.SyntaxError: syntax error at or near "{"
LINE 1: DROP TABLE IF EXISTS {schema}.{table};
                             ^

[2023-06-12, 14:53:18 UTC] {taskinstance.py:1323} INFO - Marking task as FAILED. dag_id=WorldInfo, task_id=load, execution_date=20230612T145314, start_date=20230612T145318, end_date=20230612T145318
[2023-06-12, 14:53:18 UTC] {standard_task_runner.py:105} ERROR - Failed to execute job 52 for task load (syntax error at or near "{"
LINE 1: DROP TABLE IF EXISTS {schema}.{table};
                             ^
; 1795)
[2023-06-12, 14:53:18 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2023-06-12, 14:53:18 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check

디버깅 잘해준다. get_world_info 는 성공했는데,
load에서 실패함

f string 미적용. 적용함

INSERT INTO 에서 나라 이름이 ''로 감싸져 있어 넣지 못했음.
cur.execute(f"INSERT INTO {schema}.{table} VALUES (%s, %s, %s);", (r[0], r[1], r[2]))
로 적용해봄

성공함


DB에도 들어가 있음을 확인.

멱등성 검사시도.

대그 한번더 수행함.

성공

다시 DB를 불러와봄

행 수의 변함 없음.

profile
반갑습니다 햄스터 좋아합니다

0개의 댓글