
이번 포스팅에서는 3개의 수정 작업을 다룬다.
1. 에어플로우 예시 덱 삭제
2. 에어플로우 도커컴포즈에서 requirements.txt 설치
3. op_args, op_kwargs 관련

8080포트로 웹서버를 접속하면 49개의 예시 덱이 올려져있다.
삭제한다.

AIRFLOW_CORE_LOAD_EXAMPLES : 'true' 로 설정되어있다.
-> false로 수정

docker-compose up -d


덱이 빈 것 확인 완료
공식문서를 잘 읽어보면 답이 나온다.
https://airflow.apache.org/docs/docker-stack/build.html
원하는 작업은 에어플로우로 주기적인 requests로 몽고db에 적재하는 작업이었다.
추후 bigquery 작업도 에어플로우로 돌릴 예정이었다.
하지만 하라고 하는대로 했지만 안되는 문제가 발생햇다.
- Dockerfile
FROM apache/airflow:2.6.3
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r /requirements.txt- requirements.txt
pymongo
requests
sudo docker exec -it pshyeok_airflow-init_1 /bin/bash
cd /
requirements 파일이 해당 폴더에 없었다.
(https://velog.velcdn.com/images/pshyeok2/post/9cca3983-ac0e-4f08-b128-2e48f36112aa/image.png)
docker-compose.yaml 파일에서 airflow-webserver를 실행하기 전에 pymongo를 설치하게 세팅해놓으면 된다고 하였지만...
두개의 파일을 생성하고 docker-compose up을 실행했다.
는... 되지 않았다.
airflow-webserver는 계속 starting 상태에서 벗어나지 못하는 상태가 지속되었다.






문제가된 코드는 다음과 같다.
def update_movies_for_collections(urls):
for url in urls:
movies = fetch_movies_data(url)
update_movies_data(movies)
print(url)
print(movies)
if not movies:
print("Warning: The 'movies' list is empty. Skipping database insertion.")
continue
# MongoDB에 데이터 적재
client = pymongo.MongoClient("mongodb://주소")
db = client["movie"]
# Create the 'movies' collection if it doesn't exist
collection_name = "movies"
if collection_name not in db.list_collection_names():
db.create_collection(collection_name)
collection = db[collection_name]
for movie in movies:
existing_movie = collection.find_one({'title_kr': movie['title_kr'], 'OTT': movie['OTT']})
if existing_movie:
continue
# Remove the 'href' field from the movie data
movie.pop('href', None)
# Insert the movie into the MongoDB collection
collection.insert_one(movie)
update_movies_for_collections 함수에 인자를 전달하지 않고 있기 때문에 오류가 발생.
op_args 또는 op_kwargs를 사용하여 인자를 전달해야 한다.
ott_comming_update = PythonOperator( task_id='update_movies_for_collections', python_callable=update_movies_for_collections, op_args=[urls], dag=dag, )

정상 실행된 것을 볼 수 있었다.