Explore async Python concepts with code examples and tutorials. Covers asyncio and Python features. Let's learn together!
Let’s imagine that our primary objective is achieving concurrency for I/O bound tasks.
While we have been exploring asyncio
in Python to support this goal,
additional tools such as anyio
and trio
offer valuable alternatives for enhancing I/O parallelism
which leverage the benefits of the Structured Concurrency
design pattern.
Their appeal lies in their user-friendly nature, enhanced features, and reduced learning curve,
making them attractive options for improving task efficiency.
Improved Developer Productivity: Simplified APIs and tools reduce the learning curve and development time.
Cross-Framework Compatibility: AnyIO ensures that our code can run on different async backends without modification.
Enhanced Safety: Trio’s structured concurrency model reduces the likelihood of subtle bugs in concurrent code.
As Jan Plesnik explains here,
to avoid orphaned tasks—tasks that lose their reference—we need to embrace structured concurrency.
In Python 3.11, the introduction of asyncio.TaskGroup
provides a way to manage this effectively.
Similarly, Trio offers nurseries, which serve the same purpose.
An example of an orphaned task can be seen in the following case, where task_b
loses its reference.
(Example from Jan’s article.)
# ex_8_1
import asyncio
async def coro_a():
await asyncio.sleep(0.2)
async def coro_b():
await asyncio.sleep(0.1)
async def fn():
task_a = asyncio.create_task(coro_a())
task_b = asyncio.create_task(coro_b())
return await task_a
asyncio.run(fn())
This image illustrates what happens under the hood (from here):
And now imagine this example with asyncio.TaskGroup
(which is similar to example_2_8).
As you see, both task_a
and test_b
will finish and then the task-group context manager exits.
# ex_8_2
import asyncio
async def sleep_coro(delay):
print(f"Started sleeping for {delay} seconds!")
await asyncio.sleep(delay)
print(f"Finished sleeping for {delay} seconds!")
return delay
async def main():
print('Before running task group!')
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(sleep_coro(2))
task_b = tg.create_task(sleep_coro(1))
print("After running task group!")
print("Both tasks finished!")
print(f"Results are: (task_a: {task_a.result()}, task_b: {task_b.result()})")
asyncio.run(main())
And now we have full control over tasks and this is how it will look like.
Trio
’s primary aim is to simplify the comprehension and enhance the performance of concurrency operations.
It offers a more efficient GIL and higher-level APIs, resulting in code that is easier to understand and troubleshoot.
Now let’s see our first example from trio’s official document and get familiar with it. As you can see, there is a parent async function which uses nurseries to spawn two child tasks and run them.
# ex_8_3
import trio
async def child1():
print(" child1: started! sleeping now...")
await trio.sleep(1)
print(" child1: exiting!")
async def child2():
print(" child2: started! sleeping now...")
await trio.sleep(1)
print(" child2: exiting!")
async def parent():
print("parent: started!")
async with trio.open_nursery() as nursery:
print("parent: spawning child1...")
nursery.start_soon(child1)
print("parent: spawning child2...")
nursery.start_soon(child2)
print("parent: waiting for children to finish...")
# -- we exit the nursery block here --
print("parent: all done!")
trio.run(parent)
A very interesting topic would be to see how consumer-producer
is handled in trio
, described in official document
here
and compare it with our ex_5_5
, a consumer-producer variant in asyncio
.
open_memory_channel
is an API used to create queue and manage consumer-producer in trio
.
Let’s rewrite our ex_5_5
in trio
syntax:
# ex_8_4
import trio
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
print("Done!")
async def producer(send_channel):
async with send_channel:
for value in range(5):
await trio.sleep(1)
await send_channel.send(value)
async def consumer(receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"Got number {value}")
trio.run(main)
By utilizing open_memory_channel
, we get two channels initiated and then call consumer/producer with their respective channels.
Also, the consumer
and producer
async functions contain a context manager to signal the completion of the production/consumption processes.
AnyIO is a handy library that makes async programming easier by letting you write code that works with both asyncio
and Trio
.
It hides the differences between these frameworks, so your code runs smoothly no matter which event loop you use.
The starter example from AnyIO official document is provided here:
# ex_8_5
from anyio import sleep, create_task_group, run
async def sometask(num: int) -> None:
print('Task', num, 'running')
await sleep(1)
print('Task', num, 'finished')
async def main() -> None:
async with create_task_group() as tg:
for num in range(5):
tg.start_soon(sometask, num)
print('All tasks finished!')
run(main)
It runs on top of asyncio
and if you want to run it on top of Trio
just do this in run section:
run(main, backend='trio')
And let’s see a typical consumer-producer
pattern in AnyIO
which looks like Trio
’s example:
# ex_8_6
import anyio
async def producer(sender):
async with sender:
for value in range(5):
await sender.send(value)
await anyio.sleep(1)
async def consumer(receiver):
async with receiver:
async for value in receiver:
print(f'Got number {value}')
await anyio.sleep(1)
async def main():
sender, receiver = anyio.create_memory_object_stream()
async with anyio.create_task_group() as tg:
async with sender, receiver:
tg.start_soon(producer, sender.clone())
tg.start_soon(consumer, receiver.clone())
anyio.run(main)
If you want to do some blocking task, anyio.to_thread
API seem do to a good job.
One example would be to read the file names in the current path.
from this article
# ex_8_7
from pathlib import Path
import anyio
from anyio import to_thread
def print_dir_content():
current_dir = Path.cwd()
for file in current_dir.iterdir():
print(file)
async def display_dir_content(event):
await to_thread.run_sync(print_dir_content)
event.set()
async def wait_print_finished(event):
print('wait dir printing is finished')
await event.wait()
print('finished!')
async def main():
event = anyio.Event()
async with anyio.create_task_group() as tg:
tg.start_soon(display_dir_content, event)
tg.start_soon(wait_print_finished, event)
anyio.run(main)
Choosing between AnyIO and Trio often depends on the goals of your project:
asyncio
and Trio
.If you’re already familiar with asyncio
, AnyIO
offers a smooth transition to explore Trio
’s advantages.
If you’re starting fresh, Trio
’s user-friendly design makes it a great first choice.