Потокобезопасная очередь в Python с примерами
В этом руководстве вы узнаете, как использовать потокобезопасную очередь в Python для безопасного обмена данными между несколькими потоками.
- Что такое потокобезопасная очередь в Python?
- Создание новой очереди
- Добавление элемента в очередь
- Получение элемента
- Размер очереди
- Отметить задачу как выполненную
- Ожидание завершения всех задач
- Пример потокобезопасной очереди в Python
Что такое потокобезопасная очередь в Python?
Встроенный модуль queue позволяет безопасно обмениваться данными между несколькими потоками. Класс Queue в модуле queue реализует всю необходимую семантику блокировки.
Создание новой очереди
Чтобы создать новую очередь, импортируйте класс Queue из модуля queue:
from queue import Queue
и используйте конструктор Queue следующим образом:
queue = Queue()
Чтобы создать очередь с ограничением размера, вы можете использовать параметр maxsize. Например, следующий код создает очередь, которая может хранить до 10 элементов:
queue = Queue(maxsize=10)
Добавление элемента в очередь
Чтобы добавить элемент в очередь, используйте метод put() следующим образом:
queue.add(item)
Как только очередь заполнится, вы не сможете добавить в нее элемент. Кроме того, вызов метода put() будет заблокирован до тех пор, пока в очереди не появится свободное место.
Если вы не хотите, чтобы метод put() блокировался, если очередь заполнена, вы можете установить для аргумента блокировки значение False:
queue.put(item, block=False)
В этом случае метод put() вызовет исключение length.Full, если очередь заполнена:
try: queue.put(item, block=False) except queue.Full as e: # handle exceptoin
Чтобы добавить элемент в очередь ограниченного размера и заблокировать его с таймаутом, вы можете использовать параметр timeout следующим образом:
try: queue.put(item, timeout=3) except queue.Full as e: # handle exceptoin
Получение элемента
Чтобы получить элемент из очереди, вы можете использовать метод get():
item = queue.get()
Метод get() будет блокироваться до тех пор, пока элемент не станет доступен для извлечения из очереди.
Чтобы получить элемент из очереди без блокировки, вы можете установить для параметра блокировки значение False:
try: queue.get(block=False) except queue.Empty: # handle exception
Чтобы получить элемент из очереди и заблокировать его с ограничением по времени, вы можете использовать метод get() с таймаутом:
try: item = queue.get(timeout=10) except queue.Empty: # ...
Размер очереди
Метод qsize() возвращает количество элементов в очереди:
size = queue.size()
Кроме того, метод empty() возвращает True, если очередь пуста, или False в противном случае. С другой стороны, метод full() возвращает True, если очередь заполнена, и False в противном случае.
Отметить задачу как выполненную
Элемент, добавляемый в очередь, представляет собой единицу работы или задачу. Когда поток вызывает метод get(), чтобы получить элемент из очереди, ему может потребоваться обработать его, прежде чем задача будет считаться выполненной.
После завершения поток может вызвать метод очереди Task_done(), чтобы указать, что задача полностью обработана:
item = queue.get() # process the item # ... # mark the item as completed queue.task_done()
Ожидание завершения всех задач
Чтобы дождаться завершения всех задач в очереди, вы можете вызвать метод join() объекта очереди:
queue.join()
Пример потокобезопасной очереди в Python
Следующий пример иллюстрирует, как использовать потокобезопасную очередь для обмена данными между двумя потоками:
import time from queue import Empty, Queue from threading import Thread def producer(queue): for i in range(1, 6): print(f'Inserting item {i} into the queue') time.sleep(1) queue.put(i) def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item {item}') time.sleep(2) queue.task_done() def main(): queue = Queue() # create a producer thread and start it producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # create a consumer thread and start it consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # wait for all tasks to be added to the queue producer_thread.join() # wait for all tasks on the queue to be completed queue.join() if __name__ == '__main__': main()
Как это работает.
- Сначала определите функцию Producer(), которая добавляет в очередь числа от 1 до 11. Она задерживает одну секунду в каждой итерации:
def producer(queue): for i in range(1, 6): print(f'Inserting item {i} into the queue') time.sleep(1) queue.put(i)
- Во-вторых, определите функцию Consumer(), которая получает элемент из очереди и обрабатывает его. Он задерживается на две секунды после обработки каждого элемента в очереди:
def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item {item}') time.sleep(2) queue.task_done()
queue.task_done() указывает, что функция обработала элемент в очереди.
- В-третьих, определите функцию main(), которая создает два потока: один поток добавляет число в очередь каждую секунду, а другой поток обрабатывает элемент в очереди каждые две секунды:
def main(): queue = Queue() # create a producer thread and start it producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # create a consumer thread and start it consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # wait for all tasks to be added to the queue producer_thread.join() # wait for all tasks on the queue to be completed queue.join()
Выход:
Inserting item 1 into the queue Inserting item 2 into the queue Processing item 1 Inserting item 3 into the queue Processing item 2 Inserting item 4 into the queue Inserting item 5 into the queue Processing item 3 Processing item 4 Processing item 5
Ниже приведены шаги функции main():
- Создайте новую очередь, вызвав конструктор Queue().
- Затем новый поток с именем Producer_thread и немедленно запустите его.
- Создайте поток демона с именем Consumer_thread и запустите его.
- Подождите, пока все числа будут добавлены в очередь, используя метод потока join().
- Дождитесь завершения всех задач в очереди, вызвав метод join() очереди.
producer добавляет число в очередь каждую секунду, а consumer обрабатывает число из очереди каждые две секунды. Он также отображает номера в очереди каждую секунду.