[과제]
1. 여러 csv, json 동시 적재 진행해보고, metric beat & file beat도 pipeline 사용하여 동시 적재 진행해본다.
[버전]
8.17.4 tar
[서버]
192.168.219.159 (master) : Elasticsearch, Kibana, Logstash, CA 인증서
192.168.219.157 (data) : Elasticsearch, Metricbeat
192.168.219.158 (data) : Elasticsearch
filter {
csv {
columns => ["기관아이디","기관명","정책구분","정책명","신청시
간","시설사용대상기간","지역","위도","경도","연락처","홈페이지주소","상품명","성수기평일가격","성수기주말가격","비수기평일가격","비수기주말가격","추첨대상
확인","비고"]
separator => ","
}
mutate {convert => ["기관아이디","integer"]}
mutate {convert => ["위도","string"]}
mutate {convert => ["경도","string"]}
mutate {convert => ["성수기평일가격","integer"]}
mutate {convert => ["성수기주말가격","integer"]}
mutate {convert => ["비수기평일가격","integer"]}
mutate {convert => ["비수기주말가격","integer"]}
mutate {strip => ["기관명","정책구분","정책명","지역","상품명","비고","추첨대상확인","연락처"]}
}
output {
elasticsearch {
hosts => ["https://192.168.219.159:9200"]
index => "sanrim_data"
user => "elastic"
password => "elastic"
cacert => "/home/elastic/elasticsearch-8.17.4/config/certs/http_ca.crt"
}
stdout{}
input {
file {
path => "/home/elastic/jeju.json"
start_position => "beginning"
codec => "json"
}
}
filter {
mutate {
convert => {
"경도" => "float"
"위도" => "float"
"시도코드" => "integer"
"시군구코드" => "integer"
"행정동코드" => "integer"
"법정동코드" => "integer"
"지번코드" => "integer"
"도로명코드" => "integer"
}
}
}
output {
elasticsearch {
hosts => ["https://192.168.219.159:9200"]
index => "jeju_data"
user => "elastic"
password => "elastic"
cacert => "/home/elastic/elasticsearch-8.17.4/config/certs/http_ca.crt"
}
stdout {}
}
nohup ./bin/logstash -f config/sanrim.conf &
GET sanrim_data/_search
{
"query":{
"match":{
"정책명" : "산림 복지"
}
}
}

GET sanrim_data/_search
{
"size": 0,
"aggs": {
"지역별_정책_개수": {
"terms": {
"field": "지역.keyword",
"size": 10
}
}
}
}

kill <logstash_pid>
./bin/logstash -f config/jeju.conf
GET jeju_data/_search
{
"size": 0,
"query": {
"match": {
"시군구명": "제주시"
}
},
"aggs": {
"업종대분류_집계": {
"terms": {
"field": "상권업종대분류명.keyword"
}
}
}
}
csv 파일에 대해 logstash 실행(sanrim.conf) → 실행 중지 → json 파일에 대해 logstash 실행(jeju.conf)
위의 과정이 복잡하다 싶어 두 개의 conf를 동시에 실행시켜보려 한다.
단!! 주의해야할 것이 있다. 테스트를 진행하기 위해, conf 파일에서 index 이름은 모두 sanrim_data_test01 / jeju_data_test01 로 바꾸어 주어 동시 진행했을 때 인덱스가 잘 생성 되엇는지 확인한다.
추가로 주의할 점이 있다. 이미 두 개의 인덱스(sanrim_data / jeju_data) 를 읽어와서 저장햇다. 그러기에 conf 파일에서 /dev/null를 필수로 써주어야 한다.
-pipeline.id: sanrim_pipeline
path.config: "/home/elastic/logstash-8.17.4/config/sanrim.conf"
-pipeline.id: jeju_pipeline
path.config: "/home/elastic/logstash-8.17.4/config/jeju.conf"
# Enable multiple pipelines
pipeline.ecs_compatibility: disabled # → 필요 시
# vi config/sanrim.conf
input {
file {
path => "/home/elastic/sanrim.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
# vi config/jeju.conf
input {
file {
path => "/home/elastic/jeju.json"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
}
/dev/null (처음부터 반복 읽기 OK)sincedb_path 지정 (중복 방지, 성능 유지 필수)./bin/logstash --path.settings ./config/
디렉토리 확인
/home/elastic/logstash-8.17.4/
├── config/
│ ├── pipelines.yml
│ ├── sanrim.conf
│ └── jeju.conf
curl -XGET "http://localhost:9600/_node/pipelines?pretty"
