Blackbasta 크롤러
import json
import os
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
# ChromeDriver 경로 (프로젝트 폴더 내에 위치한 chromedriver.exe)
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")
# Tor 프록시 설정
proxy_address = "127.0.0.1:9050" # Tor SOCKS5 프록시 주소
# Selenium WebDriver 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}") # Tor 프록시 사용
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36")
# WebDriver 초기화
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
# 크롤링 대상 URL
base_url = "http://stniiomyjliimcgkvdszvgen3eaaoz55hreqqx6o77yvmpwt7gklffqd.onion"
category_url = f"{base_url}/"
# 데이터 저장용 리스트
all_data = []
# 크롤링 함수
def crawl_blackbasta():
try:
# Selenium으로 페이지 열기
driver.get(category_url)
# JavaScript 로딩 대기 (명시적으로 요소가 로드될 때까지 대기)
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CLASS_NAME, "title"))
)
# BeautifulSoup으로 HTML 파싱
soup = BeautifulSoup(driver.page_source, "html.parser")
posts = soup.find_all("div", class_="title")
for post in posts:
try:
title_element = post.find("a", class_="blog_name_link")
if not title_element:
continue
title = title_element.text.strip()
url = title_element["href"].strip()
# Description 추출 (p 태그에서 data-v-md-line="3"만 선택)
description_element = post.find_next("p", {"data-v-md-line": "3"})
description = description_element.get_text(strip=True) if description_element else ""
# 데이터 저장
post_data = {
"title": title,
"url": url,
"description": description,
}
all_data.append(post_data)
print(f"추출 완료: {title}")
except Exception as e:
print(f"데이터 추출 중 오류 발생: {e}")
# JSON 파일로 저장
with open("blackbasta.json", "w", encoding="utf-8") as f:
json.dump(all_data, f, ensure_ascii=False, indent=4)
print("blackbasta.json 파일 저장 완료.")
except Exception as e:
print(f"크롤링 중 오류 발생: {e}")
finally:
driver.quit()
if __name__ == "__main__":
crawl_blackbasta()
크롤러 데이터 반환 형태 수정 및 JSON Schema 적용
abyss
import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import time
from jsonschema import validate, ValidationError
# ChromeDriver 경로
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")
# TOR 프록시 설정
proxy_address = "127.0.0.1:9050" # TOR SOCKS5 프록시 주소
# Selenium WebDriver 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}") # TOR 프록시 사용
# WebDriver 초기화
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
# 크롤링 대상 URL
base_url = "http://3ev4metjirohtdpshsqlkrqcmxq6zu3d7obrdhglpy5jpbr7whmlfgqd.onion"
# JSON Schema 정의
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"}
},
"required": ["title", "description"]
}
}
def crawl_posts():
# Selenium으로 페이지 열기
driver.get(base_url)
time.sleep(5) # 페이지 로딩 대기
# BeautifulSoup으로 HTML 파싱
soup = BeautifulSoup(driver.page_source, "html.parser")
cards = soup.find_all("div", class_="card-body") # 카드 뉴스 데이터 추출
results = []
for card in cards:
try:
title = card.find("h5", class_="card-title").text.strip() # 제목 추출
description = card.find("p", class_="card-text").text.strip() # 설명 추출
post_data = {"title": title, "description": description}
results.append(post_data)
print(f"추출 완료: {title}")
except Exception as e:
print(f"크롤링 중 오류 발생: {e}")
# JSON Schema 검증
try:
validate(instance=results, schema=schema)
print("데이터 검증 성공!")
except ValidationError as ve:
print(f"데이터 검증 실패: {ve}")
# JSON 파일 저장 (테스트용, 실제 사용 시 반환만 수행)
# with open("abyss.json", "w", encoding="utf-8") as f:
# json.dump(results, f, ensure_ascii=False, indent=4)
# print("abyss.json 파일 저장 완료.")
return results
if __name__ == "__main__":
try:
data = crawl_posts()
print(json.dumps(data, ensure_ascii=False, indent=4)) # 결과 출력
finally:
# WebDriver 종료
driver.quit()
blackbasta
import json
import os
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")
proxy_address = "127.0.0.1:9050"
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")
chrome_options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
)
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
base_url = "http://stniiomyjliimcgkvdszvgen3eaaoz55hreqqx6o77yvmpwt7gklffqd.onion"
category_url = f"{base_url}/"
schema = {
"type": "object",
"properties": {
"title": {"type": "string"},
"url": {"type": "string", "format": "uri"},
"description": {"type": "string"},
},
"required": ["title", "url", "description"],
}
def crawl_blackbasta():
all_data = []
try:
driver.get(category_url)
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CLASS_NAME, "title"))
)
soup = BeautifulSoup(driver.page_source, "html.parser")
posts = soup.find_all("div", class_="title")
for post in posts:
try:
title_element = post.find("a", class_="blog_name_link")
if not title_element:
continue
title = title_element.text.strip()
url = title_element["href"].strip()
description_element = post.find_next("p", {"data-v-md-line": "3"})
description = (
description_element.get_text(strip=True)
if description_element
else ""
)
post_data = {
"title": title,
"url": url,
"description": description,
}
try:
validate(instance=post_data, schema=schema)
all_data.append(post_data)
print(f"추출 완료: {title}")
except ValidationError as e:
print(f"데이터 검증 실패: {e.message}")
except Exception as e:
print(f"데이터 추출 중 오류 발생: {e}")
return all_data
except Exception as e:
print(f"크롤링 중 오류 발생: {e}")
return []
finally:
driver.quit()
if __name__ == "__main__":
result = crawl_blackbasta()
print(result)
breachdetector
from telethon import TelegramClient
import json
import os
from jsonschema import validate, ValidationError
api_id = os.getenv("TELEGRAM_API_ID")
api_hash = os.getenv("TELEGRAM_API_HASH")
channel_username = "breachdetector"
if not api_id or not api_hash:
raise EnvironmentError(
"API ID 또는 API Hash가 설정되지 않았습니다. "
"환경 변수 TELEGRAM_API_ID와 TELEGRAM_API_HASH를 설정해주세요."
)
schema = {
"type": "object",
"properties": {
"content": {"type": "string"},
"date": {"type": "string", "format": "date-time"},
"sender_id": {"type": ["string", "number"]}
},
"required": ["content", "date", "sender_id"]
}
client = TelegramClient("session_name", api_id, api_hash)
async def fetch_messages():
await client.start()
messages = await client.get_messages(channel_username, limit=100)
data = []
for message in messages:
try:
text = message.text
if text:
entry = {
"content": text,
"date": str(message.date),
"sender_id": message.sender_id
}
try:
validate(instance=entry, schema=schema)
data.append(entry)
print(f"메시지 저장 완료: {entry}")
except ValidationError as e:
print(f"데이터 검증 실패: {e.message}")
except Exception as e:
print(f"오류 발생: {e}")
return data
def crawl_breachdetector():
with client:
return client.loop.run_until_complete(fetch_messages())
if __name__ == "__main__":
result = crawl_breachdetector()
print(result)
- telethon 사용을 위한 api id / 해쉬 값은 사용자의 실행 환경에서 환경변수로 지정해놓아야 함
ctifeeds
import json
import os
import requests
from jsonschema import validate, ValidationError
json_sources = [
{"url": "https://ctifeeds.andreafortuna.org/dataleaks.json", "categories": "dataleaks"},
{"url": "https://ctifeeds.andreafortuna.org/cybercrime_on_telegram.json", "categories": "cybercrime_on_telegram"},
{"url": "https://ctifeeds.andreafortuna.org/phishing_sites.json", "categories": "phishing_sites"},
{"url": "https://ctifeeds.andreafortuna.org/datamarkets.json", "categories": "datamarkets"},
{"url": "https://ctifeeds.andreafortuna.org/ransomware_victims.json", "categories": "ransomware_victims"},
{"url": "https://ctifeeds.andreafortuna.org/recent_defacements.json", "categories": "recent_defacements"},
]
schema = {
"type": "object",
"properties": {
"categories": {"type": "string"},
"name": {"type": "string"},
"url": {"type": "string"},
"source": {"type": "string"},
"screenshot": {"type": ["string", "null"]},
"urlscan": {"type": ["string", "null"]}
},
"required": ["categories", "name", "url", "source"]
}
all_data = []
def fetch_json_data():
for source in json_sources:
try:
response = requests.get(source["url"], timeout=10)
response.raise_for_status()
data = response.json()
for item in data:
item["categories"] = source["categories"]
try:
validate(instance=item, schema=schema)
all_data.append(item)
except ValidationError as e:
print(f"데이터 검증 실패 ({source['categories']}): {e.message}")
print(f"데이터 수집 완료: {source['categories']}")
except Exception as e:
print(f"데이터 수집 중 오류 발생 ({source['categories']}): {e}")
return all_data
if __name__ == "__main__":
result = fetch_json_data()
print(result)
darkleak
import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
import time
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")
proxy_address = "127.0.0.1:9050"
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
base_url = "http://darkleakyqmv62eweqwy4dnhaijg4m4dkburo73pzuqfdumcntqdokyd.onion"
category_url = f"{base_url}/index.html"
schema = {
"type": "object",
"properties": {
"file_name": {"type": "string"},
"url": {"type": ["string", "null"]}
},
"required": ["file_name", "url"]
}
all_data = []
def crawl_files():
try:
driver.get(category_url)
time.sleep(5)
soup = BeautifulSoup(driver.page_source, "html.parser")
rows = soup.find_all("tr", onclick=True)
for row in rows:
try:
file_name = row.find("strong").text.strip()
onclick_attr = row.get("onclick")
if onclick_attr and "window.location='" in onclick_attr:
relative_url = onclick_attr.split("'")[1]
full_url = f"{base_url}/{relative_url}"
else:
full_url = None
post_data = {
"file_name": file_name,
"url": full_url
}
try:
validate(instance=post_data, schema=schema)
all_data.append(post_data)
print(f"추출 완료: {file_name}, URL: {full_url}")
except ValidationError as e:
print(f"데이터 검증 실패: {e.message}")
except Exception as e:
print(f"데이터 추출 중 오류 발생: {e}")
return all_data
finally:
driver.quit()
if __name__ == "__main__":
result = crawl_files()
print(result)
island
import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
import time
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")
proxy_address = "127.0.0.1:9050"
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
base_url = "https://crackingisland.net/"
category_url = f"{base_url}/categories/combolists"
schema = {
"type": "object",
"properties": {
"title": {"type": "string"},
"url": {"type": "string"},
"type": {"type": "string"},
"dateCreated": {"type": "string"},
"description": {"type": "string"}
},
"required": ["title", "url", "type", "dateCreated", "description"]
}
all_data = []
def crawl_combolists():
try:
driver.get(category_url)
time.sleep(5)
soup = BeautifulSoup(driver.page_source, "html.parser")
posts = soup.find_all("a", itemprop="url")
for post in posts:
try:
title = post.find("h2", itemprop="headline").text.strip()
post_url = base_url + post["href"]
post_type = post.find("span", itemprop="about").text.strip()
post_date = post.find("span", itemprop="dateCreated").text.strip()
description = post.find("p", itemprop="text").text.strip()
post_data = {
"title": title,
"url": post_url,
"type": post_type,
"dateCreated": post_date,
"description": description,
}
try:
validate(instance=post_data, schema=schema)
all_data.append(post_data)
print(f"추출 완료: {title}")
except ValidationError as e:
print(f"데이터 검증 실패: {e.message}")
except Exception as e:
print(f"크롤링 중 오류 발생: {e}")
return all_data
finally:
driver.quit()
if __name__ == "__main__":
result = crawl_combolists()
print(result)