Event Storage in Postgres

Kasey Speakman - Mar 25 '18 - - Dev Community

One of the critical pieces of any infrastructure is storage. Compared to traditional relational models, storing events in a log is quite simple. However, when you experience the good fortune of a successful product, even log-style storage has to evolve to keep up.

Naive Implementation

When I started using Event Sourcing, I wanted to go as simple as possible. To be quite honest, I could not wrap my head around many of the trappings of common event sourcing database implementations. So I started with a simple table.

CREATE TABLE Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum)
);

CREATE INDEX idx_event_streamid ON Event (StreamId);
Enter fullscreen mode Exit fullscreen mode

I used a serialized writer. (Translation: all requests were queued and processed one at a time, no concurrency.) I used full consistency. (Translation: saving events and updating relational/document models were performed in the same transaction in the same database.)

This configuration worked quite well for us, has good performance (even on t2.micro instances), and is relatively simple to understand. If this were an internal system, I would likely stop there. But for a multi-tenant app, at some point this is going to hit a scaling brick wall.

💡 Aside - event sourcing rocks
When we change our relational models, our "data migration" consists of dropping the affected tables, recreating them with the updated schema, and replaying events back onto them. The process happens automatically during deployment. We never have to write migration scripts. <3

Growing the implementation

The next revision of event storage includes some optimizations to support further scaling capabilities. Most of this was described 7 years ago in Building an Event Storage. However, at the time I did not fully understand some of the nuances of the description. Rather than shoot myself in the foot with an unfamiliar weapon, I chose to implement the naive version first. Now I'm getting around to the optimizations.

The Event table

Let's start with the event table itself. Even though this updated table is almost identical, I'll go ahead and explain the purpose of each field and index.

CREATE TABLE IF NOT EXISTS Event
(
    SequenceNum bigserial NOT NULL,
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Type text NOT NULL,
    Meta jsonb NOT NULL,
    LogDate timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (SequenceNum),
    UNIQUE (StreamId, Version),
    FOREIGN KEY (StreamId)
        REFERENCES Stream (StreamId)
);
Enter fullscreen mode Exit fullscreen mode

Fields

The only two absolutely required fields are StreamId and Data. Everything else supports additional features.

SequenceNum
This is used by event listeners to keep track of their current position in the stream. The only thing that really matters here is to preserve the order in which events were saved, which an auto-increment big integer does nicely.
StreamId
An identifier for a stream. I chose uuid but there are many viable choices for identity.
Version
This field was not present in the previous incarnation, but was added as an optimization. If you think of the stream as an array, this is the index number of the event. Saving it with the event helps avoid manual counting.
Data
This is the serialized data for the event. I chose jsonb format, but there are other options. If I had more resources to develop my own introspection tools, I would probably use a binary serialization like Protocol Buffers for speed.
Type
This is the type of the event. Storing this supports filtering... to avoid fetch / deserialization if the listener does not use the event. I also use it for deserialization.
Meta
This supports auditing and tracing. I put things in here like user, execute permission, correlation id, etc.
LogDate
In addition to auditing and tracing, this could also be used for cross-shard sorting. Although it is tempting to use LogDate in business logic, it should only be used for infrastructural purposes. Events should contain their own timestamps if temporal context is needed. But I admit I have used LogDate for reports. :S

Indexes / Keys

SequenceNum Primary Key
Allows efficient queries based on position. i.e. WHERE SequenceNum > @LastSeenSeqNum
StreamId, Version Unique Key
Allows efficient loading of a specific stream in Version order. Postgres can use this index to process ORDER BY Version without an extra sort step.
StreamId Foreign Key
This is primarily "training wheels" to ensure data integrity in the face of bugs. Once the code is battle-proven, this could be removed to increase performance.

⚠️ Note from the future
It turns out that using bigserial here is problematic for concurrency. Due to the way PG sequences work, it sometimes happens that numbers are written out of order. For example, the event with SequenceNum 7 is written before SequenceNum 6. This can cause active listeners (through LISTEN/NOTIFY) to miss events. I ended up having to use a regular bigint, and using a separate single-row table to track what has been issued. See comments for a deeper dive. The StreamId foreign key was also removed since the table below was not used.

The Stream table

One of the features we wanted to add to the event storage is support for concurrent writers. Concurrent writers means: I can deploy multiple copies of my business services (those things which generate events, aka command handlers) without requiring locks or coordination between them. We could even run these on "serverless" architectures like Lambda to auto-scale compute resources on demand. This is accomplished through the use of Optimistic Concurrency. You can think of this like a merge conflict in git -- two different branches independently made changes to the same line of code. In this case, two independent users tried to make changes to the same entity at the same time. This is the table we use to detect that.

⚠️ Note from the future
My analysis of Lambda command handlers is that it is a bad idea. Only because initial load performance of the Lambda is abysmal, which affects user experience. Even if I keep the lambdas warm with a script or if AWS provide an always-on lambda, the startup problem is still encountered when the function scales up to handle load. Unlike regular services, a function only handles one request at a time. It queues up to a certain number and then scales. We did not find this suitable for our user-facing services.

CREATE TABLE IF NOT EXISTS Stream
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Type text NOT NULL,
    PRIMARY KEY (StreamId)
);

CREATE INDEX IF NOT EXISTS idx_stream_type ON Stream (Type);
Enter fullscreen mode Exit fullscreen mode

At its core, the table simply tracks the current version of each stream. Here is the specific process to follow using this table.

  • Before executing business logic, get the current Version of the stream.
  • Execute the business logic, generating new events.
  • Save the events (in a transaction) ONLY IF
    • the Version from the Stream table is the same as Version when you started
    • then also update the Stream table to the last saved Version
    • indicate whether the events were saved. e.g. return true/false

This ensures that writers know when they tried to save conflicting events, and they can either return an error, retry, or some other conflict resolution process as defined by the business rules. We will probably just error by default until there is reason to do otherwise.

⚠️ Note from the future
This table actually did not make it into our implementation. Concurrent writers are already handled by the unique key on StreamId + Version. Concurrent writers will calculate the same version and only one of them will succeed. More info in the comments.

Concurrent services vs concurrent users

At a minimum, we need to know that stream hasn't changed from the time we started processing business logic until the time we tried to save the events. Most of the time the stream is loaded before processing anyway -- to rebuild the current state of the domain. So we can use the Version from the last loaded event as the expected version. Then verify it is still the same when we save new events. This will detect conflicts between concurrent services writing to the same stream.

However, we can take this a step further. The client can send us the version of the data they were working with when they submitted the request. Using this as the expected Version when saving events not only detects write conflicts between services, but also detects when a user's changes will accidentally overwrite another user's.

What's this Type then?

The Type is the type of Stream. For example, the Stream Type may be "Ordering" while one of the stream's Events may have a Type "OrderPlaced". It is entirely optional, but it can be used to help generate snapshots (discussed below). It can also support filtering events down to streams of a certain type. The index on Type should have no write-performance impact, since the only updates to the table are on Version.

The Snapshot table

When a stream gets very large (maybe >1000 events?), loading and replaying the stream to get current state might become too slow. The common mitigation to that is using snapshots. Rather than rebuilding the domain model from scratch every time, once in a while we rebuild up to the latest version of the domain model's state and save that to the database. Afterward, to load the stream we first get the snapshot and then only the events since the snapshot version. Here is the table to support that.

CREATE TABLE IF NOT EXISTS Snapshot
(
    StreamId uuid NOT NULL,
    Version int NOT NULL,
    Data jsonb NOT NULL,
    Revision int NOT NULL,
    PRIMARY KEY (StreamId)
);
Enter fullscreen mode Exit fullscreen mode

Application code should create a snapshot when a stream grows "too large" (something that should be determined by metrics like request time). The snapshot should also be recreated from time to time (after 1000 more events) and when structural changes are made to the snapshot -- see Revision below.

What is Revision?

Domain models change over time. Sometimes the changes are such that the last snapshot won't deserialize back into the new domain model. A relatively simple way to avoid that situation is to keep a Revision constant somewhere on your domain code, and increment it when you make structural changes. When loading the snapshot, the query can make sure to ignore it if the Revision is incompatible.

⚡ I hate adding easily-forgettable steps to the development process -- such as bumping a Revision number on structural changes. If you have an idea for a better way to handle this, I would love to hear it!

How do snapshots get saved?

Probably the best way is to have a separate Snapshot service which runs on a schedule. It can use the Stream table to identify streams with a large number of events. It can also check the existing snapshot versions to find ones that need updating. The query to accomplish both of these checks at once is described in Building an Event Storage. Additionally, Snapshot revisions can be checked to recreate a snapshot for structural changes.

Now the streams in need of snapshots are identified. The next step is to call the appropriate domain model code to replay the stream up to the current state. Then that state gets saved to the Snapshot table. The Stream.Type can be used to determine which stream replay code to call. If you didn't opt to include the Type column in the Stream table, then you could also read the stream's first event. Usually those are indicative of the type of stream it is.

You might not need snapshots.

For many types of applications, well-designed streams do not accumulate large numbers of events over their lifetime. It certainly does not hurt to use snapshots, but they take dev time to implement. And they can always be added later.

What did we accomplish?

These changes allow services which use the event storage to scale in a share-nothing fashion. They can even be stateless and used on serverless platforms. They also provide consistent loading performance as stream sizes grow.

What about scaling the database itself?

We haven't really addressed horizontally scaling the event database itself. In a multi-tenant app, it would be pretty easy here to add a TenantId to these tables and use a sharding solution such as Citus Data.

My current approach to multi-tenancy is schema-based isolation. It would not be difficult to convert into a sharded approach, and I may end up doing just that. However I really like schema isolation, so I'm working through some ideas on using a simple directory (e.g. a file on S3) which maps tenants to database instances.

Update 2019-01-23

The comments below are worth the read, especially the discussions with @damiensawyer . I came to the conclusion that the only necessary table was the Event table. Then the Snapshot table could be added later if needed. The Stream table wasn't really necessary after all. And in fact it complicates the implementation quite a bit -- I had to create a not-so-small stored procedure to keep Stream updated as part of appending events. So I do not consider it worthwhile.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player