PyQt QThreadPool и QRunnable в управлении потоками в Python

В этом руководстве вы узнаете, как создать многопоточное приложение PyQt, использующее классы QThreadPool и QRunnable.

Содержание

Что такое классы QThreadPool и QRunnable?

Класс QThread позволяет вам выгрузить длительную задачу в рабочий поток, чтобы сделать приложение более отзывчивым. Класс QThread отлично работает, если приложение имеет несколько рабочих потоков.

Многопоточная программа эффективна, когда в ней имеется количество объектов QThread, соответствующее количеству ядер ЦП.

Кроме того, создание потоков довольно затратно с точки зрения ресурсов компьютера. Поэтому программа должна использовать максимально повторно созданные потоки.

Таким образом, использование класса QThread для управления рабочими потоками сопряжено с двумя основными проблемами:

  • Определение идеального количества потоков для приложения на основе количества ядер ЦП.
  • По возможности повторное использование потоков.

К счастью, в PyQt есть класс QThreadPool, который решает эти проблемы для вас. Класс QThreadPool часто используется с классом QRunnable.

  • Класс QRunnable представляет задачу, которую вы хотите выполнить в рабочем потоке.
  • QThreadPool выполняет объект QRunnable, а также автоматически управляет и перезапускает потоки.

Каждое приложение Qt имеет глобальный объект QThreadPool, доступ к которому можно получить через статический метод globalInstance() класса QThreadPool.

Чтобы использовать классы QThreadPool и QRunnable, выполните следующие действия:

  • Сначала создайте класс, который наследуется от класса QRunnable, и переопределите метод run():
class Worker(QRunnable):
    @Slot()
    def run(self):
        # place a long-running task here
        pass
  • Во-вторых, откройте пул потоков из главного окна и запустите рабочие потоки:
class MainWindow(QMainWindow):
    # other methods
    # ...

    def start(self):
        """ Create and execute worker threads
        """
        pool = QThreadPool.globalInstance()
        for _ in range(1, 100):
            pool.start(Worker())

Для обновления хода выполнения рабочего процесса в основном потоке используются сигналы и слоты. Однако QRunnable не поддерживает сигнал. Поэтому вам нужно определить отдельный класс, который наследует от QObject и использует этот класс в классе Worker.

Вот как это сделать:

  • Сначала определите класс Signals, который является подклассом класса QObject:
class Signals(QObject):
    completed = Signal()

В классе Signals мы определяем один сигнал, который называется complete. Обратите внимание, что вы можете определить столько сигналов, сколько необходимо.

  • Во-вторых, подайте сигнал о завершении работы в классе Worker:
class Runnable(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = Signals()

    @Slot()
    def run(self):
        # long running task
        # ...
        # emit the completed signal
        self.signals.completed.emit()
  • В-третьих, соедините сигнал рабочего потока со слотом главного окна перед отправкой рабочего потока в пул:
class MainWindow(QMainWindow):
    # other methods
    # ...

    def start(self):
        """ Create and execute worker threads
        """
        pool = QThreadPool.globalInstance()
        for _ in range(1, 100):
            worker = Worker()
            worker.signals.completed.connect(self.update)
            pool.start(worker)

    def update(self):
        # update the worker
        pass

Пример QThreadPool и QRunnable

Следующий пример иллюстрирует, как использовать классы QThreadPool и QRunnable:

import sys
import time
from PyQt6.QtWidgets import QApplication, QMainWindow, QPushButton, QGridLayout, QWidget, QProgressBar, QListWidget
from PyQt6.QtCore import QRunnable, QObject, QThreadPool, pyqtSignal as Signal, pyqtSlot as Slot


class Signals(QObject):
    started = Signal(int)
    completed = Signal(int)


class Worker(QRunnable):
    def __init__(self, n):
        super().__init__()
        self.n = n
        self.signals = Signals()

    @Slot()
    def run(self):
        self.signals.started.emit(self.n)
        time.sleep(self.n*1.1)
        self.signals.completed.emit(self.n)


class MainWindow(QMainWindow):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.setWindowTitle('QThreadPool Demo')

        self.job_count = 10
        self.comleted_jobs = []

        widget = QWidget()
        widget.setLayout(QGridLayout())
        self.setCentralWidget(widget)

        self.btn_start = QPushButton('Start', clicked=self.start_jobs)
        self.progress_bar = QProgressBar(minimum=0, maximum=self.job_count)
        self.list = QListWidget()

        widget.layout().addWidget(self.list, 0, 0, 1, 2)
        widget.layout().addWidget(self.progress_bar, 1, 0)
        widget.layout().addWidget(self.btn_start, 1, 1)

        self.show()

    def start_jobs(self):
        self.restart()
        pool = QThreadPool.globalInstance()
        for i in range(1, self.job_count+1):
            worker = Worker(i)
            worker.signals.completed.connect(self.complete)
            worker.signals.started.connect(self.start)
            pool.start(worker)

    def restart(self):
        self.progress_bar.setValue(0)
        self.comleted_jobs = []
        self.btn_start.setEnabled(False)

    def start(self, n):
        self.list.addItem(f'Job #{n} started...')

    def complete(self, n):
        self.list.addItem(f'Job #{n} completed.')
        self.comleted_jobs.append(n)
        self.progress_bar.setValue(len(self.comleted_jobs))

        if len(self.comleted_jobs) == self.job_count:
            self.btn_start.setEnabled(True)


if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = MainWindow()
    sys.exit(app.exec())


def complete(self, n):
    self.list.addItem(f'Job #{n} completed.')
    self.comleted_jobs.append(n)
    self.progress_bar.setValue(len(self.comleted_jobs))

    if len(self.comleted_jobs) == self.job_count:
        self.btn_start.setEnabled(True)

Выход:

Как использовать класс QThreadPool

Класс Signals

Определим класс Signals, который наследует класс QObject для поддержки сигналов. В классе Signals мы определяем два сигнала:

  • Сигнал запуска будет подан при запуске рабочего процесса.
  • Сигнал о завершении работы будет подан, когда рабочий поток завершит свою работу.

Оба сигнала принимают целое число, идентифицирующее номер задания:

class Signals(QObject):
    started = Signal(int)
    completed = Signal(int)

Класс Worker

Класс Worker наследуется от класса QRunnable. Класс Worker представляет собой длительную задачу, которую мы выгружаем в рабочий поток:

class Worker(QRunnable):
    def __init__(self, n):
        super().__init__()
        self.n = n
        self.signals = Signals()

    @Slot()
    def run(self):
        self.signals.started.emit(self.n)
        time.sleep(self.n*1.1)
        self.signals.completed.emit(self.n)

Сначала инициализируйте номер задания(n) и объект Signals в методе __init__().

Затем переопределите метод run() класса QRunnable. Для имитации длительной задачи мы используем функцию sleep() модуля time. Перед запуском таймера мы посылаем сигнал started; после завершения таймера мы посылаем сигнал complete.

Класс MainWindow

Класс MainWindow определяет пользовательский интерфейс приложения:

class MainWindow(QMainWindow):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.setWindowTitle('QThreadPool Demo')

        self.comleted_jobs = []
        self.job_count = 10

        widget = QWidget()
        widget.setLayout(QGridLayout())
        self.setCentralWidget(widget)

        self.btn_start = QPushButton('Start', clicked=self.start_jobs)
        self.progress_bar = QProgressBar(minimum=0, maximum=self.job_count)
        self.list = QListWidget()

        widget.layout().addWidget(self.list, 0, 0, 1, 2)
        widget.layout().addWidget(self.progress_bar, 1, 0)
        widget.layout().addWidget(self.btn_start, 1, 1)

        self.show()

    def start_jobs(self):
        self.restart()

        pool = QThreadPool.globalInstance()
        for i in range(1, self.job_count+1):
            runnable = Worker(i)
            runnable.signals.completed.connect(self.complete)
            runnable.signals.started.connect(self.start)
            pool.start(runnable)

    def restart(self):
        self.progress_bar.setValue(0)
        self.comleted_jobs = []
        self.btn_start.setEnabled(False)

    def start(self, n):
        self.list.addItem(f'Job #{n} started...')

    def complete(self, n):
        self.list.addItem(f'Job #{n} completed.')
        self.comleted_jobs.append(n)
        self.progress_bar.setValue(len(self.comleted_jobs))

        if len(self.comleted_jobs) == self.job_count:
            self.btn_start.setEnabled(True)
  • Сначала инициализируем количество заданий(job_count) и список завершенных заданий в методе __init__() класса MainWindow:
self.job_count = 10
self.comleted_jobs = []
  • Во-вторых, определите метод start_jobs(), который будет выполняться, когда пользователь нажимает кнопку «Пуск»:
def start_jobs(self):
    self.restart()
    pool = QThreadPool.globalInstance()
    for i in range(1, self.job_count+1):
        worker = Worker(i)
        worker.signals.completed.connect(self.complete)
        worker.signals.started.connect(self.start)
        pool.start(worker)

Функция restart() сбрасывает complete_jobs, обновляет индикатор выполнения до нуля и отключает кнопку запуска:

def restart(self):
    self.progress_bar.setValue(0)
    self.comleted_jobs = []
    self.btn_start.setEnabled(False)

Чтобы получить объект QThreadPool, мы используем globalInstance() класса QThreadPool:

pool = QThreadPool.globalInstance()

Мы создаем несколько рабочих потоков, подключаем их сигналы к методам класса MainWindow и запускаем рабочие потоки с помощью метода start() объекта QThreadPool.

Метод start() добавляет сообщение, запускающее рабочий поток, в QListWidget:

def start(self, n):
    self.list.addItem(f'Job #{n} started...')

Метод complete() запускается каждый раз, когда завершается рабочий поток. Он добавляет сообщение в QListWidget, обновляет полосу прогресса и активирует кнопку запуска, если все рабочие потоки завершены:

def complete(self, n):
    self.list.addItem(f'Job #{n} completed.')
    self.comleted_jobs.append(n)
    self.progress_bar.setValue(len(self.comleted_jobs))

    if len(self.comleted_jobs) == self.job_count:
        self.btn_start.setEnabled(True)

Использование QThreadPool для получения цен на акции

Следующая программа листинга ценных бумаг считывает символы акций из файла symbols.txt и использует QThreadPool для получения цен акций с сайта Yahoo Finance:

Пример программы листинга ценных бумаг

Программа листинга акций:

import sys
from pathlib import Path

from PyQt6.QtCore import QRunnable, Qt, QObject, QThreadPool, pyqtSignal as Signal, pyqtSlot as Slot
from PyQt6.QtWidgets import QApplication,  QMainWindow, QPushButton, QWidget, QGridLayout, QProgressBar, QTableWidget, QTableWidgetItem
from PyQt6.QtGui import QIcon

from lxml import html
import requests


class Signals(QObject):
    completed = Signal(dict)


class Stock(QRunnable):
    BASE_URL = 'https://finance.yahoo.com/quote/'

    def __init__(self, symbol):
        super().__init__()
        self.symbol = symbol
        self.signal = Signals()

    @Slot()
    def run(self):
        stock_url = f'{self.BASE_URL}{self.symbol}'
        headers = {"User-Agent": "Mozilla/5.0(Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"}
        response = requests.get(stock_url, headers=headers)

        if response.status_code != 200:
            self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
            return

        tree = html.fromstring(response.text)
        price_text = tree.xpath(
            '//*[@id="quote-header-info"]/div[3]/div[1]/div[1]/fin-streamer[1]/text()'
        )

        if not price_text:
            self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
            return

        price = float(price_text[0].replace(',', ''))

        self.signal.completed.emit({'symbol': self.symbol, 'price': price})


class Window(QMainWindow):
    def __init__(self, filename, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.symbols = self.read_symbols(filename)

        self.results = []

        self.setWindowTitle('Stock Listing')
        self.setGeometry(100, 100, 400, 300)
        self.setWindowIcon(QIcon('./assets/stock.png'))

        widget = QWidget()
        widget.setLayout(QGridLayout())
        self.setCentralWidget(widget)

        # set up button & progress bar
        self.btn_start = QPushButton('Get Prices', clicked=self.get_prices)
        self.progress_bar = QProgressBar(minimum=1, maximum=len(self.symbols))

        # set up table widget
        self.table = QTableWidget(widget)
        self.table.setColumnCount(2)
        self.table.setColumnWidth(0, 150)
        self.table.setColumnWidth(1, 150)

        self.table.setHorizontalHeaderLabels(['Symbol', 'Price'])

        widget.layout().addWidget(self.table, 0, 0, 1, 2)
        widget.layout().addWidget(self.progress_bar, 1, 0)
        widget.layout().addWidget(self.btn_start, 1, 1)

        # show the window
        self.show()

    def read_symbols(self, filename):
        """ 
        Read symbols from a file
        """
        path = Path(filename)
        text = path.read_text()
        return [symbol.strip() for symbol in text.split('\n')]

    def reset_ui(self):
        self.progress_bar.setValue(1)
        self.table.setRowCount(0)

    def get_prices(self):
        # reset ui
        self.reset_ui()

        # start worker threads
        pool = QThreadPool.globalInstance()
        stocks = [Stock(symbol) for symbol in self.symbols]
        for stock in stocks:
            stock.signal.completed.connect(self.update)
            pool.start(stock)

    def update(self, data):
        # add a row to the table
        row = self.table.rowCount()
        self.table.insertRow(row)
        self.table.setItem(row, 0, QTableWidgetItem(data['symbol']))
        self.table.setItem(row, 1, QTableWidgetItem(str(data['price'])))

        # update the progress bar
        self.progress_bar.setValue(row + 1)

        # sort the list by symbols once completed
        if row == len(self.symbols) - 1:
            self.table.sortItems(0, Qt.SortOrder.AscendingOrder)


if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = Window('symbols.txt')
    sys.exit(app.exec())

Как это работает.

В классе Signals

Мы определяем класс Signals, который является подклассом QObject. Класс Signals имеет одну завершенную переменную класса, которая является экземпляром класса Signal.

Завершенный сигнал содержит словарь и выдается после того, как программа завершает получение цены акций.

class Signals(QObject):
    completed = Signal(dict)

В классе Stock

Класс STock наследуется от класса QRunnable. Он переопределяет метод run(), который получает цену акций с сайта Yahoo Finance.

После завершения метод run() выдает сигнал завершения с символом акции и ценой.

Если возникает ошибка, например, символ не найден или веб-сайт меняет способ отображения цены акций, метод run() возвращает символ с ценой в виде строки N/A.

class Stock(QRunnable):
    BASE_URL = 'https://finance.yahoo.com/quote/'

    def __init__(self, symbol):
        super().__init__()
        self.symbol = symbol
        self.signal = Signals()

    @Slot()
    def run(self):
        stock_url = f'{self.BASE_URL}{self.symbol}'
        headers = {"User-Agent": "Mozilla/5.0(Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"}
        response = requests.get(stock_url, headers=headers)
        if response.status_code != 200:
            self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
            return

        tree = html.fromstring(response.text)
        price_text = tree.xpath(
            '//*[@id="quote-header-info"]/div[3]/div[1]/div[1]/fin-streamer[1]/text()'
        )

        if not price_text:
            self.signal.completed.emit({'symbol': self.symbol, 'price': 'N/A'})
            return

        price = float(price_text[0].replace(',', ''))

        self.signal.completed.emit({'symbol': self.symbol, 'price': price})

Обратите внимание, что Yahoo Finance может изменить свою структуру. Чтобы программа заработала, вам нужно изменить XPath цены на новую:

//*[@id="quote-header-info"]/div[3]/div[1]/div[1]/fin-streamer[1]/text()

В классе MainWindow

  • Сначала считаем символы из файла и присвоим их переменным self.symbols:
 self.symbols = self.read_symbols(filename)

Метод read_symbols() выглядит следующим образом:

def read_symbols(self, filename):
    path = Path(filename)
    text = path.read_text()
    return [symbol.strip() for symbol in text.split('\n')]

Текстовый файл(symbols.txt) содержит по одному символу в строке:

AAPL	
MSFT
GOOG	
AMZN
TSLA
META
NVDA
BABA
CRM
INTC
PYPL	
AMD
ATVI
EA
TTD
ORCL
  • Во-вторых, определите get_prices, который использует QThreadPool для создания рабочих потоков для получения цен акций:
def get_prices(self):
    # reset ui
    self.reset_ui()

    # start worker threads
    pool = QThreadPool.globalInstance()
    stocks = [Stock(symbol) for symbol in self.symbols]
    for stock in stocks:
        stock.signal.completed.connect(self.update)
        pool.start(stock)

Метод reset_ui() очищает все строки QTableWidget и устанавливает индикатор выполнения на минимальное значение:

def reset_ui(self):
    self.table.setRowCount(0)
    self.progress_bar.setValue(1)
  • В-третьих, определите метод update(), который будет вызываться после завершения каждого рабочего потока. Метод update() добавляет новую строку в таблицу, обновляет индикатор выполнения и сортирует символы после завершения всех рабочих потоков:
def update(self, data):
    # add a row to the table
    row = self.table.rowCount()
    self.table.insertRow(row)
    self.table.setItem(row, 0, QTableWidgetItem(data['symbol']))
    self.table.setItem(row, 1, QTableWidgetItem(str(data['price'])))

    # update the progress bar
    self.progress_bar.setValue(row + 1)

    # sort the list by symbols once completed
    if row == len(self.symbols) - 1:
        self.table.sortItems(0, Qt.SortOrder.AscendingOrder)
Похожие посты
Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *