[AWS] Kinesis Data Firehose Dynamic Partitioning

Jeongtae Kim·2023년 9월 10일
1

AWS

목록 보기
9/13
post-thumbnail
post-custom-banner

일반적으로 Kinesis Data Firehose를 통해 S3에 아무 설정 없이 저장하면 다음과 같이 저장됩니다.

Kinesis Data Firehose에서 S3로 전달되는 year/month/day/hour/ 의 Prefix를 사용하게 됩니다.
(물론 한국 시간이 아닌 UTC를 사용합니다...)

로그에 기록된 시간이나 항목을 기준으로 S3 Prefix를 가지게 하고 싶다면 Kinesis Data Firehose의 Dynamic Partitioning 기능을 이용하면 됩니다.

0. 실습 환경 준비

https://velog.io/@arcokim/ec2-fluent-bit-logging
로그는 이러한 형식으로 보내지게 됩니다.

{
  "host": "127.0.0.1",
  "method": "GET",
  "path": "/v1/color/red",
  "HTTP": "HTTP/1.1",
  "code":"200", 
  "time":"2023-09-09 23:44:40"
}

1. Dynamic Partitioning with JQ

Kinesis Data Firehose를 생성할 때 Destination settings 에서 Dynamic Partitioning 이라는 옵션이 있습니다. 이 옵션을 Enabled 로 설정하면 Dynamic Partitioning이 가능해집니다.

Multi record deaggregation 옵션은 JSON이나 특정 구분자로 구분되는 다중 데이터를 분해할 때 사용합니다. 타입으로 JSON 을 선택하거나 구분자를 설정해야 하는 Delimited 를 선택할 수 있습니다. 저는 필요하지 않기 때문에 Not enabled 로 설정하겠습니다.

New line delimiter 옵션은 레코드 사이에 한 줄을 더 추가할 때 사용합니다. 저는 Not enabled 로 설정하겠습니다.

  • Not enabled
  • Enabled

Inline parsing for JSON 은 JSON 레코드를 분석하여 Dynamic partitioning key 를 설정하게 할 때 사용합니다. Enabled 로 설정했다면 Dynamic partitioning key 의 키 이름과 JQ 표현식을 설정해야 합니다. 저는 로그의 시간을 분석하여 year, month, day, hour 4가지의 Dynamic partitioning key를 가지게 하였습니다.

time : 2023-09-09 23:44:40
year : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%Y")
month : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%m")
day : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%d")
hour : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%H")

Dynamic partitioning key를 사용하여 S3 bucket prefix 를 지정할 수 있습니다. Inline parsing for JSON 에서 지정한 Dynamic partitioning key는 !{partitionKeyFromQuery:keyName} 의 형태로 사용할 수 있습니다. 저는 Athena Partitioning을 고려하여 다음과 같이 설정했습니다.

year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/hour=!{partitionKeyFromQuery:hour}/

Dynamic partitioning이나 레코드 형식 변환 등에 에러가 생겼을 때 에러 내용을 저장하는 S3 bucket error output prefix 를 지정할 수 있습니다. 저는 간단하게 error/ 로 지정하겠습니다.

이제 Kinesis Data Firehose를 생성하고 curl을 통해 저장 결과를 확인하겠습니다.

curl localhost:8080/v1/color/red
curl localhost:8080/v1/color/orange
curl localhost:8080/v1/color/melon

로그 시간에 맞게 저장이 잘 된 것을 확인할 수 있습니다.

2. Dynamic Partitioning with Lambda

로그가 JSON 타입이 아니거나 압축, 암호화됐을 경우 Inline parsing for JSON 사용이 어려울 수 있습니다. 이때는 Lambda 함수를 통해 Dynamic Partitioning이 가능합니다.

저는 아래와 같은 Python 코드를 Lambda 함수에 Deploy 했습니다.
(참고 코드 : https://docs.aws.amazon.com/ko_kr/firehose/latest/dev/dynamic-partitioning.html)

from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        time_value = datetime.datetime.strptime(json_value['time'], '%Y-%m-%d %H:%M:%S')
        partition_keys = {"year": time_value.strftime('%Y'),
                          "month": time_value.strftime('%m'),
                          "day": time_value.strftime('%d'),
                          "hour": time_value.strftime('%H'),
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output

이제 생성한 Lambda 함수를 Transform source records with AWS Lambda 에 등록합니다.

Inline parsing for JSON 옵션을 Not enabled 로 설정하고, Lambda 함수를 통해 Dynamic Partitioning이 되었기 때문에 !{partitionKeyFromLambda:keyName} 의 형태로 Dynamic partitioning key를 사용할 수 있습니다.

다시 curl을 통해 저장 결과를 확인하겠습니다.

curl localhost:8080/v1/color/red
curl localhost:8080/v1/color/orange
curl localhost:8080/v1/color/melon

로그 시간에 맞게 저장이 잘 된 것을 확인할 수 있습니다.

오늘의 글은 여기까지입니다. 감사합니다!

profile
유용할지도 모른다.
post-custom-banner

1개의 댓글

comment-user-thumbnail
2023년 12월 5일

덕분에 문제 해결 잘 되었습니다! 감사합니다!!!

답글 달기