This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.

WARNING: a lot of information regarding concerns for implementing event store is in the previous part. Please read it first if you have not. In this post I write only about RethinkDB – specific things.

Choice number 2– RethinkDB

This mature, open source document-oriented database is a second great candidate for implementing event store. It does not require users to define tables’ structure as any document-oriented DB, yet has many advantages of RDBMS – strong consistency guarantees, support for joins (we don’t need them here, but still nice feature), indexes etc. RethinkDB was designed with sharding and replication in mind, so we have those especially helpful features at hand once we need to scale it. RethinkDB has also very nice, informative admin dashboard. None of them is the coolest feature though – the greatest thing RethinkDB offers is streaming changes. In other words, once we save an event database engine sends it to every interested party. And that’s a killer feature!

RethinkDB admin dashboard

Client library

RethinkDB has an excellent official Python client. It supports both writing synchronous and asynchronous code. The latter is compatible with asyncio and few other async libraries. 10 minute guide with RethinkDB and Python

Table design

Unlinke MongoDB (most popular document-oriented database), a single set of entities of the same kind is called a table, not a collection. In document databases there are two possible approaches for saving aggregates. One of them is to save aggregate along with all events as a single document:

Although this seems to be easy and quick solution, it will not be optimal with RethinkDB as every update to a document requires saving entire thing again. Typical operation for event stores is to append events. It means we will gradually lose performance with an increase of events within single aggregate as we have to save all events at once every time we add new one. Second approach is to create two separate tables – one for aggregates, second for events:

This looks very alike to the design proposed in part 2 – implementing event store atop PostgreSQL. Performance will not suffer with events increase as it would with the single-table design.

Code for creating both tables:

Since we already have tables and abstract EventStore class to implement:

we can begin with the first method – load_stream:

  1. Query RethinkDB for events joined with an aggregate (we need it to get aggregate’s version) by given aggregate uuid
  2. If result set is empty then it means that aggregate does not exist
  3. Version is added to every row, we can obtain from the first one
  4. Deserialize events

Things are getting a bit more complicated in append_to_stream:

  1. Insert aggregate if it does not exist (expected_version is None)
  2. Provided insert was successful…
  3. insert all initial events passed in
  4. If aggregate already exist, then we must check expected version to protect against concurrent updates. If no one has bumped it up since we read it…
  5. …then we increase it by one
  6. otherwise we leave it unchanged.
  7. Provided aggregate’s update was successful…
  8. …insert all new events in one batch
  9. If nothing was inserted it means someone changed the aggregate in the meantime, so we raise an ConcurrentStreamWriteError

Realtime changes streaming

Finally, the best part – getting live updates on what just happened.

This example works on Python 3.6 and it leverages asyncio.

  1. Watch for changes (inserts, updates etc) on table events
  2. Print every obtained event

That’s it folks. Next part will be devoted to projections – highly optimized read models that are easily recreatable and disposable by design.

This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.