[Elasticsearch] Logstash - postgresql 연동

김민재·2025년 3월 14일

Elasticsearch

목록 보기
12/13

👣 Logstash와 postgresql 연동

  • Logstash와 postgresql을 연동하려고 하면 logstash input에 pgJDBC를 넣어줘야한다.

🕸️ pgJDBC 설치

💼 config - conf 파일 생성

  • config 폴더에 logstash-conf 파일을 생성한다.

  • input을 작성한다.

input {
  jdbc {
    jdbc_driver_library => "C:\Program Files\logstash-8.17.3\logstash-core\lib\jars/postgresql-42.7.5.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://{db_host}/{db_name}"
    jdbc_user => "{db_username}"
    jdbc_password => "{db_password}"
    statement => "SELECT * FROM tableName WHERE true"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    user => "elasticsearch_user"
    password => "elasticsearch_password"
    index => "elasticsearch_index"
  }
}

🍽️ conf 파일 실행

  • 터미널 config 경로에서 파일을 실행한다.
logstash -f logstash.conf
--config.reload.automatic // 파일 변경 시 자동실행 (default: 변경 감지 3초마다)

⛔️ db에 데이터가 새로 들어오면 logstash를 통해 새로 elasticsearch에 데이터를 넣기

input {
  jdbc {
    jdbc_driver_library => "C:\Program Files\logstash-8.17.3\logstash-core\lib\jars/postgresql-42.7.5.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://{db_host}:{db_port}/{db_name}"
    jdbc_user => "{db_username}"
    jdbc_password => "{db_password}"
    schedule => "10 * * * * *" # 갱신 주기
    statement => "SELECT *, updated_at FROM tableName WHERE true"
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
    use_column_value => true
    last_run_metadata_path => "C:/Program Files/logstash-8.17.3/data/plugins/inputs/jdbc/logstash_jdbc_last_run"
  }
}

filter {
  mutate {
    remove_field => ["@version", "@timestamp"]
    rename => { "a_id" => "common_id" }  
    rename => { "b_id" => "common_id" }  
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "{elasticsearch_usrename}"
    password => "{elasticsearch_password}"
    index => "{elasticsearch_indexName}"
    document_id => "%{고유id}" 
    doc_as_upsert => true
  }
}

// json에서 값을 빼오는 법

 statement => "SELECT item_id  AS id,    metadata->'test'->'one'->>'desc' AS desc,  metadata->'test'->'two'->>'title' AS title, updated_at, 'category' AS category FROM {db_filed} WHERE true"

여러 테이블 또는 여러 인덱스에 데이터를 넣기 위해서는 한 필드 더 작성하면 된다.

ex) jdbc {tableName = one} jdbc {tableName = two} / elasticsearch {index="one"} elasticsearch {index="two"}

고유 ID가 모두 다를 때, AS를 이용해 하나로 통일

ex) jdbc {statement => "SELECT a_id AS common_id} jdbc {statement => "SELECT b_id AS common_id}

elasticsearch {document_id => "%{common_id}"} elasticsearch {document_id => "%{common_id}"}

  • 이렇게 해야지 모두 다른 데이터로 인식을 한다.
profile
개발 경험치 쌓는 곳

0개의 댓글