AWS - Kinesis(2) : 사용

이윤택·2022년 8월 25일
0

인프라

목록 보기
10/11

회사에서는 자바 혹은 노드로 서버단을 처리하지만, 프로젝트에 적용하기 전 실습이니만큼 파이썬으로 기능을 구현하고자 한다

aws configure가 마무리 되었다는 가정하에 본 포스트를 작성한다

1. Kinesis Data Stream 생성

1. 데이터 스트림 생성

데모용이므로 온디맨드로 가볍게 만든다. 데이터 스트림을 생성하는 것은 복잡하지 않다

2. Firehose 생성

데이터 스트림에서 데이터를 받아와 S3로 보낼 예정이므로, 소스와 데스티네이션을 알맞게 설정한다 후에 필요할 수 있지만, 람다를 사용하거나 데이터 포멧을 변경하는건 disabled로 저장한다타겟에 해당하는 S3 버킷의 저장 위치를 설정한다. eeyoontaek-pipeline-demo라는 버킷의 raw/ 폴더에 저장한다

2. Python - boto3 이용하여 데이터 스트림에 데이터 전송

import boto3
import time
import json

# 만약 aws configure이 설정되어있다면
kinesis_client = boto3.client('kinesis')

# aws configure이 설정되어있지 않았을 때
kinesis_client = boto3.client('kinesis', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION_NAME)

def put_records(records):
    kinesis_records = []
    for r in records:
        kinesis_records.append({
            "Data": json.dumps(r).encode('utf-8'),
            "PartitionKey": "string_for_partition"
        })
    response = kinesis_client.put_records(
        Records=kinesis_records,
        StreamName='kinesis-demo'
    )
    return response

def main():
    while True:
        print('Start to send')
        data = [
            {
                'time': time.time()
            },
            {
                'time': time.time() + 10
            }
        ]
        response = put_records(data)
        print('response : {}'.format(response))
        time.sleep(10)
        
if __name__ == "__main__":
    main()

코드를 실행하면 위와 같이 10초마다 데이터를 전송한다

3. 결과

위의 사진과 같이 데이터 전송이 완료된 시각으로 연/월/일/시 폴더에 담기는 것을 확인할 수 있다

profile
데이터 엔지니어로 전향중인 백엔드 개발자입니다

0개의 댓글