- A+
摘要:在本教程中,您将学习如何使用 PythonThreadPoolExecutor
开发多线程程序。
在多线程教程Thread
中,您学习了如何使用模块的类来管理程序中的多个线程threading
。当Thread
您想要手动创建线程时,该类很有用。
然而,手动管理线程效率不高,因为频繁地创建和销毁许多线程在计算成本方面非常昂贵。
如果您希望在程序中运行许多临时任务,您可能希望重用线程,而不是这样做。线程池允许您实现这一点。
线程池
线程池是一种在程序中实现并发执行的模式。线程池允许您自动高效地管理线程池:
池中的每个线程称为工作线程或工作线程。线程池允许您在任务完成后重用工作线程。它还可以防止意外故障,例如异常。
通常,线程池允许您配置工作线程的数量并为每个工作线程提供特定的命名约定。
要创建线程池,请使用模块中的ThreadPoolExecutor
类concurrent.futures
。
线程池执行器
该类扩展了ThreadPoolExecutor
该类并返回一个对象。Executor
Future
执行者
该类Executor
具有三种控制线程池的方法:
– 调度一个要执行的函数并返回一个submit()
Future
对象。该submit()
方法接受一个函数并异步执行它。map()
– 为迭代中的每个元素异步执行一个函数。shutdown()
– 关闭执行器。
当您创建ThreadPoolExecutor
该类的新实例时,Python 会启动Executor
.
一旦完成与 executor 的工作,您必须显式调用该shutdown()
方法以释放 executor 持有的资源。为避免shutdown()
显式调用该方法,您可以使用上下文管理器。
未来对象
AFuture
是表示异步操作的最终结果的对象。Future 类有两个有用的方法:
result()
– 返回异步操作的结果。exception()
– 如果发生异常,则返回异步操作的异常。
Python ThreadPoolExecutor 示例
以下程序使用单个线程:
代码语言: Python (python )
from time import sleep, perf_counter def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
输出:
代码语言: Python (python )
Starting the task 1... Done with task 1 Starting the task 2... Done with task 2 It took 2.0144479 second(s) to finish.
这个怎么运作。
首先,定义
需要大约一秒钟才能完成的函数。该task()
函数调用该task()
sleep()
函数来模拟一个延迟:
代码语言: Python (python )
def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}'
其次,调用该
函数两次并打印出结果。在调用task()
函数之前和之后,我们使用task()
perf_counter()
来测量开始和结束时间:
代码语言: Python (python )
start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter()
第三,打印出程序运行的时间:
代码语言: Python (python )
print(f"It took {finish-start} second(s) to finish.")
因为该task()
函数需要一秒钟,所以调用它两次大约需要 2 秒钟。
使用 submit() 方法示例
要task()
同时运行该函数,您可以使用ThreadPoolExecutor
该类:
代码语言: Python (python )
from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
输出:
代码语言: Python (python )
Starting the task 1... Starting the task 2... Done with task 1 Done with task 2 It took 1.0177214 second(s) to finish.
输出显示该程序大约需要 1 秒才能完成。
它是如何工作的(我们将专注于线程池部分):
ThreadPoolExecutor
首先,从concurrent.futures
模块中导入类:
代码语言: Python (python )
from concurrent.futures import ThreadPoolExecutor
ThreadPoolExecutor
其次,使用上下文管理器创建线程池:
代码语言: Python (python )
with ThreadPoolExecutor() as executor:
第三,task()
通过将函数传递给执行程序的方法来调用该函数两次submit()
:
代码语言: Python (python )
with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result())
该submit()
方法返回一个 Future 对象。在这个例子中,我们有两个 Future 对象f1
和f2
. 为了从 Future 对象中获取结果,我们调用了它的result()
方法。
使用 map() 方法示例
下面的程序使用一个ThreadPoolExecutor
类。但是,它不是使用该submit()
方法,而是使用该map()
方法来执行一个函数:
代码语言: Python (python )
from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: results = executor.map(task, [1,2]) for result in results: print(result) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
这个怎么运作。
首先,调用
executor 对象的方法,为列表 [1,2] 中的每个 id 运行任务函数。该map()
方法返回一个包含函数调用结果的迭代器。map()
代码语言: Python (python )
results = executor.map(task, [1,2])
其次,遍历结果并打印出来:
代码语言: Python (python )
for result in results: print(result)
Python ThreadPoolExecutor 实例
以下程序使用线程池从 Wikipedia 下载多个图像:
代码语言: Python (python )
from concurrent.futures import ThreadPoolExecutor from urllib.request import urlopen import time import os def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...') start = time.perf_counter() urls = ['https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg', 'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg', 'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg', 'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG', 'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg'] with ThreadPoolExecutor() as executor: executor.map(download_image, urls) finish = time.perf_counter() print(f'It took {finish-start} second(s) to finish.')
这个怎么运作。
首先,定义一个download_image()
从 URL 下载图像并将其保存到文件中的函数:
代码语言: Python (python )
def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...')
该download_image()
函数urlopen()
从urllib.request
模块中的函数从 URL 下载图像。
二、通过调用对象的方法,download_image()
使用线程池执行函数:map()
ThreadPoolExecutor
代码语言: Python (python )
with ThreadPoolExecutor() as executor: executor.map(download_image, urls)
概括
- 线程池是一种有效管理多个线程的模式。
- 在 Python 中使用
ThreadPoolExecutor
类来管理线程池。 - 调用 的
方法将submit()
ThreadPoolExecutor
任务提交到线程池执行。该
方法返回一个 Future 对象。submit()
- 调用to map的
map()
方法在线程池中执行一个函数,其中每个元素都在一个列表中。ThreadPoolExecutor
- 我的微信
- 这是我的微信扫一扫
- 我的微信公众号
- 我的微信公众号扫一扫