Logstash 개요, 개념, 실습

이동명·2023년 6월 28일
0
post-thumbnail

# Logstash 개요

  • 오픈소스 서버측 데이터 처리로 파이프 라인

  • 다양한 소스에서 동시에 데이터를 수집하여 변환

  • 그 다음 자주 사용하는 Elasticsearch 에 전달 (굳이 이 모듈이 아니더라도 전달 가능하지만 Elasticsearch 와 가장 좋은 효율을 보임)

  • Logstash 파이프라인이 구축되는 구성은 DataSource -> INPUTS -> FILTERS -> OUTPUT -> Elasticsearch 로 구성되어 있음.

  • 아래에 글에서 하나하나 살펴보자

입력

데이터는 여러 시스템에 다양한 모습으로 산발적으로 보관된 경우가 많다. Losstash는 다양한 입력을 지원하여 여러 공통 소스에서 이벤트들로 모두 동시에 가져온다.

모든 로그, 메트릭, 웹 애플리케이션, 데이터 저장소, 및 다양한 AWS 서비스를 연속 스트리밍 형태로 간편하게 수집한다.

Logstash 로 데이터를 가져온다

  • file : UNIX 명령 tail -0F 와 매우 비슷하게 파일 시스템의 파일에서 읽음(로그들을 읽어들여서 출력을 해주고 끝나는게 아니라 커서가 깜빡이는 상태에서 로그 발생 감지를 한다)

  • syslog : RFC3164 형식에 따라 syslog 메시지 및 구문 분석을 위해 잘 알려진 포트 514를 수신

  • redis : redis 채널과 redis 목록을 모두 사용하여 redis 서버에서 읽음

  • beat : Filebeat 에서 보낸 이벤트를 처리한다

필터

데이터가 소스에서 저장소로 이동함에 따라 Logstash필터는 각 이벤트를 구문 분석하고, 명명된 필드를 식별하여 구조를 구성하고, 이를 공통 형식으로 변환 통합하여 분석을 쉽게 만들고 시간을 단축할 뿐만 아니라 비즈니스 가치를 높여준다.

Logstash 변환 준비 과정

  • grok 을 통해 비정형 데이터에서 구조 도출

  • IP 주소에서 위지 좌표 해독

  • PII 데이터를 익명화하고, 민감한 정보 필드를 완전히 제외

  • 데이터 소스, 형태 또는 스키마의 전체의 용이한 처리

Logstash 파이프 라인의 중간 처리 장치

  • grok : 임의의 텍스트를 구성. 현재 구조화되지 않은 로그 데이터를 구문 분석

  • mutate : 이벤트 필드에서 일반적인 변환을 수행한다. 이벤트 및 데이터 수정 및 제거

  • drop : 이벤트 완전 삭제

  • clone : 이벤트 복사

  • geoip : IP주소의 지리적 위치에 대한 정보를 추가 (Kibana의 Map 차트로 사용했었음, Kibana 포스팅 확인)

출력

Elasticsearch 뿐 아니라 다른 프로토콜 형태로 날릴수도있음.

최종 단계로 이벤트는 여러 출력 가능

  • Elasticsearch : es에 데이터 전송, 데이터를 효율적이고 편리하게 쿼리 형식으로 저장

  • file : 이벤트 데이터를 디스크의 파일로 저장

  • graphite : 이 데이터는 통계를 저장하고 그래프로 나타내기 위해 널리 사용되는 오픈소스 도구

  • statsd: "카운터 및 타이머와 같은 통계를 수신하고 UDP 를 통해 전송되며 하나이상의 플러그 가능한 백엔드 서비스에 집계를 보내는 서비스

로그스태시 설치 및 실습

https://www.elastic.co/kr/downloads/past-releases/logstash-7-2-0

C 밑에다가 압축풀기

https://www.elastic.co/guide/en/logstash/7.2/input-plugins.html

https://www.elastic.co/guide/en/logstash/7.2/filter-plugins.html

https://www.elastic.co/guide/en/logstash/7.2/output-plugins.html

실행

로그스태시의 설치 폴더 /bin으로 가서 logstash.bat 실행

실행 시 어떤 설정파일을 사용할 지 지정해줘야 함( -f 옵션 )

C:\logstash-7.2.0\bin>logstash.bat -f ../config/sample.conf

input(표준입력) -> output(표준출력)

input {
   stdin {}
}
output {
  stdout {}
}

input(표준입력) -> output(elasticsearch )

input {
   stdin {}
}
output {
  stdout {}
  elasticsearch {
    hosts => ["localhost:9200"]
  }
}

input(weblog-sample.log) -> output(표준출력 )

input {
  file {
    path => "C:/logstash-7.2.0/config/weblog-sample.log"
    start_position => "beginning"
    sincedb_path => "c:/logstash-7.2.0/null"
  }
}
output {
  stdout {}
}

filter(grok)

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}

filter(grok, geoip)

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
geoip {
    source => "clientip"
  }
}

filter(grok, geoip, date)

filter {
  grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
  geoip { source => "clientip" }
  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "logdate"
  }
}

filter(grok, geoip, date, mutate)

filter {
  grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
  geoip { source => "clientip" }
  date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] target => "logdate" }
  mutate {
    remove_field => [ "timestamp"]
    convert {
        “bytes” => “integer”
    }
  }
}
input {
  file {
    path => "C:/logstash-7.2.0/config/weblog-sample.log"
    start_position => "beginning"
    sincedb_path => "c:/logstash-7.2.0/null"
  }
}
filter {
  grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
  geoip { source => "clientip" }
  date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] target => "logdate" }
  mutate {
    remove_field => [ "timestamp"]
    convert { “bytes” => “integer” }
  }
}
output {
  stdout {}
  elasticsearch {
    hosts => ["localhost:9200"]
  }
}

프로젝트 생성

의존성 추가

//logstath encoder
implementation group: 'net.logstash.logback', name: 'logstash-logback-encoder', version: '7.2'

//elasticsearch
implementation 'co.elastic.clients:elasticsearch-java:8.8.1'
implementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.17.4'

//jsp
implementation 'javax.servlet:jstl'
implementation "org.apache.tomcat.embed:tomcat-embed-jasper"

logback.xml

resources 밑에 logback.xml 추가

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>localhost:4560</destination>
<encoder class="net.logstash.logback.encoder.LogstashE
ncoder" />
</appender>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="stdout" />
<appender-ref ref="stash" />
</root>
</configuration>

로그스태시 설정

input {
    tcp {
    port => 4560
    codec => json
  }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "springlog"
    }
}

ElasticsearchClientConfig

public static RestHighLevelClient getElasticsearchClient(){

final String HOST_NAME = "localhost";
final int PORT = 9200;
final String SCHEMA = "http";

RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(HOST_NAME, PORT, SCHEMA)));

return client;
}

ElasticsearchTemplate

public List<Map<String, Object>> findByKeyword(String keyword){
	List<Map<String, Object>> resultList = new ArrayList<>();
try (RestHighLevelClient elasticsearchClient = ElasticsearchClientConfig.getElasticsearchClient()){
SearchRequest searchRequest = new SearchRequest("petitions*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("content", keyword));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
Map<String, Object> sourceMap = hit.getSourceAsMap();
resultList.add(sourceMap);
}
} catch (IOException e) {
e.printStackTrace();
}

return resultList;
}

SearchService

private final ElasticsearchTemplate elasticsearchTemplate;

public SearchService(ElasticsearchTemplate elasticsearchTemplate) {
this.elasticsearchTemplate = elasticsearchTemplate;
}

public List<Map<String, Object>> findByKeyword(String keyword){
return elasticsearchTemplate.findByKeyword(keyword);
}

SearchController

private final SearchService searchService;

public SearchController(SearchService searchService) {
this.searchService = searchService;
}

@GetMapping("/home")
  public String home(){
  return "home";
}

@GetMapping("/search")
public ModelAndView search(
ModelAndView mav,
@RequestParam String keyword){

List<Map<String, Object>> result = searchService.findByKeyword(keyword);
mav.addObject("result", result);
mav.setViewName("search");
return mav;
}

views 폴더 생성

application.properties

spring.mvc.view.prefix=/WEB-INF/views/
spring.mvc.view.suffix=.jsp

home.jsp

<%@ page language="java" contentType="text/html; charset=utf-8"
pageEncoding="utf-8" isELIgnored="false"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%>

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Spring Boot Application with JSP</title>
</head>
<body>
<form method="get" action="/search">
<input type="text" name="keyword">
<input type="submit" value="찾기">
</form>
</body>
</html>

search.jsp

<%@ page import="java.util.Map" %>
<%@ page import="java.util.List" %>
<%@ page language="java" contentType="text/html; charset=utf-8"
pageEncoding="utf-8" isELIgnored="false"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%>

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Spring Boot Application with JSP</title>
</head>
<body>
<div>
<%
List<Map<String, Object>> result = (List<Map<String, Object>>) request.getAttribute("result");

for (Map<String, Object> stringObjectMap : result) { %>

<div>
<p>종료일 : <%= stringObjectMap.get("end")%></p>
<p>제목 : <%= stringObjectMap.get("title")%></p>
<p><%= stringObjectMap.get("content")%></p>
</div>

<% } %>
</div>
</body>
</html>

로깅 추가

private Logger loggerFactory = LoggerFactory.getLogger(SearchController.class);

loggerFactory.info(“검색어 : ” + keyword);

profile
Web Developer

0개의 댓글