Explore async Python concepts with code examples and tutorials. Covers asyncio and Python features. Let's learn together!
Handling exceptions in asyncio can be tricky but is a very important concept to understand.
The asyncio.Task.exception
API is used to raise exceptions within tasks.
The following example, inspired by Jason Brownlee’s article found here, demonstrates how to handle exceptions. In this example, we await longer_task while shorter_task raises an exception.
# ex_5_1
import asyncio
async def shorter_task():
print('Executing the task to raise an exception!')
await asyncio.sleep(0.1)
raise Exception('Some exception happened!')
async def longer_task():
print('Executing the task which will complete!')
await asyncio.sleep(1) # sleep more than shorter_task
print('longer_task is done!')
async def main():
print('Main coroutine started!')
task1 = asyncio.create_task(shorter_task())
task2 = asyncio.create_task(longer_task())
await task2
ex = task1.exception()
print(f'Exception: {ex}')
print('Main coroutine done!')
asyncio.run(main())
The above code works if longer_task
takes more time to complete than shorter_task
.
Otherwise, if task1.exception()
is called before shorter_task
raises an exception,
it will result in an error: asyncio.exceptions.InvalidStateError: Exception is not set
.
Now let’s create our own exception handler and utilize it with previous ex_5_1. In order to do it, we’ve to grab event loop and set the exception handler to it.
# ex_5_2
import asyncio
def exception_handler(loop, context):
ex = context['exception']
print(f'Exception: {ex}')
async def shorter_task():
print('Executing the task to raise an exception!')
await asyncio.sleep(0.01)
raise Exception('Some exception happened!')
async def longer_task():
print('Executing the task which will complete!')
await asyncio.sleep(1) # sleep more than shorter_task
print('longer_task is done!')
async def main():
print('Main coroutine started!')
loop = asyncio.get_running_loop()
loop.set_exception_handler(exception_handler)
task1 = asyncio.create_task(shorter_task())
task2 = asyncio.create_task(longer_task())
await task2
print('Main coroutine done!')
asyncio.run(main())
Now let’s take a look at an example of canceling a task and catching tbe error inspired by this example.
# ex_5_3
import asyncio
async def cancel_me():
await asyncio.sleep(1)
async def main():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(0.01)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
As Jason Brownlee says in his article on Asyncio Coroutine Chaining, coroutine chaining refers to the process of linking or chaining together multiple coroutines to execute in a specific sequence. This pattern helps in organizing and managing complex asynchronous workflows. Consider that done callback is triggered when the task is done. Let’s take a look at an example, inspired by Jason’s article.
# ex_5_4
import asyncio
async def task1():
print('>task1()')
await asyncio.sleep(1)
return 1
async def task2(data):
print(f'>task2() got {data}')
await asyncio.sleep(1)
def callback2(task):
global event
event.set()
def callback1(task):
result = task.result()
second_task = asyncio.create_task(task2(result))
second_task.add_done_callback(callback2)
async def main():
global event
event = asyncio.Event()
first_task = asyncio.create_task(task1())
first_task.add_done_callback(callback1)
await event.wait()
print('Main: chain is done')
asyncio.run(main())
In example above inside main coroutine, we initialize an event, and create task for task1
and call add_done_callback
with callback1
coroutine. After task1
finishes, callback1
is triggered which gets the result of task1
and creates task for task2
with callback2
as its callback when finished. The result of task1
is passed to task2
,
and after task2
finishes, callback2
sets the event which lets the main
coroutine know and finish waiting for that.
Based on that, we can conclude this chain of coroutines to run:
main
-> task1
-> callback1
-> task2
-> callback2
-> continue main
A queue is a first-in, first-out (FIFO) data structure with put and get functionalities.
This means that data can be added (put) to the queue and retrieved (get) in the order it was added,
ensuring that earlier data is accessed first. In Python, the collections.deque
class (syncornized) and queue.Queue
(parallel) class provide this functionality.
Additionally, for asynchronous programming within coroutines, Python offers the asyncio.Queue
API.
Let’s take a look at this example from Cristian Prieto:
# ex_5_5
import asyncio
async def producer(channel: asyncio.Queue):
for num in range(0, 5):
await asyncio.sleep(1)
await channel.put(num)
async def consumer(channel: asyncio.Queue):
while True:
item = await channel.get()
print(f'Got number {item}')
async def main():
channel = asyncio.Queue()
asyncio.create_task(consumer(channel))
await producer(channel)
print('Done!')
asyncio.run(main())
As Cristian Prieto says, in this example, asyncio.Queue
is our way to communicate between the producer of items
and its consumer, it will await until the queue has an item to give us.
Future objects are used to bridge low-level callback-based code with high-level async/await code.
This image shows the awaitable class inheritance hierarchy. As you can see, every task is actually of type future.
Every asyncio.Future
has some APIs to call, most important ones are:
done()
, set_result(result)
, result()
Now let’s use these APIs inside an example:
# ex_5_6
import asyncio
async def main():
future = asyncio.Future()
print(f'future status is done: {future.done()}')
future.set_result(10)
print(f'future status is done: {future.done()}, future result: {future.result()}')
result = await future
print(f'future result after being awaited: {result}')
asyncio.run(main())
In example above, we create a future, check if it’s done or not (using done()
),
then set the result=10
using set_result(10)
, so the future is done. Moreover, we can await any futures
and get the result of that future.
Also consider that you can call the same APIs on every asyncio.Task
too.