Celery is a popular distributed tasks queue. It is often used with Django framework to process heavy computational tasks in the background. You can add a single task to the queue or define periodic tasks. Periodic tasks are automatically scheduled for execution based on set time. The most common approach is to define the periodic tasks before the Celery worker is started. For example, it can be daily cleaning of the database. What if we would like to define periodic tasks dynamically? Recently, I’ve been working on a web app for uptime monitoring. The service continuously monitors a server and send email notification when the server is down. In the web app, the user can add a server address to be monitored and select the interval between requests to the server. How to dynamically add periodic tasks in Celery? I want to describe my approach in this article.
Overview
The uptime monitoring idea is simple. The user defines the server address, for example https://github.com
and interval. Every time period, a request is made to the server. We store in the database the information about response time and status. There can be monitored multiple servers, each at a different interval. Moreover, the user can add or delete monitored addresses at any time. Here is a sequence diagram of my approach.
Start Django project
In this article, I will create a Django application with simple monitoring functionality to show you my approach for dynamic periodic tasks in Celery and Django. All code for this article is available in public GitHub repository.
Please set up a new Python virtual environment:
virtualenv dynenv --python=python3.8
source dynenv/bin/activate
We need to install the required packages:
django
for web app development,djangorestframework
,markdown
,django-filter
for Django Rest Framework,celery
,django-celery-beat
,gevent
,sqlalchemy
for background tasks processing,requests
to make HTTP requests.
Please install the following packages:
pip install django djangorestframework markdown django-filter celery django-celery-beat gevent sqlalchemy requests
It is a good practice to add required packages into the
requirements.txt
file.
We need to initialize the Django project with the django-admin
command line tool:
django-admin startproject backend
Please change the directory to the backend
and create a new app:
python manage.py startup monitors
We need to setup the Django to use a newly generated monitors
app. Please update the INSTALLED_APPS
in the backend/settings.py
file:
# the rest of the code ...
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
# 3rd party
"rest_framework",
"django_celery_beat",
# apps
"monitors",
]
# the rest of the code ...
We added to the INSTALLED_APPS
:
rest_framework
- package for faster REST API development,django_celery_beat
- package that providesPeriodicTask
model,monitors
- our new package.
Monitor database model
We will need two database models:
Monitor
- a model for storing information about the monitored address and time interval between checks,MonitorRequest
- model to keep response time and status.
The backend/monitors/models.py
file content:
from django.db import models
from django_celery_beat.models import PeriodicTask
class Monitor(models.Model):
# monitored endpoint
endpoint = models.CharField(max_length=1024, blank=False)
# interval in seconds
# enpoint will be checked every specified interval time period
interval = models.IntegerField(blank=False)
task = models.OneToOneField(
PeriodicTask, null=True, blank=True, on_delete=models.SET_NULL
)
created_at = models.DateTimeField(auto_now_add=True)
class MonitorRequest(models.Model):
# endpoint response time in miliseconds
response_time = models.IntegerField(blank=False)
response_status = models.IntegerField(blank=False)
monitor = models.ForeignKey(Monitor, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
Please notice that Monitor
has one-to-one relationship with PeriodicTask
.
task = models.OneToOneField(
PeriodicTask, null=True, blank=True, on_delete=models.SET_NULL
)
The PeriodicTask
will be used to inform Celery about task execution and its frequency.
When the end-user adds a new server address for monitoring the Monitor
object will be created in the database. The server address will be stored in the endpoint
field. The interval
field is in seconds.
Each request to the server will be saved in the database as a MonitorRequest
object. We will store response time (in milliseconds) and status.
We will need to add serializers, views and URLs to have REST API available for Monitor
and MonitorRequest
.
Please add a new file serializers.py
in the backend/monitors
directory:
from rest_framework import serializers
from monitors.models import Monitor, MonitorRequest
class MonitorSerializer(serializers.ModelSerializer):
class Meta:
model = Monitor
read_only_fields = ("id", "created_at")
fields = (
"id",
"created_at",
"endpoint",
"interval",
)
class MonitorRequestSerializer(serializers.ModelSerializer):
monitor_endpoint = serializers.SerializerMethodField()
def get_monitor_endpoint(self, obj):
return obj.monitor.endpoint
class Meta:
model = MonitorRequest
read_only_fields = ("id", "created_at")
fields = (
"id",
"created_at",
"response_time",
"response_status",
"monitor_endpoint",
)
The serializers will just return available fields. The next step is to edit backend/monitors/views.py
:
import json
from django.db import transaction
from django.shortcuts import render
from django_celery_beat.models import IntervalSchedule, PeriodicTask
from rest_framework import viewsets
from rest_framework.exceptions import APIException
from monitors.models import Monitor, MonitorRequest
from monitors.serializers import MonitorRequestSerializer, MonitorSerializer
class MonitorViewSet(viewsets.ModelViewSet):
serializer_class = MonitorSerializer
queryset = Monitor.objects.all()
def perform_create(self, serializer):
try:
with transaction.atomic():
instance = serializer.save()
schedule, created = IntervalSchedule.objects.get_or_create(
every=instance.interval,
period=IntervalSchedule.SECONDS,
)
task = PeriodicTask.objects.create(
interval=schedule,
name=f"Monitor: {instance.endpoint}",
task="monitors.tasks.task_monitor",
kwargs=json.dumps(
{
"monitor_id": instance.id,
}
),
)
instance.task = task
instance.save()
except Exception as e:
raise APIException(str(e))
def perform_destroy(self, instance):
if instance.task is not None:
instance.task.delete()
return super().perform_destroy(instance)
class MonitorRequestViewSet(viewsets.ModelViewSet):
serializer_class = MonitorRequestSerializer
queryset = MonitorRequest.objects.all()
We have two views. The MonitorRequestViewSet
derives from ModelViewSet
and doesn’t overwrite any functions. It is simple CRUD for MonitorRequest
objects.
The MonitorViewSet
overwrites perform_create(self, serializer)
and perform_destroy(self, instance)
. During monitor creation, a PeriodicTask
instance is created. The PeriodicTask
instance requires an IntervalSchedule
object. The IntervalSchedule
defines the time period between every task execution.
with transaction.atomic():
instance = serializer.save()
# create `IntervalSchedule` obejct
schedule, created = IntervalSchedule.objects.get_or_create(
every=instance.interval,
period=IntervalSchedule.SECONDS,
)
# create `PeriodicTask` object
task = PeriodicTask.objects.create(
interval=schedule,
name=f"Monitor: {instance.endpoint}",
task="monitors.tasks.task_monitor",
kwargs=json.dumps(
{
"monitor_id": instance.id,
}
),
)
# save task in monitor object
instance.task = task
instance.save()
During the PeriodicTask
object creation, we pass the task function signature monitors.tasks.task_monitor
and define kwargs
. We will implement the task_monitor
in a moment.
The last step is to define urls.py
in the backend/monitors
:
from django.urls import re_path
from rest_framework.routers import DefaultRouter
from monitors.views import MonitorRequestViewSet, MonitorViewSet
router = DefaultRouter()
router.register(r"monitors", MonitorViewSet)
router.register(r"requests", MonitorRequestViewSet)
monitors_urlpatterns = router.urls
We used DefaultRouter
from Django Rest Framework. A basic viewer for REST API for monitors
and requests
will be generated by DRF.
We need to add monitors_urlpatterns
in the backend/backend/urls.py
to make them available in the Django application.
from django.contrib import admin
from django.urls import path
from monitors.urls import monitors_urlpatterns
urlpatterns = [
path("admin/", admin.site.urls),
]
urlpatterns += monitors_urlpatterns
Please make migrations and apply them:
# please run in the backend directory
python manage.py makemigrations
python manage.py migrate
Celery configuration
We need to configure the Celery framework. Please add a new file celery.py
in the backend/backend
directory:
import os
import sys
from celery import Celery
CURRENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, CURRENT_DIR)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
app = Celery("backend")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
In the backend/backend/settings.py
please add the Celery configuration:
# the rest of the code ...
# celery broker and results in sqlite
CELERY_BROKER_URL = "sqla+sqlite:///celery.sqlite"
CELERY_RESULT_BACKEND = "db+sqlite:///celery.sqlite"
We will use SQLite as a broker and results backend.
It is an example project showing how to use
PeriodicTask
, thus broker and results backend performance is out of the scope of this article.
We have the Celery configuration completed.
Background task
Please add the tasks.py
file in the backend/monitors
directory:
from datetime import datetime, timedelta
from decimal import Decimal
import requests
from celery import shared_task
from monitors.models import Monitor, MonitorRequest
@shared_task(bind=True)
def task_monitor(self, monitor_id):
try:
monitor = Monitor.objects.get(pk=monitor_id)
response = requests.get(monitor.endpoint, timeout=60)
MonitorRequest.objects.create(
response_time=int(response.elapsed.total_seconds() * 1000),
response_status=response.status_code,
monitor=monitor,
)
except Exception as e:
print(str(e), type(e))
The task_monitor(self, monitor_id)
function has @shared_task
decorator. It accepts monitor_id
as an argument - it is passed when PeriodicTask
is created as a kwargs
field.
The task_monitor
sends a GET request to the monitored server and saves response time and status. It is a simplified version of task_monitor
used in my uptime monitoring service.
Run Django and Celery
We are ready to play with our web application. You will need three terminals. Please start the Django development server in the first one:
python manage.py runserver
In the second terminal, please start the Celery worker:
celery -A backend worker --loglevel=info -P gevent --concurrency 1 -E
In the third terminal, please start Celery beat:
celery -A backend beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler --max-interval 10
Celery beat service uses
DatabaseScheduler
fromdjango-celery-beat
package. The beat service checks scheduled tasks from the database. Tasks defined withPeriodicTask
are persistent. Tasks will be available even after the Celery worker and beat restart.
Please open your (favorite) web browser and enter the address http://127.0.0.1:8000
. You should see the REST API viewer automatically generated by DRF:
If you have problems or need help, please create a GitHub issue. We will try to help you! You won’t be alone.
Please open the monitors API at http://127.0.0.1:8000/monitors/1/
. The list of monitors should be empty; let’s add the first monitor. Please fill out the form and click the POST
button.
Please wait some time to have some results, and you should see in the Celery beat terminal:
[2022-10-17 12:05:59,887: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:06:59,887: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:07:59,909: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:08:59,909: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
[2022-10-17 12:09:59,914: INFO/MainProcess] Scheduler: Sending due task Monitor: https://github.com (monitors.tasks.task_monitor)
Example output for Celery worker terminal:
[2022-10-17 12:08:00,644: INFO/MainProcess] Task monitors.tasks.task_monitor[e8ea92d4-e834-4bdf-8f4a-af6ee0601b55] received
[2022-10-17 12:08:01,304: INFO/MainProcess] Task monitors.tasks.task_monitor[e8ea92d4-e834-4bdf-8f4a-af6ee0601b55] succeeded in 0.65664959300193s: None
[2022-10-17 12:09:00,101: INFO/MainProcess] Task monitors.tasks.task_monitor[8c597c74-4917-4f3e-894f-a6ae117fc0f3] received
[2022-10-17 12:09:01,909: INFO/MainProcess] Task monitors.tasks.task_monitor[8c597c74-4917-4f3e-894f-a6ae117fc0f3] succeeded in 1.8063794959998631s: None
[2022-10-17 12:10:00,604: INFO/MainProcess] Task monitors.tasks.task_monitor[739f9227-4b63-409d-9d72-57720c2da5f0] received
[2022-10-17 12:10:00,873: INFO/MainProcess] Task monitors.tasks.task_monitor[739f9227-4b63-409d-9d72-57720c2da5f0] succeeded in 0.2661438309987716s: None
Please open requests API at http://127.0.0.1:8000/requests/
at the beginnig you will see only an empty list:
After some time, it will be filled with requests data:
You can stop the monitoring task by deleting the monitor. Please open the http://127.0.0.1:8000/monitors/1/
(where 1
is monitor ID) and click the Delete
button.
You should see that no more requests are produced. The monitor and an associated PeriodicTask
object have been removed. The code for this article is available in the GitHub repository.
To check periodic tasks, you can open the Django Admin Panel at
http://127.0.0.1:8000/admin
.
Summary
Celery is a great task queue. You can dynamically create or delete periodic tasks with the django-celery-beat
package without Celery restart. What is more, the PeriodicTask
object allows you to dynamically change the interval values and pause the task (it was not described in this article). These features were very helpful for me while implementing the uptime monitoring service.
Let's stay in touch!
Would you like to be notified about new posts? Please fill this form.