Explore async Python concepts with code examples and tutorials. Covers asyncio and Python features. Let's learn together!
Task scheduling and message queues are essential for handling long-running tasks, periodic jobs, and inter-component communication in modern applications. They help improve scalability, resilience, and efficiency by decoupling workflows and managing workloads asynchronously.
Libraries like Celery
and APScheduler
are popular tools for implementing task scheduling and queuing in Python.
They allow developers to offload tasks, schedule recurring jobs, and handle retries seamlessly.
In this section, we’ll explore these tools and learn how to design scalable, asynchronous task systems.
First install Celery
on your system with pip
.
Then you have to install RabbitMQ
on your system or setup using docker and make sure it is enabled.
You can use Celery’s official documentation hints like
first steps with Celery
or backends and brokers-RabbitMQ
to setup and use celery and RabbitMQ together.
Then we create three python files. In first one, we initiate backend and broker of celery.
# ex_9_1
from celery import Celery
app = Celery('tasks', backend='rpc://', broker='amqp://')
@app.task
def add(x, y):
return x + y
In the second one, ex_9_2
, we put the add(4, 4)
inside a celery queue to run.
Then get its result and check if it ready or not.
# ex_9_2
from ex_9_1 import add
result = add.delay(4, 4)
print(f"Is ready? {result.ready()}") # Expected to get False at first
print(f"result: {result.get(timeout=1)}") # Expected to get the sum 4 + 4 = 8
print(f"Is ready? {result.ready()}") # Expected to get True at the end
In the third file, ex_9_3
, we put the task inside queue, using delay
function
(apply_async
can be used too)
and then sleep to get the celery task’s result and check if it ready or not.
# ex_9_3
from ex_9_1 import add
import time
result = add.delay(4, 4)
print(f"Is ready? {result.ready()}") # Expected to get False at first
time.sleep(0.01)
print(f"Is ready? {result.ready()}") # Expected to get True at the end
print(f"Result: {result.get(timeout=1)}")
In order to run above pieces of code, go to chapter9’s directory and run the command below:
celery -A ex_9_1 worker --loglevel=INFO
And then run ex_9_2.py
and ex_9_3.py
modules separately (in other shell):
python ex_9_2.py
python ex_9_3.py
If you want to run the task after t
seconds of non-blocking delay, use apply_async
this way:
# ex_9_4
from ex_9_1 import add
import time
t = 2 # Expected to run add function after 2 seconds
result = add.apply_async((4, 4), countdown=t)
print(f"Is ready? {result.ready()}") # Expected to get False at first
time.sleep(2.1)
print(f"Is ready? {result.ready()}") # Expected to get True at the end
print(f"Result: {result.get(timeout=1)}")
ulimit -n
This number is actually the maximum number of file descriptors that can be open at any point of time.
By running Celery’s worker using celery -A ex_9_1 worker
,
there are n
number of processes forked/spawned by default, where n
is the number of CPU cores.
You can determine this by running pgrep celery
command (which returns their PIDs).
concurrency
and autoscale
:
concurrency
is used to determine the number of processes to be forked/spawned by celery worker
(which has an upper-bound of maximum number of file descriptors under the hood).
autoscale
limits the number of forked/spawned processes between two numbers
and adjust that number automatically based on the incoming load.It’s also notable that I’ve inspired and borrowed so much from Daksh Gupta’s tutorial on Celery.
APScheduler is a relatively simple scheduler library for Python. It provides Cron-style scheduling and some interval based scheduling.
As the APScheduler official document states, APScheduler has four compnents:
Now let’s see an example of a simple periodic task using APScheduler from Keshav Manglore which runs every five seconds:
# ex_9_5
from apscheduler.schedulers.background import BackgroundScheduler
import time
def my_periodic_task():
print("Periodic task executed!")
scheduler = BackgroundScheduler()
scheduler.add_job(my_periodic_task, 'interval', seconds=5)
scheduler.start()
# Keep the script running to allow the scheduled task to execute
while True:
time.sleep(1)
This link provides a very useful comparison between different scheduling tools in Rocketry document that worth paying attention.