900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > Python 抓取并解码原始数据包

Python 抓取并解码原始数据包

时间:2019-11-09 03:14:27

相关推荐

Python 抓取并解码原始数据包

本文介绍了如何使用Python中的混杂模式来捕获流经网卡的数据包,并对其中的IP和ICMP数据包进行解析和打印所需字段信息的操作。文章提供了获取本机MAC地址和IP地址的方法,以便确定网络接口和IP地址的绑定。接着通过使用原始套接字创建一个原始数据包捕获器,并将其绑定到本机的IP地址上。在Windows平台上,需要启用IOCTL混杂模式以捕获所有流经网卡的数据包。最后通过读取单个数据包并打印其内容,实现了捕获数据包的功能。

针对IP数据包,文章提供了解析IP包头的方法。通过定义一个IP头部的结构体,并根据特定的包头格式解析数据包,获取其中的字段信息,如协议类型、源地址、目标地址和生存周期等,并将其打印出来。

对于ICMP数据包,文章提供了解析ICMP包头的方法。与解析IP数据包类似,先判断数据包是否为ICMP协议,然后根据ICMP包头的格式解析数据包,获取字段信息,并将其打印出来。

抓取原始数据包

Python中默认的Socket模块就可以实现对原始数据包的解包操作,如下代码所示,需要注意这段代码只能在Windows平台使用,因为我们需要开启网卡的IOCTL混杂模式,这是Win平台特有的。

import socketimport uuid# 获取本机MAC地址def GetHostMAC():mac=uuid.UUID(int = uuid.getnode()).hex[-12:]this_mac = ":".join([mac[e:e+2] for e in range(0,11,2)])this_name = socket.getfqdn(socket.gethostname())print("本机名: {} --> 本机MAC: {}".format(this_name,this_mac))# 获取本机IP地址def GetHostAddress():try:sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)sock.connect(('8.8.8.8',80))address =sock.getsockname()[0]finally:return addressaddress.close()# 开始跟踪原始数据包def SnifferIOSock(address):# 创建原始套接字,然后绑定在公开接口上socket_protocol = socket.IPPROTO_IP# 开启原始数据包模式sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)sniffer.bind((address,0))# 设置在捕获的数据包中包含IP头sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)# Windows平台需要设置IOCTL以启动混杂模式sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)# 读取单个数据包while True:print(sniffer.recvfrom(65565))# 退出时,关闭混杂模式sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)if __name__ == "__main__":GetHostMAC()address = GetHostAddress()print("本机IP: {}".format(address))SnifferIOSock(address)

解码IP数据包头

解码方法同样运用的是上方的方法,只不过这里我们需要找到完整的IP地址的包头封装格式,然后根据特定的包头格式对数据包进行解包操作即可.

import socketimport osimport structfrom ctypes import *# 定义IP头部结构体class IP(Structure):_fields_ = [("ihl", c_ubyte, 4),("version", c_ubyte, 4),("tos", c_ubyte),("len", c_ushort),("id", c_ushort),("offset", c_ushort),("ttl", c_ubyte),("protocol_num", c_ubyte),("sum", c_ushort),("src", c_ulong),# linux 需要变为 c_uint32("dst", c_ulong) # linux 需要变为 c_uint32]def __new__(self,socket_buffer=None):return self.from_buffer_copy(socket_buffer)def __init__(self, socket_buffer=None):# 定义协议序号与名称对应关系self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}# 将数据包解包为地址self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))self.this_ttl = self.ttl# self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))# self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))# 协议类型try:self.protocol = self.protocol_map[self.protocol_num]except:self.protocol = str(self.protocol_num)# 获取本机IP地址def GetHostAddress():try:sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)sock.connect(('8.8.8.8',80))address =sock.getsockname()[0]finally:return addressaddress.close()# 执行解包过程,并输出def SnifferIPAddress(address):# 平台选择,nt代表Windowsif os.name == "nt":socket_protocol = socket.IPPROTO_IPelse:socket_protocol = socket.IPPROTO_ICMP# 开启原始套接字模式sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)sniffer.bind((address,0))sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)if os.name == "nt":sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)# 循环接受数据包并解包try:while True:# 读取数据包raw_buffer = sniffer.recvfrom(65565)[0]# 将缓冲区的前20个字节按IP头进行解析ip_header = IP(raw_buffer[0:20])# 输出协议和通信双方IP地址print("传输协议: {} --> 原地址: {} --> 传输到: {} --> 生存周期: {}".format(ip_header.protocol,ip_header.src_address,ip_header.dst_address,ip_header.this_ttl))# 如果按下Ctrl+C则退出except KeyboardInterrupt:# 关闭混杂模式if os.name == "nt":sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)if __name__ == "__main__":address = GetHostAddress()SnifferIPAddress(address)

解码ICMP数据包头

原理与解包IP头相同,但需要注意,由于ICMP头在IP头的下方,所以我们需要先解析出IP头数据包,然后根据IP头中的protocol_num判断如果是ICMP则将其传入ICMP结构做进一步解包即可.

import socketimport osimport structfrom ctypes import *# 定义IP头部结构体class IP(Structure):_fields_ = [("ihl", c_ubyte, 4),("version", c_ubyte, 4),("tos", c_ubyte),("len", c_ushort),("id", c_ushort),("offset", c_ushort),("ttl", c_ubyte),("protocol_num", c_ubyte),("sum", c_ushort),("src", c_ulong),# linux 需要变为 c_uint32("dst", c_ulong) # linux 需要变为 c_uint32]def __new__(self,socket_buffer=None):return self.from_buffer_copy(socket_buffer)def __init__(self, socket_buffer=None):# 定义协议序号与名称对应关系self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}# 将数据包解包为地址self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))self.this_ttl = self.ttl# self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))# self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))# 协议类型try:self.protocol = self.protocol_map[self.protocol_num]except:self.protocol = str(self.protocol_num)# 定义ICMP结构包头class ICMP(Structure):_fields_ = [("type", c_ubyte),("code", c_ubyte),("checksum", c_ushort),("unused",c_ushort),("next_hop_mtu", c_ushort)]def __new__(self,socket_buffer=None):return self.from_buffer_copy(socket_buffer)def __init__(self, socket_buffer=None):self.icmp_type = self.typeself.icmp_code = self.codeself.icmp_checksum = self.checksum# 获取本机IP地址def GetHostAddress():try:sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)sock.connect(('8.8.8.8',80))address =sock.getsockname()[0]finally:return addressaddress.close()# 执行解包过程,并输出def SnifferIPAddress(address):# 平台选择,nt代表Windowsif os.name == "nt":socket_protocol = socket.IPPROTO_IPelse:socket_protocol = socket.IPPROTO_ICMP# 开启原始套接字模式sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)sniffer.bind((address,0))sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)if os.name == "nt":sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)# 循环接受数据包并解包try:while True:# 读取数据包raw_buffer = sniffer.recvfrom(65565)[0]# 将缓冲区的前20个字节按IP头进行解析ip_header = IP(raw_buffer[0:20])# 判断协议类型是否为ICMPif ip_header.protocol == "ICMP":# 计算ICMP包的起始位置offset = ip_header.ihl * 4buf = raw_buffer[offset:offset + sizeof(ICMP)]#解析ICMP数据icmp_header = ICMP(buf)print("原地址: {} --> 发送到: {} --> 解包协议: {} --> 解包代码: {} --> 校验和: {}".format(ip_header.src_address, ip_header.dst_address, icmp_header.icmp_type,icmp_header.icmp_code,icmp_header.icmp_checksum))# 如果按下Ctrl+C则退出except KeyboardInterrupt:# 关闭混杂模式if os.name == "nt":sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)if __name__ == "__main__":address = GetHostAddress()SnifferIPAddress(address)

我们运行上方的代码,然后在本机Ping测试一下此时即可看到,本机与对端的来往数据包.

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。