vi ~/logstash/logstash.conf
input {
kafka {
bootstrap_servers => ["192.168.65.193:9092"]
topics =>["rsyslog-logstash", "collectd-logstash"]
codec => json
consumer_threads => 2
decorate_events => true
}
}
filter {
if "rsyslog" in [@metadata][kafka][topic] {
mutate {
add_field => {"topicName" => 'rsyslog'}
}
}
if "collectd" in [@metadata][kafka][topic] {
mutate {
add_field => {'topicName' => 'collectd'}
}
}
json {
source => "message"
}
}
output {
elasticsearch{
hosts => "192.168.65.159:9200"
index => "index-%{+YYYY.MM.dd}"
document_type => "logs"
}
}
kafka 토픽은 두 개가 있다. 각각의 프로듀서가 동일한 이름의 토픽으로 데이터를 보내고 있다. 그리고 logstashs는 컨슈머로서 데이터를 끌어온 뒤 elasticsearch로 보내준다.
이때, 토픽 명에 따라서 해당 토픽 명을 필드로 추가하고 있는데 filter
가 그 역할을 하는 것이다. 그리고 이때 if
를 사용해 토픽 명에 따라 분기를 나눴는데, 토픽명을 읽어오기 위해서는 input
에서 decorate_events => true
항목이 반드시 추가되어야 한다.
output
에서는 elasticsearch 호스트와 인덱스명을 넣어주면 된다.