asynchronous-python

Explore async Python concepts with code examples and tutorials. Covers asyncio and Python features. Let's learn together!

Introduction to AnyIO and Trio

What Are AnyIO and Trio?

Let’s imagine that our primary objective is achieving concurrency for I/O bound tasks. While we have been exploring asyncio in Python to support this goal, additional tools such as anyio and trio offer valuable alternatives for enhancing I/O parallelism which leverage the benefits of the Structured Concurrency design pattern. Their appeal lies in their user-friendly nature, enhanced features, and reduced learning curve, making them attractive options for improving task efficiency.

Why to use them?

Structured Concurrency Design Pattern

As Jan Plesnik explains here, to avoid orphaned tasks—tasks that lose their reference—we need to embrace structured concurrency. In Python 3.11, the introduction of asyncio.TaskGroup provides a way to manage this effectively. Similarly, Trio offers nurseries, which serve the same purpose. An example of an orphaned task can be seen in the following case, where task_b loses its reference. (Example from Jan’s article.)

# ex_8_1
import asyncio


async def coro_a():
    await asyncio.sleep(0.2)


async def coro_b():
    await asyncio.sleep(0.1)


async def fn():
    task_a = asyncio.create_task(coro_a())
    task_b = asyncio.create_task(coro_b())

    return await task_a


asyncio.run(fn())

This image illustrates what happens under the hood (from here): test1

And now imagine this example with asyncio.TaskGroup (which is similar to example_2_8). As you see, both task_a and test_b will finish and then the task-group context manager exits.

# ex_8_2
import asyncio


async def sleep_coro(delay):
    print(f"Started sleeping for {delay} seconds!")
    await asyncio.sleep(delay)
    print(f"Finished sleeping for {delay} seconds!")
    return delay


async def main():
    print('Before running task group!')
    async with asyncio.TaskGroup() as tg:
        task_a = tg.create_task(sleep_coro(2))
        task_b = tg.create_task(sleep_coro(1))
    print("After running task group!")
    print("Both tasks finished!")
    print(f"Results are: (task_a: {task_a.result()}, task_b: {task_b.result()})")

asyncio.run(main())

And now we have full control over tasks and this is how it will look like. test2

Exploring Trio

Trio’s primary aim is to simplify the comprehension and enhance the performance of concurrency operations. It offers a more efficient GIL and higher-level APIs, resulting in code that is easier to understand and troubleshoot.

Now let’s see our first example from trio’s official document and get familiar with it. As you can see, there is a parent async function which uses nurseries to spawn two child tasks and run them.

# ex_8_3
import trio


async def child1():
    print("  child1: started! sleeping now...")
    await trio.sleep(1)
    print("  child1: exiting!")


async def child2():
    print("  child2: started! sleeping now...")
    await trio.sleep(1)
    print("  child2: exiting!")


async def parent():
    print("parent: started!")
    async with trio.open_nursery() as nursery:
        print("parent: spawning child1...")
        nursery.start_soon(child1)

        print("parent: spawning child2...")
        nursery.start_soon(child2)

        print("parent: waiting for children to finish...")
        # -- we exit the nursery block here --
    print("parent: all done!")


trio.run(parent)

A very interesting topic would be to see how consumer-producer is handled in trio, described in official document here and compare it with our ex_5_5, a consumer-producer variant in asyncio. open_memory_channel is an API used to create queue and manage consumer-producer in trio. Let’s rewrite our ex_5_5 in trio syntax:

# ex_8_4
import trio


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)
    print("Done!")


async def producer(send_channel):
    async with send_channel:
        for value in range(5):
            await trio.sleep(1)
            await send_channel.send(value)


async def consumer(receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"Got number {value}")


trio.run(main)

By utilizing open_memory_channel, we get two channels initiated and then call consumer/producer with their respective channels. Also, the consumer and producer async functions contain a context manager to signal the completion of the production/consumption processes.

Getting Started with AnyIO

AnyIO is a handy library that makes async programming easier by letting you write code that works with both asyncio and Trio. It hides the differences between these frameworks, so your code runs smoothly no matter which event loop you use.

The starter example from AnyIO official document is provided here:

# ex_8_5
from anyio import sleep, create_task_group, run


async def sometask(num: int) -> None:
    print('Task', num, 'running')
    await sleep(1)
    print('Task', num, 'finished')


async def main() -> None:
    async with create_task_group() as tg:
        for num in range(5):
            tg.start_soon(sometask, num)

    print('All tasks finished!')

run(main)

It runs on top of asyncio and if you want to run it on top of Trio just do this in run section:

run(main, backend='trio')

And let’s see a typical consumer-producer pattern in AnyIO which looks like Trio’s example:

# ex_8_6
import anyio


async def producer(sender):
    async with sender:
        for value in range(5):
            await sender.send(value)
            await anyio.sleep(1)


async def consumer(receiver):
    async with receiver:
        async for value in receiver:
            print(f'Got number {value}')
            await anyio.sleep(1)


async def main():
    sender, receiver = anyio.create_memory_object_stream()
    async with anyio.create_task_group() as tg:
        async with sender, receiver:
            tg.start_soon(producer, sender.clone())
            tg.start_soon(consumer, receiver.clone())


anyio.run(main)

If you want to do some blocking task, anyio.to_thread API seem do to a good job. One example would be to read the file names in the current path. from this article

# ex_8_7
from pathlib import Path

import anyio
from anyio import to_thread


def print_dir_content():
    current_dir = Path.cwd()
    for file in current_dir.iterdir():
        print(file)


async def display_dir_content(event):
    await to_thread.run_sync(print_dir_content)
    event.set()


async def wait_print_finished(event):
    print('wait dir printing is finished')
    await event.wait()
    print('finished!')


async def main():
    event = anyio.Event()
    async with anyio.create_task_group() as tg:
        tg.start_soon(display_dir_content, event)
        tg.start_soon(wait_print_finished, event)


anyio.run(main)

When to Choose AnyIO or Trio for Your Projects

Choosing between AnyIO and Trio often depends on the goals of your project:

If you’re already familiar with asyncio, AnyIO offers a smooth transition to explore Trio’s advantages. If you’re starting fresh, Trio’s user-friendly design makes it a great first choice.