AWS SQS와 AWS Lambda로 서버 간 통신 구현하기

boseung·2024년 6월 24일
0

이전 글인 AWS Sagemaker로 AI 모델 배포하기에서 AI 모델 배포는 했지만, Spring 서버와의 통신을 어떻게 구현할지 기획 초기부터 걱정이 많았다. 그래도 결국 아래처럼 구현할 수 있었다. 😄

프로젝트 구현 화면

프로젝트 구현 이미지

프로젝트 아키텍처 고민

이전 글에서 https://github.com/kyopark2014/stable-diffusion-api-server 이 링크에서 AI 모델 배포와 아키텍처에 대한 단서를 얻게 되었다고 언급했었다.

SageMaker 아키텍처

그래서

Spring Server <-> AWS API Gateway <-> AWS Lambda <-> AWS SageMaker

이런 식으로 서버 아키텍처를 정하고, 구현을 해나가다가 문제가 생기게 되었다.

문제 발생 !

Spring Server <-> AWS API Gateway <-> AWS Lambda

위 아키텍처 기반으로 간단하게 테스트를 진행해보았다.

먼저 AWS Lambda에서 sleep 함수를 사용해서 1분 정도 시간 후 응답을 보내도록 간단한 테스트 코드를 작성해보았다.

import time
def lambda_handler(event, context):
	time.sleep(60)
	return {
    	'statusCode': 200,
        'body': json.dumps("Hello")
   }

하지만 AWS API Gateway는 제한시간이 최대 30초이고, 더 늘릴 수도 없었다.

API gateway 타임 아웃

그래서 다른 방법을 찾아 헤매게 되었다. 그러다가 StackOverFlow - How can I set the AWS API Gateway timeout higher than 30 seconds?에서 단서를 찾을 수 있었다.

AWS SQS(Simple Queue Service)를 사용하면 메시지 큐를 기반으로 요청과 응답을 비동기로 처리할 수 있기 때문에 타임 아웃 문제를 해결할 수 있었다.

메시지 큐 중에서 RabbitMQ와 Kafka도 있었지만, 프로젝트 규모에 비해 인프라 규모가 너무 커지는 것 같아서 제외하게 되었다.

그래서 결국 아래와 같은 구조로 서버 간 통신을 구현하게 되었다.

Spring Server <-> AWS SQS <-> AWS Lambda(API Server) <-> AWS SageMaker

AWS SQS 구축하기

SQS 생성

SQS의 경우 표준 방식으로 2개를 만들었다. 표준 방식으로 선택한 이유는 단순 이미지 생성이고 순서가 엄청 중요한 것 같지는 않았기 때문이다. (그리고 필요하면 나중에 바꾸면 되니까)

SQS

SQS를 2개 만든 이유는 1개만으로도 요청이랑 응답이 처리 가능한지 시도해봤는데, 1개만으로는 제대로 작동하지 않았다.

참고로 오른쪽 위에 있는 메시지 전송 및 수신 버튼을 눌러서 간단하게 메시지를 작성해서 테스트 해볼 수 있다.

SQS 메시지 전송

AWS Lambda로 AI API 서버 구축하기

lambda main

AWS Lambda를 생성하기 위해 함수 생성하기를 눌러준다.

lambda 생성

여기서 적당한 함수 이름과 자신에게 필요한 런타임 환경을 정해준다. 나는 SageMaker에서 python 3.10 환경이어서 동일하게 python 3.10 환경으로 맞춰 주었다.

이제 Lambda를 AI API 서버로 구축하기 위해 몇 가지 설정들이 필요하다.

lambda와 SQS

먼저 요청 메시지를 담당한 SQS를 Lambda에 트리거로 연결하였다.

그래서 요청 메시지가 오면 자동으로 Lambda 코드가 실행되도록 만들었다.

응답 메시지의 경우 아래의 lambda 코드에서 함수 호출 방식으로 처리했다.

SQS 트리거 설정

그리고 아래처럼 필요에 따라 제한 시간을 늘려줘야 한다. 이 프로젝트는 1분 정도면 충분하긴 하지만 2분으로 설정해주었다.

이후 아래 권한 메뉴에서 SageMaker로 접근할 수 있도록 권한 설정도 해주어야 한다.

lambda 권한

그리고 SageMaker 엔드포인트 이름을 환경 변수로 등록해주어야 한다.

환경변수

이때 필요한 값은 SageMaker EndPoints에서 빨간 박스 표시가 된 엔드 포인트 이름이다.

엔드포인트

이렇게 Lambda의 환경 설정 이후 Lambda 코드를 살펴보자.

import json
import boto3
import os
from io import BytesIO
import base64
import uuid
import time
import google.generativeai as genai

ENDPOINT_NAME = os.getenv("ENDPOINT")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
runtime= boto3.client('runtime.sagemaker')
model = genai.GenerativeModel('gemini-1.5-pro-latest')

위 코드처럼 환경 변수 설정을 해주고, 필요한 값들을 전역 변수로 등록해준다.

boto3는 S3나 Sagemaker와 같은 AWS 서비스에 접근할 때 사용하는 모듈이다.

추가로 genai는 gemini API로 사용자가 입력 값을 넘겨주면 프롬프트를 영어로 재구성할 때 사용하였다.

Lambda 코드를 작성하면서 이 레퍼런스를 많이 참고했다.

def change_prompt(inputs, images):
    return {
        "inputs": inputs,
        "input_id_images": [img for img in images]
    }

def lambda_handler(event, context):
    genai.configure(api_key=GOOGLE_API_KEY)
    request = json.loads(event['Records'][0]['body'])
    inputs = request["inputs"]
    if(inputs == ""): 
        inputs = "캐주얼"
    input_id_images = request["inputIdImages"]
    chat = model.start_chat(history=words_prompt_history())
    print(chat.send_message(inputs).candidates[0].content.parts[0].text)
    prompt = chat.send_message(inputs).candidates[0].content.parts[0].text[:-3] + " women img"
    data = change_prompt(prompt, input_id_images)
    print(data)

처음에 SQS로 전달된 메시지가 어떤 형식을 가지고 있는지 알지 못해서 에러가 많이 발생했다.

그래서 print 함수로 event로 넘어온 값들을 AWS CloudWatch에서 관찰하면서 위와 같이 코드를 구성하였다.

    response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME, 
                                    ContentType='application/json', 
                                    Accept='application/json',
                                    Body=json.dumps(data))
    response = json.loads(response['Body'].read().decode('utf-8'))
    image = base64.b64decode(response["images"][0])
    upload_urls = upload_s3(image)
    print(*upload_urls)
    client = boto3.client("sqs")
    client.send_message(
        QueueUrl="https://sqs.ap-northeast-2.amazonaws.com/responseQueue",
        MessageBody=json.dumps(*upload_urls)
        )

def upload_s3(image):
    upload_urls = []
    s3 = boto3.client("s3")
    key = f'{uuid.uuid4()}.png'
    s3.put_object(Bucket='ai-styling-s3', Key=key, Body=image)
    upload_urls.append(f'https://ai-styling-s3.s3.ap-northeast-2.amazonaws.com/{key}')
    return upload_urls

프로젝트 초반에 Lambda 코드를 구성하면서 시행착오가 많았는데, 가장 궁금했던 부분은 'SageMaker에게 어떻게 값을 전달하는 거지 ' 였다.

runtime.invoke_endpoint로 EndpointName에 엔드포인트 이름을 지정해주고, Body에 input 값을 넘겨주면 response 값을 받을 수 있다.

물론 모델마다 입력 값이 다르기 때문에 배포한 모델에 따라 입력값을 잘 구성해서 모델을 배포해줘야 한다. (나의 경우에는 입력값이 이미지랑 텍스트 프롬프트였다.)

이 모델은 base64로 이미지를 리턴해주는데 이미지 용량이 커서 SQS로 보낼 수 없어서 upload_s3로 S3를 통해 Image URI로 바꾸어 주었다.

그 후 SQS를 통해 send_message()로 응답 SQS URL을 호출해서 이미지로 응답 메시지를 보내도록 했다.

작성한 전체 코드는 여기서 확인할 수 있다.

그리고 Spring에서 SQS로 요청, 응답 메시지를 처리하는 코드를 고민하는 과정은 Spring에서 AWS SQS 요청, 응답 메시지 처리하기 글에서 이어진다!

profile
Dev Log

0개의 댓글

관련 채용 정보