개발 세션 - 코드 완성

goldenGlow_21·2024년 12월 20일
0

email_alarm.py 완성

import os
import smtplib
from email.mime.text import MIMEText
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError

# 이메일 정보 설정
EMAIL_SENDER = "tastyTiramisu110@gmail.com"
EMAIL_PASSWORD = "duca bjnc ynsf mmup"

# MongoDB 연결 설정
MONGO_URI = "mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-rs"
DB_NAME = "example_collection"
COLLECTION_NAME = "example_collection"

# 이메일 전송 함수
def send_email(subject, body):
    try:
        msg = MIMEText(body, "plain")
        msg["Subject"] = subject
        msg["From"] = EMAIL_SENDER
        msg["To"] = EMAIL_SENDER

        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(EMAIL_SENDER, EMAIL_PASSWORD)
            server.sendmail(EMAIL_SENDER, EMAIL_SENDER, msg.as_string())
        print("[INFO] 이메일 전송 성공!")
    except Exception as e:
        print(f"[ERROR] 이메일 전송 실패: {e}")

# MongoDB 변경 사항 감지 및 알림 발신
def watch_collection():
    try:
        client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
        db = client[DB_NAME]
        collection = db[COLLECTION_NAME]

        print(f"[INFO] {COLLECTION_NAME} 컬렉션 변경 사항 모니터링 시작...")
        changed_titles = []  # 변경된 title 저장 리스트

        with collection.watch() as stream:
            for change in stream:
                print("[INFO] 변경 사항 발생:", change)
                operation_type = change["operationType"]

                # insert, update, replace 이벤트에 대해 title만 수집
                if operation_type in ["insert", "update", "replace"]:
                    document_key = change["documentKey"]["_id"]
                    updated_document = collection.find_one({"_id": document_key})

                    # title이 존재하는 경우만 수집
                    if updated_document and "title" in updated_document:
                        changed_titles.append(updated_document["title"])

                # 변경 사항이 여러 개라면, 누적된 title 리스트로 알림 발송
                if changed_titles:
                    subject = f"[MongoDB 알림] {COLLECTION_NAME} 컬렉션 변경 사항"
                    body = "다음 데이터가 변경되었습니다:\n" + "\n".join(changed_titles)
                    send_email(subject, body)
                    changed_titles.clear()  # 알림 발송 후 리스트 초기화
    except ServerSelectionTimeoutError as e:
        print(f"[ERROR] MongoDB 연결 실패: {e}")
    except Exception as e:
        print(f"[ERROR] Change Stream 모니터링 중 오류 발생: {e}")

if __name__ == "__main__":
    watch_collection()

성공!


MongoDB

  • 명령 프롬프트(CMD) 또는 PowerShell을 관리자 권한으로 실행
  • 아래 명령어를 입력하여 MongoDB 서비스를 시작
net start MongoDB
  • 서비스가 정상적으로 시작되었는지 확인
net start
  • MongoDB를 중지
net stop MongoDB

Replica Set Prep

Replica Set 초기화

  • mongo1 컨테이너에서 MongoDB 셸에 접속
docker exec -it mongo1 mongosh
  • 다음 명령어로 복제 세트를 초기화
rs.initiate({
  _id: "my-rs",
  members: [
    { _id: 0, host: "mongo1:30001" },
    { _id: 1, host: "mongo2:30002" },
    { _id: 2, host: "mongo3:30003" }
  ]
});
  • Replica Set 상태 확인
rs.status();

if Fail

  • Replica Set 설정이 올바르지 않은 경우, 설정을 재구성
rs.reconfig({
  _id: "my-rs",
  members: [
    { _id: 0, host: "mongo1:30001" },
    { _id: 1, host: "mongo2:30002" },
    { _id: 2, host: "mongo3:30003" }
  ]
}, { force: true });

Replica Set 접근

  • MongoDB는 localhost가 아닌, Docker 네트워크에서 설정된 컨테이너 이름(mongo1, mongo2, mongo3)으로 접근해야 함
  • 예시: mongo1 컨테이너의 MongoDB에 접속
docker exec -it mongo1 mongosh --host mongo1 --port 30001

Replica Set

  • Replica Set 상태 확인
  • 정상적인 Replica Set 구성이라면 상태가 출력
  • {"ok": 0, ...} 메시지가 출력되면 Replica Set이 구성되지 않은 상태
rs.status()
  • MongoDB 클라이언트를 다시 열고 Replica Set 초기화
rs.initiate()

Replica Set Fail 시의 정보(gpt 피셜)

  • Replica Set 구성 문제

    • "No host described in new configuration with {version: 1, term: 1} for replica set my-rs maps to this node"라는 오류 메시지가 발생했습니다.
    • 현재 복제 세트 구성이 제대로 동작하지 않고 있습니다.
  • DNS 및 호스트 이름 문제

    • "HostNotFound"와 "getaddrinfo() failed"와 같은 오류는 컨테이너 내부에서 mongo1, mongo2, mongo3 호스트 이름이 서로 올바르게 해석되지 못한다는 것을 의미합니다.
  • Primary 노드 설정 문제

    • PrimarySteppedDown: No primary exists currently라는 메시지가 지속적으로 출력되고 있습니다. 이는 primary 노드가 설정되지 않았거나, replica set 구성이 잘못되었음을 나타냅니다.

Docker-Interior

  • 도커 내부 네트워크 상태를 점검
docker network inspect darkweb_and_osint_viewer_my-cluster

Host Name Mapping - 윈도우에서

  • Windows 환경에서는 hosts 파일을 수정하여 컨테이너 이름을 로컬 IP에 매핑
  • C:\Windows\System32\drivers\etc\hosts 파일을 관리자 권한으로 열고 다음 내용을 추가
127.0.0.1   mongo1
127.0.0.1   mongo2
127.0.0.1   mongo3

Test

  • 간단한 스크립트를 작성하여 SMTP 서버와 연결 및 전송 테스트
import smtplib
from email.mime.text import MIMEText

sender = "your_email@gmail.com"
password = "your_password"
recipient = "recipient_email@gmail.com"

msg = MIMEText("테스트 이메일 본문", "plain")
msg["Subject"] = "테스트 이메일"
msg["From"] = sender
msg["To"] = recipient

try:
    with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
        server.login(sender, password)
        server.sendmail(sender, recipient, msg.as_string())
    print("이메일 전송 성공")
except Exception as e:
    print(f"이메일 전송 실패: {e}")
  • MongoDB 셸에서 데이터를 삽입
use darkweb_db;
db.example_collection.insertOne({ title: "Change Stream Test", content: "Testing Change Stream functionality" });

Email config

Gmail 기준

SMTP 설정 확인

  • Google 계정에 로그인
  • https://myaccount.google.com/security로 이동
  • "앱 비밀번호" 설정을 찾기
  • 앱 비밀번호가 설정되지 않았다면 "앱 비밀번호" 생성 후 기록
  • SMTP 서버 주소: smtp.gmail.com, 포트: 465(SSL) 또는 587(TLS)

알림 명세 변경

변경할 명세

  1. 현재는 collection 단위로 파악해서 이메일을 보내 줌. 하지만 DB 단위로 변경 사항을 파악해서 하나의 메일에 담아 보내주도록 바꿀 것.

  2. 최종적으로 메일로 보내야 할 내용의 형태는 다음과 같이 생겼음.

  • 제목: Market Update: Stocks Surge
  • 유출한 사이트: articles
  • UTC 시간: 2024-12-16 07:07:06+00:00
  • 한국 시간: 2024-12-16 16:07:06+09:00

참고로, UTC 시간과 한국 시간은 일정한 처리를 거쳤느냐의 차이이며, UTC 시간 표현은 MongoDB상에 저장된 _id 값을 받아 변환하면 돼.

  1. 유출한 사이트 이름은 컬렉션 이름을 그대로 가져와 쓰도록 만들어줘.

변경한 버전

# email_alarm.py
import os
import smtplib
from email.mime.text import MIMEText
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError
from datetime import datetime, timezone, timedelta

# 이메일 정보 설정
EMAIL_SENDER = "tastyTiramisu110@gmail.com"
EMAIL_PASSWORD = "duca bjnc ynsf mmup"

# MongoDB 연결 설정
MONGO_URI = "mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-rs"
DB_NAME = "example_collection"

# 이메일 전송 함수
def send_email(subject, body):
    try:
        msg = MIMEText(body, "plain")
        msg["Subject"] = subject
        msg["From"] = EMAIL_SENDER
        msg["To"] = EMAIL_SENDER

        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(EMAIL_SENDER, EMAIL_PASSWORD)
            server.sendmail(EMAIL_SENDER, EMAIL_SENDER, msg.as_string())
        print("[INFO] 이메일 전송 성공!")
    except Exception as e:
        print(f"[ERROR] 이메일 전송 실패: {e}")

# MongoDB 변경 사항 감지 및 알림 발신 (DB 단위로 처리)
def watch_database():
    try:
        client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
        db = client[DB_NAME]
        print(f"[INFO] 데이터베이스 변경 사항 모니터링 시작...")

        changed_documents = []  # 변경된 문서 저장 리스트

        with client.watch() as stream:  # 데이터베이스 단위로 감시
            for change in stream:
                print("[INFO] 변경 사항 발생:", change)
                operation_type = change["operationType"]
                collection_name = change["ns"]["coll"]
                document_key = change["documentKey"]["_id"]

                # 변경된 문서 가져오기
                collection = db[collection_name]
                updated_document = collection.find_one({"_id": document_key})

                if updated_document:
                    # UTC 시간 (_id의 Timestamp에서 추출)
                    utc_time = updated_document["_id"].generation_time
                    korean_time = utc_time.astimezone(timezone(timedelta(hours=9)))

                    # 변경된 문서의 title 필드가 있을 경우 추가
                    if "title" in updated_document:
                        changed_documents.append({
                            "title": updated_document["title"],
                            "collection": collection_name,
                            "utc_time": utc_time,
                            "korean_time": korean_time
                        })

                # 변경 사항이 누적되었을 경우, 이메일 발송
                if changed_documents:
                    email_subject = "[MongoDB 알림] 데이터베이스 변경 사항"
                    email_body = "다음 변경 사항이 감지되었습니다:\n\n"

                    for doc in changed_documents:
                        email_body += (
                            f"- **제목**: {doc['title']}\n"
                            f"- **유출한 사이트**: {doc['collection']}\n"
                            f"- **UTC 시간**: {doc['utc_time']}\n"
                            f"- **한국 시간**: {doc['korean_time']}\n\n"
                        )

                    send_email(email_subject, email_body)
                    changed_documents.clear()  # 이메일 발송 후 리스트 초기화

    except ServerSelectionTimeoutError as e:
        print(f"[ERROR] MongoDB 연결 실패: {e}")
    except Exception as e:
        print(f"[ERROR] Change Stream 모니터링 중 오류 발생: {e}")

if __name__ == "__main__":
    watch_database()

작동 원리

  • watch_database() 함수
    - with client.watch()를 통해 데이터베이스 전체에서 발생하는 변경 사항(insert, update, replace 등)을 지속적으로 감지
    • 이 함수는 무한 루프처럼 작동하며, MongoDB에서 감지 가능한 이벤트가 발생할 때까지 대기 상태를 유지
  • 변경 사항 발생 시
    • MongoDB에서 이벤트를 스트리밍 방식으로 제공하며, 이벤트가 발생하면 해당 이벤트를 처리
    • 처리된 내용을 기반으로 변경된 문서를 가져오고, 필요 시 이메일 전송을 수행
  • 에이전트처럼 지속 실행
    • watch_database() 함수가 종료되지 않는 한, 스크립트는 백그라운드에서 계속 동작하며 변경 사항을 모니터링
    • 이는 MongoDB와 연결이 유지되는 한 작동을 멈추지 않음

비동기 버전(최종본!)

import os
import smtplib
from email.mime.text import MIMEText
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ServerSelectionTimeoutError
import asyncio
from datetime import datetime, timedelta

# 이메일 정보 설정
EMAIL_SENDER = "tastyTiramisu110@gmail.com"
EMAIL_PASSWORD = "duca bjnc ynsf mmup"

# MongoDB 연결 설정
MONGO_URI = "mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-rs"
DB_NAME = "example_collection"

# 이메일 전송 함수
async def send_email(subject, body):
    try:
        msg = MIMEText(body, "plain")
        msg["Subject"] = subject
        msg["From"] = EMAIL_SENDER
        msg["To"] = EMAIL_SENDER

        # SMTP 클라이언트 생성 및 이메일 전송
        loop = asyncio.get_event_loop()

        def smtp_send():
            with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
                server.login(EMAIL_SENDER, EMAIL_PASSWORD)
                server.sendmail(EMAIL_SENDER, EMAIL_SENDER, msg.as_string())

        # SMTP 전송을 비동기로 실행
        await loop.run_in_executor(None, smtp_send)
        print("[INFO] 이메일 전송 성공!")
    except Exception as e:
        print(f"[ERROR] 이메일 전송 실패: {e}")

# UTC 시간 및 한국 시간 변환 함수
def format_times_from_id(change_id):
    utc_time = change_id.generation_time
    kor_time = utc_time + timedelta(hours=9)  # UTC+9 시간대
    return utc_time, kor_time

# 변경 사항 모니터링 및 알림 발신
async def watch_database():
    try:
        client = AsyncIOMotorClient(MONGO_URI, serverSelectionTimeoutMS=5000)
        db = client[DB_NAME]

        print("[INFO] 데이터베이스 변경 사항 모니터링 시작...")
        changed_docs = []

        async with client.watch() as stream:
            async for change in stream:
                print("[INFO] 변경 사항 발생:", change)
                operation_type = change["operationType"]

                # 변경 사항 처리
                if operation_type in ["insert", "update", "replace"]:
                    collection_name = change["ns"]["coll"]
                    document_key = change["documentKey"]["_id"]
                    updated_document = await db[collection_name].find_one({"_id": document_key})

                    # title이 존재하는 경우만 수집
                    if updated_document and "title" in updated_document:
                        utc_time, kor_time = format_times_from_id(document_key)
                        changed_docs.append(
                            f"- **제목**: {updated_document['title']}\n"
                            f"- **유출한 사이트**: {collection_name}\n"
                            f"- **UTC 시간**: {utc_time}\n"
                            f"- **한국 시간**: {kor_time}"
                        )

                # 변경 사항이 여러 개라면, 누적된 내용으로 이메일 발송
                if changed_docs:
                    subject = "[MongoDB 알림] 데이터베이스 변경 사항"
                    body = "\n\n".join(changed_docs)
                    await send_email(subject, body)
                    changed_docs.clear()  # 이메일 발송 후 리스트 초기화
    except ServerSelectionTimeoutError as e:
        print(f"[ERROR] MongoDB 연결 실패: {e}")
    except Exception as e:
        print(f"[ERROR] Change Stream 모니터링 중 오류 발생: {e}")

# 메인 함수
async def main():
    await watch_database()

if __name__ == "__main__":
    asyncio.run(main())
profile
안드로이드는 리눅스의 꿈을 꾸는가

0개의 댓글