IDisposable Thoughts
I really love asynchronous programming, and the support for asynchronous programming structures out there in modern languages is amazing (even C++20 has coroutines now!), though I personally think the biggest problem with async programming is that is so easy to use (thanks to the support I mentioned before) and you tend to believe you are an expert on it and just sprinkle it over everywhere without fully understand how it works and what is going to happen, this is specially problematic with event loop based approaches (like the one by default in Python).
I like to think that when you learn a language, more than just learning the keywords and constructs of the language, you should put effort and energy in understanding the idea behind the language, why is there? what is trying to solve? and maybe you won’t use the language in production but if you take the time to understand the language paradigm I am pretty sure you will get ideas and constructs that you can easily adapt or learn faster in your day to day programming language.
A few years ago I started using Python asyncio, and a very common problem appeared, especially when you create a lot of background coroutines:
How to communicate between working coroutines in an effective way
A good example of this is the following problem
coroutine 1: produces data coroutine 2: consumes data from coroutine 1 only when there is data available
Gladly I had seen this problem so many times before in Go, a problem that is easily solved using Go channels but, of course, Python does not have channels… But it has a nice structure, the Queue!
import asyncio async def producer(channel: asyncio.Queue): for num in range(0, 5): await asyncio.sleep(1) await channel.put(num) async def consumer(channel: asyncio.Queue): while True: item = await channel.get() print(f'Got number item>') async def main(): channel = asyncio.Queue() cons = asyncio.create_task(consumer(channel)) # When no producer finished we are done await producer(channel) print('Done!') asyncio.run(main())
In this example, asyncio.Queue is our way to communicate between the producer of items and it consumer, it will await until the channel (our queue) has an item to give us and it is controlled by the loop mechanism used in asyncio, it is not as powerful as a channel but it does the job. Notice many coroutines can be producing items to the channel, it is easy to modify the previous example to see it in action:
async def main(): channel = asyncio.Queue() cons = asyncio.create_task(consumer(channel)) # Wait for all producers to finish await asyncio.wait( asyncio.create_task(producer(channel, 0)), asyncio.create_task(producer(channel, 10)) >) print('Done!')
The queue in Python is a very powerful structure and there are different implementations depending on your model, for example queue.Queue for threading applications and multiprocessing.Queue for applications using the multiple process model.
Lesson learned: Sometimes you can use the knowledge learned in one language to solve a problem using a similar way in another language.
18.5.8. Queues¶
asyncio queue API was designed to be close to classes of the queue module ( Queue , PriorityQueue , LifoQueue ), but it has no timeout parameter. The asyncio.wait_for() function can be used to cancel a task after a timeout.
18.5.8.1. Queue¶
A queue, useful for coordinating producer and consumer coroutines.
If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than 0 , then yield from put() will block when the queue reaches maxsize, until an item is removed by get() .
Unlike the standard library queue , you can reliably know this Queue’s size with qsize() , since your single-threaded asyncio application won’t be interrupted between calling qsize() and doing an operation on the Queue.
Changed in version 3.4.4: New join() and task_done() methods.
Return True if the queue is empty, False otherwise.
Return True if there are maxsize items in the queue.
If the Queue was initialized with maxsize=0 (the default), then full() is never True .
Remove and return an item from the queue. If queue is empty, wait until an item is available.
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty .
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull .
Number of items in the queue.
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
Number of items allowed in the queue.
18.5.8.2. PriorityQueue¶
A subclass of Queue ; retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
18.5.8.3. LifoQueue¶
A subclass of Queue that retrieves most recently added entries first.
18.5.8.3.1. Exceptions¶
Exception raised when the get_nowait() method is called on a Queue object which is empty.
exception asyncio. QueueFull ¶
Exception raised when the put_nowait() method is called on a Queue object which is full.
13.14. AsyncIO Queue¶
In Python, asyncio is a module that provides tools for writing asynchronous code using coroutines. The asyncio.Queue() class is a utility class that is used to implement a queue for asynchronous programming.
The asyncio.Queue() class is similar to the queue.Queue() class in Python’s standard library, but it is designed for use with asynchronous code. It allows coroutines to put items into the queue and get items from the queue without blocking the event loop.
Here is an example of how to use asyncio.Queue() :
>>> import asyncio >>> >>> async def producer(queue): . for i in range(5): . await asyncio.sleep(0.5) . item = f'item i>' . await queue.put(item) . print(f'Produced item>') >>> >>> async def consumer(queue): . while True: . item = await queue.get() . print(f'Consumed item>') . queue.task_done() >>> >>> async def main(): . queue = asyncio.Queue() . producer_task = asyncio.create_task(producer(queue)) . consumer_task = asyncio.create_task(consumer(queue)) . await asyncio.gather(producer_task, consumer_task) . await queue.join() >>> >>> asyncio.run(main())
In this example, the producer coroutine puts items into the queue using the put method of the asyncio.Queue() class. The consumer coroutine gets items from the queue using the get method of the asyncio.Queue() class. The main coroutine creates a queue, creates tasks for the producer and consumer coroutines using asyncio.create_task() , and then waits for them to complete using asyncio.gather() . Finally, it waits for the queue to be empty using the join method of the asyncio.Queue() class.
When the program is run, the producer coroutine puts 5 items into the queue, and the consumer coroutine gets each item from the queue and prints it. The program will run until all items have been consumed from the queue.
The asyncio.Queue() class is a useful tool for implementing asynchronous communication between coroutines in Python.
13.14.1. FIFO Queue¶
- FIFO Queue — First In, First Out
- class asyncio.Queue(maxsize=0)
- If maxsize is less than or equal to zero, the queue size is infinite.
- Unlike the standard library threading queue, the size of the queue is always known and can be returned by calling the qsize() method.
- maxsize — Number of items allowed in the queue.
- empty() — Return True if the queue is empty, False otherwise.
- full() — Return True if there are maxsize items in the queue.
- coroutine get() — Remove and return an item from the queue. If queue is empty, wait until an item is available.
- get_nowait() — Return an item if one is immediately available, else raise QueueEmpty.
- coroutine join() — Block until all items in the queue have been received and processed.
- coroutine put(item) — Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item.
- put_nowait(item) — Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull.
- qsize() — Return the number of items in the queue.
- task_done() — Indicate that a formerly enqueued task is complete.
13.14.2. Priority Queue¶
- class asyncio.PriorityQueue
- Retrieves entries in priority order (lowest first).
- Entries are typically tuples of the form (priority_number, data).
13.14.3. LIFO Queue¶
- LIFO Queue — Last In, First Out
- class asyncio.LifoQueue
- Retrieves most recently added entries first.
13.14.4. Exceptions¶
- exception asyncio.QueueEmpty — Raised when get_nowait() method is called on an empty queue.
- exception asyncio.QueueFull — Raised when put_nowait() method is called on a queue that has reached its maxsize.
13.14.5. Example¶
import asyncio import random import time async def worker(name, queue): while True: # Get a "work item" out of the queue. sleep_for = await queue.get() # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) # Notify the queue that the "work item" has been processed. queue.task_done() print(f'name> has slept for sleep_for:.2f> seconds') async def main(): # Create a queue that we will use to store our "workload". queue = asyncio.Queue() # Generate random timings and put them into the queue. total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) # Create three worker tasks to process the queue concurrently. tasks = [] for i in range(3): task = asyncio.create_task(worker(f'worker-i>', queue)) tasks.append(task) # Wait until the queue is fully processed. started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for total_slept_for:.2f> seconds') print(f'total expected sleep time: total_sleep_time:.2f> seconds') asyncio.run(main()) # worker-0 has slept for 0.26 seconds # worker-0 has slept for 0.41 seconds # worker-1 has slept for 0.89 seconds # worker-2 has slept for 0.98 seconds # worker-0 has slept for 0.59 seconds # worker-0 has slept for 0.09 seconds # worker-0 has slept for 0.11 seconds # worker-2 has slept for 0.53 seconds # worker-1 has slept for 0.91 seconds # worker-1 has slept for 0.21 seconds # worker-0 has slept for 0.87 seconds # worker-2 has slept for 0.86 seconds # worker-2 has slept for 0.11 seconds # worker-2 has slept for 0.23 seconds # worker-0 has slept for 0.53 seconds # worker-1 has slept for 0.89 seconds # worker-0 has slept for 0.53 seconds # worker-0 has slept for 0.10 seconds # worker-2 has slept for 0.86 seconds # worker-1 has slept for 0.82 seconds # ==== # 3 workers slept in parallel for 3.74 seconds # total expected sleep time: 10.79 seconds
© Copyright 2023, CC-BY-SA-4.0, Matt Harasymczuk , last update: 2023-07-03. Revision bde673bf .