ThreadPoolExecutor в Python на практических примерах
В этом руководстве вы узнаете, как использовать класс ThreadPoolExecutor в Python для разработки многопоточных программ на практических примерах.
- Что такое класс ThreadPoolExecutor в Python?
- Пул потоков
- ThreadPoolExecutor
- Executor
- Future
- Примеры ThreadPoolExecutor в Python
- Использование метода submit()
- Пример с использованием метода map()
- Практический пример с ThreadPoolExecutor
Что такое класс ThreadPoolExecutor в Python?
В руководстве по многопоточности вы узнали, как управлять несколькими потоками в программе с помощью класса Thread модуля потоковой обработки. Класс Thread полезен, если вы хотите создавать потоки вручную.
Однако управление потоками вручную не всегда эффективно, поскольку частое создание и уничтожение большого количества потоков требует очень больших вычислительных затрат. Вместо этого вы можете повторно использовать потоки, если планируете запускать в программе множество специальных задач. Пул потоков позволяет добиться этого.
Пул потоков
Пул потоков — это шаблон для достижения параллельного выполнения в программе. Пул потоков позволяет автоматически эффективно управлять потоками:
Каждый поток в пуле называется рабочим потоком или worker. Пул потоков позволяет повторно использовать рабочие потоки после завершения задач. Он также защищает от непредвиденных сбоев, таких как исключения.
Обычно пул потоков позволяет настроить количество рабочих потоков и обеспечивает определенное соглашение об именах для каждого рабочего потока.
Чтобы создать пул потоков, вы используете класс ThreadPoolExecutor из модуля concurrent.futures.
ThreadPoolExecutor
Класс ThreadPoolExecutor расширяет класс Executor и возвращает объект Future.
Executor
Класс Executor имеет три метода для управления пулом потоков:
- submit() – отправляет функцию на выполнение и возвращает объект Future. Метод submit() принимает функцию и выполняет ее асинхронно.
- map() – выполняет функцию асинхронно для каждого элемента в итерации.
- Shutdown() – выключает Executor.
Когда вы создаете новый экземпляр класса ThreadPoolExecutor, Python запускает Executor.
После завершения работы с Executor необходимо явно вызвать метод Shutdown(), чтобы освободить ресурс,. Чтобы избежать явного вызова метода Shutdown(), вы можете использовать контекстный менеджер.
Future
Future — это объект, который представляет конечный результат асинхронной операции. Класс Future имеет два полезных метода:
- result() – возвращает результат асинхронной операции.
- exception() — возвращает исключение асинхронной операции в случае возникновения исключения.
Примеры ThreadPoolExecutor в Python
Следующая программа использует один поток:
from time import sleep, perf_counter def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
Выход:
Starting the task 1... Done with task 1 Starting the task 2... Done with task 2 It took 2.0144479 second(s) to finish.
Как это работает.
- Сначала определите функцию Task(), выполнение которой занимает около одной секунды. Функция Task() вызывает функцию Sleep() для имитации задержки:
def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}'
- Во-вторых, дважды вызовите функцию Task() и распечатайте результат. До и после вызова функции Task() мы используем perf_counter() для измерения времени начала и окончания:
start = perf_counter() print(task(1)) print(task(2)) finish = perf_counter()
- В-третьих, распечатайте время, необходимое для запуска программы:
print(f"It took {finish-start} second(s) to finish.")
Поскольку функция Task() занимает одну секунду, ее двойной вызов займет около 2 секунд.
Использование метода submit()
Чтобы одновременно запустить функцию Task(), вы можете использовать класс ThreadPoolExecutor:
from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
Выход:
Starting the task 1... Starting the task 2... Done with task 1 Done with task 2 It took 1.0177214 second(s) to finish.
Вывод показывает, что завершение программы заняло около 1 секунды.
Как это работает(мы сосредоточимся на пуле потоков):
- Сначала импортируйте класс ThreadPoolExecutor из модуля concurrent.futures:
from concurrent.futures import ThreadPoolExecutor
- Во-вторых, создайте пул потоков с помощью ThreadPoolExecutor с помощью диспетчера контекста:
with ThreadPoolExecutor() as executor:
- В-третьих, дважды вызывая функцию Task(), передавая ее в метод submit():
with ThreadPoolExecutor() as executor: f1 = executor.submit(task, 1) f2 = executor.submit(task, 2) print(f1.result()) print(f2.result())
Метод submit() возвращает объект Future. В этом примере у нас есть два объекта Future f1 и f2. Чтобы получить результат от объекта Future, мы вызвали его метод result().
Пример с использованием метода map()
Следующая программа использует класс ThreadPoolExecutor. Однако вместо использования метода submit() для выполнения функции используется метод map():
from time import sleep, perf_counter from concurrent.futures import ThreadPoolExecutor def task(id): print(f'Starting the task {id}...') sleep(1) return f'Done with task {id}' start = perf_counter() with ThreadPoolExecutor() as executor: results = executor.map(task, [1,2]) for result in results: print(result) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
Как это работает.
- Сначала вызовите метод map() объекта-исполнителя, чтобы запустить функцию задачи для каждого идентификатора в списке [1,2]. Метод map() возвращает итератор, содержащий результат вызовов функций.
results = executor.map(task, [1,2])
- Во-вторых, переберите результаты и распечатайте их:
for result in results: print(result)
Практический пример с ThreadPoolExecutor
Следующая программа загружает несколько изображений из Википедии, используя пул потоков:
from concurrent.futures import ThreadPoolExecutor from urllib.request import urlopen import time import os def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...') start = time.perf_counter() urls = ['https://upload.wikimedia.org/wikipedia/commons/9/9d/Python_bivittatus_1701.jpg', 'https://upload.wikimedia.org/wikipedia/commons/4/48/Python_Regius.jpg', 'https://upload.wikimedia.org/wikipedia/commons/d/d3/Baby_carpet_python_caudal_luring.jpg', 'https://upload.wikimedia.org/wikipedia/commons/f/f0/Rock_python_pratik.JPG', 'https://upload.wikimedia.org/wikipedia/commons/0/07/Dulip_Wilpattu_Python1.jpg'] with ThreadPoolExecutor() as executor: executor.map(download_image, urls) finish = time.perf_counter() print(f'It took {finish-start} second(s) to finish.')
Как это работает.
- Сначала определите функцию download_image(), которая загружает изображение по URL-адресу и сохраняет его в файл:
def download_image(url): image_data = None with urlopen(url) as f: image_data = f.read() if not image_data: raise Exception(f"Error: could not download the image from {url}") filename = os.path.basename(url) with open(filename, 'wb') as image_file: image_file.write(image_data) print(f'{filename} was downloaded...')
Функция download_image() и функция urlopen() из модуля urllib.request для загрузки изображения по URL-адресу.
- Во-вторых, выполните функцию download_image(), используя пул потоков, вызвав метод map() объекта ThreadPoolExecutor:
with ThreadPoolExecutor() as executor: executor.map(download_image, urls)