개발 세션 - 정보 추가 수집

goldenGlow_21·2024년 12월 20일
0

Blackbasta 크롤러

import json
import os
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup

# ChromeDriver 경로 (프로젝트 폴더 내에 위치한 chromedriver.exe)
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")

# Tor 프록시 설정
proxy_address = "127.0.0.1:9050"  # Tor SOCKS5 프록시 주소

# Selenium WebDriver 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")  # Tor 프록시 사용
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36")

# WebDriver 초기화
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)

# 크롤링 대상 URL
base_url = "http://stniiomyjliimcgkvdszvgen3eaaoz55hreqqx6o77yvmpwt7gklffqd.onion"
category_url = f"{base_url}/"

# 데이터 저장용 리스트
all_data = []

# 크롤링 함수
def crawl_blackbasta():
    try:
        # Selenium으로 페이지 열기
        driver.get(category_url)

        # JavaScript 로딩 대기 (명시적으로 요소가 로드될 때까지 대기)
        WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.CLASS_NAME, "title"))
        )

        # BeautifulSoup으로 HTML 파싱
        soup = BeautifulSoup(driver.page_source, "html.parser")
        posts = soup.find_all("div", class_="title")

        for post in posts:
            try:
                title_element = post.find("a", class_="blog_name_link")
                if not title_element:
                    continue

                title = title_element.text.strip()
                url = title_element["href"].strip()

                # Description 추출 (p 태그에서 data-v-md-line="3"만 선택)
                description_element = post.find_next("p", {"data-v-md-line": "3"})
                description = description_element.get_text(strip=True) if description_element else ""

                # 데이터 저장
                post_data = {
                    "title": title,
                    "url": url,
                    "description": description,
                }

                all_data.append(post_data)
                print(f"추출 완료: {title}")

            except Exception as e:
                print(f"데이터 추출 중 오류 발생: {e}")

        # JSON 파일로 저장
        with open("blackbasta.json", "w", encoding="utf-8") as f:
            json.dump(all_data, f, ensure_ascii=False, indent=4)
        print("blackbasta.json 파일 저장 완료.")

    except Exception as e:
        print(f"크롤링 중 오류 발생: {e}")

    finally:
        driver.quit()

if __name__ == "__main__":
    crawl_blackbasta()

크롤러 데이터 반환 형태 수정 및 JSON Schema 적용

abyss

import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import time
from jsonschema import validate, ValidationError

# ChromeDriver 경로
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")

# TOR 프록시 설정
proxy_address = "127.0.0.1:9050"  # TOR SOCKS5 프록시 주소

# Selenium WebDriver 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")  # TOR 프록시 사용

# WebDriver 초기화
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)

# 크롤링 대상 URL
base_url = "http://3ev4metjirohtdpshsqlkrqcmxq6zu3d7obrdhglpy5jpbr7whmlfgqd.onion"

# JSON Schema 정의
schema = {
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "title": {"type": "string"},
            "description": {"type": "string"}
        },
        "required": ["title", "description"]
    }
}

def crawl_posts():
    # Selenium으로 페이지 열기
    driver.get(base_url)
    time.sleep(5)  # 페이지 로딩 대기

    # BeautifulSoup으로 HTML 파싱
    soup = BeautifulSoup(driver.page_source, "html.parser")
    cards = soup.find_all("div", class_="card-body")  # 카드 뉴스 데이터 추출

    results = []

    for card in cards:
        try:
            title = card.find("h5", class_="card-title").text.strip()  # 제목 추출
            description = card.find("p", class_="card-text").text.strip()  # 설명 추출
            post_data = {"title": title, "description": description}

            results.append(post_data)
            print(f"추출 완료: {title}")

        except Exception as e:
            print(f"크롤링 중 오류 발생: {e}")

    # JSON Schema 검증
    try:
        validate(instance=results, schema=schema)
        print("데이터 검증 성공!")
    except ValidationError as ve:
        print(f"데이터 검증 실패: {ve}")

    # JSON 파일 저장 (테스트용, 실제 사용 시 반환만 수행)
    # with open("abyss.json", "w", encoding="utf-8") as f:
    #     json.dump(results, f, ensure_ascii=False, indent=4)
    # print("abyss.json 파일 저장 완료.")

    return results

if __name__ == "__main__":
    try:
        data = crawl_posts()
        print(json.dumps(data, ensure_ascii=False, indent=4))  # 결과 출력
    finally:
        # WebDriver 종료
        driver.quit()

blackbasta

import json
import os
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError

# ChromeDriver 경로 (프로젝트 폴더 내에 위치한 chromedriver.exe)
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")

# Tor 프록시 설정
proxy_address = "127.0.0.1:9050"  # Tor SOCKS5 프록시 주소

# Selenium WebDriver 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")  # Tor 프록시 사용
chrome_options.add_argument(
    "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
)

# WebDriver 초기화
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)

# 크롤링 대상 URL
base_url = "http://stniiomyjliimcgkvdszvgen3eaaoz55hreqqx6o77yvmpwt7gklffqd.onion"
category_url = f"{base_url}/"

# JSON Schema 정의
schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string"},
        "url": {"type": "string", "format": "uri"},
        "description": {"type": "string"},
    },
    "required": ["title", "url", "description"],
}

# 크롤링 함수
def crawl_blackbasta():
    all_data = []
    try:
        # Selenium으로 페이지 열기
        driver.get(category_url)

        # JavaScript 로딩 대기 (명시적으로 요소가 로드될 때까지 대기)
        WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.CLASS_NAME, "title"))
        )

        # BeautifulSoup으로 HTML 파싱
        soup = BeautifulSoup(driver.page_source, "html.parser")
        posts = soup.find_all("div", class_="title")

        for post in posts:
            try:
                title_element = post.find("a", class_="blog_name_link")
                if not title_element:
                    continue

                title = title_element.text.strip()
                url = title_element["href"].strip()

                # Description 추출 (p 태그에서 data-v-md-line="3"만 선택)
                description_element = post.find_next("p", {"data-v-md-line": "3"})
                description = (
                    description_element.get_text(strip=True)
                    if description_element
                    else ""
                )

                # 데이터 저장
                post_data = {
                    "title": title,
                    "url": url,
                    "description": description,
                }

                # JSON Schema 검증
                try:
                    validate(instance=post_data, schema=schema)
                    all_data.append(post_data)
                    print(f"추출 완료: {title}")
                except ValidationError as e:
                    print(f"데이터 검증 실패: {e.message}")

            except Exception as e:
                print(f"데이터 추출 중 오류 발생: {e}")

        # JSON 파일로 저장 (주석 처리)
        # with open("blackbasta.json", "w", encoding="utf-8") as f:
        #     json.dump(all_data, f, ensure_ascii=False, indent=4)
        # print("blackbasta.json 파일 저장 완료.")

        return all_data

    except Exception as e:
        print(f"크롤링 중 오류 발생: {e}")
        return []

    finally:
        driver.quit()


if __name__ == "__main__":
    result = crawl_blackbasta()
    print(result)

breachdetector

from telethon import TelegramClient
import json
import os
from jsonschema import validate, ValidationError

# 해당 크롤러는 환경 변수에서 API 정보를 가져옵니다.
# 환경 변수 설정 방법:
# Windows:
#   set TELEGRAM_API_ID=<Your API ID>
#   set TELEGRAM_API_HASH=<Your API Hash>
#
# Linux/macOS:
#   export TELEGRAM_API_ID=<Your API ID>
#   export TELEGRAM_API_HASH=<Your API Hash>

# 1. 텔레그램 정보
api_id = os.getenv("TELEGRAM_API_ID")  # TELEGRAM_API_ID 환경 변수에서 가져오기
api_hash = os.getenv("TELEGRAM_API_HASH")  # TELEGRAM_API_HASH 환경 변수에서 가져오기
channel_username = "breachdetector"  # 채널 이름

if not api_id or not api_hash:
    raise EnvironmentError(
        "API ID 또는 API Hash가 설정되지 않았습니다. "
        "환경 변수 TELEGRAM_API_ID와 TELEGRAM_API_HASH를 설정해주세요."
    )

# 2. JSON Schema 정의
schema = {
    "type": "object",
    "properties": {
        "content": {"type": "string"},
        "date": {"type": "string", "format": "date-time"},
        "sender_id": {"type": ["string", "number"]}
    },
    "required": ["content", "date", "sender_id"]
}

# 3. Telegram Client 초기화
client = TelegramClient("session_name", api_id, api_hash)

async def fetch_messages():
    await client.start()  # 클라이언트 시작
    messages = await client.get_messages(channel_username, limit=100)  # 메시지 가져오기

    data = []  # 데이터를 저장할 리스트

    for message in messages:
        try:
            text = message.text
            if text:
                # JSON 형식으로 저장
                entry = {
                    "content": text,
                    "date": str(message.date),  # 메시지 작성 시간
                    "sender_id": message.sender_id  # 작성자 ID
                }

                # JSON Schema 검증
                try:
                    validate(instance=entry, schema=schema)
                    data.append(entry)
                    print(f"메시지 저장 완료: {entry}")
                except ValidationError as e:
                    print(f"데이터 검증 실패: {e.message}")

        except Exception as e:
            print(f"오류 발생: {e}")

    # JSON 파일로 저장 (주석 처리)
    # with open('breachdetector.json', 'w', encoding='utf-8') as f:
    #     json.dump(data, f, ensure_ascii=False, indent=4)
    # print("데이터 저장 완료: breachdetector.json")

    return data

# 비동기 함수 실행
def crawl_breachdetector():
    with client:
        return client.loop.run_until_complete(fetch_messages())

if __name__ == "__main__":
    result = crawl_breachdetector()
    print(result)
  • telethon 사용을 위한 api id / 해쉬 값은 사용자의 실행 환경에서 환경변수로 지정해놓아야 함

ctifeeds

import json
import os
import requests
from jsonschema import validate, ValidationError

# JSON 데이터 URL 목록 및 카테고리 이름
json_sources = [
    {"url": "https://ctifeeds.andreafortuna.org/dataleaks.json", "categories": "dataleaks"},
    {"url": "https://ctifeeds.andreafortuna.org/cybercrime_on_telegram.json", "categories": "cybercrime_on_telegram"},
    {"url": "https://ctifeeds.andreafortuna.org/phishing_sites.json", "categories": "phishing_sites"},
    {"url": "https://ctifeeds.andreafortuna.org/datamarkets.json", "categories": "datamarkets"},
    {"url": "https://ctifeeds.andreafortuna.org/ransomware_victims.json", "categories": "ransomware_victims"},
    {"url": "https://ctifeeds.andreafortuna.org/recent_defacements.json", "categories": "recent_defacements"},
]

# JSON Schema 정의
schema = {
    "type": "object",
    "properties": {
        "categories": {"type": "string"},
        "name": {"type": "string"},
        "url": {"type": "string"},
        "source": {"type": "string"},
        "screenshot": {"type": ["string", "null"]},
        "urlscan": {"type": ["string", "null"]}
    },
    "required": ["categories", "name", "url", "source"]
}

# 데이터 저장용 리스트
all_data = []

def fetch_json_data():
    for source in json_sources:
        try:
            response = requests.get(source["url"], timeout=10)
            response.raise_for_status()
            
            # JSON 데이터를 로드
            data = response.json()

            # 카테고리 항목 추가 및 데이터 처리
            for item in data:
                item["categories"] = source["categories"]

                # JSON Schema 검증
                try:
                    validate(instance=item, schema=schema)
                    all_data.append(item)
                except ValidationError as e:
                    print(f"데이터 검증 실패 ({source['categories']}): {e.message}")

            print(f"데이터 수집 완료: {source['categories']}")
        except Exception as e:
            print(f"데이터 수집 중 오류 발생 ({source['categories']}): {e}")

    # 결과 데이터를 JSON 파일로 저장 (주석 처리)
    # with open("ctifeeds_data.json", "w", encoding="utf-8") as f:
    #     json.dump(all_data, f, ensure_ascii=False, indent=4)
    # print("ctifeeds_data.json 파일 저장 완료.")

    return all_data

if __name__ == "__main__":
    result = fetch_json_data()
    print(result)

darkleak

import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
import time

# ChromeDriver 경로
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")

# TOR 프록시 설정
proxy_address = "127.0.0.1:9050"  # TOR SOCKS5 프록시 주소

# Selenium WebDriver 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")  # TOR 프록시 사용

# WebDriver 초기화
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)

# 크롤링 대상 URL
base_url = "http://darkleakyqmv62eweqwy4dnhaijg4m4dkburo73pzuqfdumcntqdokyd.onion"
category_url = f"{base_url}/index.html"

# JSON Schema 정의
schema = {
    "type": "object",
    "properties": {
        "file_name": {"type": "string"},
        "url": {"type": ["string", "null"]}
    },
    "required": ["file_name", "url"]
}

# 데이터 저장용 리스트
all_data = []

def crawl_files():
    try:
        # Selenium으로 페이지 열기
        driver.get(category_url)
        time.sleep(5)  # 페이지 로딩 대기

        # BeautifulSoup으로 HTML 파싱
        soup = BeautifulSoup(driver.page_source, "html.parser")
        rows = soup.find_all("tr", onclick=True)  # 클릭 가능한 행 추출

        for row in rows:
            try:
                file_name = row.find("strong").text.strip()  # 파일 이름 추출

                # onclick 속성에서 URL 추출
                onclick_attr = row.get("onclick")
                if onclick_attr and "window.location='" in onclick_attr:
                    relative_url = onclick_attr.split("'")[1]  # URL 경로 추출
                    full_url = f"{base_url}/{relative_url}"  # 전체 URL 생성
                else:
                    full_url = None  # URL이 없는 경우

                # 데이터 저장
                post_data = {
                    "file_name": file_name,
                    "url": full_url
                }

                # JSON Schema 검증
                try:
                    validate(instance=post_data, schema=schema)
                    all_data.append(post_data)
                    print(f"추출 완료: {file_name}, URL: {full_url}")
                except ValidationError as e:
                    print(f"데이터 검증 실패: {e.message}")

            except Exception as e:
                print(f"데이터 추출 중 오류 발생: {e}")

        # JSON 파일로 저장 (주석 처리)
        # with open("darkleak.json", "w", encoding="utf-8") as f:
        #     json.dump(all_data, f, ensure_ascii=False, indent=4)
        # print("darkleak.json 파일 저장 완료.")

        return all_data

    finally:
        driver.quit()

if __name__ == "__main__":
    result = crawl_files()
    print(result)

island

import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from jsonschema import validate, ValidationError
import time

# ChromeDriver 경로 (프로젝트 폴더 내에 위치한 chromedriver.exe)
current_dir = os.path.dirname(os.path.abspath(__file__))
chromedriver_path = os.path.join(current_dir, "chromedriver.exe")

# TOR 프록시 설정
proxy_address = "127.0.0.1:9050"  # TOR SOCKS5 프록시 주소

# Selenium WebDriver 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument(f"--proxy-server=socks5://{proxy_address}")  # TOR 프록시 사용

# WebDriver 초기화
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)

# 크롤링 대상 URL (onion 사이트)
base_url = "https://crackingisland.net/"  # onion 사이트 URL로 변경
category_url = f"{base_url}/categories/combolists"

# JSON Schema 정의
schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string"},
        "url": {"type": "string"},
        "type": {"type": "string"},
        "dateCreated": {"type": "string"},
        "description": {"type": "string"}
    },
    "required": ["title", "url", "type", "dateCreated", "description"]
}

# 데이터 저장용 리스트
all_data = []

def crawl_combolists():
    try:
        # Selenium으로 페이지 열기
        driver.get(category_url)
        # JavaScript 로딩 대기
        time.sleep(5)

        # BeautifulSoup으로 HTML 파싱
        soup = BeautifulSoup(driver.page_source, "html.parser")
        posts = soup.find_all("a", itemprop="url")

        for post in posts:
            try:
                title = post.find("h2", itemprop="headline").text.strip()
                post_url = base_url + post["href"]
                post_type = post.find("span", itemprop="about").text.strip()
                post_date = post.find("span", itemprop="dateCreated").text.strip()
                description = post.find("p", itemprop="text").text.strip()

                # 데이터 저장
                post_data = {
                    "title": title,
                    "url": post_url,
                    "type": post_type,
                    "dateCreated": post_date,
                    "description": description,
                }

                # JSON Schema 검증
                try:
                    validate(instance=post_data, schema=schema)
                    all_data.append(post_data)
                    print(f"추출 완료: {title}")
                except ValidationError as e:
                    print(f"데이터 검증 실패: {e.message}")

            except Exception as e:
                print(f"크롤링 중 오류 발생: {e}")

        # JSON 파일로 저장 (주석 처리)
        # with open("test.json", "w", encoding="utf-8") as f:
        #     json.dump(all_data, f, ensure_ascii=False, indent=4)
        # print("test.json 파일 저장 완료.")

        return all_data

    finally:
        driver.quit()

if __name__ == "__main__":
    result = crawl_combolists()
    print(result)
profile
안드로이드는 리눅스의 꿈을 꾸는가

0개의 댓글