[GCP] API로 받은 정보 Cloud Composer를 이용해서 GCS에 적재하기 - 2

HOU·2023년 3월 7일
0

gcp로pipline만들기

목록 보기
3/11
post-thumbnail

Cloud Composer(CC)란?

클라우드 및 온프레미스 데이터 센터 전체의 워크플로 파이프라인을 생성, 예약, 모니터링, 관리할 수 있는 완전 관리형 워크플로 조정 서비스이다.

워크플로?
workflow 는 작업 절차를 통한 정보 또는 업무의 이동을 의미하며, 작업 흐름이라고도 부른다. 더 자세히 말해, 워크플로는 작업 절차의 운영적 측면이다. 업무들이 어떻게 구성되고, 누가 수행하며, 순서가 어떻게 되며, 어떻게 동기화를 시킬지, 업무를 지원하기 위한 정보가 어떻게 흐르는지 그리고 업무가 어떻게 추적되는지이다. 위키피디아

  1. 작업의 흐름도
  2. 작업 절차
  3. 업무의 이동성

이 3가지 키워드로 정리할 수 있다.

CloudComposer(CC) 는 Airflow를 기반으로 Python으로 작동한다. CC의 장점은 Airflow를 로컬에서 사용하면 자원관리, 오버헤드 발생등 어려운 문제들이 존재하는데 이 부분을 구글이 처리해줘서 그냥 dag 만드는 거에 집중하면 된다.

Apache Airflow 와 Cloud Composer 구성


워크플로, DAG, 태스크

워크플로는 위에서 설명했지만 빅데이터 관점으로 보면 데이터 수집, 변환, 분석, 활용을 위한 일련의 태스크를 나타낸다. 그리고 이 워크플로는 DAG을 통해서 생성된다.

DAG은 관계 및 종속 항목을 반영하는 방식으로 구성된 예약하고 실행하려는 태스크의 모음이다. DAG는 코드를 사용하여 DAG 구조를 정의하는 Python 스크립트에서 생성됩니다. DAG은 아래와 같은 일을 수행할 수 있다. 좀 좋다!

  • 수집을 위한 데이터 준비
  • API 모니터링
  • 이메일 보내기
  • 파이프라인 실행

DAG은 각 구성 task의 기능과 관련되지 않는다. 이러한 이유는 각 태스크가 적절한 시점에, 올바른 순서로 실행되거나 올바른 문제 처리를 통해 실행되도록 하기 위함이다.

GCP에서 말하는 DAG

Task는 실행하려고 하는 코드와 같다.!

내가 하려고 하는 일

그래서 나는 두개의 태스크를 만들 생각이다.

  1. json 파일을 받아와 저장하는 task
  2. GCS로 적재하는 task를 만들 생각이다.

아! airflow는 여러가지 DAG을 제공하는데, DAG 안에서 python 코드를 사용하기 위해서는 PythonOperator를 사용해야한다.

import datetime
import json
import logging

def get_facebook_posts():
    # Your code to fetch data from Facebook API
    url = "https://graph.facebook.com/v16.0/me/posts"
    access_token = "발급받은 토큰" #string형태로 넣어주세요!
    params = {
        "access_token": access_token
    }
    response = requests.get(url, params=params)
    json_data = response.json()
    
    # 로그 추가
    logging.info(f"Response: {json_data}")

    with open('file.json', 'w') as f:
        json.dump(json_data, f)

    return json_data

위와같이 코드를 작성했다. 이 상태로도 잘 돌아가긴했는데.. 개인적으로 추후에 진행할 사항 중 하나로는 return을 받는 json_data를 바로 적재 할 수 없을까 하는부분이다

이렇게 짜놓은 파이썬 함수를 dag으로 만들어 1차 테스트를 진행했다.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
import json

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2022, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

def get_facebook_posts():
    url = "https://graph.facebook.com/v16.0/me/posts"
    access_token = "발급받은 토큰" #string 형태로 입력해주세요.
    params = {
        "access_token": access_token
    }
    
    response = requests.get(url, params=params)
    response_json = response.json()
    return response_json
    
dag = DAG('my_dag', default_args=default_args, schedule_interval='@daily')

get_facebook_posts_operator = PythonOperator(
    task_id='get_facebook_posts',
    python_callable=get_facebook_posts,
    dag=dag
)

get_facebook_posts_operator

위 코드로 테스트를 진행했다. Cloud Composer를 사용하면 dag을 업로드 할 수 있는 버킷이 만들어진다.! 저 dag안에 작성한 python 파일을 넣어줘야 작동한다.


위 사진 보면 airflow, DAG목록, 로그, DAG폴더가 보인다.

  • airflow : airflow가 잘 돌아가는지 확인 하는 UI 제공 화면이다.
  • dag목록 : gcp에서 제공하는 dag목록과 모니터링 할 수있는 화면 제공
  • 로그 : auditlog에 기록된것을 확인할 수 있다.
  • DAG : dag을 업로드 할 수 있는 GCS로 이동한다.

DAG 등록

아래 사진에서 dags라는 폴더에 들어가거나 위의 사진에서 dag폴더를 클릭하면 바로 dag폴더 하위로 들어간다.

dag파일을 업로드 해주고 airflow ui를 들어가서 확인해보자!

facebook_test로 만든 dag이 정상적으로 돌아갓음을 확인 할 수 있다.

제대로 데이터가 불려졌는지도 확인해보자. 난 airflow ui에서 로그 보는 방법을 찾는데 오래 걸렸다.. 😂


Graph 탭을 클릭하고 get_facebook_posts task를 클릭하자!


클릭하면 나오는 모달창에서 Log클릭!

클릭하면 로그가 나온다. 좀 쉬운 방법 있으면 좀알려주세요 ㅠㅠ

확인해보니까 json파일로 잘 받아오는 걸 확인 할 수 있었다.!

결론

Cloud Composer는 굉장히 강력한 기능인거 같다. 좀 만능같은 느낌 아주 좋았다. 다음엔 이 파일을 Google Cloud Storage에 담는 거 해보자!

profile
하루 한 걸음 성장하는 개발자

0개의 댓글