데이터 파이프라인 (3) - Logstash 설정

스르륵·2022년 5월 27일
0

데이터파이프라인

목록 보기
4/10
vi ~/logstash/logstash.conf

input {
  kafka {
    bootstrap_servers => ["192.168.65.193:9092"]
    topics =>["rsyslog-logstash", "collectd-logstash"]
    codec => json
    consumer_threads => 2
    decorate_events => true
  }
}

filter {
  if "rsyslog" in [@metadata][kafka][topic] {
    mutate {
      add_field => {"topicName" => 'rsyslog'}
    }
  }
  if "collectd" in  [@metadata][kafka][topic] {
    mutate {
      add_field => {'topicName' => 'collectd'}
    }
  }
  json {
    source => "message"
  }
}

output {
    elasticsearch{
      hosts => "192.168.65.159:9200"
      index => "index-%{+YYYY.MM.dd}"
      document_type => "logs"
    }
}

kafka 토픽은 두 개가 있다. 각각의 프로듀서가 동일한 이름의 토픽으로 데이터를 보내고 있다. 그리고 logstashs는 컨슈머로서 데이터를 끌어온 뒤 elasticsearch로 보내준다.

이때, 토픽 명에 따라서 해당 토픽 명을 필드로 추가하고 있는데 filter가 그 역할을 하는 것이다. 그리고 이때 if를 사용해 토픽 명에 따라 분기를 나눴는데, 토픽명을 읽어오기 위해서는 input에서 decorate_events => true 항목이 반드시 추가되어야 한다.

output에서는 elasticsearch 호스트와 인덱스명을 넣어주면 된다.

profile
기록하는 블로그

0개의 댓글