Post

What is celery beat and how to use it?

Celery is a widely recognized distributed task queue for pythonic projects. Its sole purpose is to reduce load of web servers by delegating time-consuming tasks to separate processes. Our web servers should handle one request for no longer than a fraction of second, therefore running long tasks synchronously substantially reduces application's throughput.

So we are able to delegate and queue hard work elsewhere. Obviously, this has to be done explicitly. Sometimes we have to run some tasks periodically.  Let's say we have a business requirement:

Users pay for access to our service. Payment can be done only for one month. Doing so postpones account's expiration by 31 days. Expired accounts should be deactivated within 4 hours. Eventually e-mail with apropriate information should be send.

To fulfill such requirement, we would have to scan application's user list for expired accounts at least every 4 hours. That is where celery beat comes in. Its simply a celery periodic tasks scheduler.

Why should I use celery beat? I have crontab

That's a strong argument against celery beat. After all, why would anyone care about some less-known solution that is meant to replace battle-tested crond service present in all modern unix distros? However, using crontab requires additional knowledge and usually root access on server. AFAIK, there is no such widespread service in Windows .

Portability

First argument in favour of celery beat is its portability. Every environment that can run Python will be also sufficient for celery beat. Taking development and test environments into consideration, this is a serious advantage. Celery beat is just another part of your application, so new version could be easily deployed locally every time codebase changes. This speeds up whole process and makes one headache go away.

No more "glue" scripts

Using celery beat eliminates need for writing little glue scripts with one purpose - run some checks, then eventually sending tasks to regular celery worker. Usually these would be run periodically by crond, therefore crond configuration would effectively tie application to certain run environment.

It is in Python!

An obvious advantage. No administration skills required to have full-fledged periodic tasks scheduler :)

How to properly use celery beat in aforementioned scenario?

Let's break our problem into smaller ones first. We have to:

  • Scan user's list, looking for expired accounts
  • For every expired account:
    • deactivate it
    • send an e-mail

We know for sure, that first thing should be done periodically, so we'll be running it directly from celery beat:

from celery import Celery

app = Celery('tasks', broker='redis://redis:6379/0')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(3600 * 4, scan_for_expired_users.s(), name='scan for expired accounts every 4 hours')

@app.task
def scan_for_expired_users():
    """
    TODO
    """
    pass

1:0 for scheduler.  We might implement whole thing in this task, but this approach is strongly discouraged.  First of all, resultant list of accounts can be large. Therefore, deactivating all accounts and sending e-mails would take very long time. Furthermore, failure in processing one account will affect the whole thing.

Fortunately, we can easily trigger new tasks inside others using apply_async:

from celery import Celery

app = Celery('tasks', broker='redis://redis:6379/0')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(3600 * 4, scan_for_expired_users.s(), name='scan for expired accounts every 4 hours')

@app.task
def scan_for_expired_users():
    """
    TODO
    """
    for user in expired_users:
        deactivate_account_and_send_email.apply_async((user, ))

@app.task
def deactivate_account_and_send_email(user):
    # do some stuff
    pass

We could stop here, but deactivate_account_and_send_email  still can be split into two smaller tasks. However, we can't just fire both using apply_async because they would run independently and we could end up with sending email of expiration to account that wasn't deactivated due to some failure :)

So we need to link these tasks together somehow. Fortunately, Celery has appropriate mechanism called chains. Chained tasks will be executed sequentially. What is more, next task will get result from previous one as first argument.

Working example:

from celery import Celery, chain


app = Celery('tasks', broker='redis://redis:6379/0')


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10, scan_for_expired_users.s(), name='scan for expired accounts every 4 hours')


@app.task
def scan_for_expired_users():
    for user in get_expired_users():
        deactivating_process = chain(deactivate_account.s(user), send_expiration_email.s())
        deactivating_process()


@app.task
def deactivate_account(user):
    # do some stuff
    print(f'deactivating account: {user}')
    return user + '_deactivated'


@app.task
def send_expiration_email(user):
    # do some other stuff
    print(f'sending expiration email to: {user}')
    return True


def get_expired_users():
    return (f'user_{i}' for i in range(5))

Whole magic takes place in line 15. chain is created (note absence of explicit argument send_expiration_email! Result of previous task in chain will be passed). Then, in line 16. chain is scheduled to run asynchronously.

Summary

Celery once again proves to be powerful tool. Celery beat is a great addition to passive celery worker and saves programmers from learning crond.

Further info:

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.