데이터를 OLAP Database인 Druid에 적재하는 과정이다!
최종적으로 집계가 완료된 Data를 Druid에 적재해 주었다. OLAP와 Druid 구조에 대해서는 다른 포스팅에 다시 다뤄봐야 할 것 같다.
지금은 간단하게 VM하나에서 Data가 ingestion될 경우 처리하는 걸로 진행을 했다.
Ingestion spec을 작성할 때 ingestion spec의 type을 어떤걸로 이용할지 고민이 되었는데, 현재는 multi threading 방식으로 한 번에 여러 작업 처리가 가능한 index-parallel이 적합하다고 생각되어 이것을 선택했다.
type에는 아래와 같이 여러개가 있다.
Druid에 데이터를 적재하기 위해서는 ingestion spec을 작성해 Broker로 요청을 보내야 한다.
아래는 실제로 구성한 ingestion spec이다.
Ingestion 단계에서, Druid는 입력된 source로부터 데이터를 읽어들이고, Segment의 형태로 만들어 Deep Storage에 저장한다.
ingestion_spec = {
"type": "index_parallel",
"spec": {
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"prefixes": [s3_input_path],
"endpointConfig": {"url": endpoint_url, "signingRegion": region},
"properties": {
"s3AccessKey": access_key,
"s3SecretKey": secret_key,
},
},
"inputFormat": {"type": "parquet"},
},
"dataSchema": data_schema,
},
}
type: index_parallel
: Parallel task indexing으로 Multi Thread를 통해 여러개의 indexing 작업을 동시에 수행하며, 배포 환경에 이용된다. 개발 밑 테스트 환경에서는 한번에 하나의 작업을 수행하는 index
, Simple task indexing을 이용한다.ioConfig
: Source System 으로 부터 데이터를 어떻게 읽어들일지 결정한다.ioConfig.inputSource
: Source System의 정보와 더불어 해당 시스템에서 읽어들일 데이터의 위치 정보를 전달한다.inputFormat
: 포맷을 지정하여 해당 포맷을 가진 데이터만을 읽어들인다.dataSchema
: 읽어들일 데이터의 형태를 지정한다.Data schema에는 읽어들일 데이터들의 이름을 지정하고, 읽어들인 데이터의 행을 timestamp, dimension, metrics 로 분류하여 해당 형식에 맞게 데이터를 읽어들일 수 있도록 지정한다.
schema = {
"listen": [
{"name": "artist_name", "type": "string"},
{"name": "title", "type": "string"},
{"name": "duration", "type": "double"},
{"name": "auth", "type": "string"},
{"name": "level", "type": "string"},
{"name": "city", "type": "string"},
...
]
...
}
data_schema = {
"dataSource": data_source,
"timestampSpec": {"column": "ts", "format": "millis"},
"dimensionsSpec": {"dimensions": schema[f"{data_type}"]},
}
dataSource
: 읽어들일 데이터 집합의 이름을 지정한다.timestampSpec
: Primary timestamp가 될 컬럼을 지정한다. Primary timestamp는 병렬 처리 등에서 데이터를 나누고 정렬하는데 이용하거나, 시간 범위를 지정해서 데이터를 불러오는데 이용된다. 준비한 데이터에는 ts라는 이벤트 발생 시간을 나타내는 컬럼이 있으니 이를 입력해 주었다.dimensionSpec
: 컬럼들의 데이터을 있는 그대로 저장한다. 여기에 저장된 데이터들은 groupby, filtering등 어떤 목적으로도 사용 가능하다. 이벤트 데이터별로 이용할 컬럼의 이름과 데이터 타입을 지정해 입력해 주었다.metricsSpec
: 각 열을 입력할 때마다 집계를 수행해 결과를 저장한 컬럼이다. 입력된 데이터에서 dimension을 기반으로 미리 집계해 둘 것이 없기 때문에 지금은 지정하지 않았다.https://druid.apache.org/docs/latest/ingestion/ - Druid docs
이렇게 아키텍처 구성하시는데 비용 얼마나 드셨나여?