Scapy 是一款使用純Python編寫的跨平臺網路數據包操控工具,它能夠處理和嗅探各種網路數據包。能夠很容易的創建,發送,捕獲,分析和操作網路數據包,包括TCP,UDP,ICMP等協議,此外它還提供了許多有用的功能,例如嗅探網路流量,創建自定義協議和攻擊網路的安全測試工具。使用Scapy可以通過P... ...
Scapy 是一款使用純Python編寫的跨平臺網路數據包操控工具,它能夠處理和嗅探各種網路數據包。能夠很容易的創建,發送,捕獲,分析和操作網路數據包,包括TCP,UDP,ICMP等協議,此外它還提供了許多有用的功能,例如嗅探網路流量,創建自定義協議和攻擊網路的安全測試工具。使用Scapy可以通過Python腳本編寫自定義網路協議和攻擊工具,這使得網路安全測試變得更加高效和精確。
讀者可自行安裝Scapy
第三方庫,其次該工具依賴於PCAP介面,讀者可自行安裝npcap
驅動工具包,具體的安裝細節此處就不再贅述。
- 安裝Scapy工具:pip install PyX matplotlib scapy
- 安裝Npcap驅動:https://npcap.com/dist/
21.2.1 埠掃描基礎
網路埠掃描用於檢測目標主機上開放的網路埠。埠掃描可以幫助安全專業人員識別存在的網路漏洞,以及識別網路上的服務和應用程式。在進行埠掃描時,掃描程式會發送特定的網路數據包,嘗試與目標主機的每個埠進行通信。如果埠處於打開狀態,則掃描程式將能夠成功建立連接。否則,掃描程式將收到一條錯誤消息,表明目標主機上的該埠未開放。
常見的埠掃描技術包括TCP連接掃描、SYN掃描、UDP掃描和FIN掃描等。其中,TCP連接掃描是最常用的一種技術,它通過建立TCP連接來識別開放的埠。SYN掃描則利用TCP協議的三次握手過程來判斷埠是否開放,而UDP掃描則用於識別UDP埠是否開放。FIN掃描則是利用TCP FIN數據包來探測目標主機上的埠是否處於開放狀態。
在動手開發掃描軟體之前,我們還是要重點複習一下協議相關的內容,這樣才能真正理解不同埠掃描方式的原理,首先是TCP協議,TCP(Transmission Control Protocol)傳輸控制協議是一種面向連接的、可靠的、基於位元組流的傳輸層協議。下圖是TCP報文格式:
TCP報文分為頭部和數據兩部分,其中頭部包含以下欄位:
-
源埠(Source Port):占用2個位元組,表示發送端使用的埠號,範圍是0-65535。
-
目的埠(Destination Port):占用2個位元組,表示接收端使用的埠號,範圍是0-65535。
-
序列號(Sequence Number):占用4個位元組,表示數據段中第一個位元組的序號。TCP協議中採用序號對數據進行分段,從而實現可靠傳輸。
-
確認號(Acknowledgement Number):占用4個位元組,表示期望收到的下一個位元組的序號。TCP協議中採用確認號對接收到的數據進行確認,從而實現可靠傳輸。
-
數據偏移(Data Offset):占用4個位,表示TCP頭部的長度。由於TCP頭部長度是可變的,該欄位用於指示數據段從哪裡開始。
-
保留位(Reserved):占用6個位,保留用於將來的擴展。
-
控制位(Flags):占用6個位,共有6個標誌位,分別為URG、ACK、PSH、RST、SYN和FIN。其中,URG、ACK、PSH和RST標誌的長度均為1位,SYN和FIN標誌的長度為1位。
-
視窗大小(Window Size):占用2個位元組,表示發送方可接受的位元組數量,用於流量控制。
-
校驗和(Checksum):占用2個位元組,用於檢驗數據的完整性。
-
緊急指針(Urgent Pointer):占用2個位元組,表示該報文的緊急數據在數據段中的偏移量。
-
選項(Options):可變長度,用於協商TCP參數,如最大報文長度、時間戳等。
對於埠掃描來說我們需要重點關註控制位
中所提到的6個標誌位,這些標誌是我們實現掃描的關鍵,如下是這些標誌位的說明;
- URG:緊急指針標誌位,當URG=1時,表明緊急指針欄位有效。它告訴系統中有緊急數據,應當儘快傳送,這時不會按照原來的排隊序列來傳送,而會將緊急數據插入到本報文段數據的最前面。
- ACK:當ACK=1時,我們的確認序列號ack才有效,當ACK=0時,確認序號ack無效,在TCP協議中規定所有建立連接的ACK必須全部置為1。
- PSH:當該標誌位被設置時,表示TCP數據包需要立即發送給接收方,而無需等待緩衝區填滿。這個操作被稱為推送操作,即將緩衝區的數據立即推送給接收方。
- RST:當RST=1時,表明TCP連接出現嚴重錯誤,此時必須釋放連接,之後重新連接,該標誌又叫重置位。
- SYN:同步序列號標誌位,tcp三次握手中,第一次會將SYN=1,ACK=0,此時表示這是一個連接請求報文段,對方會將SYN=1,ACK=1,表示同意連接,連接完成之後將SYN=0。
- FIN:在tcp四次揮手時第一次將FIN=1,表示此報文段的發送方數據已經發送完畢,這是一個釋放鏈接的標誌。
接著我們來具體看一下在TCP/IP
協議中,TCP是如何採用三次握手四次揮手實現數據包的通信功能的,如下是一個簡單的通信流程圖;
(1) 第一次握手:建立連接時,客戶端A發送SYN
包(SYN=j)到伺服器B,以及初始序號X,保存在包頭的序列號(Sequence Number)欄位里,併進入SYN_SEND
狀態,等待伺服器B確認。
(2)第二次握手:伺服器B收到SYN
包,必須確認客戶A的SYN
(ACK=j+1),同時自己也發送一個SYN
包(SYN=k),即SYN+ACK
包,此時伺服器B進入SYN_RECV
狀態。
(3) 第三次握手:客戶端A收到伺服器B的SYN+ACK
包,向伺服器B發送確認包ACK
(ACK=k+1),此包發送完畢,客戶端A和伺服器B進入ESTABLISHED
狀態,完成三次握手。至此服務端與客戶端之間就可以傳輸數據了。
21.2.2 ICMP構建與發送
首先我們先來構建並實現一個ICMP
數據包,在之前的文章中筆者已經通過C語言實現了數據包的構建,當然使用C語言構建數據包是一件非常繁瑣的實現,通過運用Scapy
則可以使數據包的構建變得很容易,ICMP數據包上層是IP
頭部,所以在構造數據包時應先構造IP
包頭,然後再構造ICMP
包頭,如下我們先使用ls(IP)
查詢一下IP
包頭的結構定義,然後再分別構造參數。
>>> from scapy.all import *
>>> from random import randint
>>> import time
>>>
>>> uuid = randint(1,65534)
>>> ls(IP) # 查詢IP頭部定義
version : BitField (4 bits) = (4)
ihl : BitField (4 bits) = (None)
tos : XByteField = (0)
len : ShortField = (None)
id : ShortField = (1)
flags : FlagsField (3 bits) = (<Flag 0 ()>)
frag : BitField (13 bits) = (0)
ttl : ByteField = (64)
proto : ByteEnumField = (0)
chksum : XShortField = (None)
src : SourceIPField = (None)
dst : DestIPField = (None)
options : PacketListField = ([])
>>>
>>> ip_header = IP(dst="192.168.1.1",ttl=64,id=uuid) # 構造IP數據包頭
>>> ip_header.show() # 輸出構造好的包頭
###[ IP ]###
version = 4
ihl = None
tos = 0x0
len = None
id = 64541
flags =
frag = 0
ttl = 64
proto = ip
chksum = None
src = 192.168.1.101
dst = 192.168.1.1
\options \
>>> ip_header.summary()
'192.168.1.101 > 192.168.1.1 ip'
上述代碼中我們已經構造了一個IP
包頭,接著我們還需要構造一個ICMP
包頭,該包頭的構造可以使用ICMP()
並傳入兩個參數,如下則是構造好的一個ICMP
包頭。
>>> ICMP()
<ICMP |>
>>>
>>> icmp_header = ICMP(id=uuid, seq=uuid)
>>>
>>> icmp_header
<ICMP id=0xfc1d seq=0xfc1d |>
>>> icmp_header.show()
###[ ICMP ]###
type = echo-request
code = 0
chksum = None
id = 0xfc1d
seq = 0xfc1d
接著我們需要將上述兩個包頭粘貼在一起,通過使用/
將來給你這進行拼接,併在icmp_header
後面增加我們所有發送的數據包字元串,最終將構造好的數據包存儲至packet
變數內,此時輸入packet.summary()
即可查看構造的數據包字元串。
>>> packet = ip_header / icmp_header / "hello lyshark"
>>>
>>> packet
<IP id=64541 frag=0 ttl=64 proto=icmp dst=192.168.1.1 |<ICMP id=0xfc1d seq=0xfc1d |<Raw load='hello lyshark' |>>>
>>>
>>> packet.summary()
'IP / ICMP 192.168.1.101 > 192.168.1.1 echo-request 0 / Raw'
>>>
當我們構造好一個數據包後,下一步則是需要將該數據包發送出去,對於發送數據包Scapy
中提供了多種發送函數,如下則是不同的幾種發包方式,當我們呢最常用的還是sr1()
該函數用於發送數據包並只接受回顯數據。
- send(pkt):發送三層數據包,但不會受到返回的結果
- sr(pkt):發送三層數據包,返回兩個結果,分別是接收到響應的數據包和未收到響應的數據包
- sr1(pkt):發送三層數據包,僅僅返回接收到響應的數據包
- sendp(pkt):發送二層數據包
- srp(pkt):發送二層數據包,並等待響應
- srp1(pkt):發送第二層數據包,並返迴響應的數據包
此處我們就以sr1()
函數作為演示目標,通過構造數據包並調用sr1()
將該數據包發送出去,並等待返迴響應數據到respon
變數內,此時通過對該變數進行解析即可得到當前ICMP
的狀態。
>>> respon = sr1(packet,timeout=3,verbose=0)
>>> respon
<IP version=4 ihl=5 tos=0x0 len=41 id=26086 flags= frag=0 ttl=64 proto=icmp chksum=0x9137 src=192.168.1.1 dst=192.168.1.101 |<ICMP type=echo-reply code=0 chksum=0x177d id=0xfc1d seq=0xfc1d |<Raw load='hello lyshark' |<Padding load='\x00\x00\x00\x00\x00' |>>>>
>>>
>>> respon[IP].src
'192.168.1.1'
>>>
>>> respon[IP].ttl
64
>>> respon.fields
{'options': [], 'version': 4, 'ihl': 5, 'tos': 0, 'len': 41, 'id': 26086, 'flags': <Flag 0 ()>, 'frag': 0, 'ttl': 64, 'proto': 1, 'chksum': 37175, 'src': '192.168.1.1', 'dst': '192.168.1.101'}
上述流程就是一個簡單的ICMP
的探測過程,我們可以將這段代碼進行組合封裝實現ICMP_Ping
函數,該函數只需要傳入一個IP
地址即可返回特定地址是否線上,同時我們使用ipaddress.ip_network
則可生成一整個C
段中的地址信息,並配合threading
啟用多線程,則可實現一個簡單的主機存活探測工具,完整代碼如下所示;
from scapy.all import *
from random import randint
import time,ipaddress,threading
import argparse
import logging
def ICMP_Ping(addr):
RandomID=randint(1,65534)
packet = IP(dst=addr, ttl=64, id=RandomID) / ICMP(id=RandomID, seq=RandomID) / "hello lyshark"
respon = sr1(packet,timeout=3,verbose=0)
if respon:
print("[+] 存活地址: {}".format(str(respon[IP].src)))
if __name__== "__main__":
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
#net = ipaddress.ip_network("192.168.1.0/24")
parser = argparse.ArgumentParser()
parser.add_argument("-a","--addr",dest="addr",help="指定一個IP地址或範圍")
args = parser.parse_args()
if args.addr:
net = ipaddress.ip_network(str(args.addr))
for item in net:
t = threading.Thread(target=ICMP_Ping,args=(str(item),))
t.start()
else:
parser.print_help()
讀者可自行運行上述程式片段,並傳入main.py -a 192.168.9.0/24
表示掃描整個C段,並輸出存活主機列表,其中logging
模塊則用於指定只有錯誤提示才會輸出,其他的警告忽略。掃描結果如下圖所示;
接著我們繼續實現路由追蹤功能,跟蹤路由原理是IP
路由每經過一個路由節點TTL
值會減一,假設TTL
值為0
時數據包還沒有到達目標主機,那麼該路由則會回覆給目標主機一個數據包不可達,由此我們就可以獲取到目標主機的IP
地址,我們首先構造一個數據包,並設置TTL
值為1
,將該數據包發送出去即可看到回顯主機的IP信息。
>>> from scapy.all import *
>>> from random import randint
>>> import time
>>>
>>> RandomID=randint(1,65534)
>>> packet = IP(dst="104.193.88.77", ttl=1, id=RandomID) / ICMP(id=RandomID, seq=RandomID) / "hello"
>>> respon = sr1(packet,timeout=3,verbose=0)
>>>
>>> respon
<IP version=4 ihl=5 tos=0xc0 len=61 id=14866 flags= frag=0 ttl=64 proto=icmp chksum=0xbc9a src=192.168.1.1 dst=192.168.1.2 |<ICMP type=time-exceeded code=ttl-zero-during-transit chksum=0xf4ff reserved=0 length=0 unused=None |<IPerror version=4 ihl=5 tos=0x0 len=33 id=49588 flags= frag=0 ttl=1 proto=icmp chksum=0x4f79 src=192.168.1.2 dst=104.193.88.77 |<ICMPerror type=echo-request code=0 chksum=0x30c4 id=0xc1b4 seq=0xc1b4 |<Raw load='hello' |>>>>>
關於如何實現路由跟蹤,具體來說一開始發送一個TTL
為1
的數據包,這樣到達第一個路由器的時候就已經超時了,第一個路由器就會返回一個ICMP
通知,該通知包含了對端的IP
地址,這樣就能夠記錄下所經過的第一個路由器的地址。接著將TTL
值加1,讓其能夠安全的通過第一個路由器,而第二個路由器的的處理過程會自動丟包,發通包超時通知,這樣記錄下第二個路由器IP
,由此能夠一直進行下去,直到這個數據包到達目標主機,由此列印出全部經過的路由器。
將上述跟蹤過程自動化,就可以完成數據包的跟蹤,其Python
代碼如下所示。
from scapy.all import *
from random import randint
import time,ipaddress,threading
from optparse import OptionParser
import logging
def TraceRouteTTL(addr):
for item in range(1,128):
RandomID=randint(1,65534)
packet = IP(dst=addr, ttl=item, id=RandomID) / ICMP(id=RandomID, seq=RandomID)
respon = sr1(packet,timeout=3,verbose=0)
if respon != None:
ip_src = str(respon[IP].src)
if ip_src != addr:
print("[+] --> {}".format(str(respon[IP].src)))
else:
print("[+] --> {}".format(str(respon[IP].src)))
return 1
else:
print("[-] --> TimeOut")
time.sleep(1)
if __name__== "__main__":
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
parser = OptionParser()
parser.add_option("-a","--addr",dest="addr",help="指定一個地址或範圍")
(options,args) = parser.parse_args()
if options.addr:
TraceRouteTTL(str(options.addr))
else:
parser.print_help()
讀者可自行運行上述程式片段,並傳入main.py --addr 104.193.88.77
表示跟蹤從本機到目標104.193.88.77
主機所經過的路由器地址信息,並將掃描結果輸出如下圖所示;
21.2.3 TCP全連接掃描
TCP Connect 掃描又叫做全連接掃描,它是一種常用的埠掃描技術。在這種掃描中,掃描程式向目標主機發送TCP
連接請求包(SYN包
),如果目標主機回應了一個TCP
連接確認包(SYN-ACK
包),則說明該埠處於開放狀態。否則,如果目標主機回應了一個TCP
複位包(RST
包)或者沒有任何響應,則說明該埠處於關閉狀態。這種掃描技術的優點是準確性高,因為它可以在不建立實際連接的情況下確定目標主機的埠狀態。但是,缺點是這種掃描技術很容易被目標主機的防火牆或入侵檢測系統檢測到。
全連接掃描需要客戶端與伺服器之間直接建立一次完整的握手,該方式掃描速度慢效率低,我們需要使用Scapy
構造完整的全連接來實現一次探測,在使用該工具包時讀者應該註意工具包針對flags
所代指的標識符RA/AR/SA
含義,這些標誌是Scapy
框架中各種數據包的簡寫,此外針對數據包的定義有以下幾種;
- F:FIN 結束,結束會話
- S:SYN 同步,表示開始會話請求
- R:RST 複位,中斷一個連接
- P:PUSH 推送,數據包立即發送
- A:ACK 應答
- U:URG 緊急
- E:ECE 顯式擁塞提醒回應
- W:CWR 擁塞視窗減少
實現全鏈接掃描我們封裝並實現一個tcpScan()
函數,該函數接收兩個參數一個掃描目標地址,一個掃描埠列表,通過對數據包的收發判斷即可獲取特定主機開放狀態;
from scapy.all import *
import argparse
import logging
def tcpScan(target,ports):
for port in ports:
# S 代表發送SYN報文
send=sr1(IP(dst=target)/TCP(dport=port,flags="S"),timeout=2,verbose=0)
if (send is None):
continue
# 如果是TCP數據包
elif send.haslayer("TCP"):
# 是否是 SYN+ACK 應答
if send["TCP"].flags == "SA":
# 發送ACK+RST數據包完成三次握手
send_1 = sr1(IP(dst=target) / TCP(dport=port, flags="AR"), timeout=2, verbose=0)
print("[+] 掃描主機: %-13s 埠: %-5s 開放" %(target,port))
elif send["TCP"].flags == "RA":
print("[+] 掃描主機: %-13s 埠: %-5s 關閉" %(target,port))
if __name__ == "__main__":
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
# 使用方式: main.py -H 192.168.1.10 -p 80,8080,443,445
parser = argparse.ArgumentParser()
parser.add_argument("-H","--host",dest="host",help="輸入一個被攻擊主機IP地址")
parser.add_argument("-p","--port",dest="port",help="輸入埠列表 [80,443,135]")
args = parser.parse_args()
if args.host and args.port:
tcpScan(args.host,eval(args.port))
else:
parser.print_help()
運行上述代碼片段,並傳入59.110.117.109
地址以及,埠80,8080,443,445
程式將依次掃描這些埠,並輸出如下圖所示;
21.2.4 SYN半開放掃描
TCP SYN掃描又稱半開式掃描,該過程不會和服務端建立完整的連接,其原理是利用了TCP
協議中的一個機制,即在TCP
三次握手過程中,客戶端發送SYN
包到服務端,服務端回應SYN+ACK
包給客戶端,最後客戶端回應ACK
包給服務端。如果服務端回應了SYN+ACK
包,說明該埠是開放的;如果服務端回應了RST
包,說明該埠是關閉的。
TCP SYN掃描的優點是不會像TCP Connect
掃描那樣建立完整的連接,因此不會留下大量的日誌,可以有效地隱藏掃描行為。缺點是在掃描過程中會產生大量的半連接,容易被IDS/IPS
等安全設備檢測到,而且可能會對目標主機造成負擔。
SYN掃描不會和服務端建立完整的連接,從而能夠在一定程度上提高掃描器的效率,該掃描方式在代碼實現上和全連接掃描區別不大,只是在結束到服務端響應數據包之後直接發送RST
包結束連接,上述代碼只需要進行簡單修改,將send_1
處改為R
標誌即可;
from scapy.all import *
import argparse
import logging
def tcpSynScan(target,ports):
for port in ports:
# S 代表發送SYN報文
send=sr1(IP(dst=target)/TCP(dport=port,flags="S"),timeout=2,verbose=0)
if (send is None):
continue
# 如果是TCP數據包
elif send.haslayer("TCP"):
# 是否是 SYN+ACK 應答
if send["TCP"].flags == "SA":
# 發送ACK+RST數據包完成三次握手
send_1 = sr1(IP(dst=target) / TCP(dport=port, flags="AR"), timeout=2, verbose=0)
print("[+] 掃描主機: %-13s 埠: %-5s 開放" %(target,port))
elif send["TCP"].flags == "RA":
print("[+] 掃描主機: %-13s 埠: %-5s 關閉" %(target,port))
else:
print("[+] 掃描主機: %-13s 埠: %-5s 關閉" %(target,port))
if __name__ == "__main__":
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
# 使用方式: main.py -H 192.168.1.10 -p 80,8080,443,445
parser = argparse.ArgumentParser()
parser.add_argument("-H","--host",dest="host",help="輸入一個被攻擊主機IP地址")
parser.add_argument("-p","--port",dest="port",help="輸入埠列表 [80,443,135]")
args = parser.parse_args()
if args.host and args.port:
tcpSynScan(args.host,eval(args.port))
else:
parser.print_help()
同理,我們分別傳入被掃描主機IP地址以及需要掃描的埠號列表,當掃描結束後即可輸出如下圖所示的結果;
21.2.5 UDP無狀態掃描
UDP 無狀態掃描是一種常見的網路掃描技術,其基本原理與TCP SYN
掃描類似。不同之處在於UDP
協議是無連接的、無狀態的,因此無法像TCP
一樣建立連接併進行三次握手。
UDP 無狀態掃描的基本流程如下:
- 客戶端向伺服器發送帶有埠號的
UDP
數據包,如果伺服器回覆了UDP
數據包,則目標埠是開放的。 - 如果伺服器返回了一個
ICMP
目標不可達的錯誤和代碼3
,則意味著目標埠處於關閉狀態。 - 如果伺服器返回一個
ICMP
錯誤類型3
且代碼為1,2,3,9,10
或13
的數據包,則說明目標埠被伺服器過濾了。 - 如果伺服器沒有任何相應客戶端的
UDP
請求,則可以斷定目標埠可能是開放或被過濾的,無法判斷埠的最終狀態。
讀者需要註意,UDP 無狀態掃描可能會出現誤報或漏報的情況。由於UDP
協議是無連接的、無狀態的,因此UDP
埠不可達錯誤消息可能會被目標主機過濾掉或者由於網路延遲等原因無法到達掃描主機,從而導致掃描結果不准確。
from scapy.all import *
import argparse
import logging
def udpScan(target,ports):
for port in ports:
udp_scan_resp = sr1(IP(dst=target)/UDP(dport=port),timeout=5,verbose=0)
if (str(type(udp_scan_resp))=="<class 'NoneType'>"):
print("[+] 掃描主機: %-13s 埠: %-5s 關閉" %(target,port))
elif(udp_scan_resp.haslayer(UDP)):
if(udp_scan_resp.getlayer(TCP).flags == "R"):
print("[+] 掃描主機: %-13s 埠: %-5s 開放" %(target,port))
elif(udp_scan_resp.haslayer(ICMP)):
if(int(udp_scan_resp.getlayer(ICMP).type)==3 and int(udp_scan_resp.getlayer(ICMP).code) in [1,2,3,9,10,13]):
print("[+] 掃描主機: %-13s 埠: %-5s 關閉" %(target,port))
if __name__ == "__main__":
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
parser = argparse.ArgumentParser()
parser.add_argument("-H","--host",dest="host",help="")
parser.add_argument("-p","--port",dest="port",help="")
args = parser.parse_args()
if args.host and args.port:
udpScan(args.host,eval(args.port))
else:
parser.print_help()
運行上述代碼,並傳入不同的參數,即可看到如下圖所示的掃描結果,讀者需要註意因為UDP
的特殊性導致埠掃描無法更精確的判定;
本文作者: 王瑞
本文鏈接: https://www.lyshark.com/post/1a03a021.html
版權聲明: 本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!
版權聲明:本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!