**폴더 구조**
Project /
|-- __init__.py
|-- _Run.sh
|-- project.cron
|-- project_config.sh
|-- alert/
| |-- __init__.py
| |-- alert.py
| |-- chatbot.py
| |-- rest.py
|-- monitor/
| |-- __init__.py
| |-- moniterAlert.sh
| |-- moniterChatbot.sh
| |-- moniterFB.sh
| |-- moniterSC.sh
| |-- sc/
| | |-- __init__.py
| | |-- monitorSC.sh
| |-- parser/
| | |-- __init__.py
| | |-- parseRecentCount.py
| | |-- parseUniqueCount.py
**Alert 실행 구조**
project_config.sh → monitor/monitorAlert.sh → alert/rest.py → alert/alert.py
⚡ project_config.sh
SMS 알림을 보내는 기능 (SMS_ALERT_SEND), 프로젝트 내에서 사용할 경로를 미리 변수로 저장
function SMS_ALERT_SEND() {
local ALERT_MESSAGE=${@}
curl http://10.0.7.1:5044 -d "txtMsg=${ALERT_MESSAGE}"
}
ALERT_PATH = "path_to/Project/alert"
MONITOR_PATH = "path_to/Project/monitor"
PARSER_PATH = "path_to/Project/monitor/parser"
SMS_ALERT_SEND 함수ALERT_MESSAGE를 받아서 curl 명령어를 사용해 SMS 알림을 보내는 역할을 합니다.ALERT_PATH, MONITOR_PATH, PARSER_PATH⚡ monitor/monitorAlert.sh
rest.py 의 실행 여부를 확인하고 종료되어 있으면 실행
. path_to/Project/project_config.sh
RESULT = `pgrep -f "python ${ALERT_PATH}/rest.py"`
if [[ -z "${RESULT}" ]]]; then
nohup python ${ALERT_PATH}/rest.py &>/dev/null &
echo "Alert Service Start"
SMS_ALERT_SEND "Project Alert: Alert Service Start!"
else
echo "running now..."
fi
. path_to/Project/project_config.shproject_config.sh 스크립트를 소스로 가져와 실행합니다.RESULT = \pgrep -f "python ${ALERT_PATH}/rest.py"`명령어를 이용해${ALERT_PATH}/rest.py스크립트를 실행 중인 Python 프로세스가 있는지 확인하고 그 결과를RESULT 변수에 저장합니다.if [[ -z "${RESULT}" ]]]; then ... fiRESULT가 빈 문자열인지 검사합니다. 빈 문자열이면 ${ALERT_PATH}/rest.py가 실행되고 있지 않다는 의미이므로, nohup 명령어로 백그라운드에서 실행합니다.SMS_ALERT_SEND "Project Alert: Alert Service Start!"⚡ alert/rest.py
ChatBot System 으로 Alert을 보내기 위한 python code로 메시지를 받아 외부 스크립트(alert.py)를 실행하는 기능을 수행합니다.
HTTP POST 요청을 받을 때마다 해당 요청의 "txtMsg" 필드를 읽어 alert.py에 전달하며 실행시킵니다.
import http.server
import socketserver
import subprocess
import urllib.parse
class SimpleHandler(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length).decode('utf-8')
self.send_response(200)
self.end_headers()
parsed_body = urllib.parse.parse_qs(body)
message = parsed_body.get("txtMsg", [None])[0]
if message:
subprocess.call(["python", "path_to/Project/alert/alert.py", message])
host = "10.0.7.1"
port = 5044
with socketserver.TCPServer((host, port), SimpleHandler) as httpd:
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
SimpleHandler 클래스를 정의하고, http.server.SimpleHTTPRequestHandler를 상속받습니다.do_POST 메서드에서는 먼저 요청 본문의 길이를 가져온 다음, 본문을 읽어 body 변수에 저장합니다.body를 urllib.parse.parse_qs를 이용해 파싱합니다. 파싱한 결과에서 "txtMsg"라는 키에 해당하는 값을 가져와 message 변수에 저장합니다.message가 존재하면 subprocess.call을 통해 외부 Python 스크립트(alert.py)를 실행하고, 이때 message를 인자로 전달합니다.socketserver.TCPServer를 이용해 HTTP 서버를 10.0.7.1의 5044 포트에서 실행합니다. 서버는 키보드 인터럽트가 발생할 때까지 계속 실행됩니다.⚡ alert/alert.py
전달된 인자를 Telegram 메시지로 전송하는 Alert System
import asyncio
import sys
import telegram
TOKEN = "51############################################DFs"
CHAT_ID = "50######84"
async def send_alert(message):
bot = telegram.Bot(token=TOKEN)
await bot.sendMessage(chat_id=CHAT_ID, text=message)
def main():
message = " ".join(sys.argv[1:]) or 'Project: Invalid Alert Request...'
asyncio.run(send_alert(message))
if __name__ == '__main__':
main()
**Chatbot 실행 구조**
project_config.sh → monitor/monitorChatbot.sh → alert/chatbot.py
⚡ monitor/monitorChatbot.sh
chatbot.py의 실행 여부를 확인하고 종료되어 있으면 실행
. path_to/Project/project_config.sh
RESULT = `pgrep -f "python ${ALERT_PATH}/chatbot.py"`
if [[ -z "${RESULT}" ]]]; then
nohup python ${ALERT_PATH}/chatbot.py &>/dev/null &
echo "Chatbot Service Start"
SMS_ALERT_SEND "Project Alert: Chatbot Service Start!"
else
echo "running now..."
fi
. path_to/Project/project_config.shproject_config.sh 스크립트를 소스로 가져와 실행합니다.RESULT = \pgrep -f "python ${ALERT_PATH}/rest.py"`명령어를 이용해${ALERT_PATH}/chatbot.py스크립트를 실행 중인 Python 프로세스가 있는지 확인하고 그 결과를RESULT 변수에 저장합니다.if [[ -z "${RESULT}" ]]]; then ... fiRESULT가 빈 문자열인지 검사합니다. 빈 문자열이면 ${ALERT_PATH}/chatbot.py가 실행되고 있지 않다는 의미이므로, nohup 명령어로 백그라운드에서 실행합니다.SMS_ALERT_SEND "Project Alert: Chatbot Service Start!"⚡ alert/chatbot.py
Telegram Bot을 사용하여 특정 YARN 애플리케이션을 중지하는 등의 작업을 수행하는 ChatBot System
import datetime
import subprocess
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
TOKEN = "51############################################DFs"
print("strat telegram chat bot")
dic_app = { # 예시
'뉴스': 'NEWS'
}
dic_job = { # 예시
'분석': 'ANALYZER'
}
async def stop_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
start_time = datetime.datetime.now()
command = update.message.text.strip().split()
cmd = 'ssh -p포트 계정@IP "yarn application -appStates RUNNING -list"'
if len(command) == 3:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=None, shell=True)
output = process.communicate()
result = []
if dic_app[command[1]]:
result.append(dic_app[command[1]])
if dic_job[command[2]]:
result.append(dic_job[command[2]])
job = '_'.join(result)
for line in output[0].decode('utf-8').strip().split("\n"):
if line.startswith("application") and line.strip().split()[1] == job:
app_id = line.strip().split()[0]
cmd = f'ssh -p포트 계정@IP "yarn application -kill {app_id}"'
subprocess.call(cmd, shell=True)
elapsed_time = int((datetime.datetime.now() - start_time).total_seconds() * 1000)
await update.message.reply_text(f"Kill the application ({job}, {app_id})\n* Elapsed: {elapsed_time} ms")
return
application = Application.builder().token(TOKEN).build()
application.add_handler(CommandHandler("stop", stop_command))
application.run_polling(timeout=3)
stop_command라는 비동기 함수를 정의하며, 이 함수는 Telegram 업데이트를 처리합니다.yarn application -appStates RUNNING -list 명령을 SSH를 통해 원격으로 실행하여, 실행 중인 YARN 애플리케이션 목록을 가져옵니다.yarn application -kill {app_id} 명령으로 중지합니다.**SC 실행 구조**
project_config.sh → monitor/sc/monitorSC.sh → monitor/monitorSC.sh → monitor/parser/parseRecentCount.py
⚡ monitor/sc/monitorSC.sh
. path_to/Project/project_config.sh
APP="news" # 예시
TIME_RANGE="5m/m"
JOB_LIST="analysis" # 예시
INDEX="APP" # 예시
sh ${MONITOR_PATH}/monitorSC.sh ${APP} ${TIME_RANGE} ${JOB_LIST} ${INDEX}
⚡ monitor/monitorSC.sh
Spark Streaming job을 통해 ES로 유입된 Log를 통해 해당 Job의 데이터 처리 결과(outputCount)를 통해 데이터가 처리되고 있는 지를 모니터링
. path_to/Project/project_config.sh
DATE=`date +%Y.%m.%d`
SC="$1"
NAME="${SC^^} SC"
TIME_RANGE="$2"
JOB_LIST="$3"
INDEX="project-$4-${DATE}" # 실제 ES에서 사용하는 Index Pattern으로 변환
echo "Checking ${NAME} Status... ${INDEX}"
RESULT=$(curl -XPOST "http://10.0.7.1:9200/${INDEX}/_search?q=sc.keyword:${SC}" -H "Content-Type: application/json" -d '{
"size": 0,
"aggs": {
"range": {
"date_range": {
"field": "eventTime",
"time_zone": "+09:00",
"format": "HH:mm",
"ranges": [{
"from": "now-'"${TIME_RANGE}"'",
"key": "time_'"${TIME_RANGE}"'"
}]
}, "aggs": {
"jobs": {
"terms": {
"field": "job.keyword"
}, "aggs": {
"success": {
"sum": {
"field": "outputCount"
}
}
}
}
}
}
}
}')
PY_RESULT=`echo ${RESULT} | python ${PARSER_PATH}/parseRecentCount.py ${JOB_LIST}`
if [[ ${PY_RESULT} == "true" ]]; then
echo "${NAME} is fully online"
else
echo "ES >>> ${RESULT}"
echo "PY >>> ${PY_RESULT}"
MESSAGE="Project Alert: ${NAME} is offline, ${PY_RESULT}"
echo "${MESSAGE}"
SMS_ALERT_SEND ${MESSAGE}
fi
. path_to/Project/project_config.sh: project_config.sh 파일의 내용을 현재 스크립트로 불러옵니다.DATE, SC, NAME, TIME_RANGE, JOB_LIST, INDEX: 변수 설정입니다. 이 변수들은 스크립트 동작에 필요한 정보를 담고 있으며, INDEX는 Elasticsearch에서 사용될 인덱스 패턴을 형성합니다.curl -XPOST ...: Elasticsearch에 쿼리를 보냅니다. 이 쿼리는 ${INDEX}에 해당하는 데이터에서 ${SC}와 관련된 정보를 얻어옵니다.PY_RESULT=...: 파이썬 스크립트 parseRecentCount.py를 호출하여 Elasticsearch로부터 받은 결과를 처리합니다.if [[ ${PY_RESULT} == "true" ]] ...: 파이썬 스크립트의 결과에 따라 조건 분기를 합니다. 만약 PY_RESULT가 "true"라면 서비스가 온라인 상태라고 판단하고, 그렇지 않으면 경고 메시지를 보냅니다.SMS_ALERT_SEND ${MESSAGE}: SMS 경고 메시지를 보내는 함수를 호출합니다.⚡ monitor/parser/parseRecentCount.py
Elasticsearch로부터 받은 JSON 응답을 분석하여 특정 작업(job)들이 온라인 상태인지를 확인
import json
import sys
def is_valid_string(s):
return bool(s.strip())
def finish(s):
print(s, end='')
sys.exit(0)
def main():
result = json.load(sys.stdin)
job_list = sys.argv[1].split(",")
if len(result) == 0 or len(job_list) == 0:
finish("false")
elif 'aggregations' in result:
check_list = {}
for bucket in result['aggregations']['range']['buckets'][0]['jobs']['buckets']:
if is_valid_string(bucket['key']):
check_list[bucket['key']] = int(bucket['success']['value'])
for job in job_list:
if not (job in check_list) or (check_list.get(job) == 0):
finish(job)
finish('true')
else:
finish('false')
if __name__ == "__main__":
main()
FB **실행 구조**
project_config.sh → monitor/monitorFB.sh → monitor/parser/parseUniqueCount.py
⚡ monitor/monitorFB.sh
Hadoop Slave Server의 FileBeat 에서 Log가 정상적으로 전달되는지 모니터링
Elasticsearch(ES)에서 특정 인덱스의 "agent.hostname.keyword" 필드에 대한 고유한 카운트를 확인하고, 이를 특정 기준과 비교하여 상태를 체크
. path_to/Project/project_config.sh
DATE=$(date +%Y.%m.%d)
INDEX="project-app-${DATE}" # 실제 ES에서 사용하는 Index Pattern으로 변환
NAME="FileBeat"
COUNT=3
echo "Checking ${NAME} Status... ${INDEX}"
RESULT=$(curl -XPOST "http://10.0.7.1:9200/${INDEX}/_search?size=0" -H 'Content-Type: application/json' -d '{
"aggs": {
"types_count": {
"cardinality": {
"field": "agent.hostname"
}
}
}
}')
PY_RESULT=$(echo "${RESULT}" | python "${PARSER_PATH}"/parseUniqueCount.py)
declare -i PY_RESULT
COUNT_DIFF=$(( COUNT - PY_RESULT ))
HOUR=$(date +%H)
declare -i HOUR
IS_WORKING_TIME=FALSE
if [ "${HOUR}" -ge 8 ] && [ "${HOUR}" -lt 17 ]; then
IS_WORKING_TIME=TRUE
fi
if [ ${COUNT_DIFF} == 0 ]; then
echo "${NAME} is fully online, current ${COUNT_DIFF}"
elif [ ${COUNT_DIFF} -gt 0 ]; then
echo "ES >>> ${RESULT}"
echo "PY >>> ${PY_RESULT}"
MESSAGE="Project Alert: ${NAME} is offline, current ${PY_RESULT}"
echo "${MESSAGE}"
if [ ${COUNT_DIFF} -ge 4 ] && [ ${IS_WORKING_TIME} == TRUE ]; then
SMS_ALERT_SEND "${MESSAGE}"
fi
elif [ "${PY_RESULT}" == -1 ]; then
echo "ES >>> ${RESULT}"
echo "PY >>> ${PY_RESULT}"
MESSAGE="Project Alert: ${NAME} Parsing exception, PY Msg: ${PY_RESULT}, ES Msg: ${RESULT}"
echo "${MESSAGE}"
SMS_ALERT_SEND "${MESSAGE}"
fi
COUNT_DIFF는 기대하는 카운트(COUNT)와 실제 카운트(PY_RESULT)의 차이입니다.⚡ monitor/parser/parseUniqueCount.py
현재 정상적으로 작동하는 Filebeat 갯수를 Count
import json
import sys
result = json.load(sys.stdin)
if len(result) == 0:
print(-1)
elif "aggregations" in result:
print(result["aggregations"]["types_count"]["value"])
else:
print(-1)
**Cron 실행 구조**
project.cron → _Run.sh → (Alert, Chatbot, FB)
⚡ project.cron
Linux Cron(스케줄러) 에 등록하기 위한 파일
####################### MONITOR & ALERT #######################
10-59/5 * * * * path_to/Project/_Run.sh monitorFB.sh
10-59/5 * * * * path_to/Project/_Run.sh sc/monitorSC.sh
* * * * * path_to/Project/_Run.sh monitorAlert.sh
* * * * * path_to/Project/_Run.sh monitorChatbot.sh
10-59/5 * * * * path_to/Project/_Run.sh monitorFB.sh:path_to/Project/_Run.sh monitorFB.sh를 실행합니다.10-59/5 * * * * path_to/Project/_Run.sh sc/monitorSC.sh:path_to/Project/_Run.sh sc/monitorSC.sh를 실행합니다.* * * * path_to/Project/_Run.sh monitorAlert.sh:path_to/Project/_Run.sh monitorAlert.sh를 실행합니다.* * * * path_to/Project/_Run.sh monitorChatbot.sh:path_to/Project/_Run.sh monitorChatbot.sh를 실행합니다.⚡ _Run.sh
주어진 인자($@)로 모니터링 스크립트를 실행하고, 표준 출력은 로그 파일에, 표준 에러는 에러 파일에 기록
NAME=`echo ${1} | sed 's/[ \/]/_/g;s/\.sh//'`
DIR=path_to/`date +%y%m%d`
LOG=${DIR}/${NAME}.log
ERR=${DIR}/${NAME}.err
mkdir -p $DIR
msg="Start: `date`"
echo ${msg} >> ${LOG}
echo ${msg} >> ${ERR}
sh path_to/Project/monitor/$@ >>${LOG} 2>>${ERR}
msg="End: `date`"
echo ${msg} >> ${LOG}
echo ${msg} >> ${ERR}
echo >> ${LOG}
echo >> ${ERR}
⚡ Cron 등록 방법
sudo crontab path_to/Project/project.cron⚡ Cron 등록 확인
crontab -l을 실행⚡ SC Alert

⚡ FB Alert

⚡ Chatbot
