I don’t know about other programming languages, but if you are using Python or Django, you must have heard about Celery quite a few times, and if not, you better look into it. As stated on the project Celery website:
Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.
In case of a web service (most common use-case), asynchronous task queues are utilities to push (time-consuming) tasks in background while timely sending back the response for a user request. These delegated tasks can be anything from sending few notifications, dispatching emails, update system logs, or update internal ERP. Having the aforementioned tasks in line with the request processing, can delay the response back to the user to a large extent.
In simples words, async tasks queues are used to quickly answer the users. Celery helps delegating the long running tasks to a separate process i.e. Celery workers (explained later), across threads or even network nodes. Celery makes doing it effortless, all our application has to do is to push messages to a broker i.e. RabbitMQ, and celery workers will consume and execute them.
The internal working of Celery can easily be stated as the Producer/Consumer model. Producers place the jobs in a queue, and consumers are ready them from the queue. Given this, at high-level Celery has 3 main components:
Producers – are commonly the ‘web nodes’, the web service process, handling the web request. During the request processing, tasks are delegated to Celery i.e. pushed into the task queue.
Queue – it is the broker, which basically helps passing tasks from web application to Celery worker(s). Celery has full support for RabbitMQ and Redis, also supports Amazon SQS and Zookeeper but with limited capabilities (RabbitMQ docs).
Consumers – consumers are ‘worker nodes’, listening to queue head, whenever a task is published, they consume and execute it. Workers can also publish back to the queue, triggering another tasks, hence they can also behave as producers.
The most basic setup is to have both producer (web/REST service), and consumer (workers) running on the same machine, and all the code and configuration in a single repository. The web service gets the user requests, place the tasks in the queue to be processed asynchronously by the worker(s), running independently of the web service. This separation of logic and process, makes it highly scaleable, we can have web service and task workers running separately on different nodes.
A task example:
First we need to have the tasks ready.
# The `worker node` part - the task will run asynchronously from celery import Celery app = Celery(‘app_name') @app.task def update_participants(event, n): event.participants_number = n event.save()
Once we have tasks, we can execute them asynchronously.
# The `web node` part - executes the task event = Event.objects.get(name=‘tech_offsite') update_participants.delay(event, 23) # puts task in the queue, and moves on
In the above example, when executing the task we are passing the complete event object, which is not a good practice, for several reasons. When passing to the broker objects are serialized, and then deserialized before task execution, passing complex objects to a task i.e. model instance as an argument can cause problems. The current version uses JSON by default to encode tasks, while older versions has Pickle which is still supported, Pickle has security vulnerabilities, hence increasing the chances of getting exposed. More common pitfall is, during the flow the passed model instance can have changes, so we’ll be executing the task with an outdated instance. So, it’s recommended to pass on the object Id, so the task can fetch a fresh copy of the instance.
@app.task def update_participants(event_id, n): e = Event.objects.get(id=id_event) e.participants_number = n e.save() e = Event.objects.get(name='tech_offsite') update_participants.delay(e.id, 23)
To keep things simple, I have missed on one of the components of the Celery architecture, which is the ‘Result Backend’. Result backend is used to store task results, if any. By default the transport backend (broker) is used to store results, but we can configure Celery to use some other tech just for the Celery Result backend.
Using the result backend
# `worker node` part from celery import Celery app = Celery(‘app_name') @app.task def add(a, b): return a + b
# `web node` part from tasks import add r = add.delay(1, 2).get() # will block the execution till the result backend has the return from task. print(r) # 3
In the web node part, calling .get will place the add task to queue and wait until the results are available. So, using .get() will kill the purpose of asynchronous tasks in most of the cases. Point was to share the available component, and it may be useful in any very rare use-case, where you need to update some status on getting the task results.