Airflow 사용하여 AWS 서비스 연결하기

ohyujeong·2023년 6월 7일
0

Airflow

목록 보기
4/6
post-thumbnail

Airflow는 PythonOperator로 여러 Python 라이브러리들을 사용하여 모든 Task를 구현할 수 있지만, 특정 작업을 수행하는 구체적인 목적을 위한 다양한 Operator들을 제공한다.

이러한 Operator들을 이용해 AWS, Postgres, GCP 등등의 외부 시스템과 연결할 수 있고, 원하는 기능을 지원하는 Operator가 있다면 Python 함수를 따로 만들어 PythonOperator를 사용하는 것을 대체할 수 있다.

이를 설명하기 위해 AWS S3, SageMaker (머신러닝 모델 개발 및 배포)를 사용해 머신러닝 모델을 개발하는 DAG를 생성해보자.

클라우드 서비스 연결

오늘날 대부분의 소프트웨어는 클라우드 서비스에서 실행되고 이 서비스들은 클라우드 공급자가 제공하는 API와 함께 제공되는 클라이언트로 제어할 수 있다. 클라이언트는 요청에 필요한 세부 정보들을 입력하면 요청 및 응답처리를 클라이언트 내에서 처리해준다.

아래는 현재 대표적인 3가지 클라우드 서비스들의 클라이언트이다.

Cloud ServiceClient
AWSboto3
GCPCloud SDK
AzureAzure SDK for Python

Operator는 클라우드 서비스에 요청하는 세부정보를 전달하는 편리한 클래스로서 오퍼레이터는 기술적인 구현을 내부적으로 처리한다.

다시말해, 프로그래머가 필요한 세부 정보(연결정보, 버킷 이름 등등..)들을 Operator에 전달하면 Operator는 내부적으로 클라우드 SDK 를 이용해 구현된 코드로 클라우드 서비스에 요청을 보낸다.

추가 의존성 패키지 설치

클라우드 서비스 연결을 위한 오퍼레이터를 사용하기 위해서는 아래 명령어를 통해 추가 패키지를 설치해야 한다. 이는 다른 외부시스템도 마찬가지이고 공식문서에 설치 가능한 패키지 목록이 나와있다.

pip install apache-airflow-providers-[외부서비스명]

외부시스템 사용을 위한 추가 패키지 목록
https://airflow.apache.org/docs/apache-airflow-providers-google/stable/index.html

예시) S3CopyObjectOperator

S3CopyObjectOperator 는 특정 버킷의 오브젝트를 다른 지정한 다른 곳으로 복사하는 기능을 한다. boto3로 구현 필요없이 간단하게 복사할 버킷/오브젝트 이름과 복사될 버킷/오브젝트 이름을 인수로 넘겨주면 된다.

from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator

download_mnist_data = S3CopyObjectOperator(
    task_id="download_mnist_data",
    source_bucket_name="sagemaker-sample-data-eu-west-1",
    source_bucket_key="algorithms/kmeans/mnist/mnist.pkl.gz",
    dest_bucket_name="your-bucket",
    dest_bucket_key="mnist.pkl.gz",
    dag=dag,
)

AWS Operator로 머신러닝 모델 개발하기

손글씨 숫자 분류기(모델) 구현을 위한 데이터 파이프라인을 AWS Operator로 구현해보자. 완성된 모델은 새로운 손글씨로 쓴 번호 이미지를 입력받으면 해당 손글씨를 번호로 분류할 수 있어야 한다.

이 모델을 생성하는 과정은 아래와 같이 오프라인, 온라인으로 구분된다. Airflow는 여기서 오프라인 부분을 담당한다. 주기적으로 모델을 재학습하는 것이 배치 프로세스 기능에 잘 맞아떨어지기 때문이다.

  • 오프라인
    1. MNIST (http://yann.lecun.com/exdb/mnist)에서 0~9 까지 있는 약 7만개의 숫자데이터 가져오기
    2. 가져온 데이터를 S3 버킷으로 복사
    3. 데이터를 모델에서 사용할 수 있는 형식으로 변환
    4. SageMaker를 통해 모델을 학습
  • 온라인
    1. 모델 배포
    2. 새로운 데이터를 REST API 호출하여 결과 확인

이 과정을 DAG로 나타내면 다음과 같다. 하나하나씩 살펴보자...

과정을 살펴보기에 앞서 책에서 제공하는 코드가 업데이트가 되어 수정이 필요한 부분이 있다.

패키지 가져오기

AWS Provider Operator를 가져오는 부분의 수정이 필요하다. s3_copy_objectsagemaker_training 이 사라지고 s3 , sagemaker 로 서비스명으로 통일된 것으로 보인다.

##  변경 전
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.sagemaker_endpoint import (
    SageMakerEndpointOperator,
)
from airflow.providers.amazon.aws.operators.sagemaker_training import (
    SageMakerTrainingOperator,
)
...
## 변경 후
from airflow.providers.amazon.aws.operators.s3 import (
    S3CopyObjectOperator
)
from airflow.providers.amazon.aws.operators.sagemaker import (
    SageMakerTrainingOperator,
    SageMakerEndpointOperator,
)

추가 패키지 설치

Airflow 공식 홈페이지에서 제공하는 docker-compose.yaml 파일에는 airflow이미지만을 사용하고 따로 추카 패키지를 설치하는 부분이 없어 Dockerfile을 생성하고 그 안에서 image와 추가 설치할 패키지를 명시해야한다. 다음과 같은 파일을 수정/생성하면 된다.

  • docker-compose.yaml 파일 수정 : image속성을 지우고 build속성을 추가한다.
# docker-compose.yaml
x-airflow-common: &airflow-common 
  #image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.1}
  build: . # image 설정이 지정되지 않은 경우에 사용, 현재 디렉토리에 있는 Dockerfile을 사용하여 이미지를 빌드하고 해당 이미지를 서비스에 사용함
  • Dockerfile 파일 생성 : 사용할 image를 지정하고 명령어를 실행한다. 내용 중에 사용자를 airflow로 전환하는 부분이 있는데 이는 컨테이너 내에서 root 권한을 사용하는 것이 보안 상의 문제로 권장되지 않기 때문에 변경을 해주어야 하고, 하지 않으면 에러가 난다.
# Dockerfile
FROM apache/airflow:2.6.1 # 사용할 image
USER airflow # root->airflow로 사용자 전환
COPY requirements.txt / # 호스트의 requirements.txt 파일을 컨테이너 내부로 복사
RUN pip install --no-cache-dir "apache-airflow==2.6.1" -r /requirements.txt # requirements.txt 파일에 명시된 패키지를 설치
  • requirements.txt 파일 생성 : 여기에 필요한 추가 패키지를 넣으면 된다.
# requirements.txt
boto3
sagemaker
apache-airflow[aws]

download_mnist_data

S3CopyObjectOperator 를 사용하여 두 S3 버킷 간에 데이터를 복사한다.
복사할 버킷/오브젝트 이름과 복사될 버킷/오브젝트 이름을 인수로 넘겨준다.

download_mnist_data = S3CopyObjectOperator(
    task_id="download_mnist_data",
    source_bucket_name="sagemaker-sample-data-eu-west-1",
    source_bucket_key="algorithms/kmeans/mnist/mnist.pkl.gz",  # 파일이름
    dest_bucket_name="yujeong-test-bucket",
    dest_bucket_key="mnist.pkl.gz",
    dag=dag,
)

하지만 이렇게 Operator에 인수를 전달하는 게 끝이 아니고 액세스 키를 등록하여 AWS의 리소스에 접근할 권한을 얻어야 한다. 액세스 키를 관리하는 방법은 아래와 같이 여러가지가 있는데(더 있을수도...), 그중 나는 Connetion을 이용했다.

1. AWS SDK에서 사용하는 AWS 액세스 키를 저장하는 파일인 ~/.aws/credentials 사용

이 파일은 AWS CLI를 통해 프로필로 관리되며, 각 프로필에는 AWS 액세스 키 ID와 비밀 액세스 키가 저장된다. [default] 프로필은 기본 프로필로 사용되며, 다른 프로필은 책에서 나온 [myaws] 와 같이 사용자가 정의한 이름으로 지정할 수 있어 여러 AWS 액세스 키 쌍을 저장할 수 있다.

[default]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY
[profile_name]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY

.aws/credentials 파일은 일반적으로 사용자의 홈 디렉토리(~/.aws/credentials)에 저장된다.

2. Airflow Variables 사용

아래와 같이 웹 UI에서 변수를 생성할 수도 있고, Airflow CLI를 통해 변수를 생성할 수도 있다.

# Airflow CLI 사용
airflow variables --set key_name key_value

이렇게 설정한 Variable 은 아래와 같이 코드에서 사용할 수 있다.

from airflow.models import Variable
access_key = Variable.get("key_name")

3. Airflow Connections 사용

Variables 와 마찬가지로 웹 UI에서 생성하거나 Airflow CLI를 통해 연결을 생성한다. Connection Type을 Amazon Web Services 로 지정해주면 된다.

# Airflow CLI 사용
airflow connections add connection_name --conn-type conn_type --conn-login conn_login --conn-password conn_password --conn-host conn_host --conn-port conn_port

이렇게 설정한 Connection 은 아래와 같이 코드에서 사용할 수 있다.

from airflow.models import BaseHook
connection = BaseHook.get_connection("connection_name")
access_key = connection.login
secret_key = connection.password

4. 환경변수 사용

컨테이너 내에서 환경변수를 설정할 수 있다.

services:
  my_service:
    environment:
      - ACCESS_KEY=your_access_key
      - SECRET_KEY=your_secret_key

이렇게 설정하게 되면 컨테이너 수준에서 os 모듈을 사용하여 설정한 환경변수에 접근할 수 있다.

import os
access_key = os.environ.get('ACCESS_KEY')
secret_key = os.environ.get('SECRET_KEY')

5. 안전한 저장소 사용

AWS Secrets Manager 와 같은 관리형 비밀 정보 저장소를 사용한다. 이건 사용하는 방법이 좀.. 설명이 길어져서 따로 포스팅 해야할 것 같다.

Task 개별로 테스트하기

이렇게 AWS S3에 접근할 수 있게 되었다. 이제 생성한 Task를 테스트 해보자.
테스트를 수행하는 Airflow 명령어는 다음과 같다.

# airflow tasks test <DAG_ID> <TASK_ID> <EXECUTION_DATE>
airflow tasks test chapter9_aws_handwritten_digit_classifier download_mnist_data 2022-06-06

해당 명령어를 입력하면 download_mnist_data Task를 수행하고 로그를 표시한다. INFO - Marking task as SUCCESS 를 통해 성공적으로 수행되었음을 알 수 있다. (WARNING이 왜저렇게 많은지 모르겠다..) 지정한 S3 버킷에서 복사된 파일을 확인했다.

extract_mnist_data

이 Task에서는 S3로부터 데이터를 다운로드, 데이터 세트 추출, 변환 후 다시 S3로 업로드를 하는 작업을 하는데, 이 기능들은 Airflow에서 자체적으로 지원하지 않아 직접 구현해야 한다.

def _extract_mnist_data():
    # S3와 통신을 위한 S3Hook 초기화
    s3hook = S3Hook()

    # Download S3 dataset into memory
    mnist_buffer = io.BytesIO()
    mnist_obj = s3hook.get_key(
        bucket_name="yujeong-test-bucket", key="mnist.pkl.gz")
    mnist_obj.download_fileobj(mnist_buffer)

    # Unpack gzip file, extract dataset, convert to dense tensor, upload back to S3
    mnist_buffer.seek(0)
    with gzip.GzipFile(fileobj=mnist_buffer, mode="rb") as f:
        train_set, _, _ = pickle.loads(f.read(), encoding="latin1")
        output_buffer = io.BytesIO()
        write_numpy_to_dense_tensor(
            file=output_buffer, array=train_set[0], labels=train_set[1]
        )
        output_buffer.seek(0)
        # S3에 결과 업로드
        s3hook.load_file_obj(
            output_buffer, key="mnist_data", bucket_name="yujeong-test-bucket", replace=True
        )

이 Task 또한 download_mnist_data 와 같이 개별로 테스트 해볼 수 있다. 시간이 좀 더 걸리긴 했지만 성공적으로 S3에 결과 파일이 업로드된 것을 확인할 수 있었다.

sagemaker_train_model, sagemaker_deploy_model

이 부분을 실행하기 위해서는 AWS에서 세부 설정이 필요하고 과금이 되기 때문에 실행해보지 않았다.
좀 더 AWS Sagemaker를 알아보고 난 후에 진행해야할 것 같다. 😅


모델의 사용

이렇게 전체 파이프라인이 완료되면 SageMaker 모델과 엔드포인트가 배포된다. 하지만 SageMaker의 엔드포인트는 외부에서 접근할 수 없고, 이를 완성하기 위해서는 손글씨 숫자를 입력(이미지 파일로 전달)하면 결과를 얻을 수 있는 인터페이스나 API가 필요하다.

이를 위해서는 SageMaker 엔드포인트를 트리거하는 AWS Lambda를 개발 및 배포하고, API Gateway를 생성 및 연결하여 외부에서 접속할 수 있는 HTTP 엔드포인트를 생성해야 한다.

그렇다면 왜 이 부분은 파이프라인에 포함되지 않았을까?
이는 배포가 배치작업처럼 주기적으로 배포하지 않고 한 번만 배포하면 되기 때문이다.

위 과정을 거쳐 외부에서 접속할 수 있는 HTTP 엔드포인트를 생성을 완료했다면, 간단하게 curl 명령어를 통해 입력의 결과를 반환받을 수 있다.

curl --request POST --url [API url] --header 'content-type: image/jpeg' --data-binary @'/path/to/iamge.jpeg'

손글씨 2 이미지 입력한 반환 결과

{
  "predictions":[
    {
      "distance_to_cluster": 2284.04786,
      "closest_cluster": 2.0
    }
  ]  
}
profile
거친 돌이 다듬어져 조각이 되듯

2개의 댓글

comment-user-thumbnail
2024년 1월 3일

안녕하세요, 같은 책 공부하고 있는 학생입니다. 혹시 S3 버킷 정책을 어떻게 설정하셨는지 여쭙습니다. botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the CopyObject operation: Access Denied 로컬 환경에서 테스팅 중에 에러가 자꾸 떠서요 ㅠ. ./aws/credentials 에도 IAM에 맞게 access_key_id, secret_access_key 잘 박혀있는 상태이고, 혹시 몰라 export 통해 환경변수로도 올려 놓았는데, boto가 해당 정보를 못 읽어 오는 걸까요...

1개의 답글