import os
import smtplib
from email.mime.text import MIMEText
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError
# 이메일 정보 설정
EMAIL_SENDER = "tastyTiramisu110@gmail.com"
EMAIL_PASSWORD = "duca bjnc ynsf mmup"
# MongoDB 연결 설정
MONGO_URI = "mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-rs"
DB_NAME = "example_collection"
COLLECTION_NAME = "example_collection"
# 이메일 전송 함수
def send_email(subject, body):
try:
msg = MIMEText(body, "plain")
msg["Subject"] = subject
msg["From"] = EMAIL_SENDER
msg["To"] = EMAIL_SENDER
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(EMAIL_SENDER, EMAIL_PASSWORD)
server.sendmail(EMAIL_SENDER, EMAIL_SENDER, msg.as_string())
print("[INFO] 이메일 전송 성공!")
except Exception as e:
print(f"[ERROR] 이메일 전송 실패: {e}")
# MongoDB 변경 사항 감지 및 알림 발신
def watch_collection():
try:
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
print(f"[INFO] {COLLECTION_NAME} 컬렉션 변경 사항 모니터링 시작...")
changed_titles = [] # 변경된 title 저장 리스트
with collection.watch() as stream:
for change in stream:
print("[INFO] 변경 사항 발생:", change)
operation_type = change["operationType"]
# insert, update, replace 이벤트에 대해 title만 수집
if operation_type in ["insert", "update", "replace"]:
document_key = change["documentKey"]["_id"]
updated_document = collection.find_one({"_id": document_key})
# title이 존재하는 경우만 수집
if updated_document and "title" in updated_document:
changed_titles.append(updated_document["title"])
# 변경 사항이 여러 개라면, 누적된 title 리스트로 알림 발송
if changed_titles:
subject = f"[MongoDB 알림] {COLLECTION_NAME} 컬렉션 변경 사항"
body = "다음 데이터가 변경되었습니다:\n" + "\n".join(changed_titles)
send_email(subject, body)
changed_titles.clear() # 알림 발송 후 리스트 초기화
except ServerSelectionTimeoutError as e:
print(f"[ERROR] MongoDB 연결 실패: {e}")
except Exception as e:
print(f"[ERROR] Change Stream 모니터링 중 오류 발생: {e}")
if __name__ == "__main__":
watch_collection()

성공!
net start MongoDB
net start
net stop MongoDB
mongo1 컨테이너에서 MongoDB 셸에 접속docker exec -it mongo1 mongosh
rs.initiate({
_id: "my-rs",
members: [
{ _id: 0, host: "mongo1:30001" },
{ _id: 1, host: "mongo2:30002" },
{ _id: 2, host: "mongo3:30003" }
]
});
rs.status();
rs.reconfig({
_id: "my-rs",
members: [
{ _id: 0, host: "mongo1:30001" },
{ _id: 1, host: "mongo2:30002" },
{ _id: 2, host: "mongo3:30003" }
]
}, { force: true });
localhost가 아닌, Docker 네트워크에서 설정된 컨테이너 이름(mongo1, mongo2, mongo3)으로 접근해야 함mongo1 컨테이너의 MongoDB에 접속docker exec -it mongo1 mongosh --host mongo1 --port 30001
{"ok": 0, ...} 메시지가 출력되면 Replica Set이 구성되지 않은 상태rs.status()
rs.initiate()
Replica Set 구성 문제
my-rs maps to this node"라는 오류 메시지가 발생했습니다.DNS 및 호스트 이름 문제
mongo1, mongo2, mongo3 호스트 이름이 서로 올바르게 해석되지 못한다는 것을 의미합니다.Primary 노드 설정 문제
PrimarySteppedDown: No primary exists currently라는 메시지가 지속적으로 출력되고 있습니다. 이는 primary 노드가 설정되지 않았거나, replica set 구성이 잘못되었음을 나타냅니다.docker network inspect darkweb_and_osint_viewer_my-cluster
hosts 파일을 수정하여 컨테이너 이름을 로컬 IP에 매핑C:\Windows\System32\drivers\etc\hosts 파일을 관리자 권한으로 열고 다음 내용을 추가127.0.0.1 mongo1
127.0.0.1 mongo2
127.0.0.1 mongo3
import smtplib
from email.mime.text import MIMEText
sender = "your_email@gmail.com"
password = "your_password"
recipient = "recipient_email@gmail.com"
msg = MIMEText("테스트 이메일 본문", "plain")
msg["Subject"] = "테스트 이메일"
msg["From"] = sender
msg["To"] = recipient
try:
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender, password)
server.sendmail(sender, recipient, msg.as_string())
print("이메일 전송 성공")
except Exception as e:
print(f"이메일 전송 실패: {e}")
use darkweb_db;
db.example_collection.insertOne({ title: "Change Stream Test", content: "Testing Change Stream functionality" });
https://myaccount.google.com/security로 이동smtp.gmail.com, 포트: 465(SSL) 또는 587(TLS)현재는 collection 단위로 파악해서 이메일을 보내 줌. 하지만 DB 단위로 변경 사항을 파악해서 하나의 메일에 담아 보내주도록 바꿀 것.
최종적으로 메일로 보내야 할 내용의 형태는 다음과 같이 생겼음.
참고로, UTC 시간과 한국 시간은 일정한 처리를 거쳤느냐의 차이이며, UTC 시간 표현은 MongoDB상에 저장된 _id 값을 받아 변환하면 돼.
# email_alarm.py
import os
import smtplib
from email.mime.text import MIMEText
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError
from datetime import datetime, timezone, timedelta
# 이메일 정보 설정
EMAIL_SENDER = "tastyTiramisu110@gmail.com"
EMAIL_PASSWORD = "duca bjnc ynsf mmup"
# MongoDB 연결 설정
MONGO_URI = "mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-rs"
DB_NAME = "example_collection"
# 이메일 전송 함수
def send_email(subject, body):
try:
msg = MIMEText(body, "plain")
msg["Subject"] = subject
msg["From"] = EMAIL_SENDER
msg["To"] = EMAIL_SENDER
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(EMAIL_SENDER, EMAIL_PASSWORD)
server.sendmail(EMAIL_SENDER, EMAIL_SENDER, msg.as_string())
print("[INFO] 이메일 전송 성공!")
except Exception as e:
print(f"[ERROR] 이메일 전송 실패: {e}")
# MongoDB 변경 사항 감지 및 알림 발신 (DB 단위로 처리)
def watch_database():
try:
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
db = client[DB_NAME]
print(f"[INFO] 데이터베이스 변경 사항 모니터링 시작...")
changed_documents = [] # 변경된 문서 저장 리스트
with client.watch() as stream: # 데이터베이스 단위로 감시
for change in stream:
print("[INFO] 변경 사항 발생:", change)
operation_type = change["operationType"]
collection_name = change["ns"]["coll"]
document_key = change["documentKey"]["_id"]
# 변경된 문서 가져오기
collection = db[collection_name]
updated_document = collection.find_one({"_id": document_key})
if updated_document:
# UTC 시간 (_id의 Timestamp에서 추출)
utc_time = updated_document["_id"].generation_time
korean_time = utc_time.astimezone(timezone(timedelta(hours=9)))
# 변경된 문서의 title 필드가 있을 경우 추가
if "title" in updated_document:
changed_documents.append({
"title": updated_document["title"],
"collection": collection_name,
"utc_time": utc_time,
"korean_time": korean_time
})
# 변경 사항이 누적되었을 경우, 이메일 발송
if changed_documents:
email_subject = "[MongoDB 알림] 데이터베이스 변경 사항"
email_body = "다음 변경 사항이 감지되었습니다:\n\n"
for doc in changed_documents:
email_body += (
f"- **제목**: {doc['title']}\n"
f"- **유출한 사이트**: {doc['collection']}\n"
f"- **UTC 시간**: {doc['utc_time']}\n"
f"- **한국 시간**: {doc['korean_time']}\n\n"
)
send_email(email_subject, email_body)
changed_documents.clear() # 이메일 발송 후 리스트 초기화
except ServerSelectionTimeoutError as e:
print(f"[ERROR] MongoDB 연결 실패: {e}")
except Exception as e:
print(f"[ERROR] Change Stream 모니터링 중 오류 발생: {e}")
if __name__ == "__main__":
watch_database()
watch_database() 함수with client.watch()를 통해 데이터베이스 전체에서 발생하는 변경 사항(insert, update, replace 등)을 지속적으로 감지watch_database() 함수가 종료되지 않는 한, 스크립트는 백그라운드에서 계속 동작하며 변경 사항을 모니터링import os
import smtplib
from email.mime.text import MIMEText
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ServerSelectionTimeoutError
import asyncio
from datetime import datetime, timedelta
# 이메일 정보 설정
EMAIL_SENDER = "tastyTiramisu110@gmail.com"
EMAIL_PASSWORD = "duca bjnc ynsf mmup"
# MongoDB 연결 설정
MONGO_URI = "mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=my-rs"
DB_NAME = "example_collection"
# 이메일 전송 함수
async def send_email(subject, body):
try:
msg = MIMEText(body, "plain")
msg["Subject"] = subject
msg["From"] = EMAIL_SENDER
msg["To"] = EMAIL_SENDER
# SMTP 클라이언트 생성 및 이메일 전송
loop = asyncio.get_event_loop()
def smtp_send():
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(EMAIL_SENDER, EMAIL_PASSWORD)
server.sendmail(EMAIL_SENDER, EMAIL_SENDER, msg.as_string())
# SMTP 전송을 비동기로 실행
await loop.run_in_executor(None, smtp_send)
print("[INFO] 이메일 전송 성공!")
except Exception as e:
print(f"[ERROR] 이메일 전송 실패: {e}")
# UTC 시간 및 한국 시간 변환 함수
def format_times_from_id(change_id):
utc_time = change_id.generation_time
kor_time = utc_time + timedelta(hours=9) # UTC+9 시간대
return utc_time, kor_time
# 변경 사항 모니터링 및 알림 발신
async def watch_database():
try:
client = AsyncIOMotorClient(MONGO_URI, serverSelectionTimeoutMS=5000)
db = client[DB_NAME]
print("[INFO] 데이터베이스 변경 사항 모니터링 시작...")
changed_docs = []
async with client.watch() as stream:
async for change in stream:
print("[INFO] 변경 사항 발생:", change)
operation_type = change["operationType"]
# 변경 사항 처리
if operation_type in ["insert", "update", "replace"]:
collection_name = change["ns"]["coll"]
document_key = change["documentKey"]["_id"]
updated_document = await db[collection_name].find_one({"_id": document_key})
# title이 존재하는 경우만 수집
if updated_document and "title" in updated_document:
utc_time, kor_time = format_times_from_id(document_key)
changed_docs.append(
f"- **제목**: {updated_document['title']}\n"
f"- **유출한 사이트**: {collection_name}\n"
f"- **UTC 시간**: {utc_time}\n"
f"- **한국 시간**: {kor_time}"
)
# 변경 사항이 여러 개라면, 누적된 내용으로 이메일 발송
if changed_docs:
subject = "[MongoDB 알림] 데이터베이스 변경 사항"
body = "\n\n".join(changed_docs)
await send_email(subject, body)
changed_docs.clear() # 이메일 발송 후 리스트 초기화
except ServerSelectionTimeoutError as e:
print(f"[ERROR] MongoDB 연결 실패: {e}")
except Exception as e:
print(f"[ERROR] Change Stream 모니터링 중 오류 발생: {e}")
# 메인 함수
async def main():
await watch_database()
if __name__ == "__main__":
asyncio.run(main())