Scapy 是一款使用纯Python编写的跨平台网络数据包操控工具,它能够处理和嗅探各种网络数据包。能够很容易的创建,发送,捕获,分析和操作网络数据包,包括TCP,UDP,ICMP等协议,此外它还提供了许多有用的功能,例如嗅探网络流量,创建自定义协议和攻击网络的安全测试工具。使用Scapy可以通过Python脚本编写自定义网络协议和攻击工具,这使得网络安全测试变得更加高效和精确。
读者可自行安装Scapy
第三方库,其次该工具依赖于PCAP接口,读者可自行安装npcap
驱动工具包,具体的安装细节此处就不再赘述。
21.2.1 端口扫描基础 网络端口扫描用于检测目标主机上开放的网络端口。端口扫描可以帮助安全专业人员识别存在的网络漏洞,以及识别网络上的服务和应用程序。在进行端口扫描时,扫描程序会发送特定的网络数据包,尝试与目标主机的每个端口进行通信。如果端口处于打开状态,则扫描程序将能够成功建立连接。否则,扫描程序将收到一条错误消息,表明目标主机上的该端口未开放。
常见的端口扫描技术包括TCP连接扫描、SYN扫描、UDP扫描和FIN扫描等。其中,TCP连接扫描是最常用的一种技术,它通过建立TCP连接来识别开放的端口。SYN扫描则利用TCP协议的三次握手过程来判断端口是否开放,而UDP扫描则用于识别UDP端口是否开放。FIN扫描则是利用TCP FIN数据包来探测目标主机上的端口是否处于开放状态。
在动手开发扫描软件之前,我们还是要重点复习一下协议相关的内容,这样才能真正理解不同端口扫描方式的原理,首先是TCP协议,TCP(Transmission Control Protocol)传输控制协议是一种面向连接的、可靠的、基于字节流的传输层协议。下图是TCP报文格式:
TCP报文分为头部和数据两部分,其中头部包含以下字段:
源端口(Source Port):占用2个字节,表示发送端使用的端口号,范围是0-65535。
目的端口(Destination Port):占用2个字节,表示接收端使用的端口号,范围是0-65535。
序列号(Sequence Number):占用4个字节,表示数据段中第一个字节的序号。TCP协议中采用序号对数据进行分段,从而实现可靠传输。
确认号(Acknowledgement Number):占用4个字节,表示期望收到的下一个字节的序号。TCP协议中采用确认号对接收到的数据进行确认,从而实现可靠传输。
数据偏移(Data Offset):占用4个位,表示TCP头部的长度。由于TCP头部长度是可变的,该字段用于指示数据段从哪里开始。
保留位(Reserved):占用6个位,保留用于将来的扩展。
控制位(Flags):占用6个位,共有6个标志位,分别为URG、ACK、PSH、RST、SYN和FIN。其中,URG、ACK、PSH和RST标志的长度均为1位,SYN和FIN标志的长度为1位。
窗口大小(Window Size):占用2个字节,表示发送方可接受的字节数量,用于流量控制。
校验和(Checksum):占用2个字节,用于检验数据的完整性。
紧急指针(Urgent Pointer):占用2个字节,表示该报文的紧急数据在数据段中的偏移量。
选项(Options):可变长度,用于协商TCP参数,如最大报文长度、时间戳等。
对于端口扫描来说我们需要重点关注控制位
中所提到的6个标志位,这些标志是我们实现扫描的关键,如下是这些标志位的说明;
URG:紧急指针标志位,当URG=1时,表明紧急指针字段有效。它告诉系统中有紧急数据,应当尽快传送,这时不会按照原来的排队序列来传送,而会将紧急数据插入到本报文段数据的最前面。
ACK:当ACK=1时,我们的确认序列号ack才有效,当ACK=0时,确认序号ack无效,在TCP协议中规定所有建立连接的ACK必须全部置为1。
PSH:当该标志位被设置时,表示TCP数据包需要立即发送给接收方,而无需等待缓冲区填满。这个操作被称为推送操作,即将缓冲区的数据立即推送给接收方。
RST:当RST=1时,表明TCP连接出现严重错误,此时必须释放连接,之后重新连接,该标志又叫重置位。
SYN:同步序列号标志位,tcp三次握手中,第一次会将SYN=1,ACK=0,此时表示这是一个连接请求报文段,对方会将SYN=1,ACK=1,表示同意连接,连接完成之后将SYN=0。
FIN:在tcp四次挥手时第一次将FIN=1,表示此报文段的发送方数据已经发送完毕,这是一个释放链接的标志。
接着我们来具体看一下在TCP/IP
协议中,TCP是如何采用三次握手四次挥手实现数据包的通信功能的,如下是一个简单的通信流程图;
(1) 第一次握手:建立连接时,客户端A发送SYN
包(SYN=j)到服务器B,以及初始序号X,保存在包头的序列号(Sequence Number)字段里,并进入SYN_SEND
状态,等待服务器B确认。
(2)第二次握手:服务器B收到SYN
包,必须确认客户A的SYN
(ACK=j+1),同时自己也发送一个SYN
包(SYN=k),即SYN+ACK
包,此时服务器B进入SYN_RECV
状态。
(3) 第三次握手:客户端A收到服务器B的SYN+ACK
包,向服务器B发送确认包ACK
(ACK=k+1),此包发送完毕,客户端A和服务器B进入ESTABLISHED
状态,完成三次握手。至此服务端与客户端之间就可以传输数据了。
21.2.2 ICMP构建与发送 首先我们先来构建并实现一个ICMP
数据包,在之前的文章中笔者已经通过C语言实现了数据包的构建,当然使用C语言构建数据包是一件非常繁琐的实现,通过运用Scapy
则可以使数据包的构建变得很容易,ICMP数据包上层是IP
头部,所以在构造数据包时应先构造IP
包头,然后再构造ICMP
包头,如下我们先使用ls(IP)
查询一下IP
包头的结构定义,然后再分别构造参数。
>>> from scapy.all import *>>> from random import randint>>> import time>>> >>> uuid = randint(1 ,65534 )>>> ls(IP) version : BitField (4 bits) = (4 ) ihl : BitField (4 bits) = (None ) tos : XByteField = (0 ) len : ShortField = (None )id : ShortField = (1 )flags : FlagsField (3 bits) = (<Flag 0 ()>) frag : BitField (13 bits) = (0 ) ttl : ByteField = (64 ) proto : ByteEnumField = (0 ) chksum : XShortField = (None ) src : SourceIPField = (None ) dst : DestIPField = (None ) options : PacketListField = ([]) >>> >>> ip_header = IP(dst="192.168.1.1" ,ttl=64 ,id =uuid) >>> ip_header.show() version = 4 ihl = None tos = 0x0 len = None id = 64541 flags = frag = 0 ttl = 64 proto = ip chksum = None src = 192.168 .1 .101 dst = 192.168 .1 .1 \options \ >>> ip_header.summary()'192.168.1.101 > 192.168.1.1 ip'
上述代码中我们已经构造了一个IP
包头,接着我们还需要构造一个ICMP
包头,该包头的构造可以使用ICMP()
并传入两个参数,如下则是构造好的一个ICMP
包头。
>>> ICMP()<ICMP |> >>> >>> icmp_header = ICMP(id =uuid, seq=uuid)>>> >>> icmp_header<ICMP id =0xfc1d seq=0xfc1d |> >>> icmp_header.show() type = echo-request code = 0 chksum = None id = 0xfc1d seq = 0xfc1d
接着我们需要将上述两个包头粘贴在一起,通过使用/
将来给你这进行拼接,并在icmp_header
后面增加我们所有发送的数据包字符串,最终将构造好的数据包存储至packet
变量内,此时输入packet.summary()
即可查看构造的数据包字符串。
>>> packet = ip_header / icmp_header / "hello lyshark" >>> >>> packet<IP id =64541 frag=0 ttl=64 proto=icmp dst=192.168 .1 .1 |<ICMP id =0xfc1d seq=0xfc1d |<Raw load='hello lyshark' |>>> >>> >>> packet.summary()'IP / ICMP 192.168.1.101 > 192.168.1.1 echo-request 0 / Raw' >>>
当我们构造好一个数据包后,下一步则是需要将该数据包发送出去,对于发送数据包Scapy
中提供了多种发送函数,如下则是不同的几种发包方式,当我们呢最常用的还是sr1()
该函数用于发送数据包并只接受回显数据。
send(pkt):发送三层数据包,但不会受到返回的结果
sr(pkt):发送三层数据包,返回两个结果,分别是接收到响应的数据包和未收到响应的数据包
sr1(pkt):发送三层数据包,仅仅返回接收到响应的数据包
sendp(pkt):发送二层数据包
srp(pkt):发送二层数据包,并等待响应
srp1(pkt):发送第二层数据包,并返回响应的数据包
此处我们就以sr1()
函数作为演示目标,通过构造数据包并调用sr1()
将该数据包发送出去,并等待返回响应数据到respon
变量内,此时通过对该变量进行解析即可得到当前ICMP
的状态。
>>> respon = sr1(packet,timeout=3 ,verbose=0 )>>> respon<IP version=4 ihl=5 tos=0x0 len =41 id =26086 flags= frag=0 ttl=64 proto=icmp chksum=0x9137 src=192.168 .1 .1 dst=192.168 .1 .101 |<ICMP type =echo-reply code=0 chksum=0x177d id =0xfc1d seq=0xfc1d |<Raw load='hello lyshark' |<Padding load='\x00\x00\x00\x00\x00' |>>>> >>> >>> respon[IP].src'192.168.1.1' >>> >>> respon[IP].ttl64 >>> respon.fields{'options' : [], 'version' : 4 , 'ihl' : 5 , 'tos' : 0 , 'len' : 41 , 'id' : 26086 , 'flags' : <Flag 0 ()>, 'frag' : 0 , 'ttl' : 64 , 'proto' : 1 , 'chksum' : 37175 , 'src' : '192.168.1.1' , 'dst' : '192.168.1.101' }
上述流程就是一个简单的ICMP
的探测过程,我们可以将这段代码进行组合封装实现ICMP_Ping
函数,该函数只需要传入一个IP
地址即可返回特定地址是否在线,同时我们使用ipaddress.ip_network
则可生成一整个C
段中的地址信息,并配合threading
启用多线程,则可实现一个简单的主机存活探测工具,完整代码如下所示;
from scapy.all import *from random import randintimport time,ipaddress,threadingimport argparseimport loggingdef ICMP_Ping (addr ): RandomID=randint(1 ,65534 ) packet = IP(dst=addr, ttl=64 , id =RandomID) / ICMP(id =RandomID, seq=RandomID) / "hello lyshark" respon = sr1(packet,timeout=3 ,verbose=0 ) if respon: print ("[+] 存活地址: {}" .format (str (respon[IP].src))) if __name__== "__main__" : logging.getLogger("scapy.runtime" ).setLevel(logging.ERROR) parser = argparse.ArgumentParser() parser.add_argument("-a" ,"--addr" ,dest="addr" ,help ="指定一个IP地址或范围" ) args = parser.parse_args() if args.addr: net = ipaddress.ip_network(str (args.addr)) for item in net: t = threading.Thread(target=ICMP_Ping,args=(str (item),)) t.start() else : parser.print_help()
读者可自行运行上述程序片段,并传入main.py -a 192.168.9.0/24
表示扫描整个C段,并输出存活主机列表,其中logging
模块则用于指定只有错误提示才会输出,其他的警告忽略。扫描结果如下图所示;
接着我们继续实现路由追踪功能,跟踪路由原理是IP
路由每经过一个路由节点TTL
值会减一,假设TTL
值为0
时数据包还没有到达目标主机,那么该路由则会回复给目标主机一个数据包不可达,由此我们就可以获取到目标主机的IP
地址,我们首先构造一个数据包,并设置TTL
值为1
,将该数据包发送出去即可看到回显主机的IP信息。
>>> from scapy.all import *>>> from random import randint>>> import time>>> >>> RandomID=randint(1 ,65534 )>>> packet = IP(dst="104.193.88.77" , ttl=1 , id =RandomID) / ICMP(id =RandomID, seq=RandomID) / "hello" >>> respon = sr1(packet,timeout=3 ,verbose=0 )>>> >>> respon<IP version=4 ihl=5 tos=0xc0 len =61 id =14866 flags= frag=0 ttl=64 proto=icmp chksum=0xbc9a src=192.168 .1 .1 dst=192.168 .1 .2 |<ICMP type =time-exceeded code=ttl-zero-during-transit chksum=0xf4ff reserved=0 length=0 unused=None |<IPerror version=4 ihl=5 tos=0x0 len =33 id =49588 flags= frag=0 ttl=1 proto=icmp chksum=0x4f79 src=192.168 .1 .2 dst=104.193 .88 .77 |<ICMPerror type =echo-request code=0 chksum=0x30c4 id =0xc1b4 seq=0xc1b4 |<Raw load='hello' |>>>>>
关于如何实现路由跟踪,具体来说一开始发送一个TTL
为1
的数据包,这样到达第一个路由器的时候就已经超时了,第一个路由器就会返回一个ICMP
通知,该通知包含了对端的IP
地址,这样就能够记录下所经过的第一个路由器的地址。接着将TTL
值加1,让其能够安全的通过第一个路由器,而第二个路由器的的处理过程会自动丢包,发通包超时通知,这样记录下第二个路由器IP
,由此能够一直进行下去,直到这个数据包到达目标主机,由此打印出全部经过的路由器。
将上述跟踪过程自动化,就可以完成数据包的跟踪,其Python
代码如下所示。
from scapy.all import *from random import randintimport time,ipaddress,threadingfrom optparse import OptionParserimport loggingdef TraceRouteTTL (addr ): for item in range (1 ,128 ): RandomID=randint(1 ,65534 ) packet = IP(dst=addr, ttl=item, id =RandomID) / ICMP(id =RandomID, seq=RandomID) respon = sr1(packet,timeout=3 ,verbose=0 ) if respon != None : ip_src = str (respon[IP].src) if ip_src != addr: print ("[+] --> {}" .format (str (respon[IP].src))) else : print ("[+] --> {}" .format (str (respon[IP].src))) return 1 else : print ("[-] --> TimeOut" ) time.sleep(1 ) if __name__== "__main__" : logging.getLogger("scapy.runtime" ).setLevel(logging.ERROR) parser = OptionParser() parser.add_option("-a" ,"--addr" ,dest="addr" ,help ="指定一个地址或范围" ) (options,args) = parser.parse_args() if options.addr: TraceRouteTTL(str (options.addr)) else : parser.print_help()
读者可自行运行上述程序片段,并传入main.py --addr 104.193.88.77
表示跟踪从本机到目标104.193.88.77
主机所经过的路由器地址信息,并将扫描结果输出如下图所示;
21.2.3 TCP全连接扫描 TCP Connect 扫描又叫做全连接扫描,它是一种常用的端口扫描技术。在这种扫描中,扫描程序向目标主机发送TCP
连接请求包(SYN包
),如果目标主机回应了一个TCP
连接确认包(SYN-ACK
包),则说明该端口处于开放状态。否则,如果目标主机回应了一个TCP
复位包(RST
包)或者没有任何响应,则说明该端口处于关闭状态。这种扫描技术的优点是准确性高,因为它可以在不建立实际连接的情况下确定目标主机的端口状态。但是,缺点是这种扫描技术很容易被目标主机的防火墙或入侵检测系统检测到。
全连接扫描需要客户端与服务器之间直接建立一次完整的握手,该方式扫描速度慢效率低,我们需要使用Scapy
构造完整的全连接来实现一次探测,在使用该工具包时读者应该注意工具包针对flags
所代指的标识符RA/AR/SA
含义,这些标志是Scapy
框架中各种数据包的简写,此外针对数据包的定义有以下几种;
F:FIN 结束,结束会话
S:SYN 同步,表示开始会话请求
R:RST 复位,中断一个连接
P:PUSH 推送,数据包立即发送
A:ACK 应答
U:URG 紧急
E:ECE 显式拥塞提醒回应
W:CWR 拥塞窗口减少
实现全链接扫描我们封装并实现一个tcpScan()
函数,该函数接收两个参数一个扫描目标地址,一个扫描端口列表,通过对数据包的收发判断即可获取特定主机开放状态;
from scapy.all import *import argparseimport loggingdef tcpScan (target,ports ): for port in ports: send=sr1(IP(dst=target)/TCP(dport=port,flags="S" ),timeout=2 ,verbose=0 ) if (send is None ): continue elif send.haslayer("TCP" ): if send["TCP" ].flags == "SA" : send_1 = sr1(IP(dst=target) / TCP(dport=port, flags="AR" ), timeout=2 , verbose=0 ) print ("[+] 扫描主机: %-13s 端口: %-5s 开放" %(target,port)) elif send["TCP" ].flags == "RA" : print ("[+] 扫描主机: %-13s 端口: %-5s 关闭" %(target,port)) if __name__ == "__main__" : logging.getLogger("scapy.runtime" ).setLevel(logging.ERROR) parser = argparse.ArgumentParser() parser.add_argument("-H" ,"--host" ,dest="host" ,help ="输入一个被攻击主机IP地址" ) parser.add_argument("-p" ,"--port" ,dest="port" ,help ="输入端口列表 [80,443,135]" ) args = parser.parse_args() if args.host and args.port: tcpScan(args.host,eval (args.port)) else : parser.print_help()
运行上述代码片段,并传入59.110.117.109
地址以及,端口80,8080,443,445
程序将依次扫描这些端口,并输出如下图所示;
21.2.4 SYN半开放扫描 TCP SYN扫描又称半开式扫描,该过程不会和服务端建立完整的连接,其原理是利用了TCP
协议中的一个机制,即在TCP
三次握手过程中,客户端发送SYN
包到服务端,服务端回应SYN+ACK
包给客户端,最后客户端回应ACK
包给服务端。如果服务端回应了SYN+ACK
包,说明该端口是开放的;如果服务端回应了RST
包,说明该端口是关闭的。
TCP SYN扫描的优点是不会像TCP Connect
扫描那样建立完整的连接,因此不会留下大量的日志,可以有效地隐藏扫描行为。缺点是在扫描过程中会产生大量的半连接,容易被IDS/IPS
等安全设备检测到,而且可能会对目标主机造成负担。
SYN扫描不会和服务端建立完整的连接,从而能够在一定程度上提高扫描器的效率,该扫描方式在代码实现上和全连接扫描区别不大,只是在结束到服务端响应数据包之后直接发送RST
包结束连接,上述代码只需要进行简单修改,将send_1
处改为R
标志即可;
from scapy.all import *import argparseimport loggingdef tcpSynScan (target,ports ): for port in ports: send=sr1(IP(dst=target)/TCP(dport=port,flags="S" ),timeout=2 ,verbose=0 ) if (send is None ): continue elif send.haslayer("TCP" ): if send["TCP" ].flags == "SA" : send_1 = sr1(IP(dst=target) / TCP(dport=port, flags="AR" ), timeout=2 , verbose=0 ) print ("[+] 扫描主机: %-13s 端口: %-5s 开放" %(target,port)) elif send["TCP" ].flags == "RA" : print ("[+] 扫描主机: %-13s 端口: %-5s 关闭" %(target,port)) else : print ("[+] 扫描主机: %-13s 端口: %-5s 关闭" %(target,port)) if __name__ == "__main__" : logging.getLogger("scapy.runtime" ).setLevel(logging.ERROR) parser = argparse.ArgumentParser() parser.add_argument("-H" ,"--host" ,dest="host" ,help ="输入一个被攻击主机IP地址" ) parser.add_argument("-p" ,"--port" ,dest="port" ,help ="输入端口列表 [80,443,135]" ) args = parser.parse_args() if args.host and args.port: tcpSynScan(args.host,eval (args.port)) else : parser.print_help()
同理,我们分别传入被扫描主机IP地址以及需要扫描的端口号列表,当扫描结束后即可输出如下图所示的结果;
21.2.5 UDP无状态扫描 UDP 无状态扫描是一种常见的网络扫描技术,其基本原理与TCP SYN
扫描类似。不同之处在于UDP
协议是无连接的、无状态的,因此无法像TCP
一样建立连接并进行三次握手。
UDP 无状态扫描的基本流程如下:
客户端向服务器发送带有端口号的UDP
数据包,如果服务器回复了UDP
数据包,则目标端口是开放的。
如果服务器返回了一个ICMP
目标不可达的错误和代码3
,则意味着目标端口处于关闭状态。
如果服务器返回一个ICMP
错误类型3
且代码为1,2,3,9,10
或13
的数据包,则说明目标端口被服务器过滤了。
如果服务器没有任何相应客户端的UDP
请求,则可以断定目标端口可能是开放或被过滤的,无法判断端口的最终状态。
读者需要注意,UDP 无状态扫描可能会出现误报或漏报的情况。由于UDP
协议是无连接的、无状态的,因此UDP
端口不可达错误消息可能会被目标主机过滤掉或者由于网络延迟等原因无法到达扫描主机,从而导致扫描结果不准确。
from scapy.all import *import argparseimport loggingdef udpScan (target,ports ): for port in ports: udp_scan_resp = sr1(IP(dst=target)/UDP(dport=port),timeout=5 ,verbose=0 ) if (str (type (udp_scan_resp))=="<class 'NoneType'>" ): print ("[+] 扫描主机: %-13s 端口: %-5s 关闭" %(target,port)) elif (udp_scan_resp.haslayer(UDP)): if (udp_scan_resp.getlayer(TCP).flags == "R" ): print ("[+] 扫描主机: %-13s 端口: %-5s 开放" %(target,port)) elif (udp_scan_resp.haslayer(ICMP)): if (int (udp_scan_resp.getlayer(ICMP).type )==3 and int (udp_scan_resp.getlayer(ICMP).code) in [1 ,2 ,3 ,9 ,10 ,13 ]): print ("[+] 扫描主机: %-13s 端口: %-5s 关闭" %(target,port)) if __name__ == "__main__" : logging.getLogger("scapy.runtime" ).setLevel(logging.ERROR) parser = argparse.ArgumentParser() parser.add_argument("-H" ,"--host" ,dest="host" ,help ="" ) parser.add_argument("-p" ,"--port" ,dest="port" ,help ="" ) args = parser.parse_args() if args.host and args.port: udpScan(args.host,eval (args.port)) else : parser.print_help()
运行上述代码,并传入不同的参数,即可看到如下图所示的扫描结果,读者需要注意因为UDP
的特殊性导致端口扫描无法更精确的判定;