ELK 스택을 사용해 Log 관리 시스템을 만들때 Kafka가 중간에 메세지 큐로서 주로 사용된다.
여러 app들이 kafka로 log 메세지를 보내는 방법에는 여러가지가 있다.
참조: https://www.elastic.co/kr/blog/just-enough-kafka-for-the-elastic-stack-part1
위 그림처럼 Logstash를 사용해서 Kafka로 app에서 생성한 로그파일을 메세지로 전달할 수 있다.
https://www.elastic.co/kr/downloads/logstash 에서 logstash를 다운받아 압축을 푼다.
config파일을 만들어서 logstash를 실행할 수 있다.
input은 파일이고 output은 kafka가 되도록 config파일을 지정해보자.
logstash-kafka.conf
input {
file {
path => "/Users/jiminsub/Apps/Dev/test/test-kafka.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "test-logstash"
codec => plain {
format => "%{message}"
}
}
stdout {}
}
path에서 log파일의 경로를 지정하고 start_position에선 log파일을 어디서부터 읽을 것인지 지정한다.
output으로 kafka를 지정하고 bootstrap_servers로 kafka url를 지정한다. (8.0 이하 버전은 zookeeper로 설정해야 한다)
topic_id에서 토픽 아이디를 지정하고 codec에서 메세지 포멧을 지정한다.
실행은 아래와 같다.
> bin/logstash -f logstash-kafka.conf
filebeat은 로그파일에 대한 logstash Shipper의 역할을 수행하는 Beats의 한 종류이다.
https://www.elastic.co/kr/downloads/beats 에서 filebeat을 다운로드 받고 압축을 풀고 app이 있는 서버에서 실행하면 되는 구조이다.
filebeat.yml을 다음과 같이 설정한다.
filebeat.prospectors:
- input_type: log
document_type: log-filebeat
paths:
- /Users/jiminsub/Apps/Dev/test/test.log
output.kafka:
# The Logstash hosts
hosts: ["localhost:9092"]
topic: "test-filebeat"
내용은 /Users/jiminsub/Apps/Dev/test/test.log의 로그파일이 변경이 있을때마다 localhost:9092의 test-filebeat 토픽으로 메세지를 보낸다.
filebeat을 실행하는 OS 별 방식이다.
deb:
sudo /etc/init.d/filebeat start
rpm:
sudo /etc/init.d/filebeat start
mac:
sudo ./filebeat -e -c filebeat.yml -d "publish"
win:
PS C:\Program Files\Filebeat> Start-Service filebeat
위 두가지 방식은 로그파일을 만들고 파일을 주기적으로 kafka로 전달하는 방식이다.
이번 방법은 application에서 바로 TCP를 통해 kafka로 전달하는 방식이다.
이를 위해 아래 두 dependency가 필요하다
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.8</version>
</dependency>
다음은 logback.xml
이다.
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
</layout>
</appender>
<!-- kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</layout>
</encoder>
<topic>test-logback</topic>
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
</appender>
<!-- kafkaAppender with Logstash -->
<appender name="logstashKafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
<layout class="net.logstash.logback.layout.LogstashLayout" />
</encoder>
<topic>test-logback</topic>
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
</appender>
<!-- logger -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="com.minsub.java.logger.kafka" level="DEBUG">
<appender-ref ref="kafkaAppender" />
<!--
<appender-ref ref="logstashKafkaAppender" />
-->
</logger>
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
kafkaAppender를 사용할 때 중요한건 topic과 boostrap_server 지정이다. 아래 두 줄이 kafka URL과 토픽을 지정하는 부분이다.
<topic>test-logback</topic>
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
두가지 레이아웃을 사용할 수 있는데 기본적으로 Logback의 Layout을 사용해 그대로 kafka로 전달할 수 있다.
kafkaAppender가 그 예제이다.
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
위 형태로 메세자기 전달 될 것이다.
두번째 레이아웃은 LogstashLayout 을 이용하는 것이다.
logstashKafkaLayout을 보면 layout에 LogstashLayout으로 지정하면 logstash에서 자동으로 다른 필드들이 지정된다.
아래 두 layout의 메시지 예제이다.
#without logstash
10:14:30.177 [main] DEBUG c.m.j.l.kafka.KafkaLogbackSample - Kafka log message 1
# with logstash
{ "@timestamp": "2017-01-15T10:18:25.776+09:00",
"@version": 1,
"message": "Kafka log message 1",
"logger_name": "com.minsub.java.logger.kafka.KafkaLogbackSample",
"thread_name": "main",
"level": "DEBUG",
"level_value": 10000,
"HOSTNAME": "Minsubsui-MacBook-Pro.local" }
위 3번째 logback을 사용한 방법과 거의 유사하다.
이를 위한 dependency는 아래와 같다. 그리고 logback에서 사용할 수 있는 logstash layout은 사용할 수 가 없다.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.10.1.0</version>
</dependency>
다음은 log4j.xml 이다
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
<log4j:configuration>
<!-- console -->
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n" />
</layout>
</appender>
<!-- fileAppender -->
<appender name="fileAppender" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="C:/Test/log/logfile_TEST.log"/>
<param name="Append" value="true"/>
<!-- param name="MaxFileSize" value="100MB"/ -->
<!-- param name="MaxBackupIndex" value="2"></param -->
<param name="DatePattern" value="'.'yyyy-MM-dd"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n" />
</layout>
</appender>
<!-- kafkaAppender -->
<appender name="kafkaAppender" class="org.apache.kafka.log4jappender.KafkaLog4jAppender">
<param name="Topic" value="test-log4j" />
<param name="BrokerList" value="localhost:9092" />
<param name="syncSend" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" />
</layout>
</appender>
<!-- logger -->
<logger name="com.minsub.java.logger.kafka">
<level value="debug" />
<appender-ref ref="kafkaAppender" />
</logger>
<logger name="org.apache.kafka">
<level value="ERROR" />
</logger>
<root>
<level value="DEBUG" />
<appender-ref ref="console" />
<appender-ref ref="fileAppender"/>
</root>
</log4j:configuration>
logback방식에서도 설명했듯이 kafka url과 토픽을 지정하는 부분이 필수이다. 아래 두줄이 log4j에서 토픽과 서버를 지정하는 방식이다.
<param name="Topic" value="test-log4j" />
<param name="BrokerList" value="localhost:9092" />
log4j2는 자체적으로 kafka로 메세지 전달하는 방식을 기본적으로 제공한다.
따라서 부가적인 dependency는 필요없다.
아래 log4j2.xml을 보면 태그가 존재한다. properties로 topic과 boostrap.servers를 지정할 수 있다.
이방식도 마찬가지로 logstash layout을 사용 할 수 없다.
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<!-- Appender, Layout 설정 -->
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n"/>
</Console>
<File name="fileAppender" fileName="C:/Test/log/log4j2_test.log" append="true">
<PatternLayout pattern="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n"/>
</File>
<Kafka name="kafkaAppender" topic="test">
<PatternLayout pattern="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n"/>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
</Appenders>
<!-- Logger 설정 -->
<Loggers>
<Logger name="com.minsub.java.logger" level="DEBUG" additivity="false">
<AppenderRef ref="console"/>
<AppenderRef ref="fileAppender"/>
</Logger>
<Logger name="com.minsub.java.logger.kafka" level="DEBUG" additivity="false">
<AppenderRef ref="kafkaAppender"/>
</Logger>
<Root level="DEBUG">
<AppenderRef ref="console"/>
</Root>
</Loggers>
</Configuration>
참고자료