로컬에 logstash 소스가 있어야만 filter plugin 빌드가 가능하다.
logstash download
https://github.com/elastic/logstash.git
빌드하기
gradlew.bat assemble
해당 위치에 jar가 생성되었는지 확인
{플젝위치}/build/libs에 logstash-core-*.*.*.jar생성되었는지 확인
환경변수 설정
set LS_SRC_HOME = {LOGSTASHCORE플젝위치}
filter - example download
https://github.com/logstash-plugins/logstash-filter-java_filter_example
C:\Users\SDS.gradle\gradle.properties에 아래 추가
LOGSTASH_CORE_PATH={LOGSTASHCORE플젝위치}/logstash-core/
#LOGSTASH_CORE_PATH=$LS_SRC_HOME/logstash-core/
#->아래꺼로 해도 된다고 하는데 뭔가 잘 안맞아서 그냥 절대경로로 세팅함..
#프록시가 있다면 그것도 여기에 넣어줘야함
systemProp.proxySet=true
systemProp.http.proxyHost={IP}
systemProp.http.proxyPort={PORT}
systemProp.http.nonProxyHosts={exclude할주소}|localhost
systemProp.https.proxyHost={IP}
systemProp.https.proxyPort={PORT}
systemProp.https.nonProxyHosts={exclude할주소}|localhost
build.gradle수정 (dependency가져오기, 이름바꾸기)
나는 기존 maven repository에 배포된 것들을 가져와야해서 다음과 같이 gradle 설정을 바꿨다.
아래와같이 maven url 모두 추가 (settings.xml에 있는그대로)
repositories{
maven {
credentials {
username '{id}'
password '{password}'
}
url "{url}"
}
}
아래와 같이 dependencies 추가 (pom.xml에 있는 그대로)
compile '{groupID}:{artifactId}:{version(RELEASE일 경우 +)}'
이름 바꿈
pluginInfo.pluginClass = "{yourNewFilterName}"
pluginInfo.pluginName = "{your_new_filter_name}"
java plugin작성..
dependency로 가져온 것들 사용해가며 filter 작성..
위에서 지정한 {pluginInfo.pluginClass}로 class생성하여 만듬
gradlew.bat gem
logstash-filter-your_new_name-1.00.gem
plugin install
여기서 삽질을 많이했는데, window에서는 경로의 backslash를 forwardslash로 바꿔야한다.
For Windows platforms: Substitute backslashes for forward slashes as appropriate in the command.
또 local plugin을 설치하는데 자꾸 https://rubygems.org 에 연결할수 없다고 나오는지 모르겠지만.. 환경변수에 아래와 같이 프록시를 설정했다.
HTTPS_PROXY=https://{IP}:{PORT}
HTTP_PROXY=http://{IP}:{PORT}
bin\logstash-plugin.bat install --no-verify --local {경로}\logstash-filter-your_new_name-1.00.gem
Installation successful
test
input {
kafka {
bootstrap_servers => "{IP:PORT}"
topics => "{ACTION_NAME}"
group_id => "log-to-es-group"
consumer_threads => 1
}
}
filter {
your_new_filter_name {}
}
output {
elasticsearch {
hosts => ["{ES_HOST}"]
index => "log-insert"
}
stdout { codec => rubydebug }
}
문제는 kafka input에서 byte[]의 kafkaMessage를 filter로 줄 때 String으로 변환하여 주고있다. 직렬화된 byte[]를 특정 Class로 역직렬화 하고 싶은데, String으로 변환해서 들어오니 Class -> byte[] -> String -> byte[] -> Class의 비효율이 생긴다.
또 String -> byte[] -> Class로 변환하려고 할때 아래와 같은 exception이 발생함..
byte변환 시 getBytes("8859_1")로 charsetName을 주면 누락이 없다고 해서 kafka input 설정의 encoding도 아래와 같이 놓고 시도해보았지만 변환은 계속 안된다.
codec => plain {
charset => "ISO-8859-1"
}
invalid stream header: EFBFBDEF
몇일째 삽질중이다. 로그스태시 input에서 어떤 일이 발생하는지 디버깅하기가 쉽지 않다. 다만 그냥 어떤 stream을 그대로 es에 손쉽게 때려박기는 용이한 서비스같다. 일단 flume을 작성해보고 생각하자.