В этом руководстве вы узнаете, как создать многопоточное приложение PyQt, использующее классы QThreadPool и QRunnable.
Что такое классы QThreadPool и QRunnable?
Класс QThread позволяет вам выгрузить длительную задачу в рабочий поток, чтобы сделать приложение более отзывчивым. Класс QThread отлично работает, если приложение имеет несколько рабочих потоков.
Многопоточная программа эффективна, когда в ней имеется количество объектов QThread, соответствующее количеству ядер ЦП.
Кроме того, создание потоков довольно затратно с точки зрения ресурсов компьютера. Поэтому программа должна использовать максимально повторно созданные потоки.
Таким образом, использование класса QThread для управления рабочими потоками сопряжено с двумя основными проблемами:
- Определение идеального количества потоков для приложения на основе количества ядер ЦП.
- По возможности повторное использование потоков.
К счастью, в PyQt есть класс QThreadPool, который решает эти проблемы для вас. Класс QThreadPool часто используется с классом QRunnable.
- Класс QRunnable представляет задачу, которую вы хотите выполнить в рабочем потоке.
- QThreadPool выполняет объект QRunnable, а также автоматически управляет и перезапускает потоки.
Каждое приложение Qt имеет глобальный объект QThreadPool, доступ к которому можно получить через статический метод globalInstance() класса QThreadPool.
Чтобы использовать классы QThreadPool и QRunnable, выполните следующие действия:
- Сначала создайте класс, который наследуется от класса QRunnable, и переопределите метод run():
class Worker(QRunnable):
@Slot()
def run(self):
# place a long-running task here
pass
- Во-вторых, откройте пул потоков из главного окна и запустите рабочие потоки:
class MainWindow(QMainWindow):
# other methods
# ...
def start(self):
""" Create and execute worker threads
"""
pool = QThreadPool.globalInstance()
for _ in range(1, 100):
pool.start(Worker())
Для обновления хода выполнения рабочего процесса в основном потоке используются сигналы и слоты. Однако QRunnable не поддерживает сигнал. Поэтому вам нужно определить отдельный класс, который наследует от QObject и использует этот класс в классе Worker.
Вот как это сделать:
- Сначала определите класс Signals, который является подклассом класса QObject:
class Signals(QObject):
completed = Signal()
В классе Signals мы определяем один сигнал, который называется complete. Обратите внимание, что вы можете определить столько сигналов, сколько необходимо.
- Во-вторых, подайте сигнал о завершении работы в классе Worker:
class Runnable(QRunnable):
def __init__(self):
super().__init__()
self.signals = Signals()
@Slot()
def run(self):
# long running task
# ...
# emit the completed signal
self.signals.completed.emit()
- В-третьих, соедините сигнал рабочего потока со слотом главного окна перед отправкой рабочего потока в пул:
class MainWindow(QMainWindow):
# other methods
# ...
def start(self):
""" Create and execute worker threads
"""
pool = QThreadPool.globalInstance()
for _ in range(1, 100):
worker = Worker()
worker.signals.completed.connect(self.update)
pool.start(worker)
def update(self):
# update the worker
pass
Пример QThreadPool и QRunnable
Следующий пример иллюстрирует, как использовать классы QThreadPool и QRunnable:
import sys
import time
from PyQt6.QtWidgets import QApplication, QMainWindow, QPushButton, QGridLayout, QWidget, QProgressBar, QListWidget
from PyQt6.QtCore import QRunnable, QObject, QThreadPool, pyqtSignal as Signal, pyqtSlot as Slot
class Signals(QObject):
started = Signal(int)
completed = Signal(int)
class Worker(QRunnable):
def __init__(self, n):
super().__init__()
self.n = n
self.signals = Signals()
@Slot()
def run(self):
self.signals.started.emit(self.n)
time.sleep(self.n*1.1)
self.signals.completed.emit(self.n)
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle('QThreadPool Demo')
self.job_count = 10
self.comleted_jobs = []
widget = QWidget()
widget.setLayout(QGridLayout())
self.setCentralWidget(widget)
self.btn_start = QPushButton('Start', clicked=self.start_jobs)
self.progress_bar = QProgressBar(minimum=0, maximum=self.job_count)
self.list = QListWidget()
widget.layout().addWidget(self.list, 0, 0, 1, 2)
widget.layout().addWidget(self.progress_bar, 1, 0)
widget.layout().addWidget(self.btn_start, 1, 1)
self.show()
def start_jobs(self):
self.restart()
pool = QThreadPool.globalInstance()
for i in range(1, self.job_count+1):
worker = Worker(i)
worker.signals.completed.connect(self.complete)
worker.signals.started.connect(self.start)
pool.start(worker)
def restart(self):
self.progress_bar.setValue(0)
self.comleted_jobs = []
self.btn_start.setEnabled(False)
def start(self, n):
self.list.addItem(f'Job #{n} started...')
def complete(self, n):
self.list.addItem(f'Job #{n} completed.')
self.comleted_jobs.append(n)
self.progress_bar.setValue(len(self.comleted_jobs))
if len(self.comleted_jobs) == self.job_count:
self.btn_start.setEnabled(True)
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
sys.exit(app.exec())
def complete(self, n):
self.list.addItem(f'Job #{n} completed.')
self.comleted_jobs.append(n)
self.progress_bar.setValue(len(self.comleted_jobs))
if len(self.comleted_jobs) == self.job_count:
self.btn_start.setEnabled(True)
Выход:

Класс Signals
Определим класс Signals, который наследует класс QObject для поддержки сигналов. В классе Signals мы определяем два сигнала:
- Сигнал запуска будет подан при запуске рабочего процесса.
- Сигнал о завершении работы будет подан, когда рабочий поток завершит свою работу.
Оба сигнала принимают целое число, идентифицирующее номер задания:
class Signals(QObject):
started = Signal(int)
completed = Signal(int)
Класс Worker
Класс Worker наследуется от класса QRunnable. Класс Worker представляет собой длительную задачу, которую мы выгружаем в рабочий поток:
class Worker(QRunnable):
def __init__(self, n):
super().__init__()
self.n = n
self.signals = Signals()
@Slot()
def run(self):
self.signals.started.emit(self.n)
time.sleep(self.n*1.1)
self.signals.completed.emit(self.n)
Сначала инициализируйте номер задания(n) и объект Signals в методе __init__().
Затем переопределите метод run() класса QRunnable. Для имитации длительной задачи мы используем функцию sleep() модуля time. Перед запуском таймера мы посылаем сигнал started; после завершения таймера мы посылаем сигнал complete.
Класс MainWindow
Класс MainWindow определяет пользовательский интерфейс приложения:
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle('QThreadPool Demo')
self.comleted_jobs = []
self.job_count = 10
widget = QWidget()
widget.setLayout(QGridLayout())
self.setCentralWidget(widget)
self.btn_start = QPushButton('Start', clicked=self.start_jobs)
self.progress_bar = QProgressBar(minimum=0, maximum=self.job_count)
self.list = QListWidget()
widget.layout().addWidget(self.list, 0, 0, 1, 2)
widget.layout().addWidget(self.progress_bar, 1, 0)
widget.layout().addWidget(self.btn_start, 1, 1)
self.show()
def start_jobs(self):
self.restart()
pool = QThreadPool.globalInstance()
for i in range(1, self.job_count+1):
runnable = Worker(i)
runnable.signals.completed.connect(self.complete)
runnable.signals.started.connect(self.start)
pool.start(runnable)
def restart(self):
self.progress_bar.setValue(0)
self.comleted_jobs = []
self.btn_start.setEnabled(False)
def start(self, n):
self.list.addItem(f'Job #{n} started...')
def complete(self, n):
self.list.addItem(f'Job #{n} completed.')
self.comleted_jobs.append(n)
self.progress_bar.setValue(len(self.comleted_jobs))
if len(self.comleted_jobs) == self.job_count:
self.btn_start.setEnabled(True)
- Сначала инициализируем количество заданий(job_count) и список завершенных заданий в методе __init__() класса MainWindow:
self.job_count = 10 self.comleted_jobs = []
- Во-вторых, определите метод start_jobs(), который будет выполняться, когда пользователь нажимает кнопку «Пуск»:
def start_jobs(self):
self.restart()
pool = QThreadPool.globalInstance()
for i in range(1, self.job_count+1):
worker = Worker(i)
worker.signals.completed.connect(self.complete)
worker.signals.started.connect(self.start)
pool.start(worker)
Функция restart() сбрасывает complete_jobs, обновляет индикатор выполнения до нуля и отключает кнопку запуска:
def restart(self):
self.progress_bar.setValue(0)
self.comleted_jobs = []
self.btn_start.setEnabled(False)
Чтобы получить объект QThreadPool, мы используем globalInstance() класса QThreadPool:
pool = QThreadPool.globalInstance()
Мы создаем несколько рабочих потоков, подключаем их сигналы к методам класса MainWindow и запускаем рабочие потоки с помощью метода start() объекта QThreadPool.
Метод start() добавляет сообщение, запускающее рабочий поток, в QListWidget:
def start(self, n):
self.list.addItem(f'Job #{n} started...')
Метод complete() запускается каждый раз, когда завершается рабочий поток. Он добавляет сообщение в QListWidget, обновляет полосу прогресса и активирует кнопку запуска, если все рабочие потоки завершены:
def complete(self, n):
self.list.addItem(f'Job #{n} completed.')
self.comleted_jobs.append(n)
self.progress_bar.setValue(len(self.comleted_jobs))
if len(self.comleted_jobs) == self.job_count:
self.btn_start.setEnabled(True)
Использование QThreadPool для получения цен на акции
Следующая программа листинга ценных бумаг считывает символы акций из файла symbols.txt и использует QThreadPool для получения цен акций с сайта Yahoo Finance:

Программа листинга акций:
import sys
from pathlib import Path
from PyQt6.QtCore import QRunnable, Qt, QObject, QThreadPool, pyqtSignal as Signal, pyqtSlot as Slot
from PyQt6.QtWidgets import QApplication, QMainWindow, QPushButton, QWidget, QGridLayout, QProgressBar, QTableWidget, QTableWidgetItem
from PyQt6.QtGui import QIcon
from lxml import html
import requests
class Signals(QObject):
completed = Signal(dict)
class Stock(QRunnable):
BASE_URL = 'https://finance.yahoo.com/quote/'
def __init__(self, symbol):
super().__init__()
self.symbol = symbol
self.signal = Signals()
@Slot()
def run(self):
stock_url = f'{self.BASE_URL}{self.symbol}'
headers = {"User-Agent": "Mozilla/5.0(Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"}
response = requests.get(stock_url, headers=headers)
if response.status_code != 200:
self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
return
tree = html.fromstring(response.text)
price_text = tree.xpath(
'//*[@id="quote-header-info"]/div[3]/div[1]/div[1]/fin-streamer[1]/text()'
)
if not price_text:
self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
return
price = float(price_text[0].replace(',', ''))
self.signal.completed.emit({'symbol': self.symbol, 'price': price})
class Window(QMainWindow):
def __init__(self, filename, *args, **kwargs):
super().__init__(*args, **kwargs)
self.symbols = self.read_symbols(filename)
self.results = []
self.setWindowTitle('Stock Listing')
self.setGeometry(100, 100, 400, 300)
self.setWindowIcon(QIcon('./assets/stock.png'))
widget = QWidget()
widget.setLayout(QGridLayout())
self.setCentralWidget(widget)
# set up button & progress bar
self.btn_start = QPushButton('Get Prices', clicked=self.get_prices)
self.progress_bar = QProgressBar(minimum=1, maximum=len(self.symbols))
# set up table widget
self.table = QTableWidget(widget)
self.table.setColumnCount(2)
self.table.setColumnWidth(0, 150)
self.table.setColumnWidth(1, 150)
self.table.setHorizontalHeaderLabels(['Symbol', 'Price'])
widget.layout().addWidget(self.table, 0, 0, 1, 2)
widget.layout().addWidget(self.progress_bar, 1, 0)
widget.layout().addWidget(self.btn_start, 1, 1)
# show the window
self.show()
def read_symbols(self, filename):
"""
Read symbols from a file
"""
path = Path(filename)
text = path.read_text()
return [symbol.strip() for symbol in text.split('\n')]
def reset_ui(self):
self.progress_bar.setValue(1)
self.table.setRowCount(0)
def get_prices(self):
# reset ui
self.reset_ui()
# start worker threads
pool = QThreadPool.globalInstance()
stocks = [Stock(symbol) for symbol in self.symbols]
for stock in stocks:
stock.signal.completed.connect(self.update)
pool.start(stock)
def update(self, data):
# add a row to the table
row = self.table.rowCount()
self.table.insertRow(row)
self.table.setItem(row, 0, QTableWidgetItem(data['symbol']))
self.table.setItem(row, 1, QTableWidgetItem(str(data['price'])))
# update the progress bar
self.progress_bar.setValue(row + 1)
# sort the list by symbols once completed
if row == len(self.symbols) - 1:
self.table.sortItems(0, Qt.SortOrder.AscendingOrder)
if __name__ == '__main__':
app = QApplication(sys.argv)
window = Window('symbols.txt')
sys.exit(app.exec())
Как это работает.
В классе Signals
Мы определяем класс Signals, который является подклассом QObject. Класс Signals имеет одну завершенную переменную класса, которая является экземпляром класса Signal.
Завершенный сигнал содержит словарь и выдается после того, как программа завершает получение цены акций.
class Signals(QObject):
completed = Signal(dict)
В классе Stock
Класс STock наследуется от класса QRunnable. Он переопределяет метод run(), который получает цену акций с сайта Yahoo Finance.
После завершения метод run() выдает сигнал завершения с символом акции и ценой.
Если возникает ошибка, например, символ не найден или веб-сайт меняет способ отображения цены акций, метод run() возвращает символ с ценой в виде строки N/A.
class Stock(QRunnable):
BASE_URL = 'https://finance.yahoo.com/quote/'
def __init__(self, symbol):
super().__init__()
self.symbol = symbol
self.signal = Signals()
@Slot()
def run(self):
stock_url = f'{self.BASE_URL}{self.symbol}'
headers = {"User-Agent": "Mozilla/5.0(Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"}
response = requests.get(stock_url, headers=headers)
if response.status_code != 200:
self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
return
tree = html.fromstring(response.text)
price_text = tree.xpath(
'//*[@id="quote-header-info"]/div[3]/div[1]/div[1]/fin-streamer[1]/text()'
)
if not price_text:
self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
return
price = float(price_text[0].replace(',', ''))
self.signal.completed.emit({'symbol': self.symbol, 'price': price})
Обратите внимание, что Yahoo Finance может изменить свою структуру. Чтобы программа заработала, вам нужно изменить XPath цены на новую:
//*[@id="quote-header-info"]/div[3]/div[1]/div[1]/fin-streamer[1]/text()
В классе MainWindow
- Сначала считаем символы из файла и присвоим их переменным self.symbols:
self.symbols = self.read_symbols(filename)
Метод read_symbols() выглядит следующим образом:
def read_symbols(self, filename):
path = Path(filename)
text = path.read_text()
return [symbol.strip() for symbol in text.split('\n')]
Текстовый файл(symbols.txt) содержит по одному символу в строке:
AAPL MSFT GOOG AMZN TSLA META NVDA BABA CRM INTC PYPL AMD ATVI EA TTD ORCL
- Во-вторых, определите get_prices, который использует QThreadPool для создания рабочих потоков для получения цен акций:
def get_prices(self):
# reset ui
self.reset_ui()
# start worker threads
pool = QThreadPool.globalInstance()
stocks = [Stock(symbol) for symbol in self.symbols]
for stock in stocks:
stock.signal.completed.connect(self.update)
pool.start(stock)
Метод reset_ui() очищает все строки QTableWidget и устанавливает индикатор выполнения на минимальное значение:
def reset_ui(self):
self.table.setRowCount(0)
self.progress_bar.setValue(1)
- В-третьих, определите метод update(), который будет вызываться после завершения каждого рабочего потока. Метод update() добавляет новую строку в таблицу, обновляет индикатор выполнения и сортирует символы после завершения всех рабочих потоков:
def update(self, data):
# add a row to the table
row = self.table.rowCount()
self.table.insertRow(row)
self.table.setItem(row, 0, QTableWidgetItem(data['symbol']))
self.table.setItem(row, 1, QTableWidgetItem(str(data['price'])))
# update the progress bar
self.progress_bar.setValue(row + 1)
# sort the list by symbols once completed
if row == len(self.symbols) - 1:
self.table.sortItems(0, Qt.SortOrder.AscendingOrder) 