
로그를 수집하고 가공해주는 파이프라인 단계를 간편하게 구현해줌

Docker를 이용해 ELK를 각각 컨테이너로 올려 구성했습니다

# p1.config
input{
}
filter {
}
output {
}
- pipeline.id: my-pipeline_1
path.config: "/etc/path/to/p1.config"
pipeline.workers: 3
- pipeline.id: dead-letter-queue
path.config: "/etc/different/path/dead-level-queue.conf"
queue.type: persisted
queue.type → disabled defaultPersistent Queue : 인스턴스 내장 디스크에 데이터 저장
- 로그스태시 장애 상황에서 도큐먼트 유실 최소화.
- Redis나 Kafka같은 외부 버퍼링 없이 burst 처리 가능
Dead Letter Queue : 실패한 이벤트들을 모아 큐에 저장

# dead-level-queue.conf
input {
dead_letter_queue {
path => "/path/to/data/dead_letter_queue"
commit_offsets => true
pipeline_id => "main"
}
}
output {
stdout {
codec => rubydebug { metadata => true }
}
}
💡 The dead letter queue is currently supported only for the [Elasticsearch output](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html). The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried.
tail -f 처럼 파일을 스트리밍하며 읽어옴input {
file {
path => "/path/to/file"
start_position => "beginning"
sincedb_path => "/dev/null"
stat_interval => "1m"
}
kafka {
client_id => "kafka-logstash"
group_id => "kafka-logstash"
topics => ["topic1", "topic2", "topic3"]
codec => json
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
}
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
beats{
port => 5044
}
}
필터를 거쳐 가공된 데이터를 내보내는 단계
output {
elasticsearch {
hosts => "hostname"
index => "%{[field][subfield]}-%{+YYYY.M.dd}"
user => "user_id"
password => "password"
}
file {
path => "/path/to/output/file"
codec => "json"
flush_interval => 10
}
kafka {
codec => "json"
topic_id => "topic name"
}
stdout {
codec => rubydebug
}
}
비정형 혹은 반정형화된 데이터를 정형화 시켜 다음 스토리지에 저장하게 해주는 기능
grok
정규 표현식을 사용해 문자열을 파싱하게 해줌 → %{패턴: 필드명} 의 형태로 특정 필드를 파싱
GREEDYDATA
filter {
grok {
match => {
"message" => "%{NUMBER:number}-%{TIMESTAMP_ISO8601:time}-%{GREEDYDATA:rest}"
}
}
}
NUMBER, SPACE, URI, IP, SYSLOGBASE, TIMESTAMP_ISO8601, DATA, GREEDYDATA
## Custom Pattern
# content of pattern
TEST_PATTERN [^{.*}$]
####################################
filter {
grok {
match => {
patterns_dir => ["./pattern"]
"message" => "%{TEST_PATTERN:custom}"
}
}
}
mutate
필드를 변형하는 다양한 기능 제공
https://bactoria.github.io/2020/04/12/로그스태시-mutate-filter-plugin-파헤치기/
gsub => ['field', '[pattern]', '']dissect
정규식을 사용하지 않고 구분자를 통해 새로운 필드로 추출
dessect {
mapping => {
"message" => "[%{timestamp}] [%{id}] %{ip}:%{port}"
}
}
message 필드의 문자열을 적절하게 파싱하여 새로운 필드로 나눠주는 것grok에 비해 빠름grok에 비해 단순하여 복잡한 문자열의 경우 쓰기 어려움date
data {
match => ["timestamp", "YYYY-MM-dd HH:mm", "yyyy/MM/dd HH:mm:ss"]
}
조건문