Потокобезопасная очередь в Python с примерами

В этом руководстве вы узнаете, как использовать потокобезопасную очередь в Python для безопасного обмена данными между несколькими потоками.

Содержание

Что такое потокобезопасная очередь в Python?

Встроенный модуль queue позволяет безопасно обмениваться данными между несколькими потоками. Класс Queue в модуле queue реализует всю необходимую семантику блокировки.

Создание новой очереди

Чтобы создать новую очередь, импортируйте класс Queue из модуля queue:

from queue import Queue

и используйте конструктор Queue следующим образом:

queue = Queue()

Чтобы создать очередь с ограничением размера, вы можете использовать параметр maxsize. Например, следующий код создает очередь, которая может хранить до 10 элементов:

queue = Queue(maxsize=10)

Добавление элемента в очередь

Чтобы добавить элемент в очередь, используйте метод put() следующим образом:

queue.add(item)

Как только очередь заполнится, вы не сможете добавить в нее элемент. Кроме того, вызов метода put() будет заблокирован до тех пор, пока в очереди не появится свободное место.

Если вы не хотите, чтобы метод put() блокировался, если очередь заполнена, вы можете установить для аргумента блокировки значение False:

queue.put(item, block=False)

В этом случае метод put() вызовет исключение length.Full, если очередь заполнена:

try:
   queue.put(item, block=False)
except queue.Full as e:
   # handle exceptoin

Чтобы добавить элемент в очередь ограниченного размера и заблокировать его с таймаутом, вы можете использовать параметр timeout следующим образом:

try:
   queue.put(item, timeout=3)
except queue.Full as e:
   # handle exceptoin

Получение элемента

Чтобы получить элемент из очереди, вы можете использовать метод get():

item = queue.get()

Метод get() будет блокироваться до тех пор, пока элемент не станет доступен для извлечения из очереди.

Чтобы получить элемент из очереди без блокировки, вы можете установить для параметра блокировки значение False:

try:
   queue.get(block=False)
except queue.Empty:
   # handle exception

Чтобы получить элемент из очереди и заблокировать его с ограничением по времени, вы можете использовать метод get() с таймаутом:

try:
   item = queue.get(timeout=10)
except queue.Empty:
   # ...

Размер очереди

Метод qsize() возвращает количество элементов в очереди:

size = queue.size()

Кроме того, метод empty() возвращает True, если очередь пуста, или False в противном случае. С другой стороны, метод full() возвращает True, если очередь заполнена, и False в противном случае.

Отметить задачу как выполненную

Элемент, добавляемый в очередь, представляет собой единицу работы или задачу. Когда поток вызывает метод get(), чтобы получить элемент из очереди, ему может потребоваться обработать его, прежде чем задача будет считаться выполненной.

После завершения поток может вызвать метод очереди Task_done(), чтобы указать, что задача полностью обработана:

item = queue.get()

# process the item
# ...

# mark the item as completed
queue.task_done()

Ожидание завершения всех задач

Чтобы дождаться завершения всех задач в очереди, вы можете вызвать метод join() объекта очереди:

queue.join()

Пример потокобезопасной очереди в Python

Следующий пример иллюстрирует, как использовать потокобезопасную очередь для обмена данными между двумя потоками:

import time
from queue import Empty, Queue
from threading import Thread


def producer(queue):
    for i in range(1, 6):
        print(f'Inserting item {i} into the queue')
        time.sleep(1)
        queue.put(i)


def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Processing item {item}')
            time.sleep(2)
            queue.task_done()


def main():
    queue = Queue()

    # create a producer thread and start it
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # create a consumer thread and start it
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # wait for all tasks to be added to the queue
    producer_thread.join()

    # wait for all tasks on the queue to be completed
    queue.join()


if __name__ == '__main__':
    main()

Как это работает.

  • Сначала определите функцию Producer(), которая добавляет в очередь числа от 1 до 11. Она задерживает одну секунду в каждой итерации:
def producer(queue):
    for i in range(1, 6):
        print(f'Inserting item {i} into the queue')
        time.sleep(1)
        queue.put(i)
  • Во-вторых, определите функцию Consumer(), которая получает элемент из очереди и обрабатывает его. Он задерживается на две секунды после обработки каждого элемента в очереди:
def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Processing item {item}')
            time.sleep(2)
            queue.task_done()

queue.task_done() указывает, что функция обработала элемент в очереди.

  • В-третьих, определите функцию main(), которая создает два потока: один поток добавляет число в очередь каждую секунду, а другой поток обрабатывает элемент в очереди каждые две секунды:
def main():
    queue = Queue()

    # create a producer thread and start it
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # create a consumer thread and start it
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # wait for all tasks to be added to the queue
    producer_thread.join()

    # wait for all tasks on the queue to be completed
    queue.join()

Выход:

Inserting item 1 into the queue
Inserting item 2 into the queue
Processing item 1
Inserting item 3 into the queue
Processing item 2
Inserting item 4 into the queue
Inserting item 5 into the queue
Processing item 3
Processing item 4
Processing item 5

Ниже приведены шаги функции main():

  1. Создайте новую очередь, вызвав конструктор Queue().
  2. Затем новый поток с именем Producer_thread и немедленно запустите его.
  3. Создайте поток демона с именем Consumer_thread и запустите его.
  4. Подождите, пока все числа будут добавлены в очередь, используя метод потока join().
  5. Дождитесь завершения всех задач в очереди, вызвав метод join() очереди.

producer добавляет число в очередь каждую секунду, а consumer обрабатывает число из очереди каждые две секунды. Он также отображает номера в очереди каждую секунду.

Похожие посты
Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *