Python 线程池执行器

  • A+
所属分类:Python

摘要:在本教程中,您将学习如何使用 PythonThreadPoolExecutor开发多线程程序。

在多线程教程Thread中,您学习了如何使用模块的类来管理程序中的多个线程threadingThread您想要手动创建线程时,该类很有用。

然而,手动管理线程效率不高,因为频繁地创建和销毁许多线程在计算成本方面非常昂贵。

如果您希望在程序中运行许多临时任务,您可能希望重用线程,而不是这样做。线程池允许您实现这一点。

线程池

线程池是一种在程序中实现并发执行的模式。线程池允许您自动高效地管理线程池:

Python 线程池执行器

池中的每个线程称为工作线程或工作线程。线程池允许您在任务完成后重用工作线程。它还可以防止意外故障,例如异常

通常,线程池允许您配置工作线程的数量并为每个工作线程提供特定的命名约定。

要创建线程池,请使用模块中的ThreadPoolExecutorconcurrent.futures

线程池执行器

该类扩展ThreadPoolExecutor该类并返回一个对象。ExecutorFuture

执行者

该类Executor具有三种控制线程池的方法:

  • submit()– 调度一个要执行的函数并返回一个Future对象。submit()方法接受一个函数并异步执行它。
  • map()– 为迭代中的每个元素异步执行一个函数。
  • shutdown()– 关闭执行器。

当您创建ThreadPoolExecutor该类的新实例时,Python 会启动Executor.

一旦完成与 executor 的工作,您必须显式调用该shutdown()方法以释放 executor 持有的资源。为避免shutdown()显式调用该方法,您可以使用上下文管理器

未来对象

AFuture是表示异步操作的最终结果的对象。Future 类有两个有用的方法:

  • result()– 返回异步操作的结果。
  • exception()– 如果发生异常,则返回异步操作的异常。

Python ThreadPoolExecutor 示例

以下程序使用单个线程:

from time import sleep, perf_counter def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")

代码语言: Python python

输出:

Starting the task 1... Done with task 1 Starting the task 2... Done with task 2 It took 2.0144479 second(s) to finish.

代码语言: Python python

这个怎么运作。

首先,定义task()需要大约一秒钟才能完成的函数。task()函数调用该sleep()函数来模拟一个延迟:

def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}'

代码语言: Python python

其次,调用该task()函数两次并打印出结果。在调用task()函数之前和之后,我们使用perf_counter()来测量开始和结束时间:

start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter()

代码语言: Python python

第三,打印出程序运行的时间:

print(f"It took {finish-start} second(s) to finish.")

代码语言: Python python

因为该task()函数需要一秒钟,所以调用它两次大约需要 2 秒钟。

使用 submit() 方法示例

task()同时运行该函数,您可以使用ThreadPoolExecutor该类:

from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")

代码语言: Python python

输出:

Starting the task 1... Starting the task 2... Done with task 1 Done with task 2 It took 1.0177214 second(s) to finish.

代码语言: Python python

输出显示该程序大约需要 1 秒才能完成。

它是如何工作的(我们将专注于线程池部分):

ThreadPoolExecutor首先,从concurrent.futures模块中导入类:

from concurrent.futures import ThreadPoolExecutor

代码语言: Python python

ThreadPoolExecutor其次,使用上下文管理器创建线程池:

with ThreadPoolExecutor() as executor:

代码语言: Python python

第三,task()通过将函数传递给执行程序的方法来调用该函数两次submit()

with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result())

代码语言: Python python

submit()方法返回一个 Future 对象。在这个例子中,我们有两个 Future 对象f1f2. 为了从 Future 对象中获取结果,我们调用了它的result()方法。

使用 map() 方法示例

下面的程序使用一个ThreadPoolExecutor类。但是,它不是使用该submit()方法,而是使用该map()方法来执行一个函数:

from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: results = executor.map(task, [1,2]) for result in results: print(result) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")

代码语言: Python python

这个怎么运作。

首先,调用map()executor 对象的方法,为列表 [1,2] 中的每个 id 运行任务函数。map()方法返回一个包含函数调用结果的迭代器。

results = executor.map(task, [1,2])

代码语言: Python python

其次,遍历结果并打印出来:

for result in results: print(result)

代码语言: Python python

Python ThreadPoolExecutor 实例

以下程序使用线程池从 Wikipedia 下载多个图像:

from concurrent.futures import ThreadPoolExecutor from urllib.request import urlopen import time import os def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...') start = time.perf_counter() urls = ['https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg', 'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg', 'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg', 'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG', 'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg'] with ThreadPoolExecutor() as executor: executor.map(download_image, urls) finish = time.perf_counter() print(f'It took {finish-start} second(s) to finish.')

代码语言: Python python

这个怎么运作。

首先,定义一个download_image()从 URL 下载图像并将其保存到文件中的函数:

def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...')

代码语言: Python python

download_image()函数urlopen()urllib.request模块中的函数从 URL 下载图像。

二、通过调用对象的方法,download_image()使用线程池执行函数map()ThreadPoolExecutor

with ThreadPoolExecutor() as executor: executor.map(download_image, urls)

代码语言: Python python

概括

  • 线程池是一种有效管理多个线程的模式。
  • 在 Python 中使用ThreadPoolExecutor类来管理线程池。
  • 调用 的submit()方法将ThreadPoolExecutor任务提交到线程池执行。submit()方法返回一个 Future 对象。
  • 调用to map的map()方法在线程池中执行一个函数,其中每个元素都在一个列表中。ThreadPoolExecutor
  • 我的微信
  • 这是我的微信扫一扫
  • weinxin
  • 我的微信公众号
  • 我的微信公众号扫一扫
  • weinxin