0x00 文档路径:

0x01 模块简述:

添加于python3.2

提供更加高效的接口来实现异步执行

通过具体实现来剖析

0x02 具体实现

参考官方文档给出的例子

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
  1. 通过ThreadPoolExecutor来生成一个Executor对象

    源码位置

    Lib\concurrent\futures\thread.py 83行

    一共两个参数max_workers=None、thread_name_profix=''

    max_workers

    用来指定最大线程数

    if max_workers is None:
        # Use this number because ThreadPoolExecutor is often
        # used to overlap I/O instead of CPU work.
        max_workers = (os.cpu_count() or 1) * 5

    先判断是否是None,即未指定时,默认是cpu数量*5,比如你是四核的cpu,那么默认最大线程20

    再判断是否小于0,若小于则抛出ValueError

  2. 调用ThreadPoolExecutor对象的submit方法

    ThreadPoolExecutor继承了Executor对象,并实现了部分重写
    源码位置

    Lib\concurrent\futures\thread.py 106行

    一共三个参数 fn , rags , *kwargs
    submit方法执行 fn(args,*kwargs) , 然后返回一个Future对象

  3. 调用Future对象的result方法

    返回被执行函数的结果
    源码位置

    Lib\concurrent\futures\_base.py 378行

    一个参数timeout = None

    不指定时,将不会限制等待时间,即一直等到函数完成
    如果在指定时间内函数未完成,则抛出异常TimtoutError

  4. 因为with上下文管理器的原因,自动调用Executor对象的shutdown方法来释放资源

    直接使用with 语句即可

另一个例子

from telnetlib import Telnet
def detect_port(port):
    try:
        print('{} testing .. '.format(port))
        Telnet('127.0.0.1',port,timeout=5)
        print('{} opened '.format(port))
    except:
        pass
with ThreadPoolExecutor(100)as executor:
  executor.map(detect_port,range(500))

map方法

完全继承于Executor

源码位置

Lib\concurrent\futures\_base.py

一共四个参数 fn , *iterables , timeout = None , chunksize = 1

chunsize用于ProcessPoolExecutor,其他的参数同上面的submit

def map(self, fn, *iterables, timeout=None, chunksize=1):

    if timeout is not None:
        end_time = timeout + time.time()

    fs = [self.submit(fn, *args) for args in zip(*iterables)]

    def result_iterator():
        try:
            for future in fs:
                if timeout is None:
                    yield future.result()
                else:
                    yield future.result(end_time - time.time())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()

map里面调用了内置函数zip,zip函数的特点是

In [1]: print(*zip('abc',[1,2,3,4]))
('a', 1) ('b', 2) ('c', 3)

举例说

def detect_port(ip,port):
    检测端口
with ThreadPoolExecutor()as ecexutor:
    executor.map(detect_port,['127.0.0.1','192.168.1.10'],['21','22','23'])

原本是打算逐个ip扫描这三个端口是否开放

但是此时只会扫描127.0.0.1的21端口,以及192.168.1.10的23端口

def detect_port(ip,port):
    检测端口
    
def detect_all_ip(ip):
    ports = [xx,xx,..]
    with ThreadPoolExecutor()as executor:
        [executor.submit(detect_port,ip,ports) for port in ports]

def main():
    ips = ['xxx','xxx',..]
    with ThreadPoolExecutor()as executor:
        executor.map(detect_all_ip,ips)

0x03myself

如下是之前写的通过telnet扫描端口的脚本,便是利用了ThreadPoolExecutor,可供参考
detectPortByTelnet.py下载


本文由 TEag1e 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论

icon_redface.gificon_idea.gificon_cool.gif2016kuk.gificon_mrgreen.gif2016shuai.gif2016tp.gif2016db.gif2016ch.gificon_razz.gif2016zj.gificon_sad.gificon_cry.gif2016zhh.gificon_question.gif2016jk.gif2016bs.gificon_lol.gif2016qiao.gificon_surprised.gif2016fendou.gif2016ll.gificon_mrgreen.pngicon_neutral.pngicon_twisted.pngicon_arrow.pngicon_eek.pngicon_smile.pngicon_confused.pngicon_cool.pngicon_evil.pngicon_biggrin.pngicon_idea.pngicon_redface.pngicon_razz.pngicon_rolleyes.pngicon_wink.pngicon_cry.pngicon_surprised.pngicon_lol.pngicon_mad.pngicon_sad.pngicon_exclaim.pngicon_question.pngicon_rolleyes.gif2016gz.gif2016kun.gif2016zhem.gif2016am.gif2016kel.gificon_twisted.gif2016lh.gificon_neutral.gif2016ka.gif2016tx.gificon_evil.gif2016bb.gif2016yun.gif2016qq.gif2016baojin.gificon_confused.gif2016kk.gif2016zk.gif2016kb.gificon_mad.gif2016yhh.gificon_exclaim.gif2016xia.gif2016gg.gif2016qd.gificon_smile.gif2016lengh.gificon_biggrin.gif2016bz.gif2016wq.gificon_eek.gificon_arrow.gificon_wink.gif2016tuu.gif