Implementing Event Sourcing in Python - part 2, robust event store atop PostgreSQL
This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.
Where all those events go?
Being able to restore aggregate's state from an event stream is a good starting point. Next step is to think how we are going to store and retrieve event streams. So far, we can imagine a abstract base class:
class EventStore(metaclass=abc.ABCMeta): @abc.abstractmethod def load_stream(self, aggregate_uuid: uuid.UUID) -> EventStream: pass @abc.abstractmethod def append_to_stream( self, aggregate_uuid: uuid.UUID, expected_version: typing.Optional[int], events: typing.List[Event] ) -> None: pass
load_stream method will return EventStream instance, simple data structure with list of events needed for aggregate's initialization and current aggregate's version:
class EventStream: events: typing.List[Event] version: int def __init__(self, events, version): ...
We will use version to protect against concurrent updates using optimistic locking. append_to_stream method accepts aggregate_uuid, expected_version (the one we obtained from load_stream) and a list of events our aggregate produced. Please note that we do not have to store old events, only new ones. This is possible because we are not allowed to delete any events ever, so this is append-only structure.
Aforementioned expected_version parameter serves for protection against concurrent updates. If such a situation occurs, this method should raise an exception:
class ConcurrentStreamWriteError(RuntimeError): pass
Implementation details depend on chosen database engine.
Another important question, what database should be used? Some people state that almost any is fine. I find it hardly a valid answer. Is transactional database like MySQL a good choice? What to pay attention to? To precisly answer this question, one has to consider nature of events stream and how they are used to reconstruct aggregates.
Retrieve strategy
To rebuild an aggregate we need all events that were ever emitted by it. Our aggregates will usually have unique ID being UUID. So in other words, we should be able to query our event store by aggregate's ID. This requirement can be easily met by many substantially varying database engines. Few examples:
Redis
This is a popular data-structure DB that can store data in few handy structures, such as:
- strings
- lists
- sets
- sorted sets
- hashes (also known as dictionaries or maps)
Assuming Redis is our choice, we would store events using lists and use aggregate's UUID as part of key name. To retrieve all events for given UUID, we would use a following query:
LRANGE event_stream_f42d9a33-81da-45ba-a066-32de5e747067 0 -1
WARNING: Redis' implementation of lists causes such queries to lower performance linearly with an increase of a number of events for the given aggregate. See time complexity section.
RDBMS (PostgreSQL, MySQL etc)
A natural way of modelling event stream using RDBMS is to create a table for all of them. Although we are going to store many types of events with different fields, it is not feasible to create a separate table for each one. Firstly, performance of querying will suffer. Secondly, it will make queries more complicated to implement and maintain. Thirdly, events may evolve over time, including additional data. Of course we can not change events from the past, but somehow we would have to store new ones with altered structure alongside old ones. So staying with one-table-for-all-events design is a right thing to do:
Events --------------------------------------- | uuid | aggregate_uuid | name | data | ---------------------------------------
Each event has also it's own uuid. aggregate_uuid is a column allowing us for easily querying by it. We should put an index on it. name is self explanatory (OrderCreated, CashDeposited etc). Finally, we have a flexible part - data. We will store JSON-encoded details inside. Depending on a chosen database engine, we would use dedicated data type (PostgreSQL supports JSON columns) or simply TEXT.
Querying itself is trivial:
SELECT * FROM events WHERE aggregate_uuid = 'f42d9a33-81da-45ba-a066-32de5e747067';
Document-oriented (MongoDB, RethinkDB etc)
Analogously to RDBMS, we should be able to use single collection (MongoDB) or table (RethinkDB) to retrieve all events. Beside data json part, we would also add aggregate_uuid and put an index on it for faster reads.
Querying is also very simple:
db.events.find( {aggregate_uuid: 'f42d9a33-81da-45ba-a066-32de5e747067'} ) # MongoDB r.table('events').get_all( 'f42d9a33-81da-45ba-a066-32de5e747067', index='aggregate_uuid' ).run() # RethinkDB
Store strategy
As I have already mentioned, we never delete events. Event Sourcing does not limit an amount of events by aggregte, so we should be prepared that our events table/collection will grow indefinitely. Therefore, we need a database that is able to scale and maintain approximate read/write times regardless of number of events (up to some extent, of course).
Events are our source of truth, so we can not afford any data loss. Thus, we need a database with strong consistency guarantees.
Event sourcing assumes that only one aggregate should be saved within one business operation. Saving single aggregate has to be atomic. We do not need to have full-fledged all-or-nothing guarantee as it is with relational SQL databases that spans entire HTTP request or whatever. We just need to make sure that once we attempt to save changes to aggregate and bump up its version, it is all or nothig operation.
Protection against concurrent updates is not something I can pass over in silence. Frankly, I find it bizzarre that most of articles about event sourcing implementation does not say a word about these issues and possible solutions. As though projects that articles' authors have been working on are not subjected to concurrency issues (seriously?). There is a common misconception that READ - MODIFY - WRITE approach within a RDBMS transaction is always a safe thing. Well, no. Read this excellent article to understand why (and see ways of coping with it): PostgreSQL anti-patterns: read-modify-write cycles. Getting back to event sourcing, we need protection. I dare to say this should be provided by database engine. Of course, we could work around this problem using pessimistic locking and Redis, yet it makes implementation harder and I find it hardly satisfying. That is also a reason why you will not find anything about Kafka in this article.
Events are not only stored. We usually propagate them over our system to let different components react accordingly. There are mature solutions doing just this thing (brokers). Among available options there are RabbitMQ and Kafka just to name a few. Even Redis can sometimes suffice. Nevertheless, it is nice to have such functionality within database itself to get rid of another headache - what if saving event succeeds (we can not undo this) and sending it via broker fails?
Requirements wrap-up:
- Efficient access to all events for given aggregate
- Scalability
- Strong consistency guarantees
- Possibility of implementing optimistic locking/different protection without external services
- (nice to have) built-in message passing functionality
Choice number 1 - PostgreSQL
This should not be a surprise. A mature, battle-proven open source solution can be a perfect base for event sourcing datastore. One-table-for-all-events is a good approach providing efficient read and writes. It can be scaled using table partitioning or sharding (more or less manual, but still - within a reach). PostgreSQL has strong consistency guarantees, so this requirement is also met. Protection against concurrent updates using optimistic locking is quite easy to implement. Finally, PostgreSQL has also message passing with NOTIFY/LISTEN. It seems like a perfect choice!
Table design
Nothing changed since the first proposition, we still want to keep all events in one table:
Events --------------------------------------- | uuid | aggregate_uuid | name | data | ---------------------------------------
However, to get protection against concurrent updates we will use another table:
Aggregates ------------------ | uuid | version | ------------------
A version will be bumped up by one everytime we have some events to save. Using additional condition in UPDATE query and a returned number of affected rows we can easily tell if we won the race or not:
# 1 - expected version UPDATE "aggregates" SET version = 2 WHERE "aggregate_uuid" = 'f42d9a33-81da-45ba-a066-32de5e747067' AND "version" = 1 # expected version check
If this query returns affected rows count equal 1 - we are good to go. Otherwise it means someone changed history in the meantime and we should raise ConcurrentStreamWriteError.
Code for creating both tables:
CREATE TABLE "aggregates" ( uuid VARCHAR(36) NOT NULL PRIMARY KEY, version int NOT NULL DEFAULT 1 ); CREATE TABLE "events" ( uuid VARCHAR(36) NOT NULL PRIMARY KEY, aggregate_uuid VARCHAR(36) NOT NULL REFERENCES "aggregates" ("uuid"), name VARCHAR(50) NOT NULL, data json ); # Do not forget about the index! CREATE INDEX aggregate_uuid_idx ON "events" ("aggregate_uuid");
There are numerous ways of getting data from these tables to Python. One of them is using an ORM. Mapping in almighty SQLAlchemy can look like this:
class AggregateModel(Base): __tablename__ = 'aggregates' uuid = Column(VARCHAR(36), primary_key=True) version = Column(Integer, default=1) class EventModel(Base): __tablename__ = 'events' uuid = Column(VARCHAR(36), primary_key=True) aggregate_uuid = Column(VARCHAR(36), ForeignKey('aggregates.uuid')) name = Column(VARCHAR(50)) data = Column(JSON) aggregate = relationship(AggregateModel, uselist=False, backref='events')
Relying on this code, we can quite easily implement load_stream method of EventStore:
class PostgreSQLEventStore(EventStore): def __init__(self, session: Session): self.session = session # we rely on SQLAlchemy, so we need Session to be passed for future usage def load_stream(self, aggregate_uuid: uuid.UUID): try: aggregate: AggregateModel = self.session.query( # we query for aggregate with its events AggregateModel ).options( joinedload('events') ).filter( AggregateModel.uuid == str(aggregate_uuid) ).one() except exc.NoResultFound: # we do not allow sqlalchemy-specific exception to reach our code level higher raise NotFound # translate all events models to proper event objects (see part 1) events_objects = [self._translate_to_object(model) for model in aggregate.events] version = aggregate.version return EventsStream(events_objects, version) def _translate_to_object(self, event_model: EventModel) -> Event: """This method is responsible for translating models to event classes instances""" class_name = event_model.name kwargs = event_model.data # assuming `events` is a module containing all events classes we can easily get # desired class by its name saved along with event data event_class: typing.Type[Event] = getattr(events, class_name) return event_class(**kwargs)
There are no tricky things inside beside translating models to events classes.
WARNING: one should consider abandoning ORM in favour of SQLAlchemy core or even raw queries to gain some performance boost. ORM does not add much value here and is definitely less efficient than other methods.
append_to_stream is a bit more complicated:
def append_to_stream( self, aggregate_uuid: uuid.UUID, expected_version: typing.Optional[int], events: typing.List[Event] ): # returns connection within session (same transaction state) connection = self.session.connection() if expected_version: # an update stmt = AggregateModel.__table__.update().values( version=expected_version + 1 ).where( (AggregateModel.version == expected_version) & (AggregateModel.uuid == str(aggregate_uuid)) ) result_proxy = connection.execute(stmt) if result_proxy.rowcount != 1: # 1 raise ConcurrentStreamWriteError else: # new aggregate stmt = AggregateModel.__table__.insert().values( uuid=str(aggregate_uuid), version=1 ) connection.execute(stmt) for event in events: connection.execute( EventModel.__table__.insert().values( uuid=str(uuid.uuid4()), aggregate_uuid=str(aggregate_uuid), name=event.__class__.__name__, data=event.as_dict() ) )
Implementation is quite straighforward and self-explanatory. We use SQLAlchemy Core here due to low level API, allowing to check how many rows were affected by an update (1). Protection against concurrent updates is achieved thanks to conditional update of version in aggregates table. Eventually, we insert all events one by one.
Please take a note that managing transaction is not a business of PostgreSQLEventStore.
In the end, we would like to inform all interested parties about new events. We can do this by executing NOTIFY for each inserted event:
for event in events: aggregate_uuid_str = str(aggregate_uuid) event_as_dict = event.as_dict() connection.execute( EventModel.__table__.insert().values( uuid=str(uuid.uuid4()), aggregate_uuid=aggregate_uuid_str, name=event.__class__.__name__, data=event_as_dict ) ) payload = json.dumps(event_as_dict) connection.execute(f'NOTIFY events, \'{aggregate_uuid_str}_{event.__class__.__name__}_{payload}\'')
Bear in mind that notifications will not be send until transaction is commited. Also listeners should operate in a non-transactional mode. Read more about caveats in NOTIFY and LISTEN docs.
An example listener code using aiopg and asyncio:
import asyncio import aiopg dsn = 'dbname=XXX user=XXX password=XXX host=127.0.0.1' # change me async def listen(conn): async with conn.cursor() as cur: await cur.execute('LISTEN events') while True: msg = await conn.notifies.get() print('Receive <-', msg.payload) async def main(): async with aiopg.create_pool(dsn) as pool: async with pool.acquire() as conn1: listener = listen(conn1) await listener loop = asyncio.get_event_loop() loop.run_until_complete(main())
Putting it all together
Load an aggregate
Whenever you want to load an aggregate from EventStore:
# GIVEN: # session - sqlalchemy Session instance # aggregate_uuid - UUID of loaded Order event_store = PostgreSQLEventStore(session) # 1 event_stream = event_store.load_stream(aggregate_uuid) # 2 order = Order(event_stream) # 3
- Initialize PostgreSQLEventStore with SQLAlchemy's Session instance. Event store probably should not create this on its own, since usually we would like to have control over session begin/commit a level or few higher
- Get EventStream instance using aggregate_uuid
- Create Order instance initializing it with freshly obtained event_stream
Implementation of Order class changed slightly since part 1 and now looks like this:
class Order: def __init__(self, event_stream: EventStream): self.version = event_stream.version for event in event_stream.events: self.apply(event) self.changes = [] # the rest remains unchanged
Now it accepts event_stream to save version as an instance property for future use (this is a bit more handy).
Save changes
# GIVEN: # session - sqlalchemy Session instance # order - Order instance event_store = PostgreSQLEventStore(session) new_events = aggregate.changes # 1 expected_version = aggregate.version # 2 try: event_store.append_to_stream(order.uuid, expected_version, new_events) # 3 except ConcurrentStreamWriteError: # what now?
- Get events emitted in aggregate since last fetch
- Get a version of the aggregate that it had when we fetched it
- Append new events to the stream
What to do when ConcurrentStreamWriteError is raised?
It depends on business requirements and scenario. In case of an Order instance we may imagine that someone accepts the order and some other guy concurrently tries to cancel it. We could try to fetch aggregate again and resolve conflict manually, but much simpler (and in most cases sufficient!) approach is to just retry entire operation that touches our aggregate. This is an example using retrying library:
@retry(retry_on_exception=lambda exc: isinstance(exc, ConcurrentStreamWriteError) def cancel_order(event_store: EventStore, order_uuid: UUID): event_stream = event_store.load_stream(order_uuid) order = Order(event_stream) order.cancel() # 5 event_store.append_to_stream(order.uuid, order.version, order.changes) # 6
Assume there is a race condition between setting two statuses, cancelled and confirmed. If latter wins, then code above will raise ConcurrentStreamWriteError in line 6. @retry decorator will take care of retrying whole thing again and loading entire aggregate once again, this time with most recent version. Provided there are no more concurrent updates we finally are able to cancel our newly confirmed order OR raise another exception in line 5 if our business rules do not allow for cancelling order that is confirmed. Quite easy and safe solution, adhering to Zen of Python's
Explicit is better than implicit.
Further optimizations
Repository pattern
Code responsible for loading and saving aggregates will probably look the same in many places. To avoid repetitions, I suggest to use Repository pattern and hide this logic behind get/save methods:
class AggregatesRepository: def __init__(self, event_store: EventStore); self.event_store = event_store def get(self, aggregate_uuid: UUID): event_stream = event_store.load_stream(order_uuid) return Order(event_stream) def save(self, order: Order): event_store.append_to_stream(order.uuid, order.version, order.changes)
Simplify append_to_stream signature
Since all parameters to append_to_stream method are taken from an aggregate, we could as well just make it accept an aggregate instance. No harm should be caused in result.
class Aggregate(metaclass=ABCMeta): # we introduce abstract base class def __init___(self, event_stream: EventStream): self._version = event_stream.version for event in event_stream.events: self.apply(event) self._changes = [] @abstractmethod def apply(self, event: Event): pass @property def version(self): return self._version @property def changes(self): return self._changes class Order(Aggregate): ... class EventStore(metaclass=abc.ABCMeta): @abstractmethod def load_stream(self, aggregate_uuid: uuid.UUID) -> EventsStream: pass @abstractmethod def append_to_stream(self, aggregate: Aggregate) -> None: pass
Majority of information in this post comes from PostgreSQL docs and Appendix to Implementing Domain Driven Design.
This is the end of second part devoted to implementing Event Sourcing in Python. New week = new post. Next time I will show event store implementation based on RethinkDB, modern NoSQL document-oriented database with message passing capabilities. Hold tight, guys!
This post is a part of Implementing Event Sourcing series. It consists of code snippets, thoughts and practical advice how to implement ES in your own project. The contents of this post will probably make most sense if you also read all other parts. Then you should be ready to use it in your own projects.
Comments powered by Disqus.