[Kafka] Kafka로 로깅하기 - 1. 카프카로 로그를 보내는 방법들

DEINGVELOP·2022년 10월 18일
0

Apache Kafka

목록 보기
5/6

ELK 스택을 사용해 Log 관리 시스템을 만들때 Kafka가 중간에 메세지 큐로서 주로 사용된다.

여러 app들이 kafka로 log 메세지를 보내는 방법에는 여러가지가 있다.

참조: https://www.elastic.co/kr/blog/just-enough-kafka-for-the-elastic-stack-part1



방법 1. logstash(shipper)

위 그림처럼 Logstash를 사용해서 Kafka로 app에서 생성한 로그파일을 메세지로 전달할 수 있다.

https://www.elastic.co/kr/downloads/logstash 에서 logstash를 다운받아 압축을 푼다.

config파일을 만들어서 logstash를 실행할 수 있다.

input은 파일이고 output은 kafka가 되도록 config파일을 지정해보자.

logstash-kafka.conf

 input {
  file {
    path => "/Users/jiminsub/Apps/Dev/test/test-kafka.log"
    start_position => "beginning"  
    sincedb_path => "/dev/null"  
  }
}
output {  
    kafka {
        bootstrap_servers => "localhost:9092"
        topic_id => "test-logstash"
        codec => plain {
            format => "%{message}"
        }
    }
    stdout {}
}

path에서 log파일의 경로를 지정하고 start_position에선 log파일을 어디서부터 읽을 것인지 지정한다.

output으로 kafka를 지정하고 bootstrap_servers로 kafka url를 지정한다. (8.0 이하 버전은 zookeeper로 설정해야 한다)

topic_id에서 토픽 아이디를 지정하고 codec에서 메세지 포멧을 지정한다.

실행은 아래와 같다.

> bin/logstash -f logstash-kafka.conf

방법 2. filebeat

filebeat은 로그파일에 대한 logstash Shipper의 역할을 수행하는 Beats의 한 종류이다.

https://www.elastic.co/kr/downloads/beats 에서 filebeat을 다운로드 받고 압축을 풀고 app이 있는 서버에서 실행하면 되는 구조이다.

filebeat.yml을 다음과 같이 설정한다.

 filebeat.prospectors:
- input_type: log
  document_type: log-filebeat
  paths:
    - /Users/jiminsub/Apps/Dev/test/test.log

output.kafka:
  # The Logstash hosts
  hosts: ["localhost:9092"]
  topic: "test-filebeat"

내용은 /Users/jiminsub/Apps/Dev/test/test.log의 로그파일이 변경이 있을때마다 localhost:9092의 test-filebeat 토픽으로 메세지를 보낸다.

filebeat을 실행하는 OS 별 방식이다.

  • deb:

    sudo /etc/init.d/filebeat start
  • rpm:

    sudo /etc/init.d/filebeat start
  • mac:

    sudo ./filebeat -e -c filebeat.yml -d "publish"
  • win:

    PS C:\Program Files\Filebeat> Start-Service filebeat




방법 3. logback에서 직접 메세지 전달

위 두가지 방식은 로그파일을 만들고 파일을 주기적으로 kafka로 전달하는 방식이다.

이번 방법은 application에서 바로 TCP를 통해 kafka로 전달하는 방식이다.

이를 위해 아래 두 dependency가 필요하다

<dependency>
    <groupId>com.github.danielwegener</groupId>
    <artifactId>logback-kafka-appender</artifactId>
    <version>0.1.0</version>
</dependency>

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>4.8</version>
</dependency>
  • logback-kafka-appender 는 log를 kafka로 전달할 때 사용되고, logstash
  • logback-encoder는 logstash 레이아웃으로 메세지를 만들 때 사용한다.

다음은 logback.xml이다.

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <layout class="ch.qos.logback.classic.PatternLayout">
            <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
        </layout>
    </appender>

    <!-- kafkaAppender -->
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </layout>
        </encoder>
        <topic>test-logback</topic>
        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
    </appender>

    <!-- kafkaAppender with Logstash -->
    <appender name="logstashKafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
            <layout class="net.logstash.logback.layout.LogstashLayout" />
        </encoder>
        <topic>test-logback</topic>
        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
    </appender>


    <!-- logger -->
    <logger name="org.apache.kafka" level="ERROR"/>
    <logger name="com.minsub.java.logger.kafka" level="DEBUG">
        <appender-ref ref="kafkaAppender" />
        <!--
        <appender-ref ref="logstashKafkaAppender" />
        -->
    </logger>

    <root level="DEBUG">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

kafkaAppender를 사용할 때 중요한건 topic과 boostrap_server 지정이다. 아래 두 줄이 kafka URL과 토픽을 지정하는 부분이다.

<topic>test-logback</topic>

<producerConfig>bootstrap.servers=localhost:9092</producerConfig>

두가지 레이아웃을 사용할 수 있는데 기본적으로 Logback의 Layout을 사용해 그대로 kafka로 전달할 수 있다.

kafkaAppender가 그 예제이다.

<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>

위 형태로 메세자기 전달 될 것이다.

두번째 레이아웃은 LogstashLayout 을 이용하는 것이다.

logstashKafkaLayout을 보면 layout에 LogstashLayout으로 지정하면 logstash에서 자동으로 다른 필드들이 지정된다.

아래 두 layout의 메시지 예제이다.

#without logstash
10:14:30.177 [main] DEBUG c.m.j.l.kafka.KafkaLogbackSample - Kafka log message 1

# with logstash
{ "@timestamp": "2017-01-15T10:18:25.776+09:00",
  "@version": 1,
  "message": "Kafka log message 1",
  "logger_name": "com.minsub.java.logger.kafka.KafkaLogbackSample",
  "thread_name": "main",
  "level": "DEBUG",
  "level_value": 10000,
    "HOSTNAME": "Minsubsui-MacBook-Pro.local" }



방법 4. Log4j에서 메세지 직접 전달

위 3번째 logback을 사용한 방법과 거의 유사하다.

이를 위한 dependency는 아래와 같다. 그리고 logback에서 사용할 수 있는 logstash layout은 사용할 수 가 없다.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-log4j-appender</artifactId>
    <version>0.10.1.0</version>
</dependency>

다음은 log4j.xml 이다

<?xml version="1.0" encoding="UTF-8"?>  
<!DOCTYPE log4j:configuration SYSTEM "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">

<log4j:configuration>
    <!-- console -->
    <appender name="console" class="org.apache.log4j.ConsoleAppender">
         <layout class="org.apache.log4j.PatternLayout">
             <param name="ConversionPattern" value="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n" />
         </layout>   
    </appender>
     
    <!-- fileAppender -->
    <appender name="fileAppender" class="org.apache.log4j.DailyRollingFileAppender">
        <param name="File" value="C:/Test/log/logfile_TEST.log"/>
        <param name="Append" value="true"/>
        <!-- param name="MaxFileSize" value="100MB"/ -->
        <!-- param name="MaxBackupIndex" value="2"></param -->
        <param name="DatePattern" value="'.'yyyy-MM-dd"/> 
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n" />
        </layout>   
    </appender>

    <!-- kafkaAppender -->
    <appender name="kafkaAppender" class="org.apache.kafka.log4jappender.KafkaLog4jAppender">
        <param name="Topic" value="test-log4j" />
        <param name="BrokerList" value="localhost:9092" />
        <param name="syncSend" value="true" />
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" />
        </layout>
    </appender>

    <!-- logger -->
    <logger name="com.minsub.java.logger.kafka">
        <level value="debug" />
        <appender-ref ref="kafkaAppender" />
    </logger>

    <logger name="org.apache.kafka">
        <level value="ERROR" />
    </logger>

    <root>
        <level value="DEBUG" />
        <appender-ref ref="console" />
        <appender-ref ref="fileAppender"/>
    </root>

</log4j:configuration>

logback방식에서도 설명했듯이 kafka url과 토픽을 지정하는 부분이 필수이다. 아래 두줄이 log4j에서 토픽과 서버를 지정하는 방식이다.

<param name="Topic" value="test-log4j" />

<param name="BrokerList" value="localhost:9092" />



방법 5. Log4j2에서 메세지 직접 전달

log4j2는 자체적으로 kafka로 메세지 전달하는 방식을 기본적으로 제공한다.

따라서 부가적인 dependency는 필요없다.

아래 log4j2.xml을 보면 태그가 존재한다. properties로 topic과 boostrap.servers를 지정할 수 있다.

이방식도 마찬가지로 logstash layout을 사용 할 수 없다.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <!-- Appender, Layout 설정 -->
    <Appenders>
        <Console name="console" target="SYSTEM_OUT">
            <PatternLayout pattern="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n"/>
        </Console>

        <File name="fileAppender" fileName="C:/Test/log/log4j2_test.log" append="true">
            <PatternLayout pattern="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n"/>
        </File>

        <Kafka name="kafkaAppender" topic="test">
            <PatternLayout pattern="[%-5p] [%d] [Thread:%t] %-17c{2} (%13F:%L) %3x - %m%n"/>
            <Property name="bootstrap.servers">localhost:9092</Property>
        </Kafka>
    </Appenders>

    <!-- Logger 설정 -->
    <Loggers>
        <Logger name="com.minsub.java.logger" level="DEBUG" additivity="false">
            <AppenderRef ref="console"/>
            <AppenderRef ref="fileAppender"/>
        </Logger>

        <Logger name="com.minsub.java.logger.kafka" level="DEBUG" additivity="false">
            <AppenderRef ref="kafkaAppender"/>
        </Logger>

        <Root level="DEBUG">
            <AppenderRef ref="console"/>
        </Root>
    </Loggers>
</Configuration>





참고자료

0개의 댓글