Взаимодействие и синхронизация потоков в Python

В этом руководстве вы узнаете, как использовать объект Event модуля threading в Python для взаимодействия и синхронизации между потоками.

Содержание

Введение в объект Event в Python

Иногда вам необходимо обмениваться данными между потоками. Для этого вы можете использовать lock (mutex) и логическую переменную boolean. Однако Python предоставляет вам лучший способ взаимодействия между потоками, используя класс Event из модуля threading.

Класс Event предлагает простой, но эффективный способ координации между потоками: один поток сигнализирует о событии, в то время как другие потоки его ждут.

Объект Event содержит логический флаг, который можно установить (True) или очистить (False). Несколько потоков могут дождаться установки события, прежде чем продолжить, или могут сбросить событие обратно.

Ниже показаны шаги по использованию объекта Event:

  • Сначала импортируйте Event из модуля threading:
from threading import Event
  • Затем создайте новый объект Event:
event = Event()

По умолчанию Event не установлено (сброшено). Метод is_set() объекта Event вернет False:

if event.is_set():
   # ...
  • Затем установите Event с помощью метода set():
event.set()

Как только Event установлено, все потоки, ожидающие этого события, будут уведомлены автоматически.

  • После этого отключите Event с помощью метода clear():
event.clear()
  • Наконец, потоки могут ожидать установки Event с помощью метода wait():
event.wait()

Метод wait() блокирует выполнение потока до тех пор, пока не будет установлено событие. Другими словами, метод wait() заблокирует текущий поток до тех пор, пока другой поток не вызовет метод set() для установки Event.

Если событие установлено, функция wait() немедленно возвращает значение.

Чтобы указать, как долго поток будет ждать, вы можете использовать аргумент timeout. Например:

event.wait(timeout=5) # wait for 5 seconds

Пример threading event в Python

В следующем примере показан простой пример использования объекта Event для взаимодействия между потоками:

from threading import Thread, Event
from time import sleep

def task(event: Event, id: int) -> None:
    print(f'Thread {id} started. Waiting for the signal....')
    event.wait()
    print(f'Received signal. The thread {id} was completed.')

def main() -> None:
    event = Event()

    t1 = Thread(target=task, args=(event,1))
    t2 = Thread(target=task, args=(event,2))

    t1.start()
    t2.start()

    print('Blocking the main thread for 3 seconds...')
    sleep(3) 
    event.set()



if __name__ == '__main__':
    main()

Выход:

Thread 1 started. Waiting for the signal....
Thread 2 started. Waiting for the signal....
Blocking the main thread for 3 seconds...
Received signal. The thread 1 was completed.
Received signal. The thread 2 was completed.

Как это работает.

  • Сначала определите функцию Task(), которая принимает объект Event и целое число:
def task(event: Event, id: int) -> None:
    print(f'Thread {id} started. Wait for the signal....')
    event.wait()
    print(f'Receive signal. The thread {id} was completed.')

Внутри функции Task() мы вызываем метод wait() объекта Event, чтобы дождаться, пока событие будет установлено основным потоком.

  • Во-вторых, создайте объект Event внутри функции main():
event = Event()
  • В-третьих, создайте два дочерних потока, которые выполняют функцию Task() с одним и тем же объектом Event и разными идентификаторами 1 и 2:
t1 = Thread(target=task, args=(event,1))
t2 = Thread(target=task, args=(event,2))
  • В-четвертых, запустите оба потока, вызвав метод start():
t1.start()
t2.start()
  • В-пятых, вызовите метод Sleep(), чтобы заблокировать основной поток на три секунды:
sleep(3)

Поскольку функция Task() вызывает метод wait() объекта Event, потоки t1 и t2 будут ждать установки события, прежде чем продолжить.

  • Наконец, установите Event, вызвав метод set() из основного потока:
event.set()

Потоки t1 и t2 будут уведомлены и продолжат выполнение до конца.

Практическое использование Threading event

Следующий пример иллюстрирует, как использовать событие threading для синхронизации между двумя потоками:

  • Поток №1 загружает текстовый файл с URL-адреса https://www.ietf.org/rfc/rfc793.txt, после завершения он уведомляет второй поток о необходимости подсчитать слова из загруженного текстового файла.
  • Поток №2 запускается и ждет сигнала завершения от потока №1. Однажды, получив сигнал, он начинает считать слова из скачанного файла.

Вот полная программа:

from threading import Thread, Event
from urllib import request


def download_file(url, event):
    # Download the file form URL
    print(f"Downloading file from {url}...")
    filename, _ = request.urlretrieve(url, "rfc793.txt")

    # File download completed, set the event
    event.set()


def process_file(event):
    print("Waiting for the file to be downloaded...")
    event.wait()  # Wait for the event to be set

    # File has been downloaded, start processing it
    print("File download completed. Starting file processing...")

    # Count the number of words in the file
    word_count = 0
    with open("rfc793.txt", "r") as file:
        for line in file:
            words = line.split()
            word_count += len(words)

    # Print the word count
    print(f"Number of words in the file: {word_count}")


def main():
    # Create an Event object
    event = Event()

    # Create and start the file download thread
    download_thread = Thread(target=download_file, 
                             args=("https://www.ietf.org/rfc/rfc793.txt",  event))

    download_thread.start()

    # Create and start the file processing thread
    process_thread = Thread(target=process_file, args=(event,))
    process_thread.start()

    # Wait for both threads to complete
    download_thread.join()
    process_thread.join()

    print("Main thread finished.")

if __name__ == '__main__'    :
    main()
Похожие посты
Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *