黑帽python第二版(Black Hat Python 2nd Edition)讀書筆記 之 第三章 網絡工程-原始套接字與嗅探(3)解碼ICMP
文章目錄
- 黑帽python第二版(Black Hat Python 2nd Edition)讀書筆記 之 第三章 網絡工程-原始套接字與嗅探(3)解碼ICMP
- 解碼ICMP
- 小試牛刀
-
- 在linux上運作
- 在windows主機上運作
- 關于ipaddress子產品
- 總結
解碼ICMP
現在我們可以完全解碼任何嗅探到的資料包的IP層,接下來我們必須能夠解碼ICMP響應,掃描工具将從發送UDP資料報到關閉的端口中擷取ICMP響應。ICMP消息的内容可能有很大差異,但每條消息都包含三個保持一緻的元素:類型、代碼、校驗和字段。類型和代碼字段告訴接收主機ICMP消息的類型,以便能夠正确解碼。
對于掃描工具,我們希望查找類型值為3和代碼值為3的包。類型值為3對應于ICMP消息的Destination Unreachable類,代碼值為3表示收到了端口不可達錯誤。下圖描述了Destination Unreachable的ICMP消息的示意。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLjlTOjlDO1QjZ2EjYhNTM5ATNwQzMmZmYmhTZyUmMkJ2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
正如讀者所看到的,前8位是類型,後8位包含我們的ICMP代碼。需要注意的一件有趣的事情是,當主機發送其中一條ICMP消息時,它實際上包括生成響應的原始消息的IP頭。我們還可以看到,我們将對發送的原始資料報的8個位元組進行雙重檢查,以確定掃描工具生成ICMP響應。為此,我們隻需将接收到的緩沖區的最後8個位元組切去,以提取掃描工具發送的魔法字元串。
讓我們在之前的嗅探器中添加更多代碼,使其具備解碼ICMP資料包的能力。讓我們将上一個檔案另存為sniffer_with_icmp.py并添加以下代碼:
import ipaddress
import os
import socket
import struct
import sys
from typing import Protocol
class IP:
def __init__(self, buff=None):
header = struct.unpack('<BBHHHBBH4s4s', buff)
self.ver = header[0] >> 4
self.ihl = header[0] & 0xF
self.tos = header[1]
self.len = header[2]
self.id = header[3]
self.offset = header[4]
self.ttl = header[5]
self.protocol_num = header[6]
self.sum = header[7]
self.src = header[8]
self.dst = header[9]
# human readable IP addresses
self.src_address = ipaddress.ip_address(self.src)
self.dst_address = ipaddress.ip_address(self.dst)
# map protocol constants to their names
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}
try:
self.protocol = self.protocol_map[self.protocol_num]
except Exception as e:
print('%s No protocol for %s' % (e, self.protocol_num))
self.protocol = str(self.protocol_num)
class ICMP:
def __init__(self, buff):
header = struct.unpack('<BBHHH', buff)
self.type = header[0]
self.code = header[1]
self.sum = header[2]
self.id = header[3]
self.seq = header[4]
def sniff(host):
# should look familiar from previous example
if os.name == 'nt':
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
try:
while True:
# read a packet
raw_buffer = sniffer.recvfrom(65535)[0]
# create an IP header from the first 20 bytes
ip_header = IP(raw_buffer[0:20])
# if it's ICMP, we want it
if ip_header.protocol == 'ICMP':
print('Protocol: %s %s -> %s' % (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
print(f'Version: {ip_header.ver}')
print(f'Header Length: {ip_header.ihl} TTL: {ip_header.ttl}')
# calculate where out ICMP packet starts
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + 8]
# create our ICMP structure
icmp_header = ICMP(buf)
print('ICMP -> Type: %s Code: %s\n' % (icmp_header.type, icmp_header.code))
# print the detected protocol and hosts
# print('Protocol: %s %s -> %s' % (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
except KeyboardInterrupt:
# if we're on Windows, turn off proniscuous mode
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
sys.exit()
if __name__ == '__main__':
if len(sys.argv) == 2:
host = sys.argv[1]
else:
host = '192.168.65.141'
sniff(host)
上面這段簡單的代碼在我們已有的IP結構下建立了一個ICMP的結構。當在擷取包的主循環中識别出我們收到的是ICMP的包時,我們計算ICMP消息體在原始包中的偏移量,然後建立buffer并列印出ICMP的type屬性和code屬性。長度計算基于IP頭的ihl字段,ihl屬性訓示了IP頭部包含的32位字(4位元組塊)的數量。是以,通過将該字段的值乘以4,我們可以知道IP頭的大小,進而知道下一個網絡層(本例中為ICMP)資料的開始位置。
如果我們使用典型的ping測試快速運作這段代碼,輸出内容應該略有不同:
說明:這裡跟原書中有點不太一樣,原書中隻列印出了上圖中的第一行和最後一行,沒有列印出中間的兩行。原書中的代碼不可能運作出如下所示的結果。
因為上述代碼中明顯包含了列印version和header length的語句。
這說明ping(ICMP Echo)指令的響應被正确接收并解析。接下來我們将會實施本章的最後一部分,發出UDP資料報并解析其結果。
下面,我們添加ipaddress子產品的适用,這樣我們就可以通過主機發現掃描整個子網。将上述代碼另存為scanner.py,然後添加下面的代碼。
import ipaddress
import os
import socket
import struct
import sys
import threading
import time
# subnet to target
SUBNET = '192.168.0.1/24'
# magic string we'll check ICMP responses for
MESSAGE = 'PYTHONRULES!'
class IP:
def __init__(self, buff=None):
header = struct.unpack('<BBHHHBBH4s4s', buff)
self.ver = header[0] >> 4
self.ihl = header[0] & 0xF
self.tos = header[1]
self.len = header[2]
self.id = header[3]
self.offset = header[4]
self.ttl = header[5]
self.protocol_num = header[6]
self.sum = header[7]
self.src = header[8]
self.dst = header[9]
# human readable IP addresses
self.src_address = ipaddress.ip_address(self.src)
self.dst_address = ipaddress.ip_address(self.dst)
# map protocol constants to their names
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}
try:
self.protocol = self.protocol_map[self.protocol_num]
except Exception as e:
print('%s No protocol for %s' % (e, self.protocol_num))
self.protocol = str(self.protocol_num)
class ICMP:
def __init__(self, buff):
header = struct.unpack('<BBHHH', buff)
self.type = header[0]
self.code = header[1]
self.sum = header[2]
self.id = header[3]
self.seq = header[4]
# this sprays out UDP datagrams with our magic message
def udp_sender():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sender:
for ip in ipaddress.ip_network(SUBNET).hosts():
sender.sendto(bytes(MESSAGE, 'utf8'), (str(ip), 65212))
class Scanner:
def __init__(self, host):
self.host = host
if os.name == 'nt':
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
self.socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
self.socket.bind((host, 0))
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == 'nt':
self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
def sniff(self):
hosts_up = set([f'{str(self.host)} *'])
try:
while True:
# read a packet
raw_buffer = self.socket.recvfrom(65535)[0]
# create an IP header from the first 20 bytes
ip_header = IP(raw_buffer[0:20])
# if it's ICMP, we want it
if ip_header.protocol == 'ICMP':
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + 8]
icmp_header = ICMP(buf)
# check for TYPE 3 and CODE
if ipaddress.ip_address(ip_header.src_address) in ipaddress.IPv4Network(SUBNET):
# make sure it has our magic message
if raw_buffer[len(raw_buffer) - len(MESSAGE):] == bytes(MESSAGE, 'utf8'):
tgt = str(ip_header.src_address)
if tgt != self.host and tgt not in hosts_up:
hosts_up.add(str(ip_header.src_address))
print(f'Host Up: {tgt}')
# handle CTRL-C
except KeyboardInterrupt:
if os.name == 'nt':
self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
print('\nUser interrupted.')
if hosts_up:
print(f'\n\nSummary: Hosts up on {SUBNET}')
for host in stored(hosts_up):
print('f{host}')
print('')
sys.exit()
if __name__ == '__main__':
if len(sys.argv) == 2:
host = sys.argv[1]
else:
host = '192.168.65.141'
s = Scanner(host)
time.sleep(5)
t = threading.Thread(target=udp_sender)
t.start()
s.sniff()
最後這段代碼很容易了解。我們定義了一個簡單的字元串簽名,以便測試響應是否來自我們最初發送的UDP資料包。我們的udp_sender函數隻接受我們在腳本頂部指定的子網,周遊該子網中的所有IP位址,并向其發送udp資料報。然後我們定義了Scanner類,為了初始化Scanner類,我們向它傳遞一個主機作為參數。在初始化時,我們建立一個socket,如果運作Windows,則打開混雜模式,并使socket成為Scanner類的屬性。
sniff方法嗅探網絡,具體邏輯跟前一個示例相同,隻是這次它會記錄哪些主機在運作。如果我們檢測到預定義的ICMP消息,首先檢查以確定ICMP響應來自我們的目标子網。然後,我們執行最後一次檢查,以確定ICMP響應中包含我們的魔法字元串。如果所有這些檢查都通過,我們将列印出ICMP消息産生的主機的IP位址。當我們使用CTRL-C結束嗅探過程時,我們處理鍵盤中斷。如果在Windows上,我們會關閉混雜模式,并列印出一個經過排序的實時主機清單。
__main__代碼塊完成設定工作:它建立Scanner對象,隻休眠幾秒鐘,然後調用sniff方法之前,在單獨的線程中生成udp_sender,以確定我們不會幹擾嗅探響應的能力。接下來我們試試看。
小試牛刀
現在讓我們把掃描工具放到本地網絡上運作。可以使用Linux或Windows進行操作,因為結果是相同的。如果運作掃描工具時輸出太多噪聲,隻需注釋掉所有列印語句,最後一個語句除外,它告訴您主機正在響應什麼。
在linux上運作
在本例中,包括kali在内的三台虛拟機都是運作在192.168.65.0/24網段上,并且windows虛拟機開啟了防火牆。我們現在kali上運作一下試試看,結果如下圖,準确命中另一台linux主機。
當關閉windows虛拟機的防火牆以後,準确命中windows虛拟機,如下圖。
在windows主機上運作
接下來,在安裝VMware的windows主機上運作,掃描一下整個家裡的網絡看看。得到結果如下,除了本機木有開啟的主機。
開啟家裡的NAS試試看,結果還是沒有找到,但是從運作scanner腳本的主機上ping主機NAS是可以通的,好奇怪。
突然想到,會不會是windows主機防火牆的原因,先把windows主機的防火牆關閉,然後再次運作,準确命中,如下圖。
在windows虛拟機上掃描另外兩台linux主機的時候能夠準确命中,如下圖所示。
但是這裡也碰到了問題,當我從windows主機上試圖掃描三台虛拟機的時候,無論如何都掃描不到,以後再解決這問題,初步懷疑是公司的監控軟體給屏蔽了。
關于ipaddress子產品
在上述示例中,我們使用了一個叫做ipaddress的庫,它允許我們輸入一個帶有掩碼的子網,例如192.168.0.0/24。
ipaddress子產品使得處理位址和子網非常友善。比如,可以使用Ipv4Network對象直接執行如下的代碼示例:
ip_address = "192.168.112.3"
if ip_address in Ipv4Network("192.168.112.0/24"):
print True
或者,如果要将資料包發送到整個網絡,我們可以建立簡單的疊代器:
for ip in Ipv4Network("192.168.112.1/24"):
s = socket.socket()
s.connect((ip, 25))
# send mail packets
在一次性處理整個網絡時,這将大大縮短我們的編碼時間,非常适合我們的主機發現工具:
python.exe scanner.py
Host Up: 192.168.0.1
Host Up: 192.168.0.190
Host Up: 192.168.0.192
Host Up: 192.168.0.195
總結
對于我們執行的快速掃描,隻需幾秒鐘即可獲得結果。通過将這些IP位址與家庭路由器中的DHCP表交叉驗證,可以驗證結果是否準确。我們可以輕松擴充本章所學内容,以解碼TCP和UDP資料包,并圍繞掃描工具建構其他的工具。這個掃描工具對于我們将在第7章開始建構的特洛伊木馬架構也很有用。将允許部署的特洛伊爾木馬掃描本地網絡,尋找其他目标。
既然我們已經了解了網絡如何在高層次和低層次上工作的基本知識,接下來我們将探索一個非常成熟的Python庫Scapy。