개발 세션 - 크롤러 코드 비동기로 개선

goldenGlow_21·2024년 12월 20일
0

작업 목록

OSINT

  • tuts4you
  • x00org

Dark Web

  • abyss
  • blackbasta
  • blacksuit
  • ctifeeds
  • daixin
  • darkleak
  • darknetARMY

작업 코드

tuts4you(완료)

import asyncio
import aiohttp
from aiohttp_socks import ProxyConnector
from bs4 import BeautifulSoup
import json
import os
import re
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient

# Tor 프록시 설정
PROXY_URL = "socks5://127.0.0.1:9050"

# 비동기 Tor 요청 함수
async def tor_request(session, url):
    try:
        async with session.get(url, timeout=30) as response:
            if response.status == 200:
                return await response.text()
    except Exception as e:
        print(f"[ERROR] tuts4you_crawler.py - tor_request(): {e}")
    return None

# 페이지 수 추출 함수
def get_total_pages(soup):
    pagination_element = soup.find("li", class_="ipsPagination_pageJump")
    if pagination_element:
        text = pagination_element.get_text(strip=True)
        match = re.search(r"Page \d+ of (\d+)", text)
        if match:
            return int(match.group(1))
    return 1

# 키워드 검사 함수
def check_page(a_tag, keywords):
    return any(keyword in a_tag.get("title", "") for keyword in keywords)

def check_snippet_for_keywords(a_tag, keywords):
    parent_div = a_tag.find_parent("div", class_="ipsTopicSnippet__top")
    if parent_div:
        snippet_p = parent_div.find_next_sibling("div", class_="ipsTopicSnippet__snippet")
        if snippet_p:
            snippet_text = snippet_p.get_text(strip=True)
            return sum(1 for keyword in keywords if keyword in snippet_text) >= 5
    return False

# 페이지 크롤링 함수
async def search_page(session, db, target_url, keywords):
    collection = db["tuts4you"]
    try:
        html_content = await tor_request(session, target_url)
        if not html_content:
            return

        soup = BeautifulSoup(html_content, "html.parser")
        total_pages = get_total_pages(soup)

        for page_num in range(1, total_pages + 1):
            page_url = f"{target_url}page/{page_num}/" if page_num > 1 else target_url

            page_content = await tor_request(session, page_url)
            if not page_content:
                continue

            soup = BeautifulSoup(page_content, "html.parser")
            a_tags = soup.find_all("a")

            for a_tag in a_tags:
                if check_page(a_tag, keywords) and check_snippet_for_keywords(a_tag, keywords):
                    title = a_tag.get("title")
                    url = a_tag.get("href")
                    if not await collection.find_one({"title": title}):  # 중복 확인
                        post_data = {
                            "title": title,
                            "url": url,
                            "crawled_time": str(datetime.utcnow())
                        }
                        await collection.insert_one(post_data)
    except Exception as e:
        print(f"[ERROR] tuts4you_crawler.py - search_page(): {e}")

# 메인 실행 함수
async def tuts4you():
    # MongoDB 연결
    client = AsyncIOMotorClient("mongodb://localhost:27017/")
    db = client["darkweb_db"]

    # 현재 스크립트 기준 경로에서 키워드 파일 로드
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
    KEYWORDS_FILE = os.path.join(BASE_DIR, "cleaned_keywords.json")
    try:
        with open(KEYWORDS_FILE, 'r') as f:
            data = json.load(f)
        keywords = data.get("keywords", [])
    except FileNotFoundError as e:
        print(f"[ERROR] tuts4you_crawler.py - tuts4you(): {e}")
        return

    # 타겟 URL 목록
    target_categories = [
        "https://forum.tuts4you.com/forum/47-programming-and-coding/",
        "https://forum.tuts4you.com/forum/121-programming-resources/",
        "https://forum.tuts4you.com/forum/133-software-security/",
        "https://forum.tuts4you.com/forum/146-challenge-of-reverse-engineering/",
        "https://forum.tuts4you.com/forum/124-hardware-reverse-engineering/",
        "https://forum.tuts4you.com/forum/122-network-security/",
        "https://forum.tuts4you.com/forum/93-reverse-engineering-articles/"
    ]

    # Tor 프록시 커넥터 생성
    connector = ProxyConnector.from_url(PROXY_URL)

    # 비동기 세션 생성 및 크롤링 작업 수행
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [search_page(session, db, url, keywords) for url in target_categories]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(tuts4you())

x00org(완료)

import asyncio
import aiohttp
from aiohttp_socks import ProxyConnector
from bs4 import BeautifulSoup
import json
import os
import re
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient

# Tor 프록시 설정
PROXY_URL = "socks5://127.0.0.1:9050"

# 비동기 Tor 요청 함수
async def tor_request(session, url, retries=3):
    for attempt in range(retries):
        try:
            await asyncio.sleep(2)  # 요청 간 딜레이 추가
            async with session.get(url, timeout=30) as response:
                if response.status == 200:
                    return await response.text()
        except Exception as e:
            print(f"[ERROR] x00org_crawler.py - tor_request(){e}")
    return None

# 키워드 로드 함수
def load_keywords(file_path):
    try:
        with open(file_path, 'r') as file:
            data = json.load(file)
            return data.get("keywords", [])
    except (FileNotFoundError, json.JSONDecodeError) as e:
        print(f"[ERROR] x00org_crawler.py - load_keywords(): {e}")
        return []

# 게시글 제목 및 URL 가져오기
async def fetch_post_titles(session, base_url):
    html_content = await tor_request(session, base_url)
    if not html_content:
        return []

    soup = BeautifulSoup(html_content, 'html.parser')
    posts = [
        {"title": link.get_text(strip=True), "url": link['href']}
        for link in soup.find_all('a', class_='title raw-link raw-topic-link', href=True)
    ]
    return posts

# 제목에서 키워드 매칭 확인
def match_keywords_in_titles(posts, keywords):
    results = []
    for post in posts:
        matched_keywords = [
            keyword for keyword in keywords
            if re.search(rf"\b{re.escape(keyword).replace(' ', '[-_]')}\b", post['title'], re.IGNORECASE)
        ]
        if matched_keywords:
            results.append({
                "title": post["title"],
                "keywords": ", ".join(matched_keywords),
                "url": post["url"]
            })
    return results

# 본문에서 키워드 매칭 확인
async def verify_keywords_in_content(session, url, keywords):
    html_content = await tor_request(session, url)
    if not html_content:
        return False

    soup = BeautifulSoup(html_content, 'html.parser')
    content = soup.get_text(strip=True)
    return any(content.lower().count(keyword.lower()) >= 3 for keyword in keywords)

# 크롤링 실행 함수
async def x00org():
    # MongoDB 연결
    client = AsyncIOMotorClient("mongodb://localhost:27017/")
    db = client["darkweb_db"]
    collection = db["0x00org"]

    # 키워드 파일 로드
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
    KEYWORDS_FILE = os.path.join(BASE_DIR, "cleaned_keywords.json")
    keywords = load_keywords(KEYWORDS_FILE)
    if not keywords:
        return

    base_urls = [
        "https://0x00sec.org/c/bug-bounty/108",
        "https://0x00sec.org/c/pentesting/101",
        "https://0x00sec.org/c/red-team/102",
        "https://0x00sec.org/c/blue-team/105",
        "https://0x00sec.org/c/exploit-development/53",
        "https://0x00sec.org/c/reconnaissance/54",
        "https://0x00sec.org/c/malware/56",
        "https://0x00sec.org/c/cryptology/57",
        "https://0x00sec.org/c/reverse-engineering/58",
        "https://0x00sec.org/c/linux/64",
        "https://0x00sec.org/c/ai/71",
        "https://0x00sec.org/c/social/46",
        "https://0x00sec.org/c/uncategorized/1",
        "https://0x00sec.org/c/ctf/55",
        "https://0x00sec.org/c/web-hacking/59",
        "https://0x00sec.org/c/social-engineering/60",
        "https://0x00sec.org/c/programming/61",
        "https://0x00sec.org/c/databases/62",
        "https://0x00sec.org/c/networking/63",
        "https://0x00sec.org/c/algorithms/70",
        "https://0x00sec.org/c/anonymity/72",
        "https://0x00sec.org/c/hardware/68",
        "https://0x00sec.org/c/operations/86",
        "https://0x00sec.org/c/phone-hacking/92",
        "https://0x00sec.org/c/forensics/106"
    ]

    # Tor 프록시 커넥터 생성
    connector = ProxyConnector.from_url(PROXY_URL)
    async with aiohttp.ClientSession(connector=connector) as session:
        for base_url in base_urls:
            posts = await fetch_post_titles(session, base_url)
            if not posts:
                continue

            matched_posts = match_keywords_in_titles(posts, keywords)
            for post in matched_posts:
                if await verify_keywords_in_content(session, post["url"], post["keywords"].split(", ")):
                    if not await collection.find_one({"title": post["title"]}):
                        post_data = {
                            "title": post["title"],
                            "url": post["url"],
                            "keywords": post["keywords"],
                            "crawled_time": str(datetime.now())
                        }
                        await collection.insert_one(post_data)

if __name__ == "__main__":
    asyncio.run(x00org())

abyss(완료)

import os
import asyncio
from concurrent.futures import ThreadPoolExecutor
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from datetime import datetime
from jsonschema import validate, ValidationError
from pymongo import MongoClient


def crawl_page(base_url, chromedriver_path, proxy_address, schema, collection):
    """
    개별 페이지를 크롤링하는 동기 함수
    """
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # 브라우저 창 표시 없이 실행
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")

    service = Service(chromedriver_path)
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
        print(f"[INFO] Crawling page: {base_url}")
        driver.get(base_url)
        driver.implicitly_wait(5)  # 페이지 로딩 대기

        soup = BeautifulSoup(driver.page_source, "html.parser")
        cards = soup.find_all("div", class_="card-body")  # 카드 데이터 추출

        for card in cards:
            try:
                # 데이터 추출
                title = card.find("h5", class_="card-title").text.strip()
                description = card.find("p", class_="card-text").text.strip()

                # 데이터 생성
                post_data = {
                    "title": title,
                    "description": description,
                    "crawled_time": str(datetime.now())
                }

                # JSON Schema 검증
                try:
                    validate(instance=post_data, schema=schema)

                    # 중복 확인 및 데이터 저장
                    if not collection.find_one({"title": title, "description": description}):
                        collection.insert_one(post_data)
                        print(f"[INFO] Saved: {title}")
                    else:
                        print(f"[INFO] Skipped (duplicate): {title}")

                except ValidationError as ve:
                    print(f"[WARNING] 데이터 검증 실패: {ve.message}")

            except Exception as e:
                print(f"[ERROR] 데이터 추출 중 오류 발생: {e}")

    except Exception as e:
        print(f"[ERROR] 페이지 크롤링 실패: {e}")
    finally:
        driver.quit()


async def abyss(db):
    """
    Abyss 크롤러 실행 및 MongoDB 컬렉션에 데이터 저장 (비동기 실행)
    """
    collection = db["abyss"]  # MongoDB 컬렉션 선택

    # ChromeDriver 경로 설정
    current_dir = os.path.dirname(os.path.abspath(__file__))
    chromedriver_path = os.path.join(current_dir, "chromedriver.exe")

    # 프록시 주소 (Tor SOCKS5)
    proxy_address = "127.0.0.1:9050"

    # JSON Schema 정의
    schema = {
        "type": "object",
        "properties": {
            "title": {"type": "string"},
            "description": {"type": "string"},
            "crawled_time": {"type": "string"}
        },
        "required": ["title", "description"]
    }

    # 대상 URL 목록 (명세 준수)
    base_urls = [
        "http://3ev4metjirohtdpshsqlkrqcmxq6zu3d7obrdhglpy5jpbr7whmlfgqd.onion"
    ]

    # 비동기 실행
    with ThreadPoolExecutor(max_workers=5) as executor:
        loop = asyncio.get_event_loop()
        tasks = [
            loop.run_in_executor(
                executor,
                crawl_page,
                url, chromedriver_path, proxy_address, schema, collection
            )
            for url in base_urls
        ]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    # MongoDB 연결 설정
    mongo_client = MongoClient("mongodb://localhost:27017/")
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(abyss(db))

blackbasta(완료)

import os
import asyncio
from concurrent.futures import ThreadPoolExecutor
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
from datetime import datetime
from pymongo import MongoClient

# JSON Schema 정의
schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string"},
        "url": {"type": "string", "format": "uri"},
        "description": {"type": "string"},
        "crawled_time": {"type": "string", "format": "date-time"},
    },
    "required": ["title", "url", "description"],
}

def crawl_page(category_url, proxy_address, schema, collection):
    """
    개별 페이지를 동기적으로 크롤링하는 함수
    """
    # Selenium WebDriver 옵션 설정
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")
    chrome_options.add_argument(
        "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
    )

    driver = webdriver.Chrome(options=chrome_options)

    try:
        print(f"[INFO] Crawling page: {category_url}")
        driver.get(category_url)

        # JavaScript 로딩 대기
        WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.CLASS_NAME, "title"))
        )

        # BeautifulSoup으로 HTML 파싱
        soup = BeautifulSoup(driver.page_source, "html.parser")
        posts = soup.find_all("div", class_="title")

        for post in posts:
            try:
                title_element = post.find("a", class_="blog_name_link")
                if not title_element:
                    continue

                title = title_element.text.strip()
                url = title_element["href"].strip()

                # Description 추출
                description_element = post.find_next("p", {"data-v-md-line": "3"})
                description = (
                    description_element.get_text(strip=True)
                    if description_element
                    else ""
                )

                # 데이터 생성
                post_data = {
                    "title": title,
                    "url": url,
                    "description": description,
                    "crawled_time": str(datetime.now()),
                }

                # JSON Schema 검증
                try:
                    validate(instance=post_data, schema=schema)

                    # 중복 확인 및 데이터 저장
                    if not collection.find_one({"title": title, "url": url}):
                        collection.insert_one(post_data)
                        print(f"[INFO] Saved: {title}")
                    else:
                        print(f"[INFO] Skipped (duplicate): {title}")

                except ValidationError as e:
                    print(f"[WARNING] 데이터 검증 실패: {e.message}")

            except Exception as e:
                print(f"[ERROR] 데이터 추출 중 오류 발생: {e}")

    except Exception as e:
        print(f"[ERROR] 페이지 크롤링 실패: {e}")

    finally:
        driver.quit()


async def blackbasta(db):
    """
    BlackBasta 크롤러 실행 및 MongoDB 컬렉션에 비동기적 저장
    """
    collection = db["blackbasta"]  # MongoDB 컬렉션 선택
    proxy_address = "127.0.0.1:9050"

    # 크롤링 대상 URL
    base_url = "http://stniiomyjliimcgkvdszvgen3eaaoz55hreqqx6o77yvmpwt7gklffqd.onion"
    category_url = f"{base_url}/"

    # 비동기 실행
    with ThreadPoolExecutor(max_workers=5) as executor:
        loop = asyncio.get_event_loop()
        tasks = [
            loop.run_in_executor(
                executor,
                crawl_page,
                category_url, proxy_address, schema, collection
            )
        ]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    # MongoDB 연결 설정
    mongo_client = MongoClient("mongodb://localhost:27017/")
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(blackbasta(db))

blacksuit(완료)

import asyncio
from concurrent.futures import ThreadPoolExecutor
from requests_tor import RequestsTor
from bs4 import BeautifulSoup
from datetime import datetime
from pymongo import MongoClient

# RequestsTor 인스턴스 초기화
rt = RequestsTor(tor_ports=(9050,), tor_cport=9051)

def crawl_blacksuit_page(url, collection):
    """
    BlackSuit 개별 페이지를 크롤링하는 동기 함수
    """
    try:
        # 메인 페이지 요청 및 파싱
        r = rt.get(url)
        soup = BeautifulSoup(r.text, 'html.parser')

        # 페이지 번호 가져오기
        page_numbers = [a.text.strip() for a in soup.select('.pagination a')]

        for page_number in page_numbers:
            page_url = f'{url}?page={page_number}'
            page_response = rt.get(page_url)
            page_soup = BeautifulSoup(page_response.text, 'html.parser')

            # 게시글 정보 추출
            items = page_soup.find_all("div", class_='card')
            for item in items:
                result = {}

                # 제목
                title = item.find('div', class_='title')
                result['title'] = title.text.strip() if title else None
                result['post_url'] = url + title.find('a').get('href') if title else ''

                # 회사 정보
                try:
                    company = item.find('div', class_='url').find('a')
                    result['company'] = company['href'] if company else ''
                except Exception:
                    result['company'] = ''

                # 내용
                content = item.find('div', class_='text')
                result['content'] = content.text.strip() if content else None

                # 추가 링크
                links = []
                link_div = item.find('div', class_='links')
                if link_div:
                    link_tags = link_div.find_all('a')
                    links = [link.get('href') for link in link_tags if link.get('href')]
                result['links'] = links

                # 크롤링 시간 추가
                result['Crawled Time'] = str(datetime.now())

                # 중복 확인 및 데이터 저장
                if not collection.find_one({"title": result['title'], "post_url": result['post_url']}):
                    collection.insert_one(result)
                    print(f"Saved: {result['title']}")
                else:
                    print(f"Skipped (duplicate): {result['title']}")

    except Exception as e:
        print(f"[ERROR] BlackSuit 크롤링 중 오류 발생: {e}")


async def blacksuit(db):
    """
    BlackSuit 크롤러 실행 및 MongoDB 컬렉션에 비동기적 저장
    """
    collection = db["blacksuit"]
    base_url = 'http://weg7sdx54bevnvulapqu6bpzwztryeflq3s23tegbmnhkbpqz637f2yd.onion/'

    print("[INFO] BlackSuit 크롤러 실행 시작...")

    # ThreadPoolExecutor를 사용해 비동기적으로 동기 함수 실행
    with ThreadPoolExecutor() as executor:
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(executor, crawl_blacksuit_page, base_url, collection)

    print("[INFO] BlackSuit 크롤러 실행 완료")


if __name__ == "__main__":
    # MongoDB 연결 설정
    mongo_client = MongoClient("mongodb://localhost:27017/")
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(blacksuit(db))

ctifeeds(완료)

import asyncio
import aiohttp
from pymongo import MongoClient
from jsonschema import validate, ValidationError
from datetime import datetime

# JSON 데이터 URL 목록 및 카테고리 이름
json_sources = [
    {"url": "https://ctifeeds.andreafortuna.org/dataleaks.json", "categories": "dataleaks"},
    {"url": "https://ctifeeds.andreafortuna.org/cybercrime_on_telegram.json", "categories": "cybercrime_on_telegram"},
    {"url": "https://ctifeeds.andreafortuna.org/phishing_sites.json", "categories": "phishing_sites"},
    {"url": "https://ctifeeds.andreafortuna.org/datamarkets.json", "categories": "datamarkets"},
    {"url": "https://ctifeeds.andreafortuna.org/ransomware_victims.json", "categories": "ransomware_victims"},
    {"url": "https://ctifeeds.andreafortuna.org/recent_defacements.json", "categories": "recent_defacements"},
]

# JSON Schema 정의
schema = {
    "type": "object",
    "properties": {
        "categories": {"type": "string"},
        "name": {"type": "string"},
        "url": {"type": "string"},
        "source": {"type": "string"},
        "screenshot": {"type": ["string", "null"]},
        "urlscan": {"type": ["string", "null"]},
    },
    "required": ["categories", "name", "url", "source"],
}

async def fetch_json(session, source):
    """
    비동기적으로 JSON 데이터를 가져오는 함수
    """
    try:
        async with session.get(source["url"], timeout=10) as response:
            response.raise_for_status()
            data = await response.json()
            print(f"[INFO] 데이터 가져오기 성공: {source['categories']}")
            return source["categories"], data
    except Exception as e:
        print(f"[ERROR] 데이터 수집 중 오류 발생 ({source['categories']}): {e}")
        return source["categories"], None

async def process_data(db, source, data):
    """
    MongoDB에 데이터를 저장하는 함수
    """
    collection = db["ctifeeds"]
    for item in data:
        item["categories"] = source
        item["Crawled Time"] = str(datetime.now())  # 크롤링 시간 추가

        # JSON Schema 검증 및 저장
        try:
            validate(instance=item, schema=schema)
            if not collection.find_one({"categories": item["categories"], "name": item["name"]}):
                collection.insert_one(item)
                print(f"Saved: {item['name']} in category {item['categories']}")
            else:
                print(f"Skipped (duplicate): {item['name']} in category {item['categories']}")
        except ValidationError as e:
            print(f"[ERROR] 데이터 검증 실패 ({item['categories']}): {e.message}")
        except Exception as e:
            print(f"[ERROR] 데이터 저장 중 오류 발생: {e}")

async def ctifeeds(db):
    """
    ctifeeds 크롤러 실행 및 MongoDB 컬렉션에 비동기적으로 데이터 저장
    """
    print("[INFO] ctifeeds 크롤러 실행 시작...")
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_json(session, source) for source in json_sources]
        results = await asyncio.gather(*tasks)

        for source, data in results:
            if data:
                await process_data(db, source, data)

    print("[INFO] ctifeeds 크롤러 실행 완료")

if __name__ == "__main__":
    # MongoDB 연결 설정
    mongo_client = MongoClient("mongodb://localhost:27017/")
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(ctifeeds(db))

daixin(완료)

import asyncio
from aiohttp_socks import ProxyConnector
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from pymongo import MongoClient
from datetime import datetime

# Tor 프록시 설정
TOR_PROXY = "socks5://127.0.0.1:9050"

async def fetch_page(session, url):
    """
    비동기적으로 페이지를 요청하는 함수
    """
    try:
        async with session.get(url, timeout=30) as response:
            response.raise_for_status()
            print(f"[INFO] 페이지 가져오기 성공: {url}")
            return await response.text()
    except Exception as e:
        print(f"[ERROR] 페이지 요청 실패: {url} - {e}")
        return None

async def process_page(db, html):
    """
    HTML 데이터를 파싱하고 MongoDB에 저장하는 함수
    """
    collection = db["daixin"]  # MongoDB 컬렉션 선택
    try:
        soup = BeautifulSoup(html, 'html.parser')
        items = soup.find_all("div", class_='border border-warning card-body shadow-lg')

        for item in items:
            try:
                result = {}

                # 제목 추출
                title = item.find('h4', class_='border-danger card-title text-start text-white')
                result['title'] = title.text.strip() if title else None

                # 회사 URL 추출
                company_url = item.find('h6', class_='card-subtitle mb-2 text-muted text-start')
                result['company_url'] = (
                    company_url.text.replace('Web Site:', '').strip()
                    if company_url else None
                )

                # 내용 추출
                content = item.find('p', class_='card-text text-start text-white')
                result['content'] = content.text.strip() if content else None

                # 추가 링크 추출
                links = item.find_all('a')
                result['links'] = [link.get('href') for link in links if link.get('href')]

                # 크롤링 시간 추가
                result['crawled_time'] = str(datetime.now())

                # 중복 확인 및 데이터 저장
                if not collection.find_one({"title": result['title'], "company_url": result['company_url']}):
                    collection.insert_one(result)
                    print(f"Saved: {result['title']}")
                else:
                    print(f"Skipped (duplicate): {result['title']}")

            except Exception as e:
                print(f"[ERROR] 데이터 추출 중 오류 발생: {e}")
    except Exception as e:
        print(f"[ERROR] HTML 파싱 중 오류 발생: {e}")

async def daixin(db):
    """
    Daixin 크롤러 비동기 실행 및 MongoDB 컬렉션에 데이터 저장
    """
    url = 'http://7ukmkdtyxdkdivtjad57klqnd3kdsmq6tp45rrsxqnu76zzv3jvitlqd.onion/'
    connector = ProxyConnector.from_url(TOR_PROXY)

    async with ClientSession(connector=connector) as session:
        print("[INFO] Daixin 크롤러 실행 시작...")
        html = await fetch_page(session, url)

        if html:
            await process_page(db, html)

    print("[INFO] Daixin 크롤러 실행 완료")

if __name__ == "__main__":
    # MongoDB 연결 설정
    mongo_client = MongoClient("mongodb://localhost:27017/")
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(daixin(db))

darkleak(완료)

import os
import asyncio
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient  # 비동기 MongoDB 클라이언트

# JSON Schema 정의
SCHEMA = {
    "type": "object",
    "properties": {
        "file_name": {"type": "string"},
        "url": {"type": ["string", "null"]},
        "crawled_time": {"type": "string"}
    },
    "required": ["file_name", "url"]
}

# TOR Proxy 설정
TOR_PROXY = "socks5://127.0.0.1:9050"

async def fetch_page(driver, url):
    """
    Selenium으로 페이지를 가져오는 비동기 함수
    """
    print(f"[INFO] 페이지 로드: {url}")
    try:
        driver.get(url)
        await asyncio.sleep(3)  # 페이지 로드 대기
        return driver.page_source
    except Exception as e:
        print(f"[ERROR] 페이지 로드 실패: {e}")
        return None

async def process_page(db, html, base_url):
    """
    HTML을 파싱하고 데이터를 MongoDB에 저장하는 함수
    """
    collection = db["darkleak"]
    try:
        soup = BeautifulSoup(html, "html.parser")
        rows = soup.find_all("tr", onclick=True)

        for row in rows:
            try:
                # 파일 이름 추출
                file_name = row.find("strong").text.strip()

                # onclick 속성에서 URL 추출
                onclick_attr = row.get("onclick")
                if onclick_attr and "window.location='" in onclick_attr:
                    relative_url = onclick_attr.split("'")[1]
                    full_url = f"{base_url}/{relative_url}"
                else:
                    full_url = None

                # 데이터 생성
                post_data = {
                    "file_name": file_name,
                    "url": full_url,
                    "crawled_time": str(datetime.now())
                }

                # JSON Schema 검증
                validate(instance=post_data, schema=SCHEMA)

                # 중복 확인 및 데이터 저장
                if not await collection.find_one({"file_name": file_name, "url": full_url}):
                    await collection.insert_one(post_data)
                    print(f"Saved: {file_name}, URL: {full_url}")
                else:
                    print(f"Skipped (duplicate): {file_name}")

            except ValidationError as e:
                print(f"[ERROR] 데이터 검증 실패: {e.message}")
            except Exception as e:
                print(f"[ERROR] 데이터 처리 중 오류: {e}")

    except Exception as e:
        print(f"[ERROR] HTML 파싱 중 오류 발생: {e}")

async def darkleak(db):
    """
    DarkLeak 크롤러 실행 (비동기)
    """
    base_url = "http://darkleakyqmv62eweqwy4dnhaijg4m4dkburo73pzuqfdumcntqdokyd.onion"
    category_url = f"{base_url}/index.html"

    # ChromeDriver 설정
    current_dir = os.path.dirname(os.path.abspath(__file__))
    chromedriver_path = os.path.join(current_dir, "chromedriver.exe")

    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument(f"--proxy-server={TOR_PROXY}")
    chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64)")

    service = Service(chromedriver_path)
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
        # 페이지 가져오기
        html = await fetch_page(driver, category_url)
        if html:
            # 페이지 처리 및 데이터 저장
            await process_page(db, html, base_url)
    except Exception as e:
        print(f"[ERROR] 크롤링 중 오류 발생: {e}")
    finally:
        driver.quit()
        print("[INFO] 드라이버 종료")

if __name__ == "__main__":
    # 비동기 MongoDB 연결
    MONGO_URI = "mongodb://localhost:27017"
    mongo_client = AsyncIOMotorClient(MONGO_URI)
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(darkleak(db))

darknetARMY(완료)

import asyncio
import aiohttp
from aiohttp_socks import ProxyConnector
from bs4 import BeautifulSoup
from datetime import datetime
from pymongo import MongoClient

async def fetch_page(session, url):
    """
    페이지 요청을 비동기적으로 처리
    """
    try:
        async with session.get(url, timeout=15) as response:
            response.raise_for_status()
            print(f"[INFO] Fetched: {url}")
            return await response.text()
    except Exception as e:
        print(f"[ERROR] Failed to fetch {url}: {e}")
        return None

async def process_page(db, session, base_url, page):
    """
    각 페이지를 비동기적으로 처리하고 MongoDB에 저장
    """
    collection = db["darknetARMY"]
    url = f"{base_url}page-{page}"
    print(f"[INFO] Processing page {page}: {url}")

    html_content = await fetch_page(session, url)
    if not html_content:
        print(f"[WARNING] Skipping page {page} due to fetch failure.")
        return

    # BeautifulSoup으로 HTML 파싱
    soup = BeautifulSoup(html_content, 'html.parser')
    threads = soup.find_all('div', class_='structItem')

    for thread in threads:
        title_tag = thread.find('div', class_='structItem-title')
        title = title_tag.get_text(strip=True) if title_tag else None

        author_tag = thread.find('a', class_='username')
        author = author_tag.get_text(strip=True) if author_tag else None

        time_tag = thread.find('time')
        post_time = time_tag["title"] if time_tag and "title" in time_tag.attrs else None

        post_data = {
            "title": title,
            "author": author,
            "posted Time": post_time,
            "crawled Time": str(datetime.now())
        }

        # 중복 확인 및 저장
        if title and not collection.find_one({"title": title, "posted Time": post_time}):
            collection.insert_one(post_data)
            print(f"Saved: {post_data}")
        else:
            print(f"Skipped (duplicate): {post_data['title'] if title else 'No Title'}")

async def darknetARMY(db):
    """
    DarknetARMY 크롤러 비동기 실행 및 MongoDB 저장
    """
    base_url = "http://dna777qhcrxy5sbvk7rkdd2phhxbftpdtxvwibih26nr275cdazx4uyd.onion/whats-new/posts/797681/"
    proxy_url = "socks5://127.0.0.1:9050"

    connector = ProxyConnector.from_url(proxy_url)
    headers = {
        "User-Agent": "Mozilla/5.0"
    }

    async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
        tasks = [process_page(db, session, base_url, page) for page in range(1, 4)]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    # MongoDB 연결 설정
    MONGO_URI = "mongodb://localhost:27017/"
    DB_NAME = "darkweb_db"

    try:
        client = MongoClient(MONGO_URI)
        db = client[DB_NAME]
        print("[INFO] MongoDB 연결 성공")

        # 비동기 실행
        asyncio.run(darknetARMY(db))

    except Exception as e:
        print(f"[ERROR] MongoDB 연결 실패: {e}")

크롤러 코드 selenium -> playwright

abyss

import os
import asyncio
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
from datetime import datetime
from jsonschema import validate, ValidationError
from pymongo import MongoClient

async def crawl_page(base_url, proxy_address, schema, collection):
    """
    개별 페이지를 크롤링하는 비동기 함수 (Playwright 사용)
    """
    try:
        async with async_playwright() as p:
            browser = await p.chromium.launch(
                headless=True,
                proxy={
                    "server": f"socks5://{proxy_address}"
                }
            )
            page = await browser.new_page()
            await page.goto(base_url, timeout=60000)
            content = await page.content()
            await browser.close()
  
        soup = BeautifulSoup(content, "html.parser")
        cards = soup.find_all("div", class_="card-body")  # 카드 데이터 추출
  
        for card in cards:
            try:
                # 데이터 추출
                title = card.find("h5", class_="card-title").text.strip()
                description = card.find("p", class_="card-text").text.strip()

                # 데이터 생성
                post_data = {
                    "title": title,
                    "description": description,
                    "crawled_time": str(datetime.now())
                }

                # JSON Schema 검증
                try:
                    validate(instance=post_data, schema=schema)

                    # 중복 확인 및 데이터 저장
                    if not collection.find_one({"title": title, "description": description}):
                        collection.insert_one(post_data)
                        print(f"[INFO] Saved: {title}")
                    else:
                        print(f"[INFO] Skipped (duplicate): {title}")
                except ValidationError as ve:
                    print(f"[ERROR] abyss_crawler.py - crawl_page(): {ve.message}")

            except Exception as e:
                print(f"[ERROR] abyss_crawler.py - crawl_page(): {e}")

    except Exception as e:
        print(f"[ERROR] abyss_crawler.py - crawl_page(): {e}")

async def abyss(db):
    """
    Abyss 크롤러 실행 및 MongoDB 컬렉션에 데이터 저장 (비동기 실행)
    """
    collection = db["abyss"]  # MongoDB 컬렉션 선택

    # 프록시 주소 (Tor SOCKS5)
    proxy_address = "127.0.0.1:9050"

    # JSON Schema 정의
    schema = {
        "type": "object",
        "properties": {
            "title": {"type": "string"},
            "description": {"type": "string"},
            "crawled_time": {"type": "string"}
        },
        "required": ["title", "description"]
    }

    # 대상 URL 목록 (명세 준수)
    base_urls = [
        "http://3ev4metjirohtdpshsqlkrqcmxq6zu3d7obrdhglpy5jpbr7whmlfgqd.onion"
    ]

    # 비동기 실행
    tasks = [
        crawl_page(url, proxy_address, schema, collection) for url in base_urls
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    # MongoDB 연결 설정
    mongo_client = MongoClient("mongodb://localhost:27017/")
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(abyss(db))

blackbasta

darkleak

import os
import asyncio
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient  # 비동기 MongoDB 클라이언트

# JSON Schema 정의
SCHEMA = {
    "type": "object",
    "properties": {
        "file_name": {"type": "string"},
        "url": {"type": ["string", "null"]},
        "crawled_time": {"type": "string"}
    },
    "required": ["file_name", "url"]
}

# TOR Proxy 설정
TOR_PROXY = "socks5://127.0.0.1:9050"

async def fetch_page(page, url):
    """
    Playwright를 사용해 페이지를 가져오는 비동기 함수
    """
    try:
        print(f"[INFO] Fetching URL: {url}")
        await page.goto(url, timeout=60000)
        await asyncio.sleep(3)  # 페이지 로드 대기
        return await page.content()
    except Exception as e:
        print(f"[ERROR] darkleak_crawler.py - fetch_page(): {e}")
        return None

async def process_page(db, html, base_url):
    """
    HTML을 파싱하고 데이터를 MongoDB에 저장하는 함수
    """
    collection = db["darkleak"]
    try:
        soup = BeautifulSoup(html, "html.parser")
        rows = soup.find_all("tr", onclick=True)

        for row in rows:
            try:
                # 파일 이름 추출
                file_name = row.find("strong").text.strip()

                # onclick 속성에서 URL 추출
                onclick_attr = row.get("onclick")
                if onclick_attr and "window.location='" in onclick_attr:
                    relative_url = onclick_attr.split("'")[1]
                    full_url = f"{base_url}/{relative_url}"
                else:
                    full_url = None

                # 데이터 생성
                post_data = {
                    "file_name": file_name,
                    "url": full_url,
                    "crawled_time": str(datetime.now())
                }

                # JSON Schema 검증
                validate(instance=post_data, schema=SCHEMA)

                # 중복 확인 및 데이터 저장
                if not await collection.find_one({"file_name": file_name, "url": full_url}):
                    await collection.insert_one(post_data)
                    print(f"[INFO] Saved: {file_name}")
            except ValidationError as e:
                print(f"[ERROR] darkleak_crawler.py - process_page(): {e.message}")
            except Exception as e:
                print(f"[ERROR] darkleak_crawler.py - process_page(): {e}")

    except Exception as e:
        print(f"[ERROR] darkleak_crawler.py - process_page(): {e}")

async def darkleak(db):
    """
    DarkLeak 크롤러 실행 (비동기)
    """
    base_url = "http://darkleakyqmv62eweqwy4dnhaijg4m4dkburo73pzuqfdumcntqdokyd.onion"
    category_url = f"{base_url}/index.html"

    try:
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True, proxy={"server": TOR_PROXY})
            page = await browser.new_page()

            # 페이지 가져오기
            html = await fetch_page(page, category_url)
            if html:
                # 페이지 처리 및 데이터 저장
                await process_page(db, html, base_url)

            await browser.close()
    except Exception as e:
        print(f"[ERROR] darkleak_crawler.py - darkleak(): {e}")

if __name__ == "__main__":
    # 비동기 MongoDB 연결
    MONGO_URI = "mongodb://localhost:27017"
    mongo_client = AsyncIOMotorClient(MONGO_URI)
    db = mongo_client["your_database_name"]

    # 비동기 실행
    asyncio.run(darkleak(db))
profile
안드로이드는 리눅스의 꿈을 꾸는가

0개의 댓글