# tag 사용 예시
<source>
@type tail
tag dev.sample
path /var/log/sample.log
</source>
<match dev.sample>
@type stdout
<match>
그냥 fluent 기본 이미지를 사용하려고 했는데 내가 필요한건
input: kafka
output: opensearch 구조여서 기본적으로 제공하지 않음.
대신 장점인 plugin 설치를 통해서 해당 부분을 해결 할 수 있었는데 그런 도커 이미지가 제공되지 않고 있어서 내디내만해서 이미지를 빌드해서 사용했다.
FROM fluent/fluentd:edge
USER root
RUN gem install fluent-plugin-opensearch
RUN apk --update add ruby-dev
RUN gem install fluent-plugin-kafka
USER fluent
CMD ["fluentd"]
# fluent.conf
<source>
@type kafka
brokers ${브로커주소}
topics ${토픽}
</source>
<match *.*>
@type opensearch
scheme https
host ${호스트 주소}
port ${포트}
index_name ${인덱스 이름}
routing_key routing
id_key _id
user ${아이디}
password ${패스워드}
ssl_verify false
</match>
<source>
@type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
# Optionally, you can manage topic offset by using zookeeper
offset_zookeeper <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'
# ruby-kafka consumer options
max_bytes (integer) :default => nil (Use default of ruby-kafka)
max_wait_time (integer) :default => nil (Use default of ruby-kafka)
min_bytes (integer) :default => nil (Use default of ruby-kafka)
</source>
그 와중에 만들어진 컨테이너 안에 접속이 안되는 문제가 있었음.
# 자연스럽게 기존에 쓰던 명령어
docker exec -it ${컨테이너 아이디} /bin/bash
# 이미지가 Alpine 이라면 /bin/bash 명령어가 안 먹을 수 있다고 함
docker exec -it ${컨테이너 아이디} /bin/sh
2023-09-13 05:14:10 +0000 [warn]: #0 no patterns matched tag=${토픽이름}
이런 로그가 뜨면서 해결이 되지 않았다.
# fluent.conf
<source>
@type kafka
brokers ${브로커주소}
topics ${토픽}
</source>
<match *.*>
@type opensearch
scheme https
host ${호스트 주소}
port ${포트}
index_name ${인덱스 이름}
routing_key routing
id_key _id
user ${아이디}
password ${패스워드}
ssl_verify false
</match>
fluentd에서 netstat 했을 때는 kafka랑 연결되는건 확인했었다.
추가적으로 뭐지 하고 하다가 partition
, offset
을 지정해주니까 10개 중에 1~2개 데이터 정도씩 들어가는게 보임. 일단 다른 일이 겹쳐서 여기까지 접근했다.
<source>
@type kafka
brokers ${브로커주소}
format "json"
<topic>
topic ${토픽이름}
partition 0
offset 1
</topic>
</source>
<match ${토픽이름}>
@type opensearch
scheme https
host ${opensearch 주소}
port ${opensearch 포트}
index_name ${인덱스 이름}
routing_key "routing"
id_key "_id"
user ${id}
password ${비밀번호}
ssl_verify false
</match>
fluentd 말고 fluentbit 이용해서 접근해보라는 조언을 받았음. 다음 접근 때는 bit을 이용해볼 것 같다.