Post

Implementing Event Sourcing in Python – part 4, efficient read model with projections

This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make the most sense if you also read all other parts. Then you should be ready to use it in your own projects.

What have we done so far?

In the part one of Implementing Event Sourcing series I described how to express your logic with event sourcing aggregates. Parts two and three discuss things one has to take into consideration while dealing with saving and loading state of aggregates (events, to be exact). So we are already able to write our entire logic. That's great, but that's not the end of Event Sourcing yet.

Read models/projections - what is all the fuss about?

There is an approach called CQRS. This acronym stands for Command Query Responsibility Segregation. It amounts to create and maintain two separate models of your data. One of them will be for saving, another one for reading. In other words, we will keep our aggregates in more than one way. Why? Because saving and reading operations are efficient for different data formats. That is especially true when we use Event Sourcing. Just imagine querying events table to get all orders with certain criteria met. This can not perform well. Data format we want for write model is something similar to what many people are familiar with - 3rd normal form. In short, that's the way we store data using Django's best practices. We split our data into different models, create relationships via foreign keys etc. Ideally, there is no duplicated data on the write side. Read model, on the other hand, likes data to be denormalized. Ideally, there should not be any JOINS as the data needed for certain view is available to query from one table. So we save the same data at least twice. Once for saving, once or more (depending on how many specialized view models you need) for reading.

But wait... Data duplication is bad, isn't?

As long as you apply certain standards of hygiene it is not a problem at all:

  • write model is superior to read model
  • every change in write model is reflected in reading model
  • read model can not be changed in any way except as a result of altering write model
  • read model is very easy as all read operations are safe for the system
  • the whole complexity lies in write model - it is where all business rules have to be enforced, where we need validation etc
  • once one needs to change read model format (we add/delete some columns, for example) it is possible to recreate it using save model's current state

What is in it for me?

Updating read model can be done asynchronously (we deal with Eventual Consistency here) what makes event sourcing architecture sooo scalable. We have to sacrifice consistency for the sake of scalability. Even though some clients may see out-dated information for a very short while, the whole system is protected against saving operations as write model is always consistent. This is guaranteed by proper implementation of an event store.

So how do we build read model from write model?

This is where another keyword comes onstage - projections. Projection is a piece of code that translates a stream of events into data most convenient for reading.

Let's say we deal with banking. Our most precious aggregate is called Account. Consider following events used to reconstitute account's state:

class CashOperation(Event):

    account_uuid: uuid.UUID = attr.ib(
        validator=[attr.validators.instance_of(uuid.UUID)],
        converter=uuid.UUID
    )
    amount: decimal.Decimal = attr.ib(
        validator=[attr.validators.instance_of(decimal.Decimal)],
        converter=decimal.Decimal
    )

    @amount.validator
    def check(self, _attribute, value):
        if not value > 0:
            raise ValueError

    def as_dict(self):
        return {'amount': str(self.amount), 'name': self.__class__.__name__}


class CashDeposited(CashOperation):
    pass


class CashWithdrawn(CashOperation):
    pass

We can easily imagine that we will need data saved somewhere to actually get the proper balance without having to iterate through the whole history of cash operations. In other words, we need our projection to maintain rows in such format:

--------------------------------------------------
|                 uuid                 | balance |
--------------------------------------------------
| 4e931a5d-212d-4347-aca4-e0f435b1f37d |  155.00 |
--------------------------------------------------
| 3b98b4ec-083a-498d-b8eb-9f68a06eca85 | 6155.00 |
--------------------------------------------------

Code for doing that kind of acrobatics is quite straightforward:

class SqlAlchemyProjection(metaclass=ABCMeta):  # 1
    def __init__(self, connection: Connection):  # 2
        # Beginning/closing transaction is not a business of SQL projection
        self._connection = connection

    @abstractmethod
    def handle(self, event: Event):  # 3
        pass


class AccountProjection(SqlAlchemyProjection):  # 4

    def handle(self, event: Event):

        @singledispatch  # 5
        def handle(_event: Event):
            pass

        @handle.register(CashDeposited)  # 6
        def _(event: CashDeposited):
            stmt = accounts.update().where(  # 7
                accounts.c.uuid == str(event.account_uuid)
            ).values(balance=accounts.c.balance + event.amount)
            res = self._connection.execute(stmt)

            if res.rowcount == 0:  # 8
                self._connection.execute(
                    accounts.insert().values(uuid=str(event.account_uuid), balance=event.amount)
                )

        @handle.register(CashWithdrawn)  # 9
        def _(event: CashWithdrawn):
            stmt = accounts.update().where(  # 10
                accounts.c.uuid == str(event.account_uuid)
            ).values(balance=accounts.c.balance - event.amount)
            self._connection.execute(stmt)

        handle(event)
  1. We start with a base class for all projections that will actually persist our read model in RDBMS. We will use excellent SQLAlchemy ORM for this, but instead of defining models, we'll go for SQLAlchemy Core API, what will give us a bit of performance boost.
  2. We need a Connection object to use SQLAlchemy Core. One can be trivially obtained from a Session or Engine.
  3. Every projection will have handle method accepting single argument - Event subclass instance
  4. AccountProjection will take care of inserting/updating rows inside accounts table
  5. We can leverage singledispatch to avoid writing error-prone if-elif-else block or dictionary handling logic routing. Note that default implementation does nothing as it means we don't support an event that got there
  6. The handler of CashDeposited...
  7. ...firstly tries to update existing account, adding balance to it
  8. If we updated zero rows in the previous step it means we don't have an account yet, so we'll create one
  9. The handler of CashWithdrawn...
  10. ...simply updates the row using account_id by decreasing the balance

We don't really have to handle situations when for example we try to withdraw cash from a nonexistent account since consistency is handled elsewhere (write model). We just have to make sure that read model gets events in a correct order. This can be dealt with using queuing project, like RabbitMQ.

Transactions?

A comment in SqlAlchemyProjection class says that projection should not touch transaction at all. There are two different situations that require a different approach:

  • projecting single event - in this case, we can BEGIN/COMMIT transaction after each projection's handle method call. Every event is projected in its own transaction
  • projecting all events since the beginning - one BEGIN/COMMIT for the entire operation. This gives a performance boost, see this blog post for more information and charts

Of course, this is applicable only when we store read model in some RDBMS.

Should I query for additional data inside projection once I try to denormalize an event?

Usually, you should not need to. If you need more data from an aggregate itself, then just add it to the event.

How to put projections in my architecture?

You will need new service for projecting events that will take care of projecting one event after another. In the beginning single-threaded solution that will dequeue events from your Event Bus (RabbitMQ or something similar) will suffice. Later you can scale your solution and provide separate thread/worker by every projection. Do not try to add more workers to the same projection. It would only allow for ugly race conditions and will not increase the speed at all.

An important thing to note is that every projection that may be interested in a certain event has to get it. In a simple implementation, we can just send all events to all projections. We might want to optimize it a bit by introducing some routing on Event Bus, but this may be postponed. Different projections change different read models. This will mean updating different tables or even different databases.

Projecting in Event Sourcing is a very powerful technique!

This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make the most sense if you also read all other parts. Then you should be ready to use it in your own projects.

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.