[AWS] Step Functions 및 Lambda, SQS를 이용한 Call back 패턴 구현

NewNewDaddy·2023년 9월 12일
0

AWS

목록 보기
6/13

0. INTRO

  • Step Functions은 AWS의 서비스들을 순차적으로 혹은 병렬적으로 실행할 수 있는 Workflow를 만들어주는 아주 강력한 기능을 제공한다. 단순히 Task들을 순차적으로 실행시켜주는 기능뿐만 아니라 특정한 응답을 받은 이후에 다음 task를 실행하거나, input으로 받은 인자값에 따라 다른 workflow가 실행되도록 하는 등 만들어낼 수 있는 아키텍쳐는 정말 무궁무진하다고 할 수 있다.
  • 이 중 이번 실습에서는 Step Functions, SQS, Lambda를 이용한 Call Back 패턴에 대한 내용을 다뤄볼까 한다.
  • 실행 순서 및 Workflow 아키텍처
    Workflow Architecture
    1. Step Functions -> SQS로 메세지 전달
    2. SQS가 Trigger로 설정된 Lambda에서 SQS 메세지와 Token을 받아온다.
    3. 해당 Token을 포함시켜 Task Success 메세지를 Step Functions으로 보낸다.
    4. Step Functions의 다음 Process가 진행된다.

1. SQS 및 Lambda 생성

1. SQS 생성 (CallBackQueue)

  • SQS의 이름을 설정하고 나머지는 다 default로 둔 후 queue를 생성한다.

2. Lambda 함수 생성
1) Role 생성

  • 아래와와 같이 세 가지 Policy가 들어간 Role을 생성한다. (LambdaSQSAccessRole)

2) Python Lambda 함수 생성

  • Python 언어의 Lambda Function을 생성하면서 위에서 만든 LambdaSQSAccessRole 역할을 연결해준다.

3) Timeout 시간 늘림

  • Queue에서 data를 읽어온 후 State machine으로 보내는데 3초 이상 걸릴 수 있기 때문에 Timeout 시간을 넉넉히 30초 정도로 설정해준다.

2. Lambda에 SQS Trigger 설정

  • 위에서 생성한 CallBackQueue를 Lambda 함수의 Trigger로 설정해준다.

3. Step Functions에서 State machines 생성

  • Step functions의 경우 ASL(Amazon States Language) 으로 작성을 해야하는데 처음에는 쫌 까다롭지만 익숙해지면 특정 포맷을 돌려쓰면서 빠르게 작성해나갈 수 있다. 또한 Step Functions에서는 workflow를 drag and drop으로 생성할 수 있도록 UI를 제공하는데 이 기능을 사용하면 아주 쉽고 직관적으로 State Machine을 설계할 수 있다.

    {
      "Comment": "A description of my state machine",
      "StartAt": "SQS SendMessage",
      "States": {
        "SQS SendMessage": {
          "Type": "Task",
          "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
          "Parameters": {
            "MessageBody": {
              "input.$": "$",
              "MyTaskToken.$": "$$.Task.Token"
            },
            "QueueUrl": "https://sqs.ap-northeast-2.amazonaws.com/123456789123/CallBackQueue"
          },
          "Next": "Pass"
        },
        "Pass": {
          "Type": "Pass",
          "End": true
        }
      }
    }

4. Lambda 코드 작성

import json
import boto3
import time

# step functions 커넥터 생성
step = boto3.client('stepfunctions')

def lambda_handler(event, context):
    print(event)
    main_message=json.loads(event['Records'][0]['body'])
    print("Main Message Part : {}".format(main_message))
    
    step_function_input=main_message['input']
	
    # Trigger된 SQS로부터 step function token 추출
    task_token=main_message['MyTaskToken']
    
    print("The task token is : {}".format(task_token))
    
    time.sleep(7)
    
    # send_task_success 메소드에 token을 함께 넣어서 보냄
    response = step.send_task_success(
    taskToken=task_token,
    output=json.dumps({'body':'Return from Lambda Callback'})
    )

5. State machines 실행

  • State machine Start 후 조금 있으면 아래와 같이 모든 Task가 성공했다고 표시가 된다.
    1. Step Functions -> SQS로 메세지 push

    2. SQS에 input이 들어오면 Lambda가 실행 (SQS -> Lambda)

    3. Lambda에서 Token 추출하여 success 메소드와 함께 step functions로 전송 (Lambda -> Step functions)

profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글