Scheduling Periodic Tasks with Celery 2.3.3 and Django 1.4

Django    Celery    2013-04-17

There are going to be times in the life of your application when you'll need to perform tasks - making requests to APIs, sending emails, or storing large amounts of data, for example. To do that, you'll most likely want to implement some sort of job queue - that is, a system that lines these tasks up and executes them.

Celery is a task queue commonly used with Django - it's open source, easy to integrate with many other languages and frameworks, and it can execute tasks either asynchronously (in the background) or synchronously (wait until ready).

This article is going to talk about a very basic Celery implementation for Django - you'll need to have Django installed already, and we'll discuss installing and configuring celery, django-celery (Celery integration for Django), and RabbitMQ (the recommended message broker for celery).

If you're using an older version of Celery, it may be a little challenging finding the right docs - for 2.3.3, start with this link:

And you may want to bookmark this for future reference - the Celery account on GitHub contains repositories for the official celeryproject, as well as django-celery and kombu.

Installing and configuring

Before you get started, you'll need to make sure that you have a few requirements installed.

  1. First thing, install celery itself - you can do that easily with pip:
  2.     $ pip install celery
    

  3. Celery integration for Django comes from the django-celery package - that can also be installed with pip:
  4.     $ pip install django-celery
    

    These lines need to be added to your settings file:

        import djcelery
        djcelery.setup_loader()
    

    Along with an addition to your INSTALLED_APPS:

        'djcelery',
    

    Once django-celery is installed and your settings.py is updated, you'll also need to create the celery database tables - instructions for that differ depending on whether or not you're using South for migrations. All of this information can be found here:


  5. Configure celery:
  6. Because we're using django-celery, we won't need the celeryconfig.py described here:

    Those settings should, instead, go into the settings.py. Here's an example of what you'd need initially:

        # The backend used to store task results - because we're going to be 
        # using RabbitMQ as a broker, this sends results back as AMQP messages
        CELERY_RESULT_BACKEND = "amqp"
        CELERY_IMPORTS = ("tasks", )
        CELERY_ALWAYS_EAGER = True
    

  7. Finally you'll need a broker - that's the software that sends and receives messages.
  8. RabbitMQ is the default broker and probably the simplest to use, since it does not require any additional dependencies or initial configuration. Installing RabbitMQ is simple, but methods vary across operating systems, so you'll want to check this page for guidance:

    Once RabbitMQ is installed, it needs a little setup as well. Add these to your settings.py:

        BROKER_HOST = "localhost"
        BROKER_PORT = 5672
        BROKER_PASSWORD = "mypassword"
        BROKER_USER = "myuser"
        BROKER_VHOST = "myvhost"
        BROKER_URL = "amqp://myuser:mypassword@localhost:5672//"
    

    And run these simple setup commands:

        $ sudo rabbitmqctl add_user myuser mypassword
        $ sudo rabbitmqctl add_vhost myvhost
        $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
    

Creating callable tasks

Let's assume you have a task that needs to be run periodically - a good example might be a scraper that needs to be run nightly to update some data for your web site.

Here's a quick example of how something like that might be set up (keep in mind that this example is totally arbitrary - there are many other types of tasks you could use, and many other ways that you could implement a scraper):

    ## /project_name/app_name/utils/scraper.py

    import urllib2
    import json

    def scrape_officials():
        """Scrape for a list of current officeholders using an officials API"""

        data = json.loads(urllib2.urlopen(OFFICIALS_API_URL).read())['objects']
        storage.store_officials(data)
        return data

You've got your callable command, now you need to create a Celery task out of it. Per the docs: "Tasks are the building blocks of Celery applications. A task is a class that can be created out of any callable."

Create a tasks.py file inside your application. Our simple example only calls for two imports: the celery task decorator, and our scraper module:

    ## /project_name/app_name/tasks.py

    from celery.decorators import task

    from utils import scrapers


    @task
    def scrape_officials(*args):
        scrapers.scrape_officials(*args)

In this tasks.py, we've created a new function that calls the scraper and simply wrapped it in that tasks decorator. (Things can get a lot more complicated than this, of course - for some of the other options you have available, check out the documentation on creating your tasks: http://docs.celeryproject.org/en/v2.3.3/userguide/tasks.html)

Scheduling periodic tasks

And that's the easy part done. Next up, you'll want to schedule this task to run at a periodic interval.

To do that, you'll use celery.beat, celery's scheduler. It kicks off tasks at intervals which you define, so that those tasks can then be executed by worker nodes available in the cluster.

In your settings.py, you'll want to import this module:

    from celery.schedules import crontab

    # The default Django db scheduler
    CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
    CELERYBEAT_SCHEDULE = {
        "scrape-officials": {
            "task": "bills.tasks.scrape_officials",
            # Every Sunday at 4:30AM
            "schedule": crontab(hour=4, minute=30, day_of_week=0),
            "args": (),
        },
    }

If you've ever scheduled a cron job, you can schedule a celery task. And if you haven't, here's a detailed description of the configuration syntax:

Another example of a CELERYBEAT_SCHEDULE entry, this one with arguments and a different timetable:

        "scrape-bills-senate": {
            "task": "bills.tasks.scrape_bills",
            # Daily at midnight
            "schedule": crontab(minute=0, hour=0),
            "args": ('83R', 'senate'),
        },

Testing and troubleshooting

Go ahead and start the celery worker with this command:

    $ python manage.py celeryd -l INFO

If you see entries like these in the log:

    [2013-04-16 22:10:36,702: ERROR/MainProcess] Consumer: Connection Error: [Errno 61] Connection refused. Trying again in 2 seconds...

    [2013-04-16 22:54:36,823: ERROR/MainProcess] Consumer: Connection Error: Socket closed. Trying again in 2 seconds...

Rabbitmq might not be running - to start it, navigate to:

    /your/path/to/rabbitmq/3.0.2

(replace the version in the path with whatever version number you have installed)

And run:

    $ sbin/rabbitmq-server -detached

In another terminal, open an interpreter and test out some simple task commands:

    $ python manage.py shell
    >>> from bills.tasks import scrape_officials
    >>> result = scrape_officials.delay()
    >>> # If the task took arguments, they would be passed here
    >>> # e.g., result = scrape_bill.delay('83R', 'HB25')
    >>> result.ready() # returns True if the task has finished processing
    >>> result.result # task is not ready, so no return value yet
    >>> result.get()   # waits until the task is done and returns a value
    >>> result.result # direct access to result, doesn't re-raise errors
    >>> result.successful() # returns True if the task didn't end in failure

Now to test celery with the scheduler this time:

    $ python manage.py celeryd -B -l DEBUG

Adding the '-B' modifier runs the celerybeat periodic task scheduler along with celery. ('-l DEBUG' just changes the verbosity of the logging output.)

You'll see a similar launch screen, plus the list of scheduled tasks:

    [2013-03-07 13:42:30,518: DEBUG/Beat] Current schedule:
    <ModelEntry: scrape-officials bills.tasks.scrape_officials(*[], **{}) {<crontab: 30 4 0 (m/h/d)>}
    <ModelEntry: celery.backend_cleanup celery.backend_cleanup(*[], **{}) {<crontab: 0 4 * (m/h/d)>}

When a task executes successfully, you'll see a log entry like this:

    [2013-03-07 13:45:00,577: DEBUG/Beat] bills.tasks.scrape_officials sent. id->8feaf52e-3de7-421d-9fe1-6d509f228444

Keep in mind that if the schedule for any task changes, you must restart celeryd.

An example log result:

    [2013-03-07 14:34:57,972: DEBUG/Beat] Celerybeat: Waking up in 2.03 seconds.
    [2013-03-07 14:35:00,004: DEBUG/Beat] Scheduler: Sending due task tt_bills.tasks.scrape_bill
    [2013-03-07 14:35:00,022: INFO/Beat] Starting new HTTP connection (1): www.capitol.state.tx.us
    [2013-03-07 14:35:00,605: DEBUG/Beat] "GET /BillLookup/History.aspx?LegSess=83R&Bill=HB25 HTTP/1.1" 200 15658
    [2013-03-07 14:35:00,725: INFO/Beat] Starting new HTTP connection (1): www.capitol.state.tx.us
    [2013-03-07 14:35:01,263: DEBUG/Beat] "GET /BillLookup/Text.aspx?LegSess=83R&Bill=HB25 HTTP/1.1" 200 17667
    [2013-03-07 14:35:01,415: WARNING/Beat] storing bill HB25
    [2013-03-07 14:35:01,416: INFO/Beat] RETRIEVED DATA: {'session': '83R', 'versions': [u'Introduced'], ...}
    [2013-03-07 14:35:01,495: DEBUG/Beat] Celerybeat: Synchronizing schedule...
    [2013-03-07 14:35:01,496: DEBUG/Beat] Writing dirty entries...
    [2013-03-07 14:35:01,504: DEBUG/Beat] bills.tasks.scrape_bill sent. id->ebf66d70-589d-41b7-b5e8-0e56ca9c0ceb
    [2013-03-07 14:35:01,505: DEBUG/Beat] Celerybeat: Waking up in 5.00 seconds.

If you're using a Procfile in your project, you can include a line configured thusly:

    celery: python manage.py celeryd --events --loglevel=INFO -c 5 --settings=settings -B

Just be sure to add '-B' so that celerybeat starts with celery, and the scheduler will handle things from there.

Additional reading

First steps with Django (Celery documentation)
Up and Running with Celery and Django
Django, Celerybeat and Celery with MongoDB as the Broker
Using celerybeat and Django
Using Celery to Handle Asynchronous Processes
Basic django, celery and rabbitmq example