Python Thread-safe Queue

Summary: in this tutorial, you’ll learn how to use a Python thread-safe queue to exchange data safely between multiple threads.

Introduction to the Python thread-safe queue

The built-in queue module allows you to exchange data safely between multiple threads. The Queue class in the queue module implements all required locking semantics.

Creating a new queue

To create a new queue, you import the Queue class from the queue module:

from queue import QueueCode language: Python (python)

and use the Queue constructor as follows:

queue = Queue()Code language: Python (python)

To create a queue with a size limit, you can use the maxsize parameter. For example, the following creates a queue that can store up to 10 items:

queue = Queue(maxsize=10)Code language: Python (python)

Adding an item to the queue

To add an item to the queue, you use the put() method like this:

queue.add(item)Code language: Python (python)

Once the queue is full, you won’t be able to add an item to it. Also, the call to the put() method will block until the queue has space available.

If you don’t want the put() method to block if the queue is full, you can set the block argument to False:

queue.put(item, block=False)Code language: Python (python)

In this case, the put() method will raise the queue.Full exception if the queue is full:

try:
   queue.put(item, block=False)
except queue.Full as e:
   # handle exceptoinCode language: Python (python)

To add an item to a sized limited queue and block with a timeout, you can use the timeout parameter like this:

try:
   queue.put(item, timeout=3)
except queue.Full as e:
   # handle exceptoinCode language: Python (python)

Getting an item from the queue

To get an item from the queue, you can use the get() method:

item = queue.get()Code language: Python (python)

The get() method will block until an item is available for retrieval from the queue.

To get an item from the queue without blocking, you can set the block parameter to False:

try:
   queue.get(block=False)
except queue.Empty:
   # handle exceptionCode language: Python (python)

To get an item from the queue and block it with a time limit, you can use the get() method with a timeout:

try:
   item = queue.get(timeout=10)
except queue.Empty:
   # ...Code language: Python (python)

Getting the size of the queue

The qsize() method returns the number of items in the queue:

size = queue.size()Code language: Python (python)

Also, the empty() method returns True if the queue is empty or False otherwise. On the other hand, the full() method returns True if the queue is full or False otherwise.

Marking a task as completed

An item that you add to the queue represents a unit of work or a task.

When a thread calls the get() method to get the item from the queue, it may need to process it before the task is considered completed.

Once completed, the thread may call the task_done() method of the queue to indicate that it has processed the task completely:

item = queue.get()

# process the item
# ...

# mark the item as completed
queue.task_done()Code language: Python (python)

Waiting for all tasks on the queue to be completed

To wait for all tasks on the queue to be completed, you can call the join() method on the queue object:

queue.join()Code language: Python (python)

Python thread-safe queue example

The following example illustrates how to use the thread-safe queue to exchange data between two threads:

import time
from queue import Empty, Queue
from threading import Thread


def producer(queue):
    for i in range(1, 6):
        print(f'Inserting item {i} into the queue')
        time.sleep(1)
        queue.put(i)


def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Processing item {item}')
            time.sleep(2)
            queue.task_done()


def main():
    queue = Queue()

    # create a producer thread and start it
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # create a consumer thread and start it
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # wait for all tasks to be added to the queue
    producer_thread.join()

    # wait for all tasks on the queue to be completed
    queue.join()


if __name__ == '__main__':
    main()Code language: Python (python)

How it works.

First, define the producer() function that adds numbers from 1 to 11 to the queue. It delays one second in each iteration:

def producer(queue):
    for i in range(1, 6):
        print(f'Inserting item {i} into the queue')
        time.sleep(1)
        queue.put(i)Code language: Python (python)

Second, define the consumer() function that gets an item from the queue and processes it. It delays two seconds after processing each item on the queue:

def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Processing item {item}')
            time.sleep(2)
            queue.task_done()Code language: Python (python)

The queue.task_done() indicates that the function has processed the item on the queue.

Third, define the main() function that creates two threads, one thread adds a number to the queue every second while another thread processes an item on the queue every two seconds:

def main():
    queue = Queue()

    # create a producer thread and start it
    producer_thread = Thread(
        target=producer,
        args=(queue,)
    )
    producer_thread.start()

    # create a consumer thread and start it
    consumer_thread = Thread(
        target=consumer,
        args=(queue,),
        daemon=True
    )
    consumer_thread.start()

    # wait for all tasks to be added to the queue
    producer_thread.join()

    # wait for all tasks on the queue to be completed
    queue.join()Code language: Python (python)

Output:

Inserting item 1 into the queue
Inserting item 2 into the queue
Processing item 1
Inserting item 3 into the queue
Processing item 2
Inserting item 4 into the queue
Inserting item 5 into the queue
Processing item 3
Processing item 4
Processing item 5Code language: Python (python)

The following are steps in the main() function:

  1. Create a new queue by calling the Queue() constructor
  2. Create a new thread called producer_thread and start it immediately
  3. Create a daemon thread called consumer_thread and start it immediately.
  4. Wait for all the numbers to be added to the queue using the join() method of the thread.
  5. Wait for all the tasks on the queue to be completed by calling the join() method of the queue.

The producer adds a number to the queue every second, and the consumer process a number from the queue every two seconds. It also displays the numbers on the queue every second.

Summary

  • Use the Queue class of the queue module to safely exchange data between multiple threads.
Did you find this tutorial helpful ?