ELK란?
ElasticSearch, LogStash, Kibana 조합으로 로그 수집 - 로그 저장 및 검색 - 시각화로 쓰이게 된다.
Logstash는 실시간 파이프라인 기능을 갖는 데이터 수집 엔지이며, Input을 받아 Filter를통해 가공하고 Output이 ElasticSearch의 Input이 되거나 MQ의 producer가 되는 등 파이프라인의 첫번째 단계이다.
Logstash는 여러 시스템에서 데이터 수집 및 위해 다양한 입력 Plugin을 지원하기 때문에 유연성 갖고 있다.
Streamsets, Nifi 등 ETL 툴과 다른점은 Logstash는 로그 수집하고 구문 분석해서 나중에 사용할 수 있도록하는 Log Management로 분류된다. 반면 다른 툴들은 Stream Processing 범주의 도구로 분류된다.
Input : 로그데이터가 쌓이는 파일
Filter : 로그데이터에서 필요한 칼럼, 형변환 등 Transformation과정
Output : ElasticSearch와 연동될 경우에는 hosts는 ElasticSearch서버, 해당 서버의 index 설정
> logstash_stage1.conf
input {
file {
path => "/home/minoh1227/demo-spark-analytics/00.stage1/tracks_live.csv"
start_position => "end"
}
}
filter {
csv {
columns => ["event_id","customer_id","track_id","datetime","ismobile","listening_zip_code"]
separator => ","
}
date {
match => [ "datetime", "YYYY-MM-dd HH:mm:ss"]
target => "datetime"
}
mutate {
convert => { "ismobile" => "integer" }
}
}
output {
stdout {
codec => rubydebug{ }
}
elasticsearch {
hosts => "http://localhost:9200"
index => "ba_realtime"
}
}
기본적으로 Memory Queue이용하기 때문에 장애 시에 memory에 있는 데이터가 사라진다. 해당 문제를 방지하기 위해서 파일에 저장하는 Persistent Queue사용하면 된다.
input - PQ - filter - output
input - Head checkpoint - filter - output - Tail checkpoint
HC지점에 큐에 들어가고 TC지점에서 ack상태 되고 큐에서 빠져나간다. 만약 비정상 종료 했다면 ack아닌 것들은 filter만 다시 처리하게 된다.
input -> Queue -> Workers 부분의 Queue에서 batch size 및 pipeline.workers 개수를 늘려서 스레드 개수를 늘려주는 방안이 있다.
https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html
데이터가 들어오면 Tokernizer, Token Filter가 특정 기준으로 토큰을 분리해서 검색 가능하도록(searchable) 가공한다.
Term Query경우에는 analyzer거치지 않고 검색어와 일치하는 문서 찾기. 정형 데이터 예시로는 날짜, IP주소, 제품ID 등.
Full-Text Query경우에는 analyzer 거쳐서 좀 더 복잡한 조회에 적합
<row 기반>
<Inverted Index 기반>
ElasticSearch에 있는 데이터를 검색하여 분석 및 시각화한다. Histograme, Geo 맵 등 여러 시각화를 편리하게 할 수 있다. ElasticSearch의 결과를 보여주는 것이기 때문에 ElasticSearch가 필수로 선행된다.
ElasticSearch의 Index에있는 데이터 탐색 시 사용
필요한 차트를 구성해서 DashBoard 구성
(logstash)
https://blog.naver.com/PostView.nhn?blogId=wideeyed&logNo=222153700165