2024 개인 프로젝트

틈메이러·2024년 10월 4일

포트폴리오

목록 보기
9/11
post-thumbnail

🍅주제

간단한 패킷 분석 프로그램 제작


1. 개발환경

프로그래밍 언어: Python
개발 환경: Visual Studio Code
사용 모듈: Scapy


2. 개발배경

이번 프로젝트를 통해 네트워크 패킷 분석 도구를 직접 개발함으로써 네트워크 프로토콜과 트래픽 분석에 대한 실무 능력을 기르고자 하였다.
특히, Wireshark와 같은 전문 패킷 분석 도구의 기본 원리를 이해하고, 이를 토대로 간단한 Packet Sniffer 애플리케이션을 구현하는 것을 목표로 했다.


3. 기능

  1. 실시간 패킷 캡처: 네트워크 인터페이스를 통해 실시간으로 패킷을 캡처합니다.
  2. 패킷 정보 표시: 캡처한 패킷의 번호, 도착 시간, 소스 IP, 목적지 IP, 프로토콜, 패킷 길이 등을 표 형식으로 GUI에 표시합니다.
  3. 프로토콜 통계: 캡처된 패킷의 프로토콜별 통계를 제공하여, 특정 프로토콜의 패킷 수를 보여줍니다.
  4. 시작 및 종료 기능: 버튼을 통해 손쉽게 패킷 캡처를 시작하고 종료할 수 있습니다.
  5. 디자인 요소: GUI의 배경색, 글꼴, 버튼 스타일 등을 설정하여 사용자가 보기 편한 인터페이스를 제공합니다.

4. 애플리케이션 실행 화면


5. 코드

import tkinter as tk
from tkinter import ttk
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from scapy.all import sniff, ARP, IP, TCP
import time
import threading
import queue
import scapy.layers.l2 as l2

# 프로토콜 번호 정의
protocol_map = {
    1: 'ICMP',
    6: 'TCP',
    17: 'UDP'
}

class PacketSnifferApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Packet Sniffer")
        self.root.geometry("800x600")

        # 스타일 설정
        self.root.configure(bg="#e0f2f1")
        self.root.option_add("*Font", "Helvetica 12")
        self.root.option_add("*Label.Font", "Helvetica 18 bold")
        self.root.option_add("*Button.Background", "white")
        self.root.option_add("*Button.Foreground", "black")
        self.root.option_add("*Button.Font", "Helvetica 14 bold")
        self.root.option_add("*Treeview.Background", "white")
        self.root.option_add("*Treeview.Foreground", "black")
        self.root.option_add("*Treeview.Font", "Helvetica 12")
        self.root.option_add("*Treeview.Heading.Font", "Helvetica 14 bold")

        # GUI 요소 생성
        self.create_widgets()

        # 패킷 관련 변수 초기화
        self.packet_count = 0
        self.protocol_count = {'ICMP': 0, 'TCP': 0, 'UDP': 0}

        # 패킷 큐 및 스레드 초기화
        self.packet_queue = queue.Queue()
        self.sniffing_thread = None
        self.stop_sniffing = threading.Event()

        # Matplotlib 그래프 초기화
        self.fig, self.ax = plt.subplots(figsize=(6, 4))
        self.canvas = FigureCanvasTkAgg(self.fig, master=self.root)
        self.canvas.get_tk_widget().pack(pady=10, padx=20, fill=tk.BOTH, expand=True)

    def create_widgets(self):
        # 제목 레이블
        self.label_title = tk.Label(self.root, text="Packet Sniffer", bg="#e0f2f1")
        self.label_title.pack(pady=20)

        # 패킷 로그 표
        self.tree_packets = ttk.Treeview(self.root, columns=("No.", "Time", "Source IP", "Destination IP", "Source Port", "Destination Port", "Protocol", "Length"), selectmode="browse")
        self.tree_packets.heading("#0", text="내용")
        self.tree_packets.heading("#1", text="No.")
        self.tree_packets.heading("#2", text="Time")
        self.tree_packets.heading("#3", text="Source IP")
        self.tree_packets.heading("#4", text="Destination IP")
        self.tree_packets.heading("#5", text="Source Port")
        self.tree_packets.heading("#6", text="Destination Port")
        self.tree_packets.heading("#7", text="Protocol")
        self.tree_packets.heading("#8", text="Length")
        self.tree_packets.column("#0", width=50, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#1", width=50, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#2", width=150, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#3", width=150, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#4", width=150, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#5", width=100, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#6", width=100, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#7", width=100, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.column("#8", width=100, stretch=tk.NO, anchor=tk.W)
        self.tree_packets.pack(pady=(0, 10), padx=20, fill=tk.BOTH, expand=True)

        # 통계 제목 레이블
        self.label_statistics = tk.Label(self.root, text="Statistics", bg="#e0f2f1")
        self.label_statistics.pack(pady=10)

        # 통계 내용 표
        self.tree_statistics = ttk.Treeview(self.root, columns=("Protocol", "Count"), show="headings", height=3)
        self.tree_statistics.heading("Protocol", text="Protocol")
        self.tree_statistics.heading("Count", text="Count")
        self.tree_statistics.column("Protocol", width=100, stretch=tk.NO, anchor=tk.W)
        self.tree_statistics.column("Count", width=100, stretch=tk.NO, anchor=tk.W)
        self.tree_statistics.pack(pady=10, padx=20, fill=tk.BOTH)

        # 시작 버튼
        self.button_start = tk.Button(self.root, text="Start Sniffing", command=self.start_sniffing)
        self.button_start.pack(pady=10, padx=20)

        # 중지 버튼
        self.button_stop = tk.Button(self.root, text="Stop Sniffing", command=self.stop_sniffing_func)
        self.button_stop.pack(pady=10)

    def start_sniffing(self):
        self.packet_count = 0
        self.protocol_count = {'ICMP': 0, 'TCP': 0, 'UDP': 0}
        self.tree_packets.delete(*self.tree_packets.get_children())
        self.tree_statistics.delete(*self.tree_statistics.get_children())
        self.stop_sniffing.clear()

        # ARP 스캔 스레드 시작
        arp_thread = threading.Thread(target=self.arp_scan_then_sniff)
        arp_thread.start()

        # GUI 업데이트 스케줄링
        self.update_packets_in_gui()

        # 통계 그래프 업데이트 스케줄링
        self.root.after(1000, self.update_graph)

    def arp_scan_then_sniff(self):
        net = '192.168.146.188/24'   # 핫스팟을 통해 연결된 네트워크 대역을 설정해주세요

        ans, noans = l2.arping(net, timeout=1, verbose=True)

        # ARP 스캔이 완료되면 패킷 스니핑 시작
        self.sniff_packets()

    def sniff_packets(self):
        def packet_callback(packet):
            self.packet_queue.put(packet)

        sniff(prn=packet_callback, stop_filter=lambda x: self.stop_sniffing.is_set(), iface="Wi-Fi")

    def update_packets_in_gui(self):
        while not self.packet_queue.empty():
            packet = self.packet_queue.get()
            self.packet_count += 1

            if IP in packet:
                ip_layer = packet[IP]
                ip_protocol = ip_layer.proto

                if ip_protocol in protocol_map:
                    protocol_name = protocol_map[ip_protocol]
                    self.protocol_count[protocol_name] += 1

                    time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
                    length = len(packet)
                    source_port = ""
                    destination_port = ""

                    # TCP 패킷인 경우에만 소스 포트와 목적지 포트를 추출
                    if TCP in packet:
                        tcp_layer = packet[TCP]
                        source_port = tcp_layer.sport
                        destination_port = tcp_layer.dport

                    self.tree_packets.insert("", "end", values=(self.packet_count, time_str, ip_layer.src, ip_layer.dst, source_port, destination_port, protocol_name, length))

                    self.update_statistics()

        # 스레드가 종료되지 않았으면 일정 시간 후 다시 호출
        if not self.stop_sniffing.is_set():
            self.root.after(1000, self.update_packets_in_gui)

    def update_statistics(self):
        self.tree_statistics.delete(*self.tree_statistics.get_children())
        for protocol, count in self.protocol_count.items():
            self.tree_statistics.insert("", "end", values=(protocol, count))

    def update_graph(self):
        labels = list(self.protocol_count.keys())
        counts = list(self.protocol_count.values())

        self.ax.clear()
        self.ax.bar(labels, counts, color=['blue', 'green', 'orange'])
        self.ax.set_xlabel('Protocol')
        self.ax.set_ylabel('Count')
        self.ax.set_title('Protocol Distribution')
        self.canvas.draw()

        # 일정 시간마다 그래프 업데이트
        self.root.after(1000, self.update_graph)

    def stop_sniffing_func(self):
        self.stop_sniffing.set()
        # 스레드가 종료될 때까지 기다림
        if self.sniffing_thread and self.sniffing_thread.is_alive():
            self.sniffing_thread.join()

if __name__ == "__main__":
    root = tk.Tk()
    app = PacketSnifferApp(root)
    root.mainloop()

    # 사용자가 Enter를 누를 때까지 대기
    input("Press Enter to exit...")


🍅문제점 및 해결방법

1. 권한 문제

문제점: 네트워크 패킷을 캡처하기 위해서는 관리자 권한이 필요하다.
일반 사용자 권한으로 애플리케이션을 실행할 경우 패킷을 제대로 캡처하지 못하거나, 특정 인터페이스에 접근할 수 없는 문제가 발생했다.

해결 방법: 이를 해결하기 위해 Windows 환경에서 애플리케이션을 관리자 권한으로 실행했다


2. 성능 문제

문제점: 실시간으로 많은 패킷을 캡처하고 이를 GUI에 업데이트하는 과정에서 성능 이슈가 발생했다. 특히, 많은 패킷이 빠르게 들어오는 경우, GUI가 느려지거나 응답하지 않는 문제가 생겼다.

이는 패킷 캡처와 GUI 업데이트가 동일한 스레드에서 처리되기 때문이었다.

해결방법
스레딩: 패킷 캡처 작업을 별도의 스레드에서 처리하고, GUI 업데이트는 메인 스레드에서 수행하도록 구현했다. 이를 통해 패킷 캡처의 성능을 유지하면서도 GUI가 원활하게 동작하도록 했다.

큐 사용: 패킷 캡처 스레드와 GUI 스레드 간의 데이터 교환을 위해 큐(Queue)를 사용했다. 캡처된 패킷을 큐에 저장하고, 메인 스레드에서는 주기적으로 큐에서 패킷을 가져와 GUI를 업데이트하도록 했다. 이렇게 하여 패킷 캡처와 GUI 업데이트 간의 동기화 문제를 해결할 수 있었다

패킷 필터링: 모든 패킷을 캡처하고 처리하는 대신, 사용자가 관심 있는 특정 프로토콜이나 조건에 맞는 패킷만 캡처하도록 필터링 기능을 추가했다.
예를 들어, TCP 또는 UDP 패킷만 캡처하도록 설정할 수 있다.
이를 통해 불필요한 패킷 처리로 인한 성능 저하를 방지할 수 있습니다.


3. 패킷 손실 문제

문제점: 패킷 캡처 중 일부 패킷이 손실되는 문제가 발생했다.
이는 특히 고속 네트워크 환경에서 빈번하게 발생했다.

해결 방법: 패킷 캡처 버퍼 크기를 늘리고, 패킷 처리 속도를 최적화했습니다.


4. 무선 네트워크 인터페이스 선택 문제

문제점: 시스템에 여러 네트워크 인터페이스가 있는 경우, 올바른 인터페이스를 선택하지 못해 패킷을 캡처하지 못하는 문제가 발생했다.

해결 방법: 사용자가 네트워크 인터페이스를 선택할 수 있는 기능을 추가했다.
초기화 단계에서 시스템의 네트워크 인터페이스 목록을 불러와 사용자가 선택하도록 하여, 원하는 인터페이스에서 패킷을 캡처할 수 있게 했다.

profile
나는야 멋쟁이 토마토

0개의 댓글