오픈소스 서버측 데이터 처리로 파이프 라인
다양한 소스에서 동시에 데이터를 수집하여 변환
그 다음 자주 사용하는 Elasticsearch 에 전달 (굳이 이 모듈이 아니더라도 전달 가능하지만 Elasticsearch 와 가장 좋은 효율을 보임)
Logstash 파이프라인이 구축되는 구성은 DataSource -> INPUTS -> FILTERS -> OUTPUT -> Elasticsearch 로 구성되어 있음.
아래에 글에서 하나하나 살펴보자
데이터는 여러 시스템에 다양한 모습으로 산발적으로 보관된 경우가 많다. Losstash는 다양한 입력을 지원하여 여러 공통 소스에서 이벤트들로 모두 동시에 가져온다.
모든 로그, 메트릭, 웹 애플리케이션, 데이터 저장소, 및 다양한 AWS 서비스를 연속 스트리밍 형태로 간편하게 수집한다.
file : UNIX 명령 tail -0F 와 매우 비슷하게 파일 시스템의 파일에서 읽음(로그들을 읽어들여서 출력을 해주고 끝나는게 아니라 커서가 깜빡이는 상태에서 로그 발생 감지를 한다)
syslog : RFC3164 형식에 따라 syslog 메시지 및 구문 분석을 위해 잘 알려진 포트 514를 수신
redis : redis 채널과 redis 목록을 모두 사용하여 redis 서버에서 읽음
beat : Filebeat 에서 보낸 이벤트를 처리한다
데이터가 소스에서 저장소로 이동함에 따라 Logstash필터는 각 이벤트를 구문 분석하고, 명명된 필드를 식별하여 구조를 구성하고, 이를 공통 형식으로 변환 통합하여 분석을 쉽게 만들고 시간을 단축할 뿐만 아니라 비즈니스 가치를 높여준다.
grok 을 통해 비정형 데이터에서 구조 도출
IP 주소에서 위지 좌표 해독
PII 데이터를 익명화하고, 민감한 정보 필드를 완전히 제외
데이터 소스, 형태 또는 스키마의 전체의 용이한 처리
grok : 임의의 텍스트를 구성. 현재 구조화되지 않은 로그 데이터를 구문 분석
mutate : 이벤트 필드에서 일반적인 변환을 수행한다. 이벤트 및 데이터 수정 및 제거
drop : 이벤트 완전 삭제
clone : 이벤트 복사
geoip : IP주소의 지리적 위치에 대한 정보를 추가 (Kibana의 Map 차트로 사용했었음, Kibana 포스팅 확인)
Elasticsearch 뿐 아니라 다른 프로토콜 형태로 날릴수도있음.
Elasticsearch : es에 데이터 전송, 데이터를 효율적이고 편리하게 쿼리 형식으로 저장
file : 이벤트 데이터를 디스크의 파일로 저장
graphite : 이 데이터는 통계를 저장하고 그래프로 나타내기 위해 널리 사용되는 오픈소스 도구
statsd: "카운터 및 타이머와 같은 통계를 수신하고 UDP 를 통해 전송되며 하나이상의 플러그 가능한 백엔드 서비스에 집계를 보내는 서비스
https://www.elastic.co/kr/downloads/past-releases/logstash-7-2-0
C 밑에다가 압축풀기
https://www.elastic.co/guide/en/logstash/7.2/input-plugins.html
https://www.elastic.co/guide/en/logstash/7.2/filter-plugins.html
https://www.elastic.co/guide/en/logstash/7.2/output-plugins.html
로그스태시의 설치 폴더 /bin으로 가서 logstash.bat 실행
실행 시 어떤 설정파일을 사용할 지 지정해줘야 함( -f 옵션 )
C:\logstash-7.2.0\bin>logstash.bat -f ../config/sample.conf
input {
stdin {}
}
output {
stdout {}
}
input {
stdin {}
}
output {
stdout {}
elasticsearch {
hosts => ["localhost:9200"]
}
}
input {
file {
path => "C:/logstash-7.2.0/config/weblog-sample.log"
start_position => "beginning"
sincedb_path => "c:/logstash-7.2.0/null"
}
}
output {
stdout {}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
}
filter {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
geoip { source => "clientip" }
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "logdate"
}
}
filter {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
geoip { source => "clientip" }
date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] target => "logdate" }
mutate {
remove_field => [ "timestamp"]
convert {
“bytes” => “integer”
}
}
}
input {
file {
path => "C:/logstash-7.2.0/config/weblog-sample.log"
start_position => "beginning"
sincedb_path => "c:/logstash-7.2.0/null"
}
}
filter {
grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }
geoip { source => "clientip" }
date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] target => "logdate" }
mutate {
remove_field => [ "timestamp"]
convert { “bytes” => “integer” }
}
}
output {
stdout {}
elasticsearch {
hosts => ["localhost:9200"]
}
}
//logstath encoder
implementation group: 'net.logstash.logback', name: 'logstash-logback-encoder', version: '7.2'
//elasticsearch
implementation 'co.elastic.clients:elasticsearch-java:8.8.1'
implementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.17.4'
//jsp
implementation 'javax.servlet:jstl'
implementation "org.apache.tomcat.embed:tomcat-embed-jasper"
resources 밑에 logback.xml 추가
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>localhost:4560</destination>
<encoder class="net.logstash.logback.encoder.LogstashE
ncoder" />
</appender>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout" />
<appender-ref ref="stash" />
</root>
</configuration>
input {
tcp {
port => 4560
codec => json
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "springlog"
}
}
public static RestHighLevelClient getElasticsearchClient(){
final String HOST_NAME = "localhost";
final int PORT = 9200;
final String SCHEMA = "http";
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(HOST_NAME, PORT, SCHEMA)));
return client;
}
public List<Map<String, Object>> findByKeyword(String keyword){
List<Map<String, Object>> resultList = new ArrayList<>();
try (RestHighLevelClient elasticsearchClient = ElasticsearchClientConfig.getElasticsearchClient()){
SearchRequest searchRequest = new SearchRequest("petitions*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("content", keyword));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
Map<String, Object> sourceMap = hit.getSourceAsMap();
resultList.add(sourceMap);
}
} catch (IOException e) {
e.printStackTrace();
}
return resultList;
}
private final ElasticsearchTemplate elasticsearchTemplate;
public SearchService(ElasticsearchTemplate elasticsearchTemplate) {
this.elasticsearchTemplate = elasticsearchTemplate;
}
public List<Map<String, Object>> findByKeyword(String keyword){
return elasticsearchTemplate.findByKeyword(keyword);
}
private final SearchService searchService;
public SearchController(SearchService searchService) {
this.searchService = searchService;
}
@GetMapping("/home")
public String home(){
return "home";
}
@GetMapping("/search")
public ModelAndView search(
ModelAndView mav,
@RequestParam String keyword){
List<Map<String, Object>> result = searchService.findByKeyword(keyword);
mav.addObject("result", result);
mav.setViewName("search");
return mav;
}
spring.mvc.view.prefix=/WEB-INF/views/
spring.mvc.view.suffix=.jsp
<%@ page language="java" contentType="text/html; charset=utf-8"
pageEncoding="utf-8" isELIgnored="false"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Spring Boot Application with JSP</title>
</head>
<body>
<form method="get" action="/search">
<input type="text" name="keyword">
<input type="submit" value="찾기">
</form>
</body>
</html>
<%@ page import="java.util.Map" %>
<%@ page import="java.util.List" %>
<%@ page language="java" contentType="text/html; charset=utf-8"
pageEncoding="utf-8" isELIgnored="false"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Spring Boot Application with JSP</title>
</head>
<body>
<div>
<%
List<Map<String, Object>> result = (List<Map<String, Object>>) request.getAttribute("result");
for (Map<String, Object> stringObjectMap : result) { %>
<div>
<p>종료일 : <%= stringObjectMap.get("end")%></p>
<p>제목 : <%= stringObjectMap.get("title")%></p>
<p><%= stringObjectMap.get("content")%></p>
</div>
<% } %>
</div>
</body>
</html>
private Logger loggerFactory = LoggerFactory.getLogger(SearchController.class);
loggerFactory.info(“검색어 : ” + keyword);