Post

asyncio - choosing the right executor

During application development with asyncio you will inevitably encounter situation when there is no asyncio-compatible library to use. It may be an API client for our business partner built with excellent requests library (that naturally doesn't work well with asyncio) or a simpler example - a Celery. Rewriting problematic dependency may be your first thought, although this is almost never a feasible option. At first, it would involve some reverse - engineering to quickly turn into mimicking existing API against all odds. Not to mention about need of maintenance and keeping compatibility later. Can you imagine writing your own Celery's client for tasks scheduling? A little confession here -  I have written limited client twice. I don't recommend it though.

So, shall we abandon asyncio in such a case?

No!

asyncio has a remedy for that. It's called executors. These are a pool of threads or processes dedicated for dealing with stuff incompatible with asyncio that would block event loop if executed in any other way. Whether we'll use processes or threads is completely up to us (developers). We can also create and use several different pools of both kinds whenever necessary.

Threads, when?

So, which kind of executors should we choose for aforementioned cases dealing with incompatible I/O?

We'll use threads for two reasons. Firstly, there is no problem even in "standard" Python when several threads wait for I/O operation to complete. Python program can still run on CPU if any other thread is has such a work to do. Threads are also lightweight compared to processes.

asyncio creates a pool of threads by default. To use it, we simply pass an executable to loop's run_in_executor with None as the first argument:

import asyncio

import requests


def blocking_function():
    response = requests.get('https://breadcrumbscollector.tech/feed/')
    print(f'Got data of length: {len(response.content)} in just {response.elapsed}')


loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, blocking_function)  # will use default threads executor!
loop.run_until_complete(coro)

If our function accepts positional arguments then we can just pass them as subsequent arguments to run_in_executor :

def blocking_function(arg_one, arg_two):
    ...

coro = loop.run_in_executor(None, blocking_function, some_var, some_other_var)

run_in_executor doesn't support keyword arguments, so to pass them we need a little trick using functools.partial or lambdas:

def blocking_function(keyword_one, another_keyword):
    ...

fun_with_keywords = functools.partial(blocking_function, another_keyword=1, keyword_one=2)
fun_with_keywords = lambda: blocking_function(another_keyword=1, keyword_one=2)
coro = loop.run_in_executor(None, fun_with_keywords)

This isn't anything extraordinarily complicated.

Processes, when?

Use processes for any other cases, especially when dealing with CPU-bound tasks like decoding large XML/JSON documents, converting images etc. But at least consider delegating such stuff to queue like Celery first using threads!

Creating and using own executors pools

As I mentioned there is a possiblity to create own pool with executors. Let's see how to use it to get some process-based workers:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import requests


def blocking_function():
    response = requests.get('https://breadcrumbscollector.tech/feed/')
    print(f'Got data of length: {len(response.content)} in just {response.elapsed}')


loop = asyncio.get_event_loop()

with ProcessPoolExecutor() as process_executor:
    coro = loop.run_in_executor(process_executor, blocking_function)
    loop.run_until_complete(coro)

This is the same example as the first one - we use run_in_executor to get some data from an URL using a library that is not compatbile with asyncio.

Summary

To sum up, use threads (default) pool whenever you deal with blocking I/O and processes for CPU bound tasks.

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.