ELK Side Project 1 (2/4)

KIMAKUMA·2023년 2월 21일

ELK

목록 보기
5/7
post-thumbnail

Part 1

python
datetime
strftime
timedelta
api 호출
파일쓰기, 플러그인
schedul, time module


Part 2

01. 데이터 수집💥

Logstash에서 HTTP 요청으로 데이터를 수집하려 했다..🥲

그런데 API 호출이 안되기도 하고 매일 API 주소를 바꿔야 해서 Python을 사용하기로 했다.

02. API 호출 및 파일 저장

  • API 호출
  • JSON 파일로 저장 (이어쓰기)
  • Schedule 모듈을 통해 매일 함수 실행

1) API 호출

오늘 날짜, 어제 날짜 구하기

import datetime as datetime

today = datetime.datetime.now()
todayYmd = today.strftime("%Y%m%d")
ocrnYmd = str(int(today_ymd)-1)
  • 당일이 지나야 해당 날짜의 정보가 다 쌓이기 때문에 datetime모듈을 통해 오늘 날짜를 구하고 하루 빼주었다.
  • API 호출 양식에 맞게 YYYYMMDD 형식으로 바꿔주었다.

또는 아래 방식으로 해도 된다.

from datetime import date, timedelta
 
today = date.today()
yesterday = date.today() - timedelta(1)
ocrnYmd = yesterday.strftime("%Y%m%d")

API 호출

import requests

serviceKey = "{공공데이터포털에서 받은 인증키}"
pageNo = "1" # 페이지 번호
numOfRows = "100" # 한 페이지 결과 수
resultType = "json" # 결과 형식
ocrn_ymd = ocrnYmd # 발생 일자
    
url = "https://apis.data.go.kr/1661000/FireInformationService/getOcBysidoFireSmrzPcnd?serviceKey={}&pageNo={}&numOfRows={}&resultType={}&ocrn_ymd={}".format(serviceKey, pageNo, numOfRows, resultType, ocrn_ymd)

responses = requests.get(url).json()['response']['body']['items']['item']
  • API에 필요한 변수들을 설정하고 URL을 완성시켜주었다.
  • 결과가 원하는 모양으로 오지 않기 때문에 가공시켰다.

2) 파일 저장💥

파일 저장 v1 - JSON 파일 저장

import json

with open(file_path, 'a') as file:
	json.dumps(responses)
  • 파일에 이어쓰기 위해 a를 사용해준다.
  • JSON 형식으로 저장하기 위해 json 모듈을 사용해준다.

파일 저장 v2 - Date 형식 적용

with open(file_path, 'a') as file:
	for response in responses:
		response['newYmd'] = datetime.datetime.strptime(response['ocrnYmd'], '%Y%m%d').strftime("%Y-%m-%d")

	json.dumps(responses)
  • ElasticSearch 사용 중 수정한 부분
  • 문제) 발생 일자를 Date형식으로 받아와야 하는데 str형식으로 받아와서 사용하기 불편했다.
  • 해결1) 데이터를 쪼개서 Date형식으로 저장해줬다.
  • 해결2) 모양을 바꿔서 좀 더 가독성도 높여주었다.
  • 해결3) Kibana에서 적용이 안된다면 index pattern에서 새로고침을 해주면 적용이 된다.

파일 저장 v3 - 구분자(?) 추가

with open(file_path, 'a') as file:
	for response in responses:
		response['newYmd'] = datetime.datetime.strptime(response['ocrnYmd'], '%Y%m%d').strftime("%Y-%m-%d")

		response_json = json.dumps(response)

    	file.write(response_json)
    	file.write('\n')
  • Logstash 사용 중 수정한 부분
  • 문제) 파일에 변화가 있음에도 불구하고 Logstash에서 변화를 감지하지 못했다.
  • 해결) 줄바꿈을 넣어줌으로써 구분자가 생겨 Logstash에서도 파일 변화 감지를 할 수 있게 되었다.
  • 좋아) txt 형식도 해보고 했는데 줄바꿈 넣으니까 JSON으로 인식하고 예쁘게 출력도 된다.

3) 특정 시간마다 함수 실행

import schedule, time

schedule.every(1).day.at("00:05").do(write_file)

while True:
    schedule.run_pending()
    time.sleep(1)
  • Schedule 모듈을 통해 매일 자정이 지나면 API 호출과 JSON 파일로 저장하도록 함수를 실행시켜준다.

4) 최종 코드

import datetime as datetime
import json, requests
import schedule, time

def write_file():
    today = datetime.datetime.now()

    serviceKey = "{공공데이터포털에서 받은 인증키}"
    pageNo = "1" # 페이지 번호
    numOfRows = "100" # 한 페이지 결과 수
    resultType = "json" # 결과 형식
    today_ymd = today.strftime("%Y%m%d")
    ocrn_ymd = str(int(today_ymd)-1)

    url = "https://apis.data.go.kr/1661000/FireInformationService/getOcBysidoFireSmrzPcnd?serviceKey={}&pageNo={}&numOfRows={}&resultType={}&ocrn_ymd={}".format(serviceKey, pageNo, numOfRows, resultType, ocrn_ymd)

    responses = requests.get(url).json()['response']['body']['items']['item']

    file_path = "{파일을 저장할 위치}/{파일명}.json"

    with open(file_path, 'a') as file:
        for response in responses:
            response['newYmd'] = datetime.datetime.strptime(response['ocrnYmd'], '%Y%m%d').strftime("%Y-%m-%d")

            response_json = json.dumps(response)

            file.write(response_json)
            file.write('\n')

schedule.every(1).day.at("00:05").do(write_file)

while True:
    schedule.run_pending()
    time.sleep(1)

5) 저장된 파일 - Sample Data

{
 	"sidoNm": "\uc11c\uc6b8\ud2b9\ubcc4\uc2dc", // 시도명
  	"flsrpPrcsMnb": 22, // 오보처리건수
	"slfExtshMnb": 3, // 자체진화건수
  	"fireRcptMnb": 46, // 화재접수건수
  	"stnEndMnb": 14, // 상황종료건수
  	"ocrnYmd": "20230215", // 발생일자
  	"falsDclrMnb": 0, // 허위신고건수
  	"newYmd": "2023-02-15"
}
  • 나중에 ELK로 확인할 때 시도명은 한글로 출력된다.

03. Logstash 기반 데이터 수집

1) input💥

input {
    file {
        path => "{파일 경로}/{파일명}.json"
        start_position => "beginning"
    }
}
  • 파일의 변화를 감지하기 때문에 input으로는 Python으로 저장한 파일을 지정해주었다.
  • sincedb를 통하여 어디까지 읽었는지 감지하고 그 이후부터 읽어온다.
  • 문제) JSON 파일을 읽지 못함, 변화를 감지하지 못함
  • 해결x) JSON codec 추가 또는 filter 부분에서 field 변경
  • 해결o) Python 코드에서 줄바꿈 문자를 넣어줌으로써 해결

2) filter

filter {
    json {
        source => "message"
    }

    mutate {
        rename => {
            "sidoNm" => "시도명"
            "flsrpPrcsMnb" => "오보처리건수"
            "slfExtshMnb" => "자체진화건수"
            "fireRcptMnb" => "화재접수건수"
            "stnEndMnb" => "상황종료건수"
            "newYmd" => "발생일자"
            "falsDclrMnb" => "허위신고건수"
        }

        remove_field => [
            "_type",
            "@version",
            "tags",
            "@timestamp",
            "path",
            "message",
            "host",
            "ocrnYmd"
        ]
	}
}
  • 변수명을 알아보기 쉽게 rename을 사용하여 변경해주었다.
  • 필요없는 field들은 remove_field를 통해 지워주었다.

3) output

output {
    stdout { }

    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "fire"
    }
    
}
  • Logstash Console에서도 확인하기 위해 stdout으로 출력해주었다.
  • 최종적으로는 ElasticSearch와 Kibana에서 데이터를 사용해야하기 때문에 elasticsearch로도 전송해주었다.

4) 최종 코드

input {
    file {
        path => "C:/Users/ibricks/Downloads/esk/logstash-7.10.2/files/fire3.json"
        start_position => "beginning"
    }
}

filter {
    json {
        source => "message"
    }

    mutate {
        rename => {
            "sidoNm" => "시도명"
            "flsrpPrcsMnb" => "오보처리건수"
            "slfExtshMnb" => "자체진화건수"
            "fireRcptMnb" => "화재접수건수"
            "stnEndMnb" => "상황종료건수"
            "newYmd" => "발생일자"
            "falsDclrMnb" => "허위신고건수"
        }

        remove_field => [
            "_type",
            "@version",
            "tags",
            "@timestamp",
            "path",
            "message",
            "host",
            "ocrnYmd"
        ]
    }
}

output {
    stdout { }

    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "fire"
    }
    
}

Part 3

01. Logstash에서 HTTP 요청

02. Python으로 파일 저장

03. Logstash로 데이터 수집

0개의 댓글