asynchronous-python

Explore async Python concepts with code examples and tutorials. Covers asyncio and Python features. Let's learn together!

Advanced Techniques

Error handling and exception propagation in async Python

Handling exceptions in asyncio can be tricky but is a very important concept to understand. The asyncio.Task.exception API is used to raise exceptions within tasks.

The following example, inspired by Jason Brownlee’s article found here, demonstrates how to handle exceptions. In this example, we await longer_task while shorter_task raises an exception.

# ex_5_1
import asyncio


async def shorter_task():
    print('Executing the task to raise an exception!')
    await asyncio.sleep(0.1)
    raise Exception('Some exception happened!')


async def longer_task():
    print('Executing the task which will complete!')
    await asyncio.sleep(1)  # sleep more than shorter_task
    print('longer_task is done!')


async def main():
    print('Main coroutine started!')
    task1 = asyncio.create_task(shorter_task())
    task2 = asyncio.create_task(longer_task())
    await task2
    ex = task1.exception()
    print(f'Exception: {ex}')
    print('Main coroutine done!')


asyncio.run(main())

The above code works if longer_task takes more time to complete than shorter_task. Otherwise, if task1.exception() is called before shorter_task raises an exception, it will result in an error: asyncio.exceptions.InvalidStateError: Exception is not set.

Now let’s create our own exception handler and utilize it with previous ex_5_1. In order to do it, we’ve to grab event loop and set the exception handler to it.

# ex_5_2
import asyncio


def exception_handler(loop, context):
    ex = context['exception']
    print(f'Exception: {ex}')


async def shorter_task():
    print('Executing the task to raise an exception!')
    await asyncio.sleep(0.01)
    raise Exception('Some exception happened!')


async def longer_task():
    print('Executing the task which will complete!')
    await asyncio.sleep(1)  # sleep more than shorter_task
    print('longer_task is done!')


async def main():
    print('Main coroutine started!')
    loop = asyncio.get_running_loop()
    loop.set_exception_handler(exception_handler)
    task1 = asyncio.create_task(shorter_task())
    task2 = asyncio.create_task(longer_task())
    await task2
    print('Main coroutine done!')


asyncio.run(main())


Now let’s take a look at an example of canceling a task and catching tbe error inspired by this example.

# ex_5_3
import asyncio


async def cancel_me():
    await asyncio.sleep(1)


async def main():
    task = asyncio.create_task(cancel_me())
    await asyncio.sleep(0.01)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

Chaining coroutines using callback and event to compose more complex async workflows

As Jason Brownlee says in his article on Asyncio Coroutine Chaining, coroutine chaining refers to the process of linking or chaining together multiple coroutines to execute in a specific sequence. This pattern helps in organizing and managing complex asynchronous workflows. Consider that done callback is triggered when the task is done. Let’s take a look at an example, inspired by Jason’s article.

# ex_5_4
import asyncio


async def task1():
    print('>task1()')
    await asyncio.sleep(1)
    return 1


async def task2(data):
    print(f'>task2() got {data}')
    await asyncio.sleep(1)


def callback2(task):
    global event
    event.set()


def callback1(task):
    result = task.result()
    second_task = asyncio.create_task(task2(result))
    second_task.add_done_callback(callback2)


async def main():
    global event
    event = asyncio.Event()
    first_task = asyncio.create_task(task1())
    first_task.add_done_callback(callback1)
    await event.wait()
    print('Main: chain is done')


asyncio.run(main())

In example above inside main coroutine, we initialize an event, and create task for task1 and call add_done_callback with callback1 coroutine. After task1 finishes, callback1 is triggered which gets the result of task1 and creates task for task2 with callback2 as its callback when finished. The result of task1 is passed to task2, and after task2 finishes, callback2 sets the event which lets the main coroutine know and finish waiting for that.

Based on that, we can conclude this chain of coroutines to run:
main -> task1 -> callback1 -> task2 -> callback2 -> continue main

asyncio Queue and consumer-producer workflows

A queue is a first-in, first-out (FIFO) data structure with put and get functionalities. This means that data can be added (put) to the queue and retrieved (get) in the order it was added, ensuring that earlier data is accessed first. In Python, the collections.deque class (syncornized) and queue.Queue (parallel) class provide this functionality. Additionally, for asynchronous programming within coroutines, Python offers the asyncio.Queue API.

Let’s take a look at this example from Cristian Prieto:

# ex_5_5
import asyncio


async def producer(channel: asyncio.Queue):
    for num in range(0, 5):
        await asyncio.sleep(1)
        await channel.put(num)


async def consumer(channel: asyncio.Queue):
    while True:
        item = await channel.get()
        print(f'Got number {item}')


async def main():
    channel = asyncio.Queue()
    asyncio.create_task(consumer(channel))
    await producer(channel)
    print('Done!')

asyncio.run(main())

As Cristian Prieto says, in this example, asyncio.Queue is our way to communicate between the producer of items and its consumer, it will await until the queue has an item to give us.

asyncio Future

Future objects are used to bridge low-level callback-based code with high-level async/await code.

This image shows the awaitable class inheritance hierarchy. As you can see, every task is actually of type future. Every asyncio.Future has some APIs to call, most important ones are:
done(), set_result(result), result()
Now let’s use these APIs inside an example:

# ex_5_6
import asyncio


async def main():
    future = asyncio.Future()
    print(f'future status is done: {future.done()}')
    future.set_result(10)
    print(f'future status is done: {future.done()}, future result: {future.result()}')
    result = await future
    print(f'future result after being awaited: {result}')


asyncio.run(main())

In example above, we create a future, check if it’s done or not (using done()), then set the result=10 using set_result(10), so the future is done. Moreover, we can await any futures and get the result of that future.

Also consider that you can call the same APIs on every asyncio.Task too.