Airflow + SageMaker ML 파이프라인 개발 삽질기

YGHwang·2022년 5월 29일
2
post-thumbnail

아직 많이 부족하지만 실전을 통해 체득한 지식과 노하우를 기록해보려고 합니다. 잘못된 내용이 있다면 지적도 부탁드립니다! 🤚

지금 다니는 회사에서는 AWS를 사용하고 있습니다. 전 회사에서는 BigQuery(빛☀️쿼리..), Composer(Airflow) 조합으로 GCP 기반 데이터 파이프라인을 구축하고 운영했었는데, 이번 기회에 AWS를 통해 ML Pipeline을 만들면서 겪은 시행착오(?)를 간단하게 기록해보려고 합니다.

MWAA

Amazon Managed Workflow of Apache Airflow (MWAA)는 관리형 오케스트레이션 서비스로, 사실상 Apache Airflow를 AWS에서 제공하는 것이라 볼 수 있습니다. 요즘엔 Airflow를 모르는 분이 없을 정도로 많이 사용하는 스케줄러인데요, Data Lakehouse을 구축하기 위한 DataOps과 머신러닝 파이프라인을 운영하기 위한 MLOps 영역에서 많이 사용하는 것 같습니다. 원래 작년 초부터 사용하고 싶었으나, Asia/Seoul 리전에는 지원이 안되고 있었는데 2021년 9월 경에 출시되어 드디어 사용하게 되었습니다.


SageMaker

SageMaker는 머신러닝에 특화된 완전 관리형 인프라로, 모델을 직접 구현하지 않아도 각종 컴포넌트를 통해 훈련, 검증, 배포를 손쉽게 할 수 있는 기능을 제공합니다. 특히 ML Engineer가 해야 할 업무를 SageMaker에서 제공하는 컴포넌트를 활용하여 구현할 수 있도록하여 거의 SaaS(Software-as-a-Service)로 여겨지는 추세입니다.

이처럼 ML에 필요한 각 단계를 컴포넌트화하여 추상화된 기능으로 제공합니다. 다만 이번 글에서는 해당 기능을 사용하기 보다는, 직접 개발한 ML 모듈을 SageMaker의 노트북 인스턴스를 통해 일배치로 실행하기 위한 파이프라인 설계 과정 위주로 작성했습니다.

(좀 있어 보이고 싶어서 들고온 SageMaker 설명 사진.. 머쓱)

MWAA + SageMaker 파이프라인

저는 아래의 단계를 통해 배치 파이프라인을 구현했으며, 각 단계를 모두 Airflow의 Task로 실행하도록 개발했습니다.

  • (Task 1) 노트북 인스턴스 lifecycle configurations 생성
  • (Task 2) 노트북 인스턴스 생성 및 ML 모듈 실행 (데이터 로드, 모델 학습, 예측 실행, 예측 값 DB 업데이트 등)
  • (Task 3) 노트북 인스턴스 중지 및 삭제
  • (Task 4) 노트북 인스턴스 lifecycle configurations 삭제
(위에서 나열한 Task를 DAG에서 구현한 모습)

노트북 인스턴스 생성 & 삭제

그러면 각 Task를 통해 SageMaker 노트북 인스턴스의 생성과 삭제를 어떻게 실행하는지 살펴보겠습니다.

GCP와 마찬가지로 AWS는 각 서비스를 CLI로 명령할 수 있는 SDK를 제공하는데요, Python에서는 Boto3(공식 문서)라는 라이브러리로 이 기능을 동일하게 사용할 수 있습니다. 따라서 이것을 통해 SageMaker의 노트북 생성과 삭제를 직접 컨트롤할 수 있습니다. 즉, Boto3로부터 SageMaker 클라이언트 객체를 생성할 수 있고, 해당 객체에서 필요한 메서드를 호출하여 코드 레벨에서 명령할 수 있습니다.

import boto3
client = boto3.client('sagemaker')

이제 Task 순서 별로 필요한 함수를 만들어보겠습니다.

Task 1) lifecycle configurations 생성

노트북 인스턴스를 새로 생성하거나, 혹은 중지했던 것을 다시 시작할 때 해당 인스턴스 환경 설정 셋팅에 필요한 명령어를 사전에 정의해 놓을 수 있는 기능

SageMaker 노트북 인스턴스를 생성하여 들어가보면 사진과 같이 다양한 Conda 환경을 제공하는데요, 이 가상환경 별로 머신러닝에 필요한 라이브러리들이 사전에 설치되어 있는 형태입니다. 여기에는 범용적으로 사용하는 Scikit-learn, Tensorflow, Torch 등이 설치 되어있긴 하지만, 로컬에서 개발한 코드가 해당 라이브러리 버전과 일치하지 않을 수 있고 혹은 새롭게 설치가 필요한 라이브러리가 있을 수 있습니다.

(SageMaker 노트북 인스턴스에서 조회한 Conda 가상환경)

그래서 로컬 개발 환경을 노트북 인스턴스에 동일하게 맞춰주기 위한 작업이 필요할 때가 있는데, 이것을 lifecycle configurations을 통해 자동화 할 수 있습니다. 예를 들어 새로운 Conda 가상환경을 생성하고, 해당 환경에 로컬에서 사용했던 requirements.txt를 그대로 설치할 수 있도록 정의할 수 있습니다.

#!/bin/bash
sudo -u ec2-user -i <<'EOF'
conda create -n new_conda_env_p38 python=3.8 -y --force # 새로운 가상환경 생성
source activate new_conda_env_p38

cd SageMaker # SageMaker 노트북 인스턴스에 default로 생성되어 있는 폴더 
mkdir new_folder # 새로운 폴더 생성
cd new_folder

# Github repo를 clone하여 필요한 라이브러리를 설치
username="my_github_account"
password="my_github_account_password"
GIT_URL="https://${username}:${password}@github.com/WORKSPACE/my-repository.git"
git clone ${GIT_URL}

pip install -r requirements.txt &

conda deactivate
EOF

그러면 이 작업을 DAG의 PythonOperator Task로 실행할 수 있도록 함수로 생성해야 합니다. 즉, 해당 shell을 읽어들여서 SageMaker의 lifecycle configurations를 생성할 수 있도록 하는 것입니다. 단, shell 스크립트 안에는 github 계정 등의 민감한 정보가 포함되어 있기 때문에 따로 파일로 관리하기 보다는 Airflow Variables을 통해 불러오도록 설계했습니다. 그래서 해당 함수에 content 파라미터를 만들었는데, Variables을 통해 읽어들인 shell을 string으로 전달받아서 그대로 lifecycle configurations를 생성하는 것입니다. (이것은 아래에서 다시 한번 설명합니다!)

import base64

def create_notebook_instance_lifecycle_config(NotebookInstanceLifecycleConfigName: str, content: str):
    content_bytes = content.encode("utf-8") 
    content_base64 = base64.b64encode(content_bytes)
    content_base64_str = content_base64.decode("utf-8")

    try:
        response = client.create_notebook_instance_lifecycle_config(
            NotebookInstanceLifecycleConfigName=NotebookInstanceLifecycleConfigName,
            OnStart=[
                {
                    "Content": content_base64_str
                },
            ]
        )
        print(" >>> NotebookInstanceLifecycleConfig is created. ")
        return response
    except Exception as err:
        return err
(lifecycle configurations을 통해 설치하는 라이브러리와 클론하는 github repository)

Task 2) 인스턴스 생성 및 ML 모듈 실행

앞서 설명한 lifecycle configurations를 토대로 노트북 인스턴스가 생성되게끔 해야합니다. 노트북 인스턴스로 사용할 이름과 lifecycle configurations를 입력 받도록 정의하면 됩니다. 특히 ML 모듈 실행을 위한 컴퓨팅 리소스를 잘 잡아줘야 하는데, 이것은 비용과 직결되는 문제이기 떄문에 상황에 맞게 선택하면 됩니다. 또, SubnetId, SecurityGroupIds, RoleArn의 파라미터도 필수로 입력해야 합니다. 이 부분은 VPC(네트워크 보안)와 관련된 문제이므로 본인이 사용하는 조직 내에서 사용하는 Security Group과 Role을 지정해주어야 합니다.

def create_notebook_instance(
        NotebookInstanceName: str, # 노트북 인스턴스로 사용할 이름
        LifecycleConfigName: str, # 노트북 인스턴스를 생성할 실행할 lifecycle configurations 
        RoleArn: str,
        SubnetId: str,
        SecurityGroupIds: str,
        InstanceType: str = "ml.t3.medium", # 노트북 인스턴스 타입
        DirectInternetAccess: str = "Enabled",
        VolumeSizeInGB: int = 50, # 노트북 인스턴스 용량
        DefaultCodeRepository: str = "my-github-repo", # 노트북 인스턴스가 생성되고 자동으로 Clone할 Repo
    ):
    try:
        response = client.create_notebook_instance(
            NotebookInstanceName=NotebookInstanceName,
            InstanceType=InstanceType,
            SubnetId=SubnetId,
            SecurityGroupIds=SecurityGroupIds,
            RoleArn=RoleArn,
            LifecycleConfigName=LifecycleConfigName,
            DirectInternetAccess=DirectInternetAccess,
            VolumeSizeInGB=VolumeSizeInGB,
            DefaultCodeRepository=DefaultCodeRepository,
            RootAccess=RootAccess
        )
        print(" >>> NotebookInstance is created. ")
        return response
    except Exception as err:
        return err

자 그런데.. 노트북 인스턴스까지 생성하는 건 알겠는데, 내가 실행하고자 하는 모듈은 어디에서 명령하는 걸까요? 🤔

가장 아름다운 그림은 노트북 인스턴스가 생성되면, Boto3로 생성한 client를 통해 해당 인스턴스 내의 터미널 CLI로 명령어를 전달하는 모습일 것입니다. 그런데 공식 문서에 나와있는 모든 함수를 다 찾아봤지만.. 이것을 제공하는 기능은 없는 것 같았습니다. SageMaker의 사상이 관리형이라 그런지는 몰라도 생성된 노트북 인스턴스 내에 특정 명령어를 전달할 수 있는 기능은 찾지 못했습니다. 🥲

결국.. lifecycle configurations를 활용하는 방법 뿐이 떠오르지 않았습니다. 노트북 인스턴스를 생성하면서 lifecycle configurations에 선언한 명령어를 실행할 때 파이썬 모듈까지 실행하도록 하는 것이죠..! 그래서 python 실행 코드가 담긴 shell을 실행하는 코드를 추가했습니다.

#!/bin/bash
sudo -u ec2-user -i <<'EOF'
conda create -n new_conda_env_p38 python=3.8 -y --force # 새로운 가상환경 생성
source activate new_conda_env_p38

cd SageMaker # SageMaker 노트북 인스턴스에 default로 생성되어 있는 폴더 
mkdir new_folder # 새로운 폴더 생성
cd new_folder

# Github repo를 clone하여 필요한 라이브러리를 설치
username="my_github_account"
password="my_github_account_password"
GIT_URL="https://${username}:${password}@github.com/WORKSPACE/my-repository.git"
git clone ${GIT_URL}

pip install -r requirements.txt

######## 추가된 부분 ^.^.. #########
chmod +x my_python_script.sh
nohup bash my_python_script.sh &
#################################

conda deactivate
EOF

전혀 아름다운 파이프라인이 아니긴 하지만, 직접 개발한 모듈을 실행하기 위해선 이 방법 밖엔 없었습니다. 흐흑. 뭐 그래도 여기까지는 그래도 괜찮았습니다. 문제는 lifecycle configurations은 최대 5분까지만 실행이 된다는 것입니다. 그래서 인스턴스가 생성되고 나서도 백그라운드에서 실행되게끔 nohup을 붙여줘야 했습니다. 😂

(lifecycle configurations 코드가 5분 이상 실행이 불가능하다는 뭐 대충 그런 뜻..ㅠㅠ)

사실 여기서 더 중요한 것은 파이썬으로 실행하는 모듈의 구성과 순서입니다. 이 부분이 파이프라인을 만드는 가장 큰 이유이기 때문에 아래의 구성을 잘 고려하여 개발해야 합니다. 이 부분에 대한 내용도 추후에 따로 포스팅 해보려고 합니다.

  • 데이터 로드
  • 데이터 전처리
  • 모델 훈련 및 검증
  • 훈련된 모델로 예측 결과 생성
  • 예측 결과 저장 (to AWS S3, Redshift, Athena 등)

Task 3) 인스턴스 삭제

모듈 실행이 끝났으면 인스턴스를 죽여..가 아니고 삭제해줘야 합니다. 회사에 돈이 많다면(...) 그냥 생성해놓고 쭉 놔둬도 상관은 없습니다. (만약 그렇다면 부럽습니다..)

하지만 인스턴스 타입에 GPU라도 포함되어 있다면 비용이 상당히 많이 발생합니다. 이처럼 비용까지 고려한다면 필요할 때만 노트북 인스턴스를 사용하고, 그 외에는 삭제하여 비용을 절감하는 것이 좋습니다.

그런데 SageMaker에서 노트북 인스턴스를 삭제하려면 중지(Stopped) 상태가 되어야 가능 합니다. '아 그러면 노트북 인스턴스를 중지하는 명령어를 실행하고, 그 이후에 삭제하면 되겠네!' 라고 생각하는 것이 자연스러운 사고의 흐름입니다. 에.. 근데 이게 안되겠더라고요. 왜냐면 위의 lifecycle configurations에서 실행한 파이썬 모듈이 끝날 때까지 기다려야 했기 때문이죠.

그래서 my_python_script.sh 스크립트에 파이썬 실행이 끝나면 노트북 인스턴스를 중지하는 CLI를 미리 추가 해줘야 했습니다.

#! /bin/bash
python ./run_my_script1.py
python ./run_my_script2.py
python ./run_my_script3.py
# 파이썬 모듈 실행이 끝나면 노트북 인스턴스를 중지하기 위한 명령어 추가.. ^.^..
aws sagemaker stop-notebook-instance --notebook-instance-name MY_NOTEBOOK_INSTANCE_NAME

그러면 여기서 필요한 것이 노트북 인스턴스의 상태를 계속 찔러보는 함수가 필요했습니다. (만두 속 찔러보는 것도 아니고 참내 이렇게까지 만들어야 하나 싶었지만 계속 만들었읍니다..)

def describe_notebook_instance(NotebookInstanceName: str = None):
    try:
        response = client.describe_notebook_instance(
            NotebookInstanceName=NotebookInstanceName
        )
        return response
    except Exception as err:
        return err

그리고 이 함수를 사용하여 주기적으로 상태를 조회하면서 'Stopped'가 되면 삭제를 실행하는 함수를 다시 만들었습니다.

import time

def delete_notebook_instance(NotebookInstanceName: str = None):
    try:
        response = client.delete_notebook_instance(
            NotebookInstanceName=NotebookInstanceName
        )
        print(" >>> NotebookInstance is deleting. ")
        return response
    except Exception as err:
        return err


def DeleteNotebookInstance(NotebookInstanceName: str, loop: int, time_sleep: int):
    for _ in range(loop):
        response = describe_notebook_instance(
            NotebookInstanceName=NotebookInstanceName
        )
        NotebookInstanceStatus = response["NotebookInstanceStatus"]
        if NotebookInstanceStatus == "Stopped":
            return delete_notebook_instance(
                NotebookInstanceName=NotebookInstanceName
            )
        else:
            print(f" >>> NotebookInstanceStatus is {NotebookInstanceStatus}. ")
            time.sleep(time_sleep)

DeleteNotebookInstance에서 looptime_sleep를 통해 노트북 인스턴스의 상태를 조회할 주기를 설정할 수 있도록 했는데, 예를 들면 아래와 같이 사용하도록 설계했습니다.

  • loop=100, time_sleep=60 이라면 60초 간격으로 100회 동안 인스턴스의 상태를 조회하겠다는 것입니다. 즉, 총 6,000초니까 최대 1.6시간 동안 계속 조회하는 Task가 유지되는 것입니다. 이 부분은 인스턴스의 타입과 파이썬 모듈 스크립트 실행 시간을 적절히 고려햐여 배분하도록 의도했습니다.
  • 위 함수를 통해 실행된 로그를 확인해보면 주기적으로 상태를 보면서 틈(?)을 노리다가 중지가 되면 삭제하는 것을 확인할 수 있습니다.
(실제 로그 캡쳐)

Task 4) lifecycle configurations 삭제

맨 처음에 생성한 lifecycle configurations까지 삭제를 해주면 파이프라인의 긴(?) 여정이 끝납니다. 해당 Task는 반드시 필요한 부분은 아니라고 생각하는데요, 어차피 계속 사용할 lifecycle configurations이라면 굳이 삭제할 필요는 없습니다.

다만, github 계정이나 repository 정보를 노출하지 않도록 하기 위해 해당 함수도 추가했습니다.

def delete_notebook_instance_lifecycle_config(NotebookInstanceLifecycleConfigName: str):
    try:
        response = client.delete_notebook_instance_lifecycle_config(
            NotebookInstanceLifecycleConfigName=NotebookInstanceLifecycleConfigName
        )
        print(" >>> NotebookInstanceLifecycleConfigName is deleted. ")
        return response
    except Exception as err:
        return err

Airflow DAG 생성

SageMaker 노트북 인스턴스 생성, 삭제에 필요한 함수는 모두 생성했으니 이것을 DAG에서 호출하여 Task로 만들면 원했던 파이프라인이 완성됩니다.

Variables

저는 DAG의 Task를 생성할 때 Operator에 전달할 파라미터의 값을 하드코딩하기 보다는 웬만하면 Variables로 빼놓고, 이것을 호출하여 파라미터로 전달하는 편입니다. 두 가지 이유 때문입니다.

  • 첫째로 보안 때문입니다. Github에 DAG 코드를 올릴 때 계정 정보나 네트워크 정보가 포함되어 올라가지 않도록 해야 하기 때문입니다.
  • 두 번째는 DAG 스크립트를 반복 사용할 수 있도록 하기 위함입니다. 해당 DAG.py를 복사해서 dag_id를 바꿔주고, Variables만 새로 추가 해준다면 새로운 파이프라인을 손쉽게 생성할 수 있기 때문입니다.

예시로 Variables을 아래와 같이 구성해봤습니다. 각 함수에 전달해야 할 파라미터를 key, value로 미리 정의 해놓는 것 뿐이라 전혀 어려운 작업은 아닙니다.


{
  "my_variable": {
    "NotebookInstanceLifecycleConfigName": "my-lifecycle-configurations",
    "NotebookInstanceName": "my-notebook-instance",
    "InstanceType": "ml.c5d.4xlarge",
    "RoleArn": "arn:aws:iam::*************:role/service-role/*************",
    "SubnetId": "****************",
    "SecurityGroupIds": [
      "sg-****************",
      "sg-****************"
    ],
    "content": "#!/bin/bash\nsudo ... 아까 위애서 작성한 shell을 여기에!! ... EOF"
  }
}
(Airflow Variables 추가 화면)

DAG

Variables를 DAG에서 호출하고, PythonOperatorop_kwargs에 전달해주어 Task별 Flow만 선언해주면 원했던 파이프라인이 완성됩니다. 👏

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

from utils.sagemaker import SageMaker # 위에서 설계한 함수를 클래스로 만들어서 utils에 따로 저장

sagemaker = SageMaker()

## Variables
variables = Variable.get("my_variable", deserialize_json=True)

NotebookInstanceLifecycleConfigName = variables["NotebookInstanceLifecycleConfigName"]
NotebookInstanceName = variables["NotebookInstanceName"]
InstanceType = variables["InstanceType"]
SecurityGroupIds = variables["SecurityGroupIds"]
SubnetId = variables["SubnetId"]
RoleArn = variables["RoleArn"]
content = variables["content"]

## DAG
default_args={
    "depends_on_past": False,
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}

dag = DAG(
    dag_id="my_dag_id",
    description="DAG for SageMaker Operating",
    schedule_interval="0 0 * * *",
    catchup=False,
    default_args=default_args
)

## Tasks
START = DummyOperator(dag=dag, task_id="START")
END = DummyOperator(dag=dag, task_id="END")

CreateNotebookInstanceLifecycleConfig = PythonOperator(
    task_id="CreateNotebookInstanceLifecycleConfig",
    python_callable=sagemaker.create_notebook_instance_lifecycle_config,
    provide_context=True,
    op_kwargs={
        "NotebookInstanceLifecycleConfigName": NotebookInstanceLifecycleConfigName,
        "content": content,
    },
    dag=dag
)

CreateNotebookInstance = PythonOperator(
    task_id="CreateNotebookInstance",
    python_callable=sagemaker.create_notebook_instance,
    provide_context=True,
    op_kwargs={
        "NotebookInstanceName": NotebookInstanceName,
        "LifecycleConfigName": NotebookInstanceLifecycleConfigName,
        "InstanceType": InstanceType,
        "RoleArn": RoleArn,
        "SubnetId": SubnetId,
        "SecurityGroupIds": SecurityGroupIds
    },
    dag=dag
)

DeleteNotebookInstance = PythonOperator(
    task_id="DeleteNotebookInstance",
    python_callable=sagemaker.DeleteNotebookInstance,
    provide_context=True,
    op_kwargs={
        "NotebookInstanceName": NotebookInstanceName,
        "time_sleep": 300, # 300초(5분) 마다 인스턴스 확인
        "loop": 12 * 6  # 최대 6 시간 동안 인스턴스 확인
    },
    dag=dag
)

DeleteNotebookInstanceLifecycleConfig = PythonOperator(
    task_id="DeleteNotebookInstanceLifecycleConfig",
    python_callable=sagemaker.delete_notebook_instance_lifecycle_config,
    provide_context=True,
    op_kwargs={
        "NotebookInstanceLifecycleConfigName": NotebookInstanceLifecycleConfigName
    },
    dag=dag
)

START >> \
CreateNotebookInstanceLifecycleConfig >> \
CreateNotebookInstance >> \
DeleteNotebookInstance >> \
DeleteNotebookInstanceLifecycleConfig >> \ 
END
(생성된 DAG의 Workflow) (생성된 DAG가 실제 실행된 간트 차트)

마무리 ⌛️

MWAA와 SageMaker 조합으로 ML 파이프라인을 만들기 위해 겪은 시행착오를 간단히 공유해봤습니다. 글이 좀 장황해서(..) 간단해 보이진 않았지만, Airflow의 스케줄링 기능을 바탕으로 boto3로 SageMaker 노트북 인스턴스를 생성하고 삭제하는 과정이 전부이기 때문에 그리 복잡한 파이프라인은 아닐 것입니다. 😅

물론 SageMaker 장점을 잘 살린 파이프라인은 아니었습니다. 데이터 로드, 모델 훈련, 검증, 모델 배포까지의 흐름을 SageMaker에서 제공하는 컴포넌트를 활용하여 만드는 사례가 훨씬 많기 때문입니다. 다만, SageMaker의 컴포넌트를 사용하지 않고 직접 개발한 모듈을 배치로 실행하기 위해 노트북 인스턴스를 컴퓨팅 리소스로 사용하기 위한 사례 정도로 보면 될 것 같습니다.

기회가 된다면 SageMaker Studio를 활용하여 파이프라인을 새로이 구성해보려고 합니다. 이것도 무사히 만들게 된다면(과아연..) 시행착오의 여정을 공유해보겠습니다. 뭐어.. 잘 실행만 된다면 되는 거니깐요 하하.. 끄-읕 👋

profile
사용자를 위한 데이터 프로덕트를 만드는 데에 즐거움을 느낍니다.

2개의 댓글

comment-user-thumbnail
2022년 6월 3일

넘나 좋은 글 감사합니다!

1개의 답글