일반적으로 Kinesis Data Firehose를 통해 S3에 아무 설정 없이 저장하면 다음과 같이 저장됩니다.
Kinesis Data Firehose에서 S3로 전달되는 year/month/day/hour/ 의 Prefix를 사용하게 됩니다.
(물론 한국 시간이 아닌 UTC를 사용합니다...)
로그에 기록된 시간이나 항목을 기준으로 S3 Prefix를 가지게 하고 싶다면 Kinesis Data Firehose의 Dynamic Partitioning 기능을 이용하면 됩니다.
https://velog.io/@arcokim/ec2-fluent-bit-logging
로그는 이러한 형식으로 보내지게 됩니다.
{
"host": "127.0.0.1",
"method": "GET",
"path": "/v1/color/red",
"HTTP": "HTTP/1.1",
"code":"200",
"time":"2023-09-09 23:44:40"
}
Kinesis Data Firehose를 생성할 때 Destination settings 에서 Dynamic Partitioning 이라는 옵션이 있습니다. 이 옵션을 Enabled 로 설정하면 Dynamic Partitioning이 가능해집니다.
Multi record deaggregation 옵션은 JSON이나 특정 구분자로 구분되는 다중 데이터를 분해할 때 사용합니다. 타입으로 JSON 을 선택하거나 구분자를 설정해야 하는 Delimited 를 선택할 수 있습니다. 저는 필요하지 않기 때문에 Not enabled 로 설정하겠습니다.
New line delimiter 옵션은 레코드 사이에 한 줄을 더 추가할 때 사용합니다. 저는 Not enabled 로 설정하겠습니다.
Inline parsing for JSON 은 JSON 레코드를 분석하여 Dynamic partitioning key 를 설정하게 할 때 사용합니다. Enabled 로 설정했다면 Dynamic partitioning key 의 키 이름과 JQ 표현식을 설정해야 합니다. 저는 로그의 시간을 분석하여 year, month, day, hour 4가지의 Dynamic partitioning key를 가지게 하였습니다.
time : 2023-09-09 23:44:40
year : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%Y")
month : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%m")
day : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%d")
hour : .time| strptime("%Y-%m-%d %H:%M:%S")| strftime("%H")
Dynamic partitioning key를 사용하여 S3 bucket prefix 를 지정할 수 있습니다. Inline parsing for JSON 에서 지정한 Dynamic partitioning key는 !{partitionKeyFromQuery:keyName} 의 형태로 사용할 수 있습니다. 저는 Athena Partitioning을 고려하여 다음과 같이 설정했습니다.
year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/hour=!{partitionKeyFromQuery:hour}/
Dynamic partitioning이나 레코드 형식 변환 등에 에러가 생겼을 때 에러 내용을 저장하는 S3 bucket error output prefix 를 지정할 수 있습니다. 저는 간단하게 error/ 로 지정하겠습니다.
이제 Kinesis Data Firehose를 생성하고 curl을 통해 저장 결과를 확인하겠습니다.
curl localhost:8080/v1/color/red
curl localhost:8080/v1/color/orange
curl localhost:8080/v1/color/melon
로그 시간에 맞게 저장이 잘 된 것을 확인할 수 있습니다.
로그가 JSON 타입이 아니거나 압축, 암호화됐을 경우 Inline parsing for JSON 사용이 어려울 수 있습니다. 이때는 Lambda 함수를 통해 Dynamic Partitioning이 가능합니다.
저는 아래와 같은 Python 코드를 Lambda 함수에 Deploy 했습니다.
(참고 코드 : https://docs.aws.amazon.com/ko_kr/firehose/latest/dev/dynamic-partitioning.html)
from __future__ import print_function
import base64
import json
import datetime
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
+ ", Region: " + firehose_records_input['region']
+ ", and InvocationId: " + firehose_records_input['invocationId'])
# Create return value.
firehose_records_output = {'records': []}
# Create result object.
# Go through records and process them
for firehose_record_input in firehose_records_input['records']:
# Get user payload
payload = base64.b64decode(firehose_record_input['data'])
json_value = json.loads(payload)
print("Record that was received")
print(json_value)
print("\n")
# Create output Firehose record and add modified payload and record ID to it.
firehose_record_output = {}
time_value = datetime.datetime.strptime(json_value['time'], '%Y-%m-%d %H:%M:%S')
partition_keys = {"year": time_value.strftime('%Y'),
"month": time_value.strftime('%m'),
"day": time_value.strftime('%d'),
"hour": time_value.strftime('%H'),
}
# Create output Firehose record and add modified payload and record ID to it.
firehose_record_output = {'recordId': firehose_record_input['recordId'],
'data': firehose_record_input['data'],
'result': 'Ok',
'metadata': { 'partitionKeys': partition_keys }}
# Must set proper record ID
# Add the record to the list of output records.
firehose_records_output['records'].append(firehose_record_output)
# At the end return processed records
return firehose_records_output
이제 생성한 Lambda 함수를 Transform source records with AWS Lambda 에 등록합니다.
Inline parsing for JSON 옵션을 Not enabled 로 설정하고, Lambda 함수를 통해 Dynamic Partitioning이 되었기 때문에 !{partitionKeyFromLambda:keyName} 의 형태로 Dynamic partitioning key를 사용할 수 있습니다.
다시 curl을 통해 저장 결과를 확인하겠습니다.
curl localhost:8080/v1/color/red
curl localhost:8080/v1/color/orange
curl localhost:8080/v1/color/melon
로그 시간에 맞게 저장이 잘 된 것을 확인할 수 있습니다.
오늘의 글은 여기까지입니다. 감사합니다!
덕분에 문제 해결 잘 되었습니다! 감사합니다!!!