Alert and Chatbot Project

Q·2023년 10월 16일

✅ 개요

  • 실시간 Spark Streaming으로 되어 있는 플랫폼의 모니터링을 위한 알람을 구성하고
  • Chatbot 등을 이용하여 Job을 관리 할 수 있는 프로젝트
  • Elasticsearch - 검색 및 집계 쿼리에 대한 이해가 있어야 합니다.

✅ 프로젝트 디렉토리 구조

**폴더 구조**

Project /
|-- __init__.py
|-- _Run.sh
|-- project.cron
|-- project_config.sh
|-- alert/ 
|   |-- __init__.py
|   |-- alert.py
|   |-- chatbot.py
|   |-- rest.py
|-- monitor/ 
|   |-- __init__.py
|   |-- moniterAlert.sh
|   |-- moniterChatbot.sh
|   |-- moniterFB.sh
|   |-- moniterSC.sh
|   |-- sc/
|   |   |-- __init__.py
|   |   |-- monitorSC.sh
|   |-- parser/
|   |   |-- __init__.py
|   |   |-- parseRecentCount.py
|   |   |-- parseUniqueCount.py

✅ Alert 구성

**Alert 실행 구조**

 project_config.sh → monitor/monitorAlert.sh → alert/rest.py → alert/alert.py

⚡ project_config.sh

SMS 알림을 보내는 기능 (SMS_ALERT_SEND), 프로젝트 내에서 사용할 경로를 미리 변수로 저장

function SMS_ALERT_SEND() {
    local ALERT_MESSAGE=${@}
    curl http://10.0.7.1:5044 -d "txtMsg=${ALERT_MESSAGE}"
}

ALERT_PATH = "path_to/Project/alert"
MONITOR_PATH = "path_to/Project/monitor"
PARSER_PATH = "path_to/Project/monitor/parser"
  • SMS_ALERT_SEND 함수
    • 이 함수는 로컬 변수 ALERT_MESSAGE를 받아서 curl 명령어를 사용해 SMS 알림을 보내는 역할을 합니다.
  • ALERT_PATH, MONITOR_PATH, PARSER_PATH
    • 이 변수들은 각각 알림, 모니터링, 파싱에 관련된 프로젝트의 경로를 저장합니다.

⚡ monitor/monitorAlert.sh

rest.py 의 실행 여부를 확인하고 종료되어 있으면 실행

. path_to/Project/project_config.sh

RESULT = `pgrep -f "python ${ALERT_PATH}/rest.py"`

if [[ -z "${RESULT}" ]]]; then
  nohup python ${ALERT_PATH}/rest.py &>/dev/null &
  echo "Alert Service Start"
  SMS_ALERT_SEND "Project Alert: Alert Service Start!"
else
  echo "running now..."
fi
  • . path_to/Project/project_config.sh
    • 이 코드는 project_config.sh 스크립트를 소스로 가져와 실행합니다.
  • RESULT = \pgrep -f "python ${ALERT_PATH}/rest.py"`
    • pgrep명령어를 이용해${ALERT_PATH}/rest.py스크립트를 실행 중인 Python 프로세스가 있는지 확인하고 그 결과를RESULT 변수에 저장합니다.
  • if [[ -z "${RESULT}" ]]]; then ... fi
    • RESULT가 빈 문자열인지 검사합니다. 빈 문자열이면 ${ALERT_PATH}/rest.py가 실행되고 있지 않다는 의미이므로, nohup 명령어로 백그라운드에서 실행합니다.
  • SMS_ALERT_SEND "Project Alert: Alert Service Start!"
    • 알림 서비스가 시작되면 이를 SMS로 알립니다.

⚡ alert/rest.py

ChatBot System 으로 Alert을 보내기 위한 python code로 메시지를 받아 외부 스크립트(alert.py)를 실행하는 기능을 수행합니다.

HTTP POST 요청을 받을 때마다 해당 요청의 "txtMsg" 필드를 읽어 alert.py에 전달하며 실행시킵니다.

import http.server
import socketserver
import subprocess
import urllib.parse

class SimpleHandler(http.server.SimpleHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        body = self.rfile.read(content_length).decode('utf-8')
        self.send_response(200)
        self.end_headers()

        parsed_body = urllib.parse.parse_qs(body)
        message = parsed_body.get("txtMsg", [None])[0]

        if message:
            subprocess.call(["python", "path_to/Project/alert/alert.py", message])

host = "10.0.7.1"
port = 5044
with socketserver.TCPServer((host, port), SimpleHandler) as httpd:
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        pass

    httpd.server_close()
  • SimpleHandler 클래스를 정의하고, http.server.SimpleHTTPRequestHandler를 상속받습니다.
  • do_POST 메서드에서는 먼저 요청 본문의 길이를 가져온 다음, 본문을 읽어 body 변수에 저장합니다.
  • 읽어온 bodyurllib.parse.parse_qs를 이용해 파싱합니다. 파싱한 결과에서 "txtMsg"라는 키에 해당하는 값을 가져와 message 변수에 저장합니다.
  • message가 존재하면 subprocess.call을 통해 외부 Python 스크립트(alert.py)를 실행하고, 이때 message를 인자로 전달합니다.
  • 마지막으로 socketserver.TCPServer를 이용해 HTTP 서버를 10.0.7.1의 5044 포트에서 실행합니다. 서버는 키보드 인터럽트가 발생할 때까지 계속 실행됩니다.

⚡ alert/alert.py

전달된 인자를 Telegram 메시지로 전송하는 Alert System

import asyncio
import sys
import telegram

TOKEN = "51############################################DFs"
CHAT_ID = "50######84"

async def send_alert(message):
    bot = telegram.Bot(token=TOKEN)
    await bot.sendMessage(chat_id=CHAT_ID, text=message)

def main():
    message = " ".join(sys.argv[1:]) or 'Project: Invalid Alert Request...'
    asyncio.run(send_alert(message))

if __name__ == '__main__':
    main()

✅ Chatbot 구성

**Chatbot 실행 구조**

 project_config.sh → monitor/monitorChatbot.sh → alert/chatbot.py

⚡ monitor/monitorChatbot.sh

chatbot.py의 실행 여부를 확인하고 종료되어 있으면 실행

. path_to/Project/project_config.sh

RESULT = `pgrep -f "python ${ALERT_PATH}/chatbot.py"`

if [[ -z "${RESULT}" ]]]; then
  nohup python ${ALERT_PATH}/chatbot.py &>/dev/null &
  echo "Chatbot Service Start"
  SMS_ALERT_SEND "Project Alert: Chatbot Service Start!"
else
  echo "running now..."
fi
  • . path_to/Project/project_config.sh
    • 이 코드는 project_config.sh 스크립트를 소스로 가져와 실행합니다.
  • RESULT = \pgrep -f "python ${ALERT_PATH}/rest.py"`
    • pgrep명령어를 이용해${ALERT_PATH}/chatbot.py스크립트를 실행 중인 Python 프로세스가 있는지 확인하고 그 결과를RESULT 변수에 저장합니다.
  • if [[ -z "${RESULT}" ]]]; then ... fi
    • RESULT가 빈 문자열인지 검사합니다. 빈 문자열이면 ${ALERT_PATH}/chatbot.py가 실행되고 있지 않다는 의미이므로, nohup 명령어로 백그라운드에서 실행합니다.
  • SMS_ALERT_SEND "Project Alert: Chatbot Service Start!"
    • 알림 서비스가 시작되면 이를 SMS로 알립니다.

⚡ alert/chatbot.py

Telegram Bot을 사용하여 특정 YARN 애플리케이션을 중지하는 등의 작업을 수행하는 ChatBot System

import datetime
import subprocess

from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes

TOKEN = "51############################################DFs"
print("strat telegram chat bot")

dic_app = {               # 예시
    '뉴스': 'NEWS'
}
dic_job = {               # 예시
    '분석': 'ANALYZER'
}

async def stop_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    start_time = datetime.datetime.now()

    command = update.message.text.strip().split()
    cmd = 'ssh -p포트 계정@IP "yarn application -appStates RUNNING -list"'
    if len(command) == 3:
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=None, shell=True)
        output = process.communicate()

        result = []
        if dic_app[command[1]]:
            result.append(dic_app[command[1]])
        if dic_job[command[2]]:
            result.append(dic_job[command[2]])
        job = '_'.join(result)

        for line in output[0].decode('utf-8').strip().split("\n"):
            if line.startswith("application") and line.strip().split()[1] == job:
                app_id = line.strip().split()[0]
                cmd = f'ssh -p포트 계정@IP "yarn application -kill {app_id}"'
                subprocess.call(cmd, shell=True)
                elapsed_time = int((datetime.datetime.now() - start_time).total_seconds() * 1000)
                await update.message.reply_text(f"Kill the application ({job}, {app_id})\n* Elapsed: {elapsed_time} ms")
                return

application = Application.builder().token(TOKEN).build()
application.add_handler(CommandHandler("stop", stop_command))

application.run_polling(timeout=3)
  • stop_command라는 비동기 함수를 정의하며, 이 함수는 Telegram 업데이트를 처리합니다.
  • 해당 함수 내에서는 yarn application -appStates RUNNING -list 명령을 SSH를 통해 원격으로 실행하여, 실행 중인 YARN 애플리케이션 목록을 가져옵니다.
  • 사용자로부터 받은 메시지를 분석하여 애플리케이션과 작업의 키를 찾고, 이를 바탕으로 YARN 애플리케이션을 찾아냅니다.
  • 찾아낸 YARN 애플리케이션을 yarn application -kill {app_id} 명령으로 중지합니다.

✅ SC 구성

**SC 실행 구조**

 project_config.sh → monitor/sc/monitorSC.sh → monitor/monitorSC.sh → monitor/parser/parseRecentCount.py

⚡ monitor/sc/monitorSC.sh

. path_to/Project/project_config.sh

APP="news" # 예시
TIME_RANGE="5m/m"
JOB_LIST="analysis" # 예시
INDEX="APP" # 예시

sh ${MONITOR_PATH}/monitorSC.sh ${APP} ${TIME_RANGE} ${JOB_LIST} ${INDEX}

⚡ monitor/monitorSC.sh

Spark Streaming job을 통해 ES로 유입된 Log를 통해 해당 Job의 데이터 처리 결과(outputCount)를 통해 데이터가 처리되고 있는 지를 모니터링

. path_to/Project/project_config.sh

DATE=`date +%Y.%m.%d`
SC="$1"
NAME="${SC^^} SC"
TIME_RANGE="$2"
JOB_LIST="$3"
INDEX="project-$4-${DATE}" # 실제 ES에서 사용하는 Index Pattern으로 변환 

echo "Checking ${NAME} Status... ${INDEX}"

RESULT=$(curl -XPOST "http://10.0.7.1:9200/${INDEX}/_search?q=sc.keyword:${SC}" -H "Content-Type: application/json" -d '{
	"size": 0,
	"aggs": {
		"range": {
			"date_range": {
				"field": "eventTime",
				"time_zone": "+09:00",
				"format": "HH:mm",
				"ranges": [{
					"from": "now-'"${TIME_RANGE}"'",
					"key": "time_'"${TIME_RANGE}"'"
				}]
			}, "aggs": {
				"jobs": {
					"terms": {
						"field": "job.keyword"
					}, "aggs": {
						"success": {
							"sum": {
								"field": "outputCount"
							}
						}
					}
				}
			}
		}
	}
}')

PY_RESULT=`echo ${RESULT} | python ${PARSER_PATH}/parseRecentCount.py ${JOB_LIST}`

if [[ ${PY_RESULT} == "true" ]]; then
  echo "${NAME} is fully online"
else
	echo "ES >>> ${RESULT}"
	echo "PY >>> ${PY_RESULT}"

  MESSAGE="Project Alert: ${NAME} is offline, ${PY_RESULT}"

	echo "${MESSAGE}"
	SMS_ALERT_SEND ${MESSAGE}
fi
  • . path_to/Project/project_config.sh: project_config.sh 파일의 내용을 현재 스크립트로 불러옵니다.
  • DATE, SC, NAME, TIME_RANGE, JOB_LIST, INDEX: 변수 설정입니다. 이 변수들은 스크립트 동작에 필요한 정보를 담고 있으며, INDEX는 Elasticsearch에서 사용될 인덱스 패턴을 형성합니다.
  • curl -XPOST ...: Elasticsearch에 쿼리를 보냅니다. 이 쿼리는 ${INDEX}에 해당하는 데이터에서 ${SC}와 관련된 정보를 얻어옵니다.
  • PY_RESULT=...: 파이썬 스크립트 parseRecentCount.py를 호출하여 Elasticsearch로부터 받은 결과를 처리합니다.
  • if [[ ${PY_RESULT} == "true" ]] ...: 파이썬 스크립트의 결과에 따라 조건 분기를 합니다. 만약 PY_RESULT가 "true"라면 서비스가 온라인 상태라고 판단하고, 그렇지 않으면 경고 메시지를 보냅니다.
  • SMS_ALERT_SEND ${MESSAGE}: SMS 경고 메시지를 보내는 함수를 호출합니다.

⚡ monitor/parser/parseRecentCount.py

Elasticsearch로부터 받은 JSON 응답을 분석하여 특정 작업(job)들이 온라인 상태인지를 확인

import json
import sys

def is_valid_string(s):
    return bool(s.strip())

def finish(s):
    print(s, end='')
    sys.exit(0)

def main():
    result = json.load(sys.stdin)
    job_list = sys.argv[1].split(",")

    if len(result) == 0 or len(job_list) == 0:
        finish("false")
    elif 'aggregations' in result:
        check_list = {}
        for bucket in result['aggregations']['range']['buckets'][0]['jobs']['buckets']:
            if is_valid_string(bucket['key']):
                check_list[bucket['key']] = int(bucket['success']['value'])
        for job in job_list:
            if not (job in check_list) or (check_list.get(job) == 0):
                finish(job)
        finish('true')
    else:
        finish('false')

if __name__ == "__main__":
    main()

✅ FB 구성

FB **실행 구조**

 project_config.sh → monitor/monitorFB.sh → monitor/parser/parseUniqueCount.py

⚡ monitor/monitorFB.sh

Hadoop Slave Server의 FileBeat 에서 Log가 정상적으로 전달되는지 모니터링

Elasticsearch(ES)에서 특정 인덱스의 "agent.hostname.keyword" 필드에 대한 고유한 카운트를 확인하고, 이를 특정 기준과 비교하여 상태를 체크

. path_to/Project/project_config.sh

DATE=$(date +%Y.%m.%d)
INDEX="project-app-${DATE}" # 실제 ES에서 사용하는 Index Pattern으로 변환 
NAME="FileBeat"
COUNT=3

echo "Checking ${NAME} Status... ${INDEX}"

RESULT=$(curl -XPOST "http://10.0.7.1:9200/${INDEX}/_search?size=0" -H 'Content-Type: application/json' -d '{
	"aggs": {
		"types_count": {
			"cardinality": {
				"field": "agent.hostname"
			}
		}
	}
}')

PY_RESULT=$(echo "${RESULT}" | python "${PARSER_PATH}"/parseUniqueCount.py)
declare -i PY_RESULT

COUNT_DIFF=$(( COUNT - PY_RESULT ))

HOUR=$(date +%H)
declare -i HOUR
IS_WORKING_TIME=FALSE

if [ "${HOUR}" -ge 8 ] && [ "${HOUR}" -lt 17 ]; then
		IS_WORKING_TIME=TRUE
fi

if [ ${COUNT_DIFF} == 0 ]; then
		echo "${NAME} is fully online, current ${COUNT_DIFF}"
elif [ ${COUNT_DIFF} -gt 0 ]; then
		echo "ES >>> ${RESULT}"
		echo "PY >>> ${PY_RESULT}"
		MESSAGE="Project Alert: ${NAME} is offline, current ${PY_RESULT}"
		echo "${MESSAGE}"

		if [ ${COUNT_DIFF} -ge 4 ] && [ ${IS_WORKING_TIME} == TRUE ]; then
				SMS_ALERT_SEND "${MESSAGE}"
		fi
elif [ "${PY_RESULT}" == -1 ]; then
	echo "ES >>> ${RESULT}"
	echo "PY >>> ${PY_RESULT}"
	MESSAGE="Project Alert: ${NAME} Parsing exception, PY Msg: ${PY_RESULT}, ES Msg: ${RESULT}"
	echo "${MESSAGE}"
	SMS_ALERT_SEND "${MESSAGE}"
fi
  • COUNT_DIFF는 기대하는 카운트(COUNT)와 실제 카운트(PY_RESULT)의 차이입니다.
  • 근무 시간 내에 4개 이상의 차이가 발생하면 SMS 알림을 보냅니다.
  • 파싱 중 예외가 발생하면(-1의 값이 반환되면), 이를 알림으로 보냅니다.

⚡ monitor/parser/parseUniqueCount.py

현재 정상적으로 작동하는 Filebeat 갯수를 Count

import json
import sys

result = json.load(sys.stdin)
if len(result) == 0:
    print(-1)
elif "aggregations" in result:
    print(result["aggregations"]["types_count"]["value"])
else:
    print(-1)

✅ Cron 실행기

**Cron 실행 구조**

project.cron → _Run.sh  (Alert, Chatbot, FB)

⚡ project.cron

Linux Cron(스케줄러) 에 등록하기 위한 파일

####################### MONITOR & ALERT #######################
10-59/5 * * * * path_to/Project/_Run.sh monitorFB.sh
10-59/5 * * * * path_to/Project/_Run.sh sc/monitorSC.sh

* * * * * path_to/Project/_Run.sh monitorAlert.sh
* * * * * path_to/Project/_Run.sh monitorChatbot.sh
  • 10-59/5 * * * * path_to/Project/_Run.sh monitorFB.sh:
    • 매 시간 10분부터 59분까지 5분 간격으로 path_to/Project/_Run.sh monitorFB.sh를 실행합니다.
  • 10-59/5 * * * * path_to/Project/_Run.sh sc/monitorSC.sh:
    • 또한 매 시간 10분부터 59분까지 5분 간격으로 path_to/Project/_Run.sh sc/monitorSC.sh를 실행합니다.
  • * * * * path_to/Project/_Run.sh monitorAlert.sh:
    • 매 분마다 path_to/Project/_Run.sh monitorAlert.sh를 실행합니다.
  • * * * * path_to/Project/_Run.sh monitorChatbot.sh:
    • 매 분마다 path_to/Project/_Run.sh monitorChatbot.sh를 실행합니다.

⚡ _Run.sh

주어진 인자($@)로 모니터링 스크립트를 실행하고, 표준 출력은 로그 파일에, 표준 에러는 에러 파일에 기록

NAME=`echo ${1} | sed 's/[ \/]/_/g;s/\.sh//'`
DIR=path_to/`date +%y%m%d`
LOG=${DIR}/${NAME}.log
ERR=${DIR}/${NAME}.err

mkdir -p $DIR

msg="Start: `date`"
echo ${msg} >> ${LOG}
echo ${msg} >> ${ERR}

sh path_to/Project/monitor/$@ >>${LOG} 2>>${ERR}

msg="End: `date`"
echo ${msg} >> ${LOG}
echo ${msg} >> ${ERR}
echo >> ${LOG}
echo >> ${ERR}

⚡ Cron 등록 방법

  • sudo crontab path_to/Project/project.cron

⚡ Cron 등록 확인

  • 작업이 제대로 등록되었는지 확인하려면 crontab -l을 실행

✅ 결과

⚡ SC Alert

⚡ FB Alert

⚡ Chatbot

profile
Data Engineer

0개의 댓글