What is Celery beat and how to use it – part 2, patterns and caveats

Celery beat is a nice Celery’s add-on for automatic scheduling periodic tasks (e.g. every hour). For more basic information, see part 1 – What is Celery beat and how to use it.

In this part, we’re gonna talk about common applications of Celery beat, reoccurring patterns and pitfalls waiting for you.

Ensuring a task is only executed one at a time

There are particular tasks we want to run periodically up to one at a time. For example, data synchronization with an external service or doing calculations on our own data. In both cases, it does not make sense to have two identical tasks running at the same time. It may either result in considerable load spike or lead to data corruption.

The first one will be a nightmare from a maintenance and performance point of view since one has to kill manually duplicated tasks. For example, two or more tasks doing heavy operations on a relational database may consume so many resources that the database will not be able to service usual requests from clients.

When it comes to data corruption, consequences are even more dreadful. Such a situation may lead to money loss and potentially time-consuming manual intervention in data to undo the damage. For example, imagine we synchronize payments from external service in a periodic task. If our implementation does not secure us against parsing the same payment twice, we could end up giving someone twice as they should have.

Luckily, there are at least two ways to secure against such unpleasant situations:

  • a separate queue with a dedicated celery worker with a single worker process (–concurrency 1)
  • using lock

Separate queue with a dedicated Celery worker with a single process

This solution requires no serious code changes in Celery tasks. Instead, we have to:

  • configure tasks routing so our task goes to a separate queue
  • run another worker for that queue
  • the worker needs to have only one process (–concurrency 1)

How does it work? Well, a single worker process ensures there is exactly one task processed at a time. If for some reason task takes longer than anticipated (e.g. longer than intervals for Celery Beat scheduler), another one is not started. It may still pile up in the broker, though.

For the complete example, see github repo – Celery Beat Example.

On the bright side, this solution is very reliable and trivial. No extra code for handling locks is needed. No code changes in tasks are required. This is a natural solution by design.

However, this solution has also serious disadvantages. One of them is the maintenance of additional celery worker. If it is idle for most of the time, it is pure waste. On the other hand, if we have more tasks that could use execution one at a time, we may reuse the same worker. It may still require a bit of fine-tuning plus monitoring if we are under- or over-utilizing our dedicated worker.

Use a lock to guarantee only a single task execution at a time

Second solution uses classic pessimistic lock. When a Celery gets a task from the queue, we need to acquire a lock first. If it fails, we abort. If we acquired the lock successfully, we apply timeout on it (so lock automatically disappears if a worker crashes) and start work. When we are finished, we can release the lock. This guarantees us to have only one worker at a time processing a given task.

Recipe from Celery Task Cookbook (and why it is not something you want in production)

Celery Task Cookbook provides an explanation and code snippet for achieving that, but continue reading before you put this in production! This implementation is flawed for two reasons:

  • uses Django cache
  • does not handle a situation when a task takes longer than lock timeout

Using the cache is problematic because it is designed for a different application – caching data. The cache can be unavailable (and this is 100% normal situation). As far as I remember, Django’s cache.add does not inform whether it failed because a given key exists (lock has been already acquired) or cache server (Memcached in the example) is not working. To mitigate this, we need monitoring & alerting set up to see if our cache is up and running. Otherwise, we would be seeing in logs that processing is already taking place, which is obviously false. Another problem with cache is that it does not provide appropriate API for doing locking. Note that the author of the recipe mentions one has to use cache backend that has add operation implemented as an atomic operation. What is more, a cache is supposed to be safe to delete at any time. This is a sign it is a hacky and brittle solution. In the example to come, I will show you how one can leverage Redis to achieve reliable locking.

Before jumping into Redis locking, let’s talk about a scenario when a lock-protected task takes longer than a lock timeout. With code in the example, the lock would expire and if another such task was waiting in the queue, it would start executing even before the previous one finished (but its lock expired meh)! That’s a catastrophe. Removing timeout from a lock is not an option because every time a task crashes it would require manual intervention to remove the lock. We may also approach this by introducing very long expiry time on the lock. That’s an option provided we can guarantee the task will finish in a shorter time. When I think guarantee, I do not mean guessing (Oh, it SHOULD last for 4 minutes). Either use timeouts internally or use Celery’s feature Time Limits.

Tailored Redis-based lock

Knowing that using Django cache is not really a reliable option, let’s explore an alternative. Redis is much better in terms of locking because a) it has atomic commands to set a value if a key does not exist and apply a timeout at once b) one may use Lua script to atomically release the lock but only if you own it. If you are just interested in the algorithm, see Distributed locks with Redis.

Celery Beat tasks running very often (e.g. every few seconds)

Now, for tasks that are scheduled to run every few seconds, we must be very cautious. The solution with a dedicated worker in Celery does not really work great there, because tasks will quickly pile up in the queue, leading ultimately to the broker failure. The situation is a bit better for lock-protected tasks because multiple workers can quickly empty the queue of tasks if they ever pile up. It will just make your CPU cores red for a moment.

If you, however, think about preventing piling up tasks in a queue with expiration feature, beware the scenario with a single dedicated worker. Tasks do not magically disappear from the queue once they expire. They are still there until the worker gets them and only then they are revoked. In other words, if you have tasks piling up in queues but you applied expiry times, you are not safe yet. If tasks are getting produced faster than you consume them, it will lead to broker failure.

It may happen if you have created a dedicated worker for a calculation task that should run every few seconds. This is a very short time frame and it is easy to cross. Celery beat will keep on putting tasks in the queue until it fills up completely and bang, the broker is down.

Let me share with you one last recipe for dealing with the scenario – create a separate Python script (completely outside Celery) that will be doing that work. Its code will look as follows:

For such often calculations, it makes no sense to stress Celery and put extra work in securing this against concurrent executions.

Summary

Even though Celery and Celery Beat may look trivial to use, there are many potential pitfalls you have to avoid.

It is not Celery’s fault, though. It is just that queues are not simple 😉 If you are more interested about what may go wrong in such systems, have a read about Queueing theory. It is applicable not only in computer programs!

PS: Don’t forget to checkout a repository with working examples from this post: Celery Beat Example

2

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.