Asynchronous Task Scheduling with Celery

Celery is one of  the best open source Task Manager and Scheduler which runs tasks in background. It is good to automate some process which need to be ran after some event to be taken place. Celery uses Message broker to send message between the Tasks and Worker. The workers can even be ran on different servers.

There are many message brokers that Celery supports. They are RabbitMQ server, Radis Server, Amazon SQS and Zookeeper. Out of which RabbitMQ seems to be a better choice as a broker but will not be useful result store, if the application needs the result to be stored Radis can be used.

Here are some steps by which you can easily get started with Celery and RabbitMQ Server with Django.

sudo pip install django

sudo pip install celery

sudo apt install rabbitmq-server

Celery has good Support with Django. Refer First Steps with Django to get started with Django Celery.

proj/proj/__init__.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

proj/proj/celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span><span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span><span 				data-mce-type="bookmark" 				id="mce_SELREST_end" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>

proj/proj/settings.py:

The following lines should be added to the Django Configuration(settings.py) file.

# Celery Configuration
CELERY_BROKER_URL = 'amqp://localhost'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
CELERY_TASK_SERIALIZER = 'json'

These are the initial configurations that need to be done to get started. Now the task can be easily created with simple steps.

djangoapp/tasks.py:

 # Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

In order to test the the task, we need to start the worker daemon.

celery -A proj worker -l info

To set Concurrency of tasks for a worker,

celery -A proj worker –loglevel=INFO –concurrency=2 -n worker1@%h  //For two concurrent processes

So its the time to run our task. In terminal,

$ python

>>> from tasks import *

>>> add.delay(7, 9)

This will add the task to the queue and will be consumed by the worker. In the Daemon monitor we can see the task assigned and completed status.

Conclusion:

The tasks like machine learning processes may take some time to complete and have severe resource constraints. So the task scheduling without missing any tasks and making the usability of the server correctly Celery is great tool to start off with.

 

 

Leave a comment