https://jdbc.postgresql.org/download/
자신에게 맞는 버전을 설치한다.
Logstash - logstash-core - lib - jars 경로에 파일을 넣어준다.
config 폴더에 logstash-conf 파일을 생성한다.
input을 작성한다.
input {
jdbc {
jdbc_driver_library => "C:\Program Files\logstash-8.17.3\logstash-core\lib\jars/postgresql-42.7.5.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://{db_host}/{db_name}"
jdbc_user => "{db_username}"
jdbc_password => "{db_password}"
statement => "SELECT * FROM tableName WHERE true"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elasticsearch_user"
password => "elasticsearch_password"
index => "elasticsearch_index"
}
}
logstash -f logstash.conf
--config.reload.automatic // 파일 변경 시 자동실행 (default: 변경 감지 3초마다)
input {
jdbc {
jdbc_driver_library => "C:\Program Files\logstash-8.17.3\logstash-core\lib\jars/postgresql-42.7.5.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://{db_host}:{db_port}/{db_name}"
jdbc_user => "{db_username}"
jdbc_password => "{db_password}"
schedule => "10 * * * * *" # 갱신 주기
statement => "SELECT *, updated_at FROM tableName WHERE true"
tracking_column => "updated_at"
tracking_column_type => "timestamp"
use_column_value => true
last_run_metadata_path => "C:/Program Files/logstash-8.17.3/data/plugins/inputs/jdbc/logstash_jdbc_last_run"
}
}
filter {
mutate {
remove_field => ["@version", "@timestamp"]
rename => { "a_id" => "common_id" }
rename => { "b_id" => "common_id" }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "{elasticsearch_usrename}"
password => "{elasticsearch_password}"
index => "{elasticsearch_indexName}"
document_id => "%{고유id}"
doc_as_upsert => true
}
}
// json에서 값을 빼오는 법
statement => "SELECT item_id AS id, metadata->'test'->'one'->>'desc' AS desc, metadata->'test'->'two'->>'title' AS title, updated_at, 'category' AS category FROM {db_filed} WHERE true"
여러 테이블 또는 여러 인덱스에 데이터를 넣기 위해서는 한 필드 더 작성하면 된다.
ex) jdbc {tableName = one} jdbc {tableName = two} / elasticsearch {index="one"} elasticsearch {index="two"}
고유 ID가 모두 다를 때, AS를 이용해 하나로 통일
ex) jdbc {statement => "SELECT a_id AS common_id} jdbc {statement => "SELECT b_id AS common_id}
elasticsearch {document_id => "%{common_id}"} elasticsearch {document_id => "%{common_id}"}