python构造icmp echo请求和实现网络探测器功能代码分享_python

python发送icmp echo requesy请求

复制代码 代码如下:

import socket
import struct

def checksum(source_string):
    sum = 0
    countTo = (len(source_string)/2)*2
    count = 0
    while count<countTo:
        thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
        sum = sum + thisVal
        sum = sum & 0xffffffff
        count = count + 2
    if countTo<len(source_string):
        sum = sum + ord(source_string[len(source_string) - 1])
        sum = sum & 0xffffffff
    sum = (sum >> 16)  +  (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer

def ping(ip):
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
    packet = struct.pack(
            "!BBHHH", 8, 0, 0, 0, 0
    )
    chksum=checksum(packet)
    packet = struct.pack(
            "!BBHHH", 8, 0, chksum, 0, 0
    )
    s.sendto(packet, (ip, 1))

if __name__=='__main__':
    ping('192.168.41.56')

扫描探测网络功能(网络探测器)

复制代码 代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''
探测网络主机存活。
'''

import os
import struct
import array
import time
import socket
import IPy
import threading

class SendPingThr(threading.Thread):
    '''
    发送ICMP请求报文的线程。

    参数:
        ipPool      -- 可迭代的IP地址池
        icmpPacket  -- 构造的icmp报文
        icmpSocket  -- icmp套字接
        timeout     -- 设置发送超时
    '''
    def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
        threading.Thread.__init__(self)
        self.Sock = icmpSocket
        self.ipPool = ipPool
        self.packet = icmpPacket
        self.timeout = timeout
        self.Sock.settimeout( timeout + 3 )

    def run(self):
        time.sleep(0.01)  #等待接收线程启动
        for ip in self.ipPool:
            try:
                self.Sock.sendto(self.packet, (ip, 0))
            except socket.timeout:
                break
        time.sleep(self.timeout)

class Nscan:
    '''
    参数:
        timeout    -- Socket超时,默认3秒
        IPv6       -- 是否是IPv6,默认为False
    '''
    def __init__(self, timeout=3, IPv6=False):
        self.timeout = timeout
        self.IPv6 = IPv6

        self.__data = struct.pack('d', time.time())   #用于ICMP报文的负荷字节(8bit)
        self.__id = os.getpid()   #构造ICMP报文的ID字段,无实际意义

    @property   #属性装饰器
    def __icmpSocket(self):
        '''创建ICMP Socket'''
        if not self.IPv6:
            Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        else:
            Sock = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.getprotobyname("ipv6-icmp"))
        return Sock

    def __inCksum(self, packet):
        '''ICMP 报文效验和计算方法'''
        if len(packet) & 1:
            packet = packet + '\0'
        words = array.array('h', packet)
        sum = 0
        for word in words:
            sum += (word & 0xffff)
        sum = (sum >> 16) + (sum & 0xffff)
        sum = sum + (sum >> 16)

        return (~sum) & 0xffff

    @property
    def __icmpPacket(self):
        '''构造 ICMP 报文'''
        if not self.IPv6:
            header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0) # TYPE、CODE、CHKSUM、ID、SEQ
        else:
            header = struct.pack('BbHHh', 128, 0, 0, self.__id, 0)

        packet = header + self.__data     # packet without checksum
        chkSum = self.__inCksum(packet) # make checksum

        if not self.IPv6:
            header = struct.pack('bbHHh', 8, 0, chkSum, self.__id, 0)
        else:
            header = struct.pack('BbHHh', 128, 0, chkSum, self.__id, 0)

        return header + self.__data   # packet *with* checksum

    def isUnIP(self, IP):
        '''判断IP是否是一个合法的单播地址'''
        IP = [int(x) for x in IP.split('.') if x.isdigit()]
        if len(IP) == 4:
            if (0 < IP[0] < 223 and IP[0] != 127 and IP[1] < 256 and IP[2] < 256 and 0 < IP[3] < 255):
                return True
        return False

    def makeIpPool(self, startIP, lastIP):
        '''生产 IP 地址池'''
        IPver = 6 if self.IPv6 else 4
        intIP = lambda ip: IPy.IP(ip).int()
        ipPool = {IPy.intToIp(ip, IPver) for ip in range(intIP(startIP), intIP(lastIP)+1)}

        return {ip for ip in ipPool if self.isUnIP(ip)}

    def mPing(self, ipPool):
        '''利用ICMP报文探测网络主机存活

        参数:
            ipPool  -- 可迭代的IP地址池
        '''
        Sock = self.__icmpSocket
        Sock.settimeout(self.timeout)
        packet = self.__icmpPacket
        recvFroms = set()   #接收线程的来源IP地址容器

        sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
        sendThr.start()

        while True:
            try:
                recvFroms.add(Sock.recvfrom(1024)[1][0])
            except Exception:
                pass
            finally:
                if not sendThr.isAlive():
                    break
        return recvFroms & ipPool

if __name__=='__main__':
    s = Nscan()
    ipPool = s.makeIpPool('192.168.0.1', '192.168.0.254')
    print( s.mPing(ipPool) )

时间: 2024-09-10 02:11:58

python构造icmp echo请求和实现网络探测器功能代码分享_python的相关文章

Python实现爬取知乎神回复简单爬虫代码分享_python

看知乎的时候发现了一个 "如何正确地吐槽" 收藏夹,里面的一些神回复实在很搞笑,但是一页一页地看又有点麻烦,而且每次都要打开网页,于是想如果全部爬下来到一个文件里面,是不是看起来很爽,并且随时可以看到全部的,于是就开始动手了. 工具 1.Python 2.7 2.BeautifulSoup 分析网页 我们先来看看知乎上该网页的情况 网址:,容易看到,网址是有规律的,page慢慢递增,这样就能够实现全部爬取了. 再来看一下我们要爬取的内容: 我们要爬取两个内容:问题和回答,回答仅限于显示

Python获取Linux系统下的本机IP地址代码分享_python

有时候使用到获取本机IP,就采用以下方式进行. 复制代码 代码如下: #!/usr/bin/python   import socket import struct import fcntl   def getip(ethname):   s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)   return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0X8915, struct.pack('256s', e

python实现多线程暴力破解登陆路由器功能代码分享_python

运行时请在其目录下添加user.txt passwd.txt两文件.否则会报错.程序没有加异常处理.代码比较挫..... 复制代码 代码如下: #coding:utf-8- import base64 import urllib2 import Queue import threading,re,sys queue = Queue.Queue() class Rout_thread(threading.Thread):   def __init__(self,queue,passwd):    

python正则匹配抓取豆瓣电影链接和评论代码分享_python

复制代码 代码如下: import urllib.requestimport reimport time def movie(movieTag):     tagUrl=urllib.request.urlopen(url)    tagUrl_read = tagUrl.read().decode('utf-8')    return tagUrl_read def subject(tagUrl_read):     '''         这里还存在问题:        ①这只针对单独的一页

零基础写python爬虫之抓取糗事百科代码分享_python

项目内容: 用Python写的糗事百科的网络爬虫. 使用方法: 新建一个Bug.py文件,然后将代码复制到里面后,双击运行. 程序功能: 在命令提示行中浏览糗事百科. 原理解释: 首先,先浏览一下糗事百科的主页:http://www.qiushibaike.com/hot/page/1 可以看出来,链接中page/后面的数字就是对应的页码,记住这一点为以后的编写做准备. 然后,右击查看页面源码: 观察发现,每一个段子都用div标记,其中class必为content,title是发帖时间,我们只需

Python 实现 贪吃蛇大作战 代码分享_python

感觉游戏审核新政实施后,国内手游市场略冷清,是不是各家的新游戏都在排队等审核.媒体们除了之前竞相追捧<Pokemon Go>热闹了一把,似乎也听不到什么声音了.直到最近几天,突然听见好几人都提到同一个游戏,网上还有人表示朋友圈被它刷屏了.(不过现在微信已经悍然屏蔽了它的分享) 这个游戏就是现在iOS免费榜排名第一的<贪吃蛇大作战>.一个简单到不行的游戏,也不知道怎么就火了.反正一款游戏火了,各路媒体.专家总能说出种种套路来,所以我就不发表意见了.不过这实在是一个挺好实现的游戏,于是

Python复制目录结构脚本代码分享_python

引言 有个需要,需要把某个目录下的目录结构进行复制,不要文件,当目录结构很少的时候可以手工去建立,当目录结构复杂,目录层次很深,目录很多的时候,这个时候要是还是手动去建立的话,实在不是一种好的方法,弄不好会死人的.写一个python脚本来处理吧. 首先了解 写python脚本前,先了解几个东西 复制代码 代码如下: #!/usr/bin/python 这个东西写过脚本的人都知道,用来标明该脚本的执行器,类似的还有 复制代码 代码如下: #!/bin/bash       通过bash来执行 #!

从零开始学Python第八周:详解网络编程基础(socket)_python

一,Socket编程 (1)Socket方法介绍 Socket是网络编程的一个抽象概念.通常我们用一个Socket表示"打开了一个网络链接",而打开一个Socket需要知道目标计算机的IP地址和端口号,再指定协议类型即可. 套接字是一个双向的通信信道的端点.套接字可能在沟通过程,进程之间在同一台机器上,或在不同的计算机之间的进程 要创建一个套接字,必须使用Socket模块的socket.socket()方法 在socket模块中的一般语法: s = socket.socket(sock

python 网络编程常用代码段_python

服务器端代码: # -*- coding: cp936 -*- import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)#初始化socket sock.bind(("127.0.0.1", 8001))#绑定本机地址,8001端口 sock.listen(5)#等待客户连接 while True: print "waiting client connection..." connec