Fluentd랑 친해지기 시도 1차

min·2023년 9월 19일
0

💻 인프라

목록 보기
10/10

Fluentd

출처: https://docs.fluentd.org/

  • 로그(데이터) 수집기
  • 다양한 input, output 플러그인을 가졌음 - 다양한 데이터를 받아서 다양한 곳에 저장 할 수 있음.

이벤트(Event)

  • Fluentd가 읽어들인 데이터는 tag, time, record로 구성된 이벤트로 처리된다.
    • tag: 이벤트를 어디로 보낼지 결정하기 위한 구분값
    • time: 이벤트가 발생한 시간
    • record: 데이터 (JSON)

태그(Tag)

  • 핵심 기능
  • 이벤트의 적절한 Filter, Parser, Output 플러그인 이동 기준
# tag 사용 예시
<source>
  @type tail
  tag dev.sample
  path /var/log/sample.log
</source>

<match dev.sample>
  @type stdout
<match>

fluentd 테스트 환경 구축

그냥 fluent 기본 이미지를 사용하려고 했는데 내가 필요한건

input: kafka

output: opensearch 구조여서 기본적으로 제공하지 않음.

대신 장점인 plugin 설치를 통해서 해당 부분을 해결 할 수 있었는데 그런 도커 이미지가 제공되지 않고 있어서 내디내만해서 이미지를 빌드해서 사용했다.

FROM fluent/fluentd:edge
USER root

RUN gem install fluent-plugin-opensearch
RUN apk --update add ruby-dev
RUN gem install fluent-plugin-kafka

USER fluent
CMD ["fluentd"]
# fluent.conf
<source>
  @type kafka
  brokers ${브로커주소}
  topics ${토픽}
</source>

<match *.*>
  @type opensearch
  scheme https
  host ${호스트 주소}
  port ${포트}
  index_name ${인덱스 이름}
  routing_key routing
  id_key _id
  user ${아이디}
  password ${패스워드}
  ssl_verify false
</match>
  • input kafka plugin관련 문서: https://github.com/fluent/fluent-plugin-kafka
    <source>
      @type kafka
    
      brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
      topics <listening topics(separate with comma',')>
      format <input text type (text|json|ltsv|msgpack)> :default => json
      message_key <key (Optional, for text format only, default is message)>
      add_prefix <tag prefix (Optional)>
      add_suffix <tag suffix (Optional)>
    
      # Optionally, you can manage topic offset by using zookeeper
      offset_zookeeper    <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
      offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'
    
      # ruby-kafka consumer options
      max_bytes     (integer) :default => nil (Use default of ruby-kafka)
      max_wait_time (integer) :default => nil (Use default of ruby-kafka)
      min_bytes     (integer) :default => nil (Use default of ruby-kafka)
    </source>
  • output opensearch plugin 관련문서: https://github.com/fluent/fluent-plugin-opensearch routing_key remove_keys

그 와중에 만들어진 컨테이너 안에 접속이 안되는 문제가 있었음.

# 자연스럽게 기존에 쓰던 명령어
docker exec -it ${컨테이너 아이디} /bin/bash

# 이미지가 Alpine 이라면 /bin/bash 명령어가 안 먹을 수 있다고 함
docker exec -it ${컨테이너 아이디} /bin/sh

2023-09-13 05:14:10 +0000 [warn]: #0 no patterns matched tag=${토픽이름}

이런 로그가 뜨면서 해결이 되지 않았다.

# fluent.conf
<source>
  @type kafka
  brokers ${브로커주소}
  topics ${토픽}
</source>

<match *.*>
  @type opensearch
  scheme https
  host ${호스트 주소}
  port ${포트}
  index_name ${인덱스 이름}
  routing_key routing
  id_key _id
  user ${아이디}
  password ${패스워드}
  ssl_verify false
</match>

fluentd에서 netstat 했을 때는 kafka랑 연결되는건 확인했었다.

추가적으로 뭐지 하고 하다가 partition, offset을 지정해주니까 10개 중에 1~2개 데이터 정도씩 들어가는게 보임. 일단 다른 일이 겹쳐서 여기까지 접근했다.

<source>
    @type kafka
    brokers ${브로커주소}
    format "json"
    <topic>
      topic ${토픽이름}
      partition 0
      offset 1
    </topic>
  </source>
  <match ${토픽이름}>
    @type opensearch
    scheme https
    host ${opensearch 주소}
    port ${opensearch 포트}
    index_name ${인덱스 이름}
    routing_key "routing"
    id_key "_id"
    user ${id}
    password ${비밀번호}
    ssl_verify false
  </match>

fluentd 말고 fluentbit 이용해서 접근해보라는 조언을 받았음. 다음 접근 때는 bit을 이용해볼 것 같다.

profile
기록으로 기억하기

0개의 댓글