간단한 패킷 분석 프로그램 제작
프로그래밍 언어:
Python
개발 환경:Visual Studio Code
사용 모듈:Scapy
이번 프로젝트를 통해 네트워크 패킷 분석 도구를 직접 개발함으로써 네트워크 프로토콜과 트래픽 분석에 대한 실무 능력을 기르고자 하였다.
특히, Wireshark와 같은 전문 패킷 분석 도구의 기본 원리를 이해하고, 이를 토대로 간단한 Packet Sniffer 애플리케이션을 구현하는 것을 목표로 했다.
- 실시간 패킷 캡처: 네트워크 인터페이스를 통해 실시간으로 패킷을 캡처합니다.
- 패킷 정보 표시: 캡처한 패킷의
번호,도착 시간,소스 IP,목적지 IP,프로토콜,패킷 길이등을 표 형식으로 GUI에 표시합니다.- 프로토콜 통계: 캡처된 패킷의 프로토콜별 통계를 제공하여, 특정 프로토콜의 패킷 수를 보여줍니다.
- 시작 및 종료 기능: 버튼을 통해 손쉽게 패킷 캡처를 시작하고 종료할 수 있습니다.
- 디자인 요소: GUI의 배경색, 글꼴, 버튼 스타일 등을 설정하여 사용자가 보기 편한 인터페이스를 제공합니다.

import tkinter as tk
from tkinter import ttk
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from scapy.all import sniff, ARP, IP, TCP
import time
import threading
import queue
import scapy.layers.l2 as l2
# 프로토콜 번호 정의
protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
class PacketSnifferApp:
def __init__(self, root):
self.root = root
self.root.title("Packet Sniffer")
self.root.geometry("800x600")
# 스타일 설정
self.root.configure(bg="#e0f2f1")
self.root.option_add("*Font", "Helvetica 12")
self.root.option_add("*Label.Font", "Helvetica 18 bold")
self.root.option_add("*Button.Background", "white")
self.root.option_add("*Button.Foreground", "black")
self.root.option_add("*Button.Font", "Helvetica 14 bold")
self.root.option_add("*Treeview.Background", "white")
self.root.option_add("*Treeview.Foreground", "black")
self.root.option_add("*Treeview.Font", "Helvetica 12")
self.root.option_add("*Treeview.Heading.Font", "Helvetica 14 bold")
# GUI 요소 생성
self.create_widgets()
# 패킷 관련 변수 초기화
self.packet_count = 0
self.protocol_count = {'ICMP': 0, 'TCP': 0, 'UDP': 0}
# 패킷 큐 및 스레드 초기화
self.packet_queue = queue.Queue()
self.sniffing_thread = None
self.stop_sniffing = threading.Event()
# Matplotlib 그래프 초기화
self.fig, self.ax = plt.subplots(figsize=(6, 4))
self.canvas = FigureCanvasTkAgg(self.fig, master=self.root)
self.canvas.get_tk_widget().pack(pady=10, padx=20, fill=tk.BOTH, expand=True)
def create_widgets(self):
# 제목 레이블
self.label_title = tk.Label(self.root, text="Packet Sniffer", bg="#e0f2f1")
self.label_title.pack(pady=20)
# 패킷 로그 표
self.tree_packets = ttk.Treeview(self.root, columns=("No.", "Time", "Source IP", "Destination IP", "Source Port", "Destination Port", "Protocol", "Length"), selectmode="browse")
self.tree_packets.heading("#0", text="내용")
self.tree_packets.heading("#1", text="No.")
self.tree_packets.heading("#2", text="Time")
self.tree_packets.heading("#3", text="Source IP")
self.tree_packets.heading("#4", text="Destination IP")
self.tree_packets.heading("#5", text="Source Port")
self.tree_packets.heading("#6", text="Destination Port")
self.tree_packets.heading("#7", text="Protocol")
self.tree_packets.heading("#8", text="Length")
self.tree_packets.column("#0", width=50, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#1", width=50, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#2", width=150, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#3", width=150, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#4", width=150, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#5", width=100, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#6", width=100, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#7", width=100, stretch=tk.NO, anchor=tk.W)
self.tree_packets.column("#8", width=100, stretch=tk.NO, anchor=tk.W)
self.tree_packets.pack(pady=(0, 10), padx=20, fill=tk.BOTH, expand=True)
# 통계 제목 레이블
self.label_statistics = tk.Label(self.root, text="Statistics", bg="#e0f2f1")
self.label_statistics.pack(pady=10)
# 통계 내용 표
self.tree_statistics = ttk.Treeview(self.root, columns=("Protocol", "Count"), show="headings", height=3)
self.tree_statistics.heading("Protocol", text="Protocol")
self.tree_statistics.heading("Count", text="Count")
self.tree_statistics.column("Protocol", width=100, stretch=tk.NO, anchor=tk.W)
self.tree_statistics.column("Count", width=100, stretch=tk.NO, anchor=tk.W)
self.tree_statistics.pack(pady=10, padx=20, fill=tk.BOTH)
# 시작 버튼
self.button_start = tk.Button(self.root, text="Start Sniffing", command=self.start_sniffing)
self.button_start.pack(pady=10, padx=20)
# 중지 버튼
self.button_stop = tk.Button(self.root, text="Stop Sniffing", command=self.stop_sniffing_func)
self.button_stop.pack(pady=10)
def start_sniffing(self):
self.packet_count = 0
self.protocol_count = {'ICMP': 0, 'TCP': 0, 'UDP': 0}
self.tree_packets.delete(*self.tree_packets.get_children())
self.tree_statistics.delete(*self.tree_statistics.get_children())
self.stop_sniffing.clear()
# ARP 스캔 스레드 시작
arp_thread = threading.Thread(target=self.arp_scan_then_sniff)
arp_thread.start()
# GUI 업데이트 스케줄링
self.update_packets_in_gui()
# 통계 그래프 업데이트 스케줄링
self.root.after(1000, self.update_graph)
def arp_scan_then_sniff(self):
net = '192.168.146.188/24' # 핫스팟을 통해 연결된 네트워크 대역을 설정해주세요
ans, noans = l2.arping(net, timeout=1, verbose=True)
# ARP 스캔이 완료되면 패킷 스니핑 시작
self.sniff_packets()
def sniff_packets(self):
def packet_callback(packet):
self.packet_queue.put(packet)
sniff(prn=packet_callback, stop_filter=lambda x: self.stop_sniffing.is_set(), iface="Wi-Fi")
def update_packets_in_gui(self):
while not self.packet_queue.empty():
packet = self.packet_queue.get()
self.packet_count += 1
if IP in packet:
ip_layer = packet[IP]
ip_protocol = ip_layer.proto
if ip_protocol in protocol_map:
protocol_name = protocol_map[ip_protocol]
self.protocol_count[protocol_name] += 1
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
length = len(packet)
source_port = ""
destination_port = ""
# TCP 패킷인 경우에만 소스 포트와 목적지 포트를 추출
if TCP in packet:
tcp_layer = packet[TCP]
source_port = tcp_layer.sport
destination_port = tcp_layer.dport
self.tree_packets.insert("", "end", values=(self.packet_count, time_str, ip_layer.src, ip_layer.dst, source_port, destination_port, protocol_name, length))
self.update_statistics()
# 스레드가 종료되지 않았으면 일정 시간 후 다시 호출
if not self.stop_sniffing.is_set():
self.root.after(1000, self.update_packets_in_gui)
def update_statistics(self):
self.tree_statistics.delete(*self.tree_statistics.get_children())
for protocol, count in self.protocol_count.items():
self.tree_statistics.insert("", "end", values=(protocol, count))
def update_graph(self):
labels = list(self.protocol_count.keys())
counts = list(self.protocol_count.values())
self.ax.clear()
self.ax.bar(labels, counts, color=['blue', 'green', 'orange'])
self.ax.set_xlabel('Protocol')
self.ax.set_ylabel('Count')
self.ax.set_title('Protocol Distribution')
self.canvas.draw()
# 일정 시간마다 그래프 업데이트
self.root.after(1000, self.update_graph)
def stop_sniffing_func(self):
self.stop_sniffing.set()
# 스레드가 종료될 때까지 기다림
if self.sniffing_thread and self.sniffing_thread.is_alive():
self.sniffing_thread.join()
if __name__ == "__main__":
root = tk.Tk()
app = PacketSnifferApp(root)
root.mainloop()
# 사용자가 Enter를 누를 때까지 대기
input("Press Enter to exit...")
문제점: 네트워크 패킷을 캡처하기 위해서는 관리자 권한이 필요하다.
일반 사용자 권한으로 애플리케이션을 실행할 경우 패킷을 제대로 캡처하지 못하거나, 특정 인터페이스에 접근할 수 없는 문제가 발생했다.해결 방법: 이를 해결하기 위해 Windows 환경에서 애플리케이션을 관리자 권한으로 실행했다
문제점: 실시간으로 많은 패킷을 캡처하고 이를 GUI에 업데이트하는 과정에서 성능 이슈가 발생했다. 특히, 많은 패킷이 빠르게 들어오는 경우, GUI가 느려지거나 응답하지 않는 문제가 생겼다.
이는 패킷 캡처와 GUI 업데이트가 동일한 스레드에서 처리되기 때문이었다.
해결방법
스레딩: 패킷 캡처 작업을 별도의 스레드에서 처리하고, GUI 업데이트는 메인 스레드에서 수행하도록 구현했다. 이를 통해 패킷 캡처의 성능을 유지하면서도 GUI가 원활하게 동작하도록 했다.큐 사용: 패킷 캡처 스레드와 GUI 스레드 간의 데이터 교환을 위해
큐(Queue)를 사용했다. 캡처된 패킷을 큐에 저장하고, 메인 스레드에서는 주기적으로 큐에서 패킷을 가져와 GUI를 업데이트하도록 했다. 이렇게 하여 패킷 캡처와 GUI 업데이트 간의동기화 문제를 해결할 수 있었다패킷 필터링: 모든 패킷을 캡처하고 처리하는 대신, 사용자가 관심 있는 특정 프로토콜이나 조건에 맞는 패킷만 캡처하도록 필터링 기능을 추가했다.
예를 들어,TCP또는UDP패킷만 캡처하도록 설정할 수 있다.
이를 통해 불필요한 패킷 처리로 인한 성능 저하를 방지할 수 있습니다.
문제점: 패킷 캡처 중 일부 패킷이 손실되는 문제가 발생했다.
이는 특히 고속 네트워크 환경에서 빈번하게 발생했다.해결 방법: 패킷 캡처 버퍼 크기를 늘리고, 패킷 처리 속도를 최적화했습니다.
문제점: 시스템에 여러 네트워크 인터페이스가 있는 경우, 올바른 인터페이스를 선택하지 못해 패킷을 캡처하지 못하는 문제가 발생했다.
해결 방법: 사용자가 네트워크 인터페이스를 선택할 수 있는 기능을 추가했다.
초기화 단계에서 시스템의 네트워크 인터페이스 목록을 불러와 사용자가 선택하도록 하여, 원하는 인터페이스에서 패킷을 캡처할 수 있게 했다.