Cloud Composer는 데이터 분석 워크플로우를 오케스트레이션 해주는 Apache Airflow의 구글 클라우드 managed 리소스이다.
프로그래밍 방식으로 워크플로를 작성, 예약 및 모니터링하는 플랫폼
Airflow를 사용하여 작업의 DAG(Directed Acyclic Graphs)로 워크플로를 작성.
워크플로 안에 Operator를 이용해 Task를 만들어 담을 수 있다.
Cloud Shell 활성화
Composer는 쿠버네티스 엔진을 기반으로 한 배포 환경을 가지고 있기 때문에 쿠버네티스 API를 활성화 해줘야 한다.
Kubernetes Engine API
API 활성화
Composer 인스턴스가 생성되는데는 10분 이상이 걸린다.
Cloud Storage에 버킷도 하나 생성해준다.
Airflow
Cloud Storage에 DAG 업로드
DAG를 업로드하려면 hadoop_tutorial.py를 Cloud Shell에서 Composer환경 생성 시 자동으로 생성된 Cloud Storage 버킷 dags폴더에 파일을 업로드 한다.
hadoop_tutorial.py를 열어보면 알겠지만 우리는 아래의 텍스트 파일에서 WordCount를 해볼 것이다.
rose.txt
What's in a name? That which we call a rose
By any other name would smell as sweet.
gsutil cp gs://cloud-training/datawarehousing/lab_assets/hadoop_tutorial.py gs://<Composer가 만들어지면서 생성된 버킷DAGs폴더 경로>
DAG 파일이 dags 폴더에 추가되면 Cloud Composer는 DAG를 Airflow에 추가하고 자동으로 예약
DAG 변경은 3-5분 이내에 발생한다.
DAG 파일을 Cloud Storage의 dags 폴더에 업로드하면 Cloud Composer가 파일을 파싱
오류가 발견되지 않으면 워크플로 이름이 DAG 목록에 나타나고 워크플로가 즉시 실행되도록 대기열에 추가된다.
각 작업 위에 마우스를 올려 놓으면 해당 상태가 표시된다. 테두리는 상태를 나타낸다.
녹색 테두리 = 실행 중, 빨간색 = 실패 등등
이 워크플로우 코드에는 세가지 작업을 위해 다음 연산자들을 사용한다.
Apache Hadoop 단어 수 작업을 모니터링하는 작업.
DAG에서 모든 단계가 완료되면 각 단계에 짙은 녹색 테두리가 생긴다. 또한 생성된 Dataproc 클러스터가 이제 삭제된다.
Cloud Storage의 버킷에서 wordcount확인