Data Load

박지은·2023년 8월 29일
0
post-thumbnail


데이터를 OLAP Database인 Druid에 적재하는 과정이다!
최종적으로 집계가 완료된 Data를 Druid에 적재해 주었다. OLAP와 Druid 구조에 대해서는 다른 포스팅에 다시 다뤄봐야 할 것 같다.

지금은 간단하게 VM하나에서 Data가 ingestion될 경우 처리하는 걸로 진행을 했다.

Ingestion spec을 작성할 때 ingestion spec의 type을 어떤걸로 이용할지 고민이 되었는데, 현재는 multi threading 방식으로 한 번에 여러 작업 처리가 가능한 index-parallel이 적합하다고 생각되어 이것을 선택했다.

type에는 아래와 같이 여러개가 있다.

  • index: 한번에 하나의 작업만 수행, 개발환경에 적합하다.
  • index parallel: 한번에 여러개의 작업을 멀티 스레딩으로 처리 가능하다.
  • query controller: SQL문을 이용해 작업을 수행한다. 데이터 ingestion 보다는 검색을 수행하는데 적합한 것 같았다.
  • index hadoop: 하둡 클러스터 상에서 작업을 진행한다. 본 프로젝트에서는 하둡 클러스터를 이용하지 않으니 Pass.

Druid에 데이터를 적재하기 위해서는 ingestion spec을 작성해 Broker로 요청을 보내야 한다.
아래는 실제로 구성한 ingestion spec이다.

1. ingestion_spec

Ingestion 단계에서, Druid는 입력된 source로부터 데이터를 읽어들이고, Segment의 형태로 만들어 Deep Storage에 저장한다.

ingestion_spec = {
	"type": "index_parallel",
    "spec": {
    	"ioConfig": {
        	"type": "index_parallel",
            "inputSource": {
            	"type": "s3",
                "prefixes": [s3_input_path],
                "endpointConfig": {"url": endpoint_url, "signingRegion": region},
                "properties": {
                	"s3AccessKey": access_key,
                    "s3SecretKey": secret_key,
                },
            },
            "inputFormat": {"type": "parquet"},
        },
        "dataSchema": data_schema,
    },
}
  • type: index_parallel: Parallel task indexing으로 Multi Thread를 통해 여러개의 indexing 작업을 동시에 수행하며, 배포 환경에 이용된다. 개발 밑 테스트 환경에서는 한번에 하나의 작업을 수행하는 index, Simple task indexing을 이용한다.
  • ioConfig: Source System 으로 부터 데이터를 어떻게 읽어들일지 결정한다.
  • ioConfig.inputSource: Source System의 정보와 더불어 해당 시스템에서 읽어들일 데이터의 위치 정보를 전달한다.
  • inputFormat: 포맷을 지정하여 해당 포맷을 가진 데이터만을 읽어들인다.
  • dataSchema: 읽어들일 데이터의 형태를 지정한다.

2. Data schema

Data schema에는 읽어들일 데이터들의 이름을 지정하고, 읽어들인 데이터의 행을 timestamp, dimension, metrics 로 분류하여 해당 형식에 맞게 데이터를 읽어들일 수 있도록 지정한다.

schema = {
    "listen": [
        {"name": "artist_name", "type": "string"},
        {"name": "title", "type": "string"},
        {"name": "duration", "type": "double"},
        {"name": "auth", "type": "string"},
        {"name": "level", "type": "string"},
        {"name": "city", "type": "string"},
        ...
        ]
        ...
     }

data_schema = {
        "dataSource": data_source,
        "timestampSpec": {"column": "ts", "format": "millis"},
        "dimensionsSpec": {"dimensions": schema[f"{data_type}"]},
    }
  • dataSource: 읽어들일 데이터 집합의 이름을 지정한다.
  • timestampSpec: Primary timestamp가 될 컬럼을 지정한다. Primary timestamp는 병렬 처리 등에서 데이터를 나누고 정렬하는데 이용하거나, 시간 범위를 지정해서 데이터를 불러오는데 이용된다. 준비한 데이터에는 ts라는 이벤트 발생 시간을 나타내는 컬럼이 있으니 이를 입력해 주었다.
  • dimensionSpec: 컬럼들의 데이터을 있는 그대로 저장한다. 여기에 저장된 데이터들은 groupby, filtering등 어떤 목적으로도 사용 가능하다. 이벤트 데이터별로 이용할 컬럼의 이름과 데이터 타입을 지정해 입력해 주었다.
  • metricsSpec: 각 열을 입력할 때마다 집계를 수행해 결과를 저장한 컬럼이다. 입력된 데이터에서 dimension을 기반으로 미리 집계해 둘 것이 없기 때문에 지금은 지정하지 않았다.

Reference

https://druid.apache.org/docs/latest/ingestion/ - Druid docs

profile
Today I learned...

1개의 댓글

comment-user-thumbnail
2024년 4월 12일

이렇게 아키텍처 구성하시는데 비용 얼마나 드셨나여?

답글 달기