[GCP] API로 받은 정보 Cloud Composer를 이용해서 GCS에 적재하기 - 3

HOU·2023년 3월 9일
0

gcp로pipline만들기

목록 보기
4/11
post-thumbnail

GCS

Cloud Storage(CS)라고 말하는 저장 서비스는 Google Cloud에 객체를 저장하는 서비스이다. 여기서 말하는 객체는 모든 형식의 파일로 구성된 변경할 수 없는 데이터 조각이다. 객체를 버킷이라는 컨테이너에 저장하며, 모든 버킷은 프로젝트와 연결되고 프로젝트를 조직 아래에서 그룹화할 수 있다.

Gcp 조직

  • 조직 : Example Inc. 라는 회사에서 examplein.org라는 Google Cloud 조직을 만든다. Google Cloud 최상위 단계
  • 프로젝트 : Example Inc.는 여러 애플리케이션을 빌드하고 각 애플리케이션은 프로젝트와 연결된다. 각 프로젝트에는 Cloud Storage API 세트와 리소스가 있다.
  • 버킷: 각 프로젝트에는 객체를 저장할 컨테이너인 버킷이 여러 개 존재할 수 있다.
  • 객체 : puppy.png 라는 이미지 파일과 같은 개별 파일이다.

GCS에 적재하는 python code를 작성해 보자

여기서 진짜 엄청나게 삽질을 많이 했는데.. document를 안 보고 이곳 적소 검색어로 찾아보고 작성하다가 삽질을 많이 했다. 😂

아래코드는 검색하며 찾은 gcs에 upload 하는 소스이다. 하지만 버전 문제 인지 이 소스는 정상적으로 작동하지 않았다. ㅠㅠ


def upload_to_gcs(**context):
    # GCS에 접근하는 Hook 생성
    gcs_hook = storage.GoogleCloudStorageHook()

    # 파일 이름과 버킷 이름 지정
    filename = 'facebook_posts.json'
    bucket_name = 'my_bucket_name'

    # 이전 task에서 반환된 JSON 파일 GCS에 업로드
    gcs_hook.upload(bucket_name=bucket_name, object_name=filename, filename='/tmp/facebook_posts.json')

구글링을 해서 documnet에 CS에 파일 업로드 하는 방법에 대해 찾아보았고 google cloud document에 클라이언트 라이브러리에 자세히 나와있었다. document

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

아래가 위에 것을 반영한 코드이다.

import datetime
import json
from google.cloud import storage
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
import logging
import os

def get_facebook_posts():
    # Your code to fetch data from Facebook API
    url = "https://graph.facebook.com/v16.0/me/posts"
    access_token = "받은코드" #string값을 넣어주세요.
    params = {
        "access_token": access_token
    }
    response = requests.get(url, params=params)
    json_data = response.json()

    # 로그 추가
    print(f"Response: {json_data}")
    # 여기부터 안써지네 .. ? 
    print("file writing start")
    with open('file.json', 'w') as f:
        json.dump(json_data, f)

    print("file writing end")
    path = os.getcwd()

    print(f"directory {path}, file list: {os.listdir()}")

    return json_data

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    generation_match_precondition = 0
    print("file upload start")
    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2023, 3, 7),
    'retries': 1,
}

with DAG('facebook_to_gcs_dag', default_args=default_args, schedule_interval=None) as dag:

    get_facebook_posts_task = PythonOperator(
        task_id='get_facebook_posts',
        python_callable=get_facebook_posts,
    )

    upload_to_gcs_task = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=upload_to_gcs,
        op_kwargs={
            'bucket_name': 'corded-palisade-378506-import',
            'source_file_name': './file.json',
            'destination_blob_name': 'file.json',
        }
    )

get_facebook_posts_task >> upload_to_gcs_task # get_facebook... 을 실행하고 완료되면 upload_to_gcs_task에 실행할 것

위에 코드를 보면 json 파일을 저장해 놓았는데 upload_to_gcs 정상적으로 사용했음에도 불구하고 upload_to_gcs_task에서 계속해서 오류가 발생했다. 확인해 보니 file을 찾을 수 없다는 오류였다.

그래서 파일을 만들어주는 코드를 데이터 파일을 받아오는 곳에 추가하였다.

dag을 돌리니 정상작동 하였고 원하는 곳에 json 파일이 추가된 것도 확인할 수 있었다.

but.. DAG이 airflow에 올라가는데 시간이 조금 걸리는 거 같다.. 꼭 저렇게 해줘야 할까? return 받는 값을 그냥 파일로 내릴 순 없을까? 궁금하다.!

결론

이제 정상적으로 만들긴 했다. ㅎㅎㅎ 하지만 변경 사항을 좀 더 추가하자면 dag에 스케줄링을 추가하고 싶다.! 그리고 but에 적어 놓은 것도 적용해 봐야겠다.!

profile
하루 한 걸음 성장하는 개발자

0개의 댓글