[GCP] API로 받은 정보 Cloud Composer를 이용해서 GCS에 적재하기 - 4

HOU·2023년 3월 9일
0

gcp로pipline만들기

목록 보기
5/11
post-thumbnail

생각지도 못했던 오류들!

gcp에서는 덮어쓰기 기능이 없는듯하다.. 412 오류가 발생해서 이래저래 검색을 해보니, 똑같은 이름의 디렉토리에 파일명은 허용하지 않는 것 마냥. 오류가 계속해서 발생했다. 아니 왜 덮어쓰기가 안된단 말인가..😢😢😢 그래서 document도 뒤져보고 여러가지를 해보았다. 나의 삽질을 기록해본다.

나의 삽질들!

document를 보면서 오지게 삽질을 했는데! 여러가지 함수들을 찾았다. 그리고 물론 모르는 것들도 다수 발견했다.

  1. 덮어쓰기 하는 방법 찾기
  2. 단위테스트 중요도를 너무 느끼게 되었다.
  3. json encoding이 깨지는 문제를 발견했다.
  4. scheduler 사용에 문제점 발견

덮어쓰기 하는 방법 찾기

GCS는 기본적으로 덮어쓰기를 금지 하는거 같은 느낌이든다.. Object형태로 저장되기 때문일까..? 기본적으로 제공되는 update라는 함수와 update_storage_class 라는 함수가 있지만 이건 storage_class 를 변경하는 함수들이였다. 그리고 update 함수는 빌링관련 업데이트 같고..

몇 가지를 찾아보니 upload_from_string 이라는 함수가 있었다. 이것은 blob 파일 객체를 string의 형태로 만들어서 넣어주는데, 저장및 변경이 가능하다고 한다.

다른 한가지 방법으로는 blob.exists() 라는 함수를 활용하면 그 해당 blob명이 존재할 경우 boolean값을 반환해주는 함수가 존재했는데, 이 함수를 활용해서 적재하기 전에 내가 처음에 저장한 파일이 존재하는지 존재한다면 해당 파일을 지우고 다시 파일을 적재하는 코드를 짜볼까 했지만,, 빅데이터 관점으로 봣을 때 별로 효율적이지 못하다는 생각이 들었다.

하지만 string을 update 하는 건 효율적인가에 대해서 생각해봣을 때 이것 역시 효율적이지 못하다고 생각은 변하지 않는다.. 방법을 알고 싶다 ㅠㅠ

unit test!

airflow도 unittest 하는게 분명히 존재할것이다. 하지만 나는 아직 그정도는 못 도달했고, 함수 하나하나 테스트 해보면서 하기로 결정했다. 왜냐하면 덮어쓰기 하는 방법들을 하나하나 적용하고 dag을 돌려보고 하기에는 .. 너무 비효율적이였기 때문에..

local에서 테스트 할때는 IAM에 등록해야 되고 뭐 이래 저래 설정할게 많았다. 하지만 구글 코랩에서는 간단한 코드 몇줄이면 바로 적용이 되어서 코랩으로 진행하였다.

from google.colab import auth
auth.authenticate_user()
print('Authenticated')

위에 코드를 돌리고 실행하면 인증 완료다.!

json 파일을 받아오는 코드를 돌려보았다.

update하는 파일을 두고 테스트 해보았다. gcs에는 해당파일이 존재한다. 원래 다시 파일을 업로드 하면 오류가 발생하였다. upload_from_string함수를 사용하고 서는 오류가 발생하지 않았다.

from google.cloud import storage

def update_blob(bucket_name, blob_name, content):
    # Instantiate a client
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_string(json.dumps(content, ensure_ascii = False).encode(encoding="EUC-KR"))
    print(f"Blob {blob_name} in {bucket_name} was updated with new content: {content}.")

bucket_name = 'corded-palisade-378506-import'
blob_name = 'file.json'
file_path = './file.json'

# Example usage:
update_blob(bucket_name, blob_name, getJsondata())

getJsondata()란 함수를 만들어서 string으로 넣어줬더니 정상 작동한다! 하지만 세번째 문제 파일이 전부다 깨져있었다. 딱봐도 encoding 문제라는 생각이 들었다.

encoding..

encoding을 하는 여러 방법이 있는 건 알겠다. 왜인지 모르겠지만 기존에 알고 있던 방법으로 다 잘 되지 않아서 여러가지를 시도해보았다.

  1. dumps()안에 ensure_ascii = Faslse 넣기 , 이거 적용하니까 아스키코드로 변환이 안되긴하였지만 원하는게 아니였다.
  2. encode 함수를 사용해서 encoding을 UTF-8로 적용하였으나 원하는 형태로 바뀌지 않았고 EUC-KR로 변경하니 정상 작동 되었다.

그리고 여러가지를 확인해 보니 dump 와 dumps의 차이가 존재함을 확인 할 수 있었는데
dumps : Python dict object를 JSON 문자열로 변환할 수 있다.
dump : Json 파일에 write할 때 사용할 수 있는 차이가 존재했다.

자 이제 airflow에 적용을 해보니 몇번 시도해도 에러가 나지 않는다!! 이제 다 도달했다..

스케줄로 1시간에 한번씩 다시 읽고 적재만 해보자!!!

Scheduling

기존의 코드에 scheduler만 바꿔주면 제대로 돌아가겠지 하고 시도했는데 왠일 계속 변경되지 않고 Scheduling에 계속해서 None이라고 나와있는것이다.

확인해보니 stack overflow에 default_args에 schedule_interval을 설정하면 되는 줄 알았는데 DAG에 None 이라고 명시 되어 있기 때문에 안되는 것이였다. 그래서 보기론 default_args에 설정하는 것보다 DAG 바로 옆에 property를 우선시 하는거 처럼 보인다. 하지만 아니였다..ㅋㅋㅋ;;

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'schedule_interval':'*/20 * * * *',
    'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}

with DAG('facebook_to_gcs_dag', 
         default_args=default_args, 
         schedule_interval='None') as dag:

    t1 = PythonOperator(
        task_id='get_facebook_posts',
        python_callable=get_facebook_posts,
    )

    t2 = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=update_blob,
        op_kwargs={
            'bucket_name': 'corded-palisade-378506-import',
            'blob_name': 'file.json',
            'content': get_facebook_posts()
        }
    )

t1 >> t2

그래서 아래 코드로 수정했다. 이러면 돌아가야지 했지만 scheduled의 날짜가 1day 00:00:00으로 변경 되어 있었다. 아니. 왜 적용이 안되는거야.. 생각하고 기본적인 Airflow의 상태를 확인하는 곳의 코드를 확인해보니 schedule_intervaldefault_args가 아닌 DAG 안에 설정되어 었었다

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'schedule_interval':'*/20 * * * *',
    'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}

with DAG('facebook_to_gcs_dag', 
         default_args=default_args, 
         ) as dag:

    t1 = PythonOperator(
        task_id='get_facebook_posts',
        python_callable=get_facebook_posts,
    )

    t2 = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=update_blob,
        op_kwargs={
            'bucket_name': 'corded-palisade-378506-import',
            'blob_name': 'file.json',
            'content': get_facebook_posts()
        }
    )

t1 >> t2

그래서 코드를 다시 수정했다. 이렇게 수정하니까 원하는데로 스케줄러가 작동하는 것을 확인할 수 있었다. 여기서 주의 할 점은 처음에 20분씩 호출했는데, 하루 지나니까 호출 할 수 있는 수가 초과 되버렸다.. ㅠㅠ 이런;;;

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}

with DAG('facebook_to_gcs_dag', 
         default_args=default_args, 
         schedule_interval='@daily',
         max_active_runs=2,) as dag:

    t1 = PythonOperator(
        task_id='get_facebook_posts',
        python_callable=get_facebook_posts,
    )

    t2 = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=update_blob,
        op_kwargs={
            'bucket_name': 'corded-palisade-378506-import',
            'blob_name': 'file.json',
            'content': get_facebook_posts()
        }
    )

결론

  1. meta graph api 사용
  2. cloud composer 사용 및 스케줄 등록 (다양한 덱들은 좀 더 진행 예정)
  3. encoding 방법 숙지
  4. cloud storage의 기능에 대해 습득

정도 숙지 된 거 같다. airflow 기반의 composer는 굉장히 강력한 기능인 거 같다. 하지만 scheduling 좀 조심해야 할 거 같고.. 어떻게 하면 더 나이스하게 작성할 수 있을지 고민해보자!

개선할점

1.graph API 가 원하는 정보를 다 가져오지 않는 거 같다. 확인이 필요하다! page에 정보를 가지고 오고 싶은데 안된다.
1. gcs로 가져오지 않고 바로 bigquery로 전송하는 파이프 라인을 만들어보자!

profile
하루 한 걸음 성장하는 개발자

0개의 댓글