ELK Stack (2) - logstash

Jae Min·2023년 12월 21일
0

ELK STACK

목록 보기
3/5
post-thumbnail

이전 글에 이어서... 긁어온 로그 데이터를 어떻게 가공하는지에 대해서 알아보자.

logstash

무료 개방형 서버의 데이터 처리 파이프라인인 Logstash는 다양한 소스에서 데이터를 수집하여 변환한 후 자주 사용하는 저장소로 전달합니다.
logstash 공식문서

데이터를 수집해서 원하는 stash 로 전송한다.
데이터를 가공할 필요가 없다면, logstash 를 사용하지 않고, 바로 elasticsearch 혹은 다른 stash 로 전송하면 되지만, 비즈니스 상황 및 로직에 따라 데이터를 가공해야 할 경우 logstash 를 사용하면 효용성을 높일 수 있다.

how to install

  1. wget https://artifacts.elastic.co/downloads/logstash/logstash-8.6.2-x86_64.rpm
  2. rpm -Uvh logstash-8.6.2-x86_64.rpm
  3. sudo systemctl daemon-reload
    sudo systemctl enable logstash.service
    sudo systemctl start logstash.service
    sudo systemctl status logstash.service

config file

logstash 는 filebeat 가 설치된 ec2(서버가 돌아가고 있는) 에 같이 설치해도 되지만, 관리의 편의성과 logstash 때문에 서버에 문제가 발생하면 안되기 때문에 다른 ec2 에 설치를 해주었다.

logstash 도 filebeat 와 마찬가지로 기본적으로 /etc/logstash 에 설정파일이 있다.

jvm.options

## JVM configuration

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

#-Xms1g
#-Xmx1g
-Xms256m
-Xmx256m

기본적으로 logstash 는 자바로 만들어져있어 따로 메모리 관리를 해줘야 한다.
처음에는 Xms1g 로 되어 있을텐데, 이건 용량이 너무 작아서 늘려주는 것(Xms256m)이 좋다.

pipelines.yml

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: main
  path.config: "/etc/logstash/pipeline.conf"

pipeline.yml 파일에는 어떤 설정파일을 가지고 logstash 를 구동할지에 대한 정의가 들어있다. 만약에 본인이 설정파일의 이름을 바꾸던지 위치를 바꿨다면 여기도 같이 바꿔줘야 한다.

pipeline.conf

# Beats -> Logstash -> Elasticsearch pipeline.


input {
  beats {
    port => 5044
  }
}

filter {
    grok {
        match => [ "message", "#(?<current_time>[^#]*)#(?<log_level>[^#]*)#(?<app_name>[^#]*)#(?<http_method>[^#]*)#(?<host_url>[^#]*)#(?<url>[^#]*)#(?<error_message>[^#]*)#(?<body>[^#]*)#(?<client_ip>[^#]*)#(?<server_ip>[^#]*)#(?<stack_trace>[^#]*)" ]
    }
    #current_time에서 년월일 추출
    grok {
        match => [ "current_time", "(?<py>[^-]*)-(?<pm>[^-]*)-(?<pd>[^-]*) (?<ph>[^:]*)" ]
    }

    mutate {
        convert => {
            # convert 구문에서 integer는 정수로 long, integer 같이 사용함
            "server_type" => "integer"
            "server_id" => "integer"
            "thread_id" => "integer"
        }
        rename => {
            "[fields][instance_name]" => "instance_name"
            "[py]" => "[@metadata][py]"
            "[pm]" => "[@metadata][pm]"
            "[pd]" => "[@metadata][pd]"
            "[ph]" => "[@metadata][ph]"
        }
        remove_field => ["_id", "_score", "_type", "beat", "source", "offset", "prospector", "message", "tags", "@version", "fields", "host", "input", "log", "ecs", "agent"]
    }
    if [app_name] =~ "undefined" {
        mutate {
          replace => { "log_level" => "info" }
        }
    }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "log-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "[password]"
  }
  s3 {
    access_key_id => "[access_key_id]"
    secret_access_key => "[secret_access_key]"
    bucket => "[bucket_name]"
    codec => rubydebug
  }
}

여기가 가장 중요하다.

input

  • beats :
    filebeat 로 부터 데이터를 받는다. elk 는 prometheus 와 다르게 데이터를 pull 해서 받는것이 아니라 push 해서 받는것이기 때문에 filebeat 가 설치되어 있는 ec2 의 ip를 따로 설정을 해줄 필요가 없다. 대신 port 번호(5044)는 있어야 한다.

filter

  • grok :
    쿼리를 짤 수 있도록 raw data 를 변형시켜주는 플러그인이다.
    match 를 통해 특정 field를 정형화된 형태로 매칭시켜줄 수 있다.
    첫번째 match 를 보면 message field를 두번쨰 인자값인 #(?<current_time>[^#]*)#(?<log_level>[^#]*)#(?<app_name>[^#]*)#(?<http_method>[^#]*)#(?<host_url>[^#]*)#(?<url>[^#]*)#(?<error_message>[^#]*)#(?<body>[^#]*)#(?<client_ip>[^#]*)#(?<server_ip>[^#]*)#(?<stack_trace>[^#]*)" 형태로 매칭시켜줬다.

    잘 보면 (?<...>[^#]*) 와 같은 형태와 filebeat 설정파일에서 세팅한 multiline pattern 을 구분자로 나누는 것을 확인할 수 있다.

    (# 를 기준으로 (?<current_time>[^#]*) 과 같이 원하는 값을 구분지을 수 있다)

    current_time, log_level, app_name 이런 값들이 나중에 kibana 에서 쿼리를 짤때 쓰이는 field들이다.

    대신 중요한 점이 있다. filebeat 에서 수집하는 로그 파일에도 저 형태와 동일하게 데이터가 쌓여야 한다. 즉, 반드시 # 를 구분자로 로그를 쌓아야 하다는 뜻이다.

    두번째 match 를 보면 첫번째 match 에서 사용한 current_time field를 두번째 형태 "(?<py>[^-]*)-(?<pm>[^-]*)-(?<pd>[^-]*) (?<ph>[^:]*)" 로 매칭시켜줬다. 년월일시간으로 형태화 시키기 위한 정규식을 통해서 kibana 에서 정형화된 날짜 형태 데이터로 확인할 수 있다.

  • mutate :
    mutate 는 형태를 정형화 시켜주는 match 와는 다르게 값을 바꿔주는 역할을 한다
    mutate 에서 쓰이는 기능들로는
    coerce, rename, update, replace, convert, gsub, uppercase, capitalize, lowercase, strip, split, join, merge, copy 들이 있다.
    convert는 사용하는 field 타입을 변경해주는 type casting 을 할 수 있다.
    rename은 field 의 이름을 바꿔주는 역할을 해준다.
    filebeat 의 설정파일에서 세팅한 fields 키 값을 보면 instance_name 을 설정했었다.

    fields:
      instance_name: "instance_1"

    여기서 쓰이는 instance_1instance_name field 로 사용하겠다라는 의미이다.
    아래 있는 [@metadata][py] 는 kibana 에서 쓰이는 metadata 로 사용하겠다라는 의미이다.
    remove_field는 kibana 에서 특정 field 들을 보이지 않게 할 수 있다.

    마지막으로 에러 로그중에서 app_name 이 undefined 는 우리 서비스 페이지에서 보낸 요청이 아닌, 외부에서 의도적으로 공격성을 띈 요청이라고 판단되어, log_level 을 info 로 전환시킬 수도 있다.

    if [app_name] =~ "undefined" {
           mutate {
             replace => { "log_level" => "info" }
           }
       }

output

  • elasticsearch :
    logstash 를 통해서 데이터를 가공하고 나면 해당 데이터들을 elasticsearch 로 전송하는 역할이다.
    logstash 와 elasticsearch 를 같은 ec2 에 설치를 하였기 때문에 localhost:9200 임을 알 수 있다.
  • s3 :
    가공된 데이터를 elasticsearch 뿐만 아니라, s3에도 저장을 하여, 추후에 분쟁이 있거나 혹시 모를 문제에 대응하기 위해 s3에 저장을 하게끔 설정한다.

다음에는 elasticsearch 에서 어떻게 kibana 로 보내는 지 알아보자.


REF

https://www.elastic.co/kr/logstash
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
사진: UnsplashCrystal Mapes

profile
자유로워지고 싶다면 기록하라.

0개의 댓글