Post

Implementing Event Sourcing in Python - part 1, aggregates

This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.

What Event Sourcing actually is?

Recall any entity/model/being from a piece of software you recently worked on. Probably you thought about User, but let's try a bit harder. Consider e-commerce Order. It might hold current status (new, confirmed, shipped, etc) and summaries - total price, shipping and taxes. Naturally, Order does not exist on its own. We usually wire it with another entity, OrderLine that refers to a single product ordered with a quantity information. This structure could be represented in a relational database in a following way:

Orders
------------------------------
| id | status | total_price |
------------------------------
|  1 |    new |      169.99 |
------------------------------

OrderLines

-----------------------------------------
| id | order_id | product_id | quantity |
-----------------------------------------
|  1 |        1 |        512 |        1 |
-----------------------------------------
|  2 |        1 |        614 |        3 |
-----------------------------------------

By storing data this way we can always cheaply get CURRENT state of our Order. We store a dump of serialized object after latest changes. Changing anything, for example switching status from new to shipped causes data overwrite. We irreversibly lose old state. What if we need to track all changes? Let's see how that fits in another database table:

OrdersHistory
--------------------------------------------------------------------------------------------
| id | order_id |    event_name |            datetime |                               data |
--------------------------------------------------------------------------------------------
|  1 |        1 |  OrderCreated | 2018-01-20 18:33:12 |                                    |
--------------------------------------------------------------------------------------------
|  2 |        1 |     LineAdded | 2018-01-20 18:33:44 | {"product_id": 512, "quantity": 1} |
--------------------------------------------------------------------------------------------
|  3 |        1 | StatusChanged | 2018-01-20 18:42:59 |            {"status": "confirmed"} |
--------------------------------------------------------------------------------------------

Such a representation enables us to firmly tell what was changed and when. But this OrderHistory plays a second fiddle. It is merely an auxiliary record of Order, added just to fulfill some business requirement. We still reach to original Orders table when we want to know exact state of any Order in all other scenarios.

Please take a note OrdersHistory is as good as Orders table when we have to get current Order state. We just have to fetch all entries for given Order and 'replay' them from the start. In the end we'll get exactly the same information that is saved in Orders table. So should we still treat Orders table as our source of truth? Event Sourcing denies such a claim. We can safely get rid of the table or at least no longer rely on it in any situation that would actually mutate Order.

To sum up, Event Sourcing comes down to:

  • Keeping your business objects (called aggregates) as a series of replayable events. This is often called an event stream
  • Never deleting any events from a system, only appending new ones
  • Using events as the only reliable way of telling in what state a given aggregate is
  • If you need to query data or present them in a table-like format, keep a copy of them in a denormalized format. This is called projection
  • Designing your aggregates to protect certain vital business invariants, such as Order encapsulates costs summary. A good rule of thumb is to keep aggregates as small as possible

If any point sounds a bit unclear, don't worry. It will all be clarified within few next paragraphs and code snippets.

In return you get:

  • A complete history what was changed when and by who (if you enclose such information in an event)
  • Time-travel debugging, allowing to recreate state of the system in any given moment
  • Possibility of creating specialized read models of your data for high performance
  • Append - only write model that is also easier to scale

I strongly recommend to watch talk of Greg Young if you have not seen it before.

Talk is cheap - show me some code!

Consider following Order class:

class Order:
    def __init__(self, user_id: int, status: str = 'new'):
        self.user_id = user_id
        self.status = status

    def set_status(self, new_status: str):
        if new_status not in ('new', 'paid', 'confirmed', 'shipped'):
            raise ValueError(f'{new_status} is not a correct status!')

        self.status = new_status

To create an Order we have to provide user_id. Status is equal to new by default. The only thing we can do is to change status. This may look like a trivial example, but I made it to be simple for a purpose. Let's rewrite the class using Event Sourcing. First, we need events that will represent any state mutations:

class OrderCreated:
    def __init__(self, user_id: int):
        self.user_id = user_id

class StatusChanged:
    def __init__(self, new_status: str):
        self.new_status = new_status

In such simple example there are only two events. First one, OrderCreated, is a standard way of starting any event stream. We know that status will be equal to new, so there is no point in adding such a field to OrderCreated event. Second event, StatusChanged, represents any status field mutation. Again, we just need one field to fully represent what's going on. Consider following order mutations:

order = Order(user_id=1) # 1
order.set_status('confirmed')  # 2
order.set_status('paid')  # 3
order.set_status('shipped')  # 4

A corresponding event stream looks like this:

events_stream = [
    OrderCreated(user_id=1),  # 1
    StatusChanged('confirmed'),  # 2
    StatusChanged('paid'),  # 3
    StatusChanged('shipped'),  # 4
]

So now we need a way to restore order's state using these events...

class Order:
    def __init__(self, events: list):  # 1
        for event in events:
            self.apply(event)  # 2

        self.changes = []  # 3

    def apply(self, event):
        if isinstance(event, OrderCreated):
            self.user_id = event.user_id
            self.status = 'new'
        elif isinstance(event, StatusChanged):
            self.status = event.new_status
        else:
            raise ValueError('Unknown event!')

    def set_status(self, new_status: str):  # 5
        if new_status not in ('new', 'paid', 'confirmed', 'shipped'):
            raise ValueError(f'{new_status} is not a correct status!')

        event = StatusChanged(new_status)  # 6
        self.apply(event)
        self.changes.append(event)  # 7
  1. Now the only way to instantiate an Order is to give it a list of events it should be initialized with
  2. Inside __init__, we apply every event, causing state mutation
  3. We need to keep an append-only list of state mutations done after Order initialization, because to save changes we just need to persist new events. Old ones are already saved and we will never delete them
  4. A heart of an event sourcing aggregate is an apply method. Inside we mutate state. I will show a bit more clever implementation later, without  if-elif-else block.
  5. Crucial change is inside set_status method. We still validate input, but instead of modifying object's fields directly...
  6. ...we prepare StatusChanged event, put it through apply...
  7. ...and finally append new event to our changes list

Wait, how do I create new Order then? We can not simply constuct OrderCreated and pass it to a newly-created Order, because this would not include OrderCreated in a changes list. I use a classmethod that encapsulates what is going on:

class Order:
    
    @classmethod
    def create(cls, user_id: int):
        initial_event = OrderCreated(user_id)
        instance = cls([initial_event])
        instance.changes = [initial_event]
        return instance

Is testing aggregates hard? You basically create Order with events, perform some actions and see if expected events were appended to changes. Some examples:

class OrderAggregateTest(unittest.TestCase):

    def test_should_create_order(self):
        order = Order.create(user_id=1)

        self.assertEqual(order.changes, [OrderCreated(user_id=1)])

    def test_should_emit_set_status_event(self):
        order = Order([OrderCreated(user_id=1)])

        order.set_status('confirmed')

        self.assertEqual(order.changes, [StatusChanged('confirmed')])

Of course for this to work, we need to support __eq__ in our events:

class StatusChanged:
    def __init__(self, new_status: str):
        self.new_status = new_status

    def __eq__(self, other):
        if isinstance(self, other.__class__):
            return self.__dict__ == other.__dict__
        return False

That's basically everything about event sourcing aggregates. Now let's see how we can implement it in a bit more clever way

Implementation improvements

Events

We had to manually write classes and support __eq__ operation. We could use namedtuples or attrs library instead:

import attr


@attr.s(frozen=True)
class OrderCreated:
    user_id: int = attr.ib()


@attr.s(frozen=True)
class StatusChanged:
    new_status: str = attr.ib()

We've got __init__, __eq__ and several other goodies for free. Please take a look at attrs docs for more information.

Data Classes that will be introduced in Python 3.7 are also worth mentioning here.

Aggregate's apply method

Probably some of you already heard about singledispatch that was introduced in Python3.4.

import functools


@functools.singledispatch
def some_fun(arg):
    raise NotImplementedError

@some_fun.register(int)
def _(arg: int):
    print('I got integer here!')


@some_fun.register(str)
def _(arg: str):
    print('Hey, you gave me str!')

 
some_fun(1)  # this will invoke version for integer
some_fun('2')   # this will call str version

Unfortunately, singledispatch does not support objects' methods (sic!). But there is a workaround:

def method_dispatch(func):
    dispatcher = functools.singledispatch(func)

    def wrapper(*args, **kw):
        return dispatcher.dispatch(args[1].__class__)(*args, **kw)

    wrapper.register = dispatcher.register
    functools.update_wrapper(wrapper, func)
    return wrapper


class Order:
    def __init__(self, events: list):  # 1
        for event in events:
            self.apply(event)  # 2

        self.changes = []  # 3

    @method_dispatch
    def apply(self, event):
        raise ValueError('Unknown event!')

    @apply.register(OrderCreated)
    def _(self, event: OrderCreated):
        self.user_id = event.user_id
        self.status = 'new'

    @apply.register(StatusChanged)
    def _(self, event: StatusChanged):
        self.status = event.new_status

As you can see, we splitted one apply method into three much smaller (and cleaner!) ones.

This is the end of first part devoted to implementing Event Sourcing in Python. New week = new post. Hold tight, guys!

This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.

 

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.