Cloud Storage(CS)라고 말하는 저장 서비스는 Google Cloud에 객체를 저장하는 서비스이다. 여기서 말하는 객체는 모든 형식의 파일로 구성된 변경할 수 없는 데이터 조각이다. 객체를 버킷이라는 컨테이너에 저장하며, 모든 버킷은 프로젝트와 연결되고 프로젝트를 조직 아래에서 그룹화할 수 있다.
examplein.org
라는 Google Cloud 조직을 만든다. Google Cloud 최상위 단계puppy.png
라는 이미지 파일과 같은 개별 파일이다.여기서 진짜 엄청나게 삽질을 많이 했는데.. document를 안 보고 이곳 적소 검색어로 찾아보고 작성하다가 삽질을 많이 했다. 😂
아래코드는 검색하며 찾은 gcs에 upload 하는 소스이다. 하지만 버전 문제 인지 이 소스는 정상적으로 작동하지 않았다. ㅠㅠ
def upload_to_gcs(**context):
# GCS에 접근하는 Hook 생성
gcs_hook = storage.GoogleCloudStorageHook()
# 파일 이름과 버킷 이름 지정
filename = 'facebook_posts.json'
bucket_name = 'my_bucket_name'
# 이전 task에서 반환된 JSON 파일 GCS에 업로드
gcs_hook.upload(bucket_name=bucket_name, object_name=filename, filename='/tmp/facebook_posts.json')
구글링을 해서 documnet에 CS에 파일 업로드 하는 방법에 대해 찾아보았고 google cloud document에 클라이언트 라이브러리에 자세히 나와있었다. document
from google.cloud import storage
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
# The ID of your GCS bucket
# bucket_name = "your-bucket-name"
# The path to your file to upload
# source_file_name = "local/path/to/file"
# The ID of your GCS object
# destination_blob_name = "storage-object-name"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
# Optional: set a generation-match precondition to avoid potential race conditions
# and data corruptions. The request to upload is aborted if the object's
# generation number does not match your precondition. For a destination
# object that does not yet exist, set the if_generation_match precondition to 0.
# If the destination object already exists in your bucket, set instead a
# generation-match precondition using its generation number.
generation_match_precondition = 0
blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)
print(
f"File {source_file_name} uploaded to {destination_blob_name}."
)
아래가 위에 것을 반영한 코드이다.
import datetime
import json
from google.cloud import storage
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
import logging
import os
def get_facebook_posts():
# Your code to fetch data from Facebook API
url = "https://graph.facebook.com/v16.0/me/posts"
access_token = "받은코드" #string값을 넣어주세요.
params = {
"access_token": access_token
}
response = requests.get(url, params=params)
json_data = response.json()
# 로그 추가
print(f"Response: {json_data}")
# 여기부터 안써지네 .. ?
print("file writing start")
with open('file.json', 'w') as f:
json.dump(json_data, f)
print("file writing end")
path = os.getcwd()
print(f"directory {path}, file list: {os.listdir()}")
return json_data
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
generation_match_precondition = 0
print("file upload start")
blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime(2023, 3, 7),
'retries': 1,
}
with DAG('facebook_to_gcs_dag', default_args=default_args, schedule_interval=None) as dag:
get_facebook_posts_task = PythonOperator(
task_id='get_facebook_posts',
python_callable=get_facebook_posts,
)
upload_to_gcs_task = PythonOperator(
task_id='upload_to_gcs',
python_callable=upload_to_gcs,
op_kwargs={
'bucket_name': 'corded-palisade-378506-import',
'source_file_name': './file.json',
'destination_blob_name': 'file.json',
}
)
get_facebook_posts_task >> upload_to_gcs_task # get_facebook... 을 실행하고 완료되면 upload_to_gcs_task에 실행할 것
위에 코드를 보면 json 파일을 저장해 놓았는데 upload_to_gcs
정상적으로 사용했음에도 불구하고 upload_to_gcs_task
에서 계속해서 오류가 발생했다. 확인해 보니 file을 찾을 수 없다는 오류였다.
그래서 파일을 만들어주는 코드를 데이터 파일을 받아오는 곳에 추가하였다.
dag을 돌리니 정상작동 하였고 원하는 곳에 json 파일이 추가된 것도 확인할 수 있었다.
but.. DAG이 airflow에 올라가는데 시간이 조금 걸리는 거 같다.. 꼭 저렇게 해줘야 할까? return 받는 값을 그냥 파일로 내릴 순 없을까? 궁금하다.!
이제 정상적으로 만들긴 했다. ㅎㅎㅎ 하지만 변경 사항을 좀 더 추가하자면 dag에 스케줄링을 추가하고 싶다.! 그리고 but에 적어 놓은 것도 적용해 봐야겠다.!