GCP04.airflow docker-compose 수정 작업

hyeok2·2023년 7월 31일
0

Airflow_stu

목록 보기
3/5
post-thumbnail

이번 포스팅에서는 3개의 수정 작업을 다룬다.
1. 에어플로우 예시 덱 삭제
2. 에어플로우 도커컴포즈에서 requirements.txt 설치
3. op_args, op_kwargs 관련

8080포트로 웹서버를 접속하면 49개의 예시 덱이 올려져있다.
삭제한다.

수정작업

1.vim docker-compose.yaml


AIRFLOW_CORE_LOAD_EXAMPLES : 'true' 로 설정되어있다.
-> false로 수정

2.다시 도커 컴포즈 업을 실행한다.

docker-compose up -d


덱이 빈 것 확인 완료

도커컴포즈에서 requirements.txt

공식문서를 잘 읽어보면 답이 나온다.
https://airflow.apache.org/docs/docker-stack/build.html

1. 문제점

원하는 작업은 에어플로우로 주기적인 requests로 몽고db에 적재하는 작업이었다.
추후 bigquery 작업도 에어플로우로 돌릴 예정이었다.
하지만 하라고 하는대로 했지만 안되는 문제가 발생햇다.

시도01.Dockerfile, requirements.txt 생성

  • Dockerfile
    FROM apache/airflow:2.6.3
    COPY requirements.txt /
    RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r /requirements.txt
  • requirements.txt
    pymongo
    requests

sudo docker exec -it pshyeok_airflow-init_1 /bin/bash
cd /
requirements 파일이 해당 폴더에 없었다.

(https://velog.velcdn.com/images/pshyeok2/post/9cca3983-ac0e-4f08-b128-2e48f36112aa/image.png)

시도02.docker-compose.yaml 파일 수정

docker-compose.yaml 파일에서 airflow-webserver를 실행하기 전에 pymongo를 설치하게 세팅해놓으면 된다고 하였지만...
두개의 파일을 생성하고 docker-compose up을 실행했다.
는... 되지 않았다.
airflow-webserver는 계속 starting 상태에서 벗어나지 못하는 상태가 지속되었다.

시도03.docker-compose.yaml 파일 수정 2

  • image를 주석처리
  • 주석처리 되어있던 build를 주석 해제

    docker-compose up 을 하였을때 각 모듈을 설치하는 과정을 보여준다.

    성공적으로 설치한 것도 볼 수 있다.

또 오류


문제가된 코드는 다음과 같다.

def update_movies_for_collections(urls):
    for url in urls:
        movies = fetch_movies_data(url)
        update_movies_data(movies)

        print(url)
        print(movies)

        if not movies:
            print("Warning: The 'movies' list is empty. Skipping database insertion.")
            continue

        # MongoDB에 데이터 적재
        client = pymongo.MongoClient("mongodb://주소")
        db = client["movie"]

        # Create the 'movies' collection if it doesn't exist
        collection_name = "movies"
        if collection_name not in db.list_collection_names():
            db.create_collection(collection_name)

        collection = db[collection_name]

        for movie in movies:
            existing_movie = collection.find_one({'title_kr': movie['title_kr'], 'OTT': movie['OTT']})

            if existing_movie:

                continue

            # Remove the 'href' field from the movie data
            movie.pop('href', None)

            # Insert the movie into the MongoDB collection
            collection.insert_one(movie)

update_movies_for_collections 함수에 인자를 전달하지 않고 있기 때문에 오류가 발생.
op_args 또는 op_kwargs를 사용하여 인자를 전달해야 한다.

ott_comming_update = PythonOperator(
    task_id='update_movies_for_collections',
    python_callable=update_movies_for_collections,
    op_args=[urls],
    dag=dag,
)


정상 실행된 것을 볼 수 있었다.

profile
땅을 파다보면 흙과 물을 보겠지만, 코드를 파다보면 답이 보일것이다.

0개의 댓글