Post

Implementing Event Sourcing in Python – part 3, robust event store atop RethinkDB

This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.

WARNING: a lot of information regarding concerns for implementing event store is in the previous part. Please read it first if you have not. In this post I write only about RethinkDB - specific things.

Choice number 2– RethinkDB

This mature, open source document-oriented database is a second great candidate for implementing event store. It does not require users to define tables' structure as any document-oriented DB, yet has many advantages of RDBMS - strong consistency guarantees, support for joins (we don't need them here, but still nice feature), indexes etc. RethinkDB was designed with sharding and replication in mind, so we have those especially helpful features at hand once we need to scale it. RethinkDB has also very nice, informative admin dashboard. None of them is the coolest feature though - the greatest thing RethinkDB offers is streaming changes. In other words, once we save an event database engine sends it to every interested party. And that's a killer feature!

[caption id="attachment_266" align="aligncenter" width="676"] RethinkDB admin dashboard[/caption]

Client library

RethinkDB has an excellent official Python client. It supports both writing synchronous and asynchronous code. The latter is compatible with asyncio and few other async libraries. 10 minute guide with RethinkDB and Python

Table design

Unlinke MongoDB (most popular document-oriented database), a single set of entities of the same kind is called a table, not a collection. In document databases there are two possible approaches for saving aggregates. One of them is to save aggregate along with all events as a single document:

{
  "uuid": "75e60528-e222-4e9e-bbd8-a040ce3fb654",
  "version": 2,
  "events": [
    {
      "name": "OrderCreated",
      "datetime": "2018-02-03 21:06:00",
      "user_id": 1
    },
    {
      "name": "StatusChanged",
      "datetime": "2018-02-03 12:54:27",
      "new_status": "confirmed"
    }
  ]
}

Although this seems to be easy and quick solution, it will not be optimal with RethinkDB as every update to a document requires saving entire thing again. Typical operation for event stores is to append events. It means we will gradually lose performance with an increase of events within single aggregate as we have to save all events at once every time we add new one. Second approach is to create two separate tables - one for aggregates, second for events:

# aggregates
{
  "uuid": "75e60528-e222-4e9e-bbd8-a040ce3fb654",
  "version": 1,
}

# events
[
  {
    "aggregate_id": "75e60528-e222-4e9e-bbd8-a040ce3fb654",
    "name": "OrderCreated",
    "datetime": "2018-02-03 21:06:00",
    "user_id": 1
  },
  {
    "aggregate_id": "75e60528-e222-4e9e-bbd8-a040ce3fb654",
    "name": "StatusChanged",
    "datetime": "2018-02-03 12:54:27",
    "new_status": "confirmed"
  }
]


This looks very alike to the design proposed in part 2 - implementing event store atop PostgreSQL. Performance will not suffer with events increase as it would with the single-table design.

Code for creating both tables:

import rethinkdb
conn = rethinkdb.connect('localhost', 28015)  # connect to RethinkDB using defaults
rethinkdb.db_create('event_sourcing').run(conn)  # create database for event sourcing. Name is arbitrary
conn.use('event_sourcing')  # set default database for connection

rethinkdb.table_create('aggregates').run(conn)  # create table for aggregates

rethinkdb.table_create('events').run(conn)  # create table for events
rethinkdb.table('events').index_create('aggregate_uuid').run(conn)  # add index for aggregate_id on events table

Since we already have tables and abstract EventStore class to implement:

class EventStore(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def load_stream(self, aggregate_uuid: uuid.UUID) -> EventsStream:
        pass

    @abc.abstractmethod
    def append_to_stream(
            self,
            aggregate_uuid: uuid.UUID,
            expected_version: typing.Optional[int],
            events: typing.List[Event]
    ) -> None:
        pass

we can begin with the first method - load_stream:

class RethinkDbEventStore(EventStore):

    AGGREGATES_TABLE_NAME = 'aggregates'
    EVENTS_TABLE_NAME = 'events'

    def __init__(self, connection: rethinkdb.Connection):
        self._connection = connection

    def load_stream(self, aggregate_uuid: uuid.UUID) -> EventsStream:
        events_documents = rethinkdb.table(self.EVENTS_TABLE_NAME).filter(
            {'aggregate_id': str(aggregate_uuid)}
        ).eq_join(
            'aggregate_id', rethinkdb.table(self.AGGREGATES_TABLE_NAME)
        ).zip().run(self._connection)  # 1

        listed_events = list(events_documents)
        if not listed_events:  # 2
            raise NotFound
        else:
            version = listed_events[0]['version']  # 3

        events = [  # 4
            self._translate_to_object(raw_event)
            for raw_event in listed_events
        ]

        return EventsStream(events=events, version=version)

    def _translate_to_object(self, event_document: dict) -> Event:
        """This method is responsible for translating models to event classes instances"""
        event_document.pop('id')
        event_document.pop('aggregate_id')
        event_document.pop('version')

        class_name = event_document.pop('name')
        kwargs = event_document
        # assuming `events` is a module containing all events classes we can easily get
        # desired class by its name saved along with event data
        event_class: typing.Type[Event] = getattr(events, class_name)
        return event_class(**kwargs)
  1. Query RethinkDB for events joined with an aggregate (we need it to get aggregate's version) by given aggregate uuid
  2. If result set is empty then it means that aggregate does not exist
  3. Version is added to every row, we can obtain from the first one
  4. Deserialize events

Things are getting a bit more complicated in append_to_stream:

    def append_to_stream(
            self,
            aggregate_uuid: uuid.UUID,
            expected_version: typing.Optional[int],
            events: typing.List[Event]
    ) -> None:

        str_aggregate_id = str(aggregate_uuid)

        if expected_version is None:
            operation_result = rethinkdb.table(self.AGGREGATES_TABLE_NAME).insert({  # 1
                'id': str_aggregate_id,
                'version': 1 
            }).do(  
                lambda result: rethinkdb.branch(
                    result['inserted'] == 1,  # 2
                    rethinkdb.table(self.EVENTS_TABLE_NAME).insert([  # 3
                        dict(
                            aggregate_id=str_aggregate_id,
                            **event.as_dict()
                        ) for event in events
                    ]),
                    {'inserted': 0}
                )
            ).run(self._connection)
        else:
            operation_result = rethinkdb.table(self.AGGREGATES_TABLE_NAME).get(
                str_aggregate_id
            ).update({
                'version': rethinkdb.branch(
                    rethinkdb.row['version'].eq(expected_version),  # 4
                    expected_version + 1,  # 5
                    rethinkdb.row['version']  # 6
                )
            }).do(
                lambda result: rethinkdb.branch(
                    result['replaced'] == 1,  # 7
                    rethinkdb.table(self.EVENTS_TABLE_NAME).insert([  # 8
                        dict(
                            aggregate_id=str_aggregate_id,
                            **event.as_dict()
                        ) for event in events
                    ]),
                    {'inserted': 0}
                )
            ).run(self._connection)

            if operation_result['inserted'] != 1:  # 9
                raise ConcurrentStreamWriteError
  1. Insert aggregate if it does not exist (expected_version is None)
  2. Provided insert was successful...
  3. insert all initial events passed in
  4. If aggregate already exist, then we must check expected version to protect against concurrent updates. If no one has bumped it up since we read it...
  5. ...then we increase it by one
  6. otherwise we leave it unchanged.
  7. Provided aggregate's update was successful...
  8. ...insert all new events in one batch
  9. If nothing was inserted it means someone changed the aggregate in the meantime, so we raise an ConcurrentStreamWriteError

Realtime changes streaming

Finally, the best part - getting live updates on what just happened.

import asyncio
import rethinkdb


rethinkdb.set_loop_type('asyncio')


async def print_changefeed():
    conn = await rethinkdb.connect('localhost', 28015)
    feed = await rethinkdb.db('event_sourcing').table('events').changes()['new_val'].run(conn)  # 1
    while (await feed.fetch_next()):
        change = await feed.next()  # 2
        print(change)

loop = asyncio.get_event_loop()
loop.run_until_complete(print_changefeed())

This example works on Python 3.6 and it leverages asyncio.

  1. Watch for changes (inserts, updates etc) on table events
  2. Print every obtained event

That's it folks. Next part will be devoted to projections - highly optimized read models that are easily recreatable and disposable by design.

This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.