What is celery beat and how to use it?
Celery is a widely recognized distributed task queue for pythonic projects. Its sole purpose is to reduce load of web servers by delegating time-consuming tasks to separate processes. Our web servers should handle one request for no longer than a fraction of second, therefore running long tasks synchronously substantially reduces application's throughput.
So we are able to delegate and queue hard work elsewhere. Obviously, this has to be done explicitly. Sometimes we have to run some tasks periodically. Let's say we have a business requirement:
Users pay for access to our service. Payment can be done only for one month. Doing so postpones account's expiration by 31 days. Expired accounts should be deactivated within 4 hours. Eventually e-mail with apropriate information should be send.
To fulfill such requirement, we would have to scan application's user list for expired accounts at least every 4 hours. That is where celery beat comes in. Its simply a celery periodic tasks scheduler.
Why should I use celery beat? I have crontab
That's a strong argument against celery beat. After all, why would anyone care about some less-known solution that is meant to replace battle-tested crond service present in all modern unix distros? However, using crontab requires additional knowledge and usually root access on server. AFAIK, there is no such widespread service in Windows .
Portability
First argument in favour of celery beat is its portability. Every environment that can run Python will be also sufficient for celery beat. Taking development and test environments into consideration, this is a serious advantage. Celery beat is just another part of your application, so new version could be easily deployed locally every time codebase changes. This speeds up whole process and makes one headache go away.
No more "glue" scripts
Using celery beat eliminates need for writing little glue scripts with one purpose - run some checks, then eventually sending tasks to regular celery worker. Usually these would be run periodically by crond, therefore crond configuration would effectively tie application to certain run environment.
It is in Python!
An obvious advantage. No administration skills required to have full-fledged periodic tasks scheduler :)
How to properly use celery beat in aforementioned scenario?
Let's break our problem into smaller ones first. We have to:
- Scan user's list, looking for expired accounts
- For every expired account:
- deactivate it
- send an e-mail
We know for sure, that first thing should be done periodically, so we'll be running it directly from celery beat:
from celery import Celery app = Celery('tasks', broker='redis://redis:6379/0') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(3600 * 4, scan_for_expired_users.s(), name='scan for expired accounts every 4 hours') @app.task def scan_for_expired_users(): """ TODO """ pass
1:0 for scheduler. We might implement whole thing in this task, but this approach is strongly discouraged. First of all, resultant list of accounts can be large. Therefore, deactivating all accounts and sending e-mails would take very long time. Furthermore, failure in processing one account will affect the whole thing.
Fortunately, we can easily trigger new tasks inside others using apply_async:
from celery import Celery app = Celery('tasks', broker='redis://redis:6379/0') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(3600 * 4, scan_for_expired_users.s(), name='scan for expired accounts every 4 hours') @app.task def scan_for_expired_users(): """ TODO """ for user in expired_users: deactivate_account_and_send_email.apply_async((user, )) @app.task def deactivate_account_and_send_email(user): # do some stuff pass
We could stop here, but deactivate_account_and_send_email still can be split into two smaller tasks. However, we can't just fire both using apply_async because they would run independently and we could end up with sending email of expiration to account that wasn't deactivated due to some failure :)
So we need to link these tasks together somehow. Fortunately, Celery has appropriate mechanism called chains. Chained tasks will be executed sequentially. What is more, next task will get result from previous one as first argument.
Working example:
from celery import Celery, chain app = Celery('tasks', broker='redis://redis:6379/0') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10, scan_for_expired_users.s(), name='scan for expired accounts every 4 hours') @app.task def scan_for_expired_users(): for user in get_expired_users(): deactivating_process = chain(deactivate_account.s(user), send_expiration_email.s()) deactivating_process() @app.task def deactivate_account(user): # do some stuff print(f'deactivating account: {user}') return user + '_deactivated' @app.task def send_expiration_email(user): # do some other stuff print(f'sending expiration email to: {user}') return True def get_expired_users(): return (f'user_{i}' for i in range(5))
Whole magic takes place in line 15. chain is created (note absence of explicit argument send_expiration_email! Result of previous task in chain will be passed). Then, in line 16. chain is scheduled to run asynchronously.
Summary
Celery once again proves to be powerful tool. Celery beat is a great addition to passive celery worker and saves programmers from learning crond.
Further info:
- celery beat manual
- designing tasks workflows
- working example, ready to use with docker-compose https://github.com/Enforcer/celery-beat-example
Comments powered by Disqus.