Event Sourcing: Eventual Consistency and Responding to Events

In my last post, about CQRS and how to keep your read model synchronized, I mentioned how you could ‘broadcast’ your events to other parts of the system in order to keep read-optimized representations of your application’s state in sync with the actual state. You use an ‘event bus’ that will distribute an event to all interested parties.

This event bus can be stupidly simple; it could be a list of every read model in the system and publishing could consist of broadcasting every event to every read model in the system. You could make it a little more intelligent by only distributing an event to a read model that is interested in it. What if updating your read-optimized representation means running some heavy queries against a database and then inserting or updating a ton of records? You don’t want to make your users wait for those updates, right? That’s why it’s often a good idea to make your event bus asynchronous. This doesn’t mean the method broadcasting your events returns a Task, it means ‘broadcasting’ the events after you save them is actually putting them on a queue to be broadcast. Your save immediately returns, and the events will get processed in the background.

Eventual Consistency

If your ‘event bus’ is asynchronous, you will have to come to grips with the realities of Eventual Consistency.

This is a statement I made in my previous post. What does Eventual Consistency mean? Consider the following: you are writing a blogging application (because at one point or another, we all write a blogging application). One of your requirements is that after the user has finished writing a blog post, he is redirected to the overview of blog posts. However, for whatever reason, it takes a couple of seconds to update the list of blog posts — or maybe just a single second, or several hundreds of milliseconds — which is probably way longer than the time it takes for the user to be redirected and for the application to request a list of blog items from the read model, which is still updating the database.

The result, in the end, is that the user’s blog post is not in the overview. What the hell? The user spends several moments in confusion. Did the blog post get created or not? They press F5. Lo and behold, there’s the blog post.

In the moments after creating the blog post, the system was ‘inconsistent’; that is, not all views of the system (or read models, if you will) were returning the same data. A read model that is eventually consistent will regularly ‘fall out of sync’ with the underlying data, as changes are being processed in the background. Given enough time, it will catch up with all the changes, and it will be consistent again. Eventually, it will be consistent.

Observing readers will notice I wrote ‘a read model that is eventually consistent’. If you use an asynchronous event bus, then yes, technically your entire application is eventually consistent. However, if updating a read model takes less time than it takes for the user to round-trip a request, then the read model will never appear to be out of sync. For all intents and purposes, it is not eventually consistent but immediately consistent. Hence the nuance: sometimes eventual consistency applies only to several read models, but the rest of the application is not affected.

What to do about it

There is a number of things you can do when a view or read model is Eventually Consistent.

Ignore it

This means making it the user’s problem. In general this will annoy the hell out of users. This might be a perfectly valid solution, depending on the application. If the user’s action was creating a blog post and it doesn’t show up in their list of blog posts, then I’m sure you agree that it’s not a very good solution.

Notify the user

Your blog post has been created and is being processed. It may take several minutes for it to become available.

This is the ‘fastest’ solution. It requires the least amount of modifications and your application remains responsive. It also requires the user to trust your application. If the read model encounters a problem when processing events, there is, by default, nothing to notify the user. They check after a few minutes, but there is still nothing there.

If this happens often enough, users will stop trusting the application. Which is obviously a bad thing.

If you do go this route, it would probably be a good idea to add a system which will notify users (and possibly administrators) if a read model encounters an error, possibly by sending them an e-mail.

Wait for the view or read model to become consistent again

After the blog post has been created, before redirecting the user to the overview, your application can wait, polling the read model every once in a while (probably every hundred milliseconds or so) until the newly created blog post is listed. Only then will you actually redirect the user to their overview. To the user, it appears as if creating the blog post takes longer than it actually does. Another possibility is to wait when you’re requesting the overview of blog posts.

If the event is still in the queue, waiting to be processed because there is a high workload, that might cause your application to wait for an unreasonably long time before the read model is updated.

Fake it

This is not always a possibility, but one way to make an application seem responsive is to ‘fake’ stuff. Of course I’m not talking about actually faking the creation of a blog post. After the blog post has successfully been created, we redirect the user to their blog post overview, supplying it with a parameter that indicates ‘the user just created a blog post with ID such-and-such.’

The overview page will then request a list of blog posts from the read model and check if that blog post is among the results. If it is; great, just do the regular thing and show the results. If it isn’t, get information about the blog post from the system and display it among the results as if it were a regular result. The user will be none the wiser.

Actually, he might be. If the user navigates away and reaches the overview page some other way, chances are he won’t see his newly minted post any more, because the overview page doesn’t receive the ‘newly created post ID’ parameter any more.

You might ask: how does the overview page get information about the new blog post? Doesn’t it need to wait for a read model to become consistent? Why, yes it does. A way to solve this is to create a separate queue for a special read model.

Every event broadcast is put on the ‘regular queue’ and this special queue. The read model could store its information in-memory and contain only the most basic information about pretty much all blog posts ever created. Because it’s in-memory, the queue would never be very long, so the time to become consistent would be virtually zero.

All the posts, though. Hmm, that might still be a lot of information. How about only the most recent x blog posts? Then we don’t even need the passing around of parameters and the view would remain consistent even if you navigated away! You could even tweak how many posts it keeps in memory based on how busy the application is and how long it takes for read models to become consistent! There is a slightly better way, though.

Discriminate against your read models

If you have the ability to create and broadcast to multiple queues, why not maintain the normal read models, but separate them into ‘high speed’ (or ‘high priority’) and ‘low speed’ (or, conversely, ‘low priority’)? You could make sure that all read models that need to update in near real-time feed off the high-speed queue, while all the other read models feed off the low-speed queue.

If you have a slow database, you might introduce a cache. The high-speed read models update into and read from the cache. The low-speed versions of those read models write into and read from the database. The cache-backed read models can use the database-backed read models as a fall-back, so that when your application has just started up, you don’t need to replay all the events in the universe to get a working application.

Fully parallel processing

Having a low-speed and a high-speed queue solves the issue of having slow read models interfering with near real-time read models. However, it’s still possible for badly behaving read models to interfere with one another.

A couple of great minds have written a manifesto, the Reactive Manifesto, which deals with how systems should be designed to make them “more robust, more resilient, more flexible and better positioned to meet modern demands.” One of the characteristics of a reactive system is as follows:

Failures are contained within each component, isolating components from each other and thereby ensuring that parts of the system can fail and recover without compromising the system as a whole.

What that means for our read models is that each read model should be as isolated and self-contained as they can be. A way to achieve this is to have each read model have their own queue of events. A failure or delay in one read model has very little chance of interfering with other read models. One read model might not be consistent because it is failing, but at least the others are likely to be.

It also decouples the read models on a temporal level. Each read model can process events as fast as possible, so the fastest, in-memory, read models are almost immediately up-to-date, while the slow read models, that update reporting databases, are allowed to lag behind and catch up when there is less load.

By isolating each read model, it is also possible to scale out read models independently. An in-memory read model that merely copies data from events to a state representation probably has little to gain from being scaled out. A read model that does a lot of heavy processing probably has a lot to gain.

Beware of the queue

A note about using queues, though. A friend and former colleague of mine would probably have my head on a pike if I didn’t mention the danger of using queues. Even though memory and disk space are ‘virtually limitless’, in practice they have very real limits. When messages are being put onto the queue faster than they are pulled off and processed, you will have a problem. Eventually, the queue will be full (because there is no more memory or disk space) and you can’t put new messages onto the queue. What should your application do in that case?

You could discard messages. This is known as load-shedding. For events, it means that, at the very least, some information is missing in some parts of your system. At the very worst, a critical piece of information for a critical business process is missing and your system shits itself and breaks down.

You could also block the write side until there is room on the queue. This is known as back-pressure. At least the system will remain consistent (eventually), but users end up waiting a for a while whenever they do something, possibly getting a time-out.

A much better write-up of this phenomenon is at ferd.ca.

Actor Model

Independent queues (or mailboxes, if you will), isolation, fault tolerance... Those familiar with the pattern (and that have made it this far) have probably been shouting it for a while now: this is the Actor Model!

I won’t go into too much detail about the Actor Model, because it is a huge and complex topic. The Actor Model is a pattern that lets you create highly concurrent and fault-tolerant systems. An Actor Model system is comprised of ‘actors’ who are driven by messages and can, in response to a message, only do a limited number of things:

  • Create more actors
  • Send messages to other actors
  • Modify their own state
  • Modify how they will respond to future events

An actor only knows how to reach other actors by their address; they don’t have a direct reference to them. The reason for this is location transparency; an actor doesn’t have to know whether another actor lives inside the same process, another process or maybe another machine altogether.

The Actor Model is an insanely powerful way to model your system and achieve resilience and concurrency. It is also calls for a very different way of thinking about and designing your system; even more so than Event Sourcing.

If you want to know more about the Actor Model, I suggest you take a look at Petabridge’s Akka.NET Bootcamp. Akka.NET is an Actor Model framework for .NET and it’s a port of the Akka framework for Java.

Responding to Events

Eventual Consistency towards the user is one thing, but you can also encounter Eventual Consistency internally. Let’s say you have an event handler that needs to respond to an event X occurring by storing and publishing a different kind of event. To create this new event, you need data from a read model. That read model depends on the data from event X to become fully consistent. The event handler receives the event, but the read model you need to use is not yet up-to-date. What to do?

When you have a fully parallel event bus, where each event handler — which an read model is in its core — has its own queue, the solution is pretty simple. When you receive a message and the read model you depend on doesn’t yet have all the data you need, put the message back on the back of the queue. This is called stashing in the Actor Model. Depending on how busy your system is and how quickly the read model you depend on becomes consistent, you might re-encounter the event several times before you are able to successfully process it. On the other hand, you might not need to make any changes to the read model you depend on and only very little changes to the event handler.

In the case of a (partially) sequential event bus, the problem is much more tedious to solve. You might be able to change the order in which event handlers are invoked, but this is very brittle. You could surround the code that controls the order with large exclamatory comments like /**** DON'T CHANGE THIS CODE OR IT WILL BREAK TEMPORAL COUPLING IN EVENT HANDLERS... YOU PROBABLY WOULDN'T UNDERSTAND ANYWAY ****/. This does not guarantee that someone won’t try and do it, though.

If you’ve implemented low-speed and high-speed queues, you might ensure that the event handler and the read model are on different queues and wait inside the event handler until the read model is consistent, but you will also block everything else on that queue.

You could introduce a new queue specifically for this event handler, but if it blocks until the read model becomes consistent, you might have a pretty low throughput. If the frequency of the event is pretty low, that might be acceptable. When you are able to ‘stash’ events, the throughput will improve. It would require a special case code path, which you have to maintain and test separately.

If your event bus is fully sequential, you might get away with sending an intermediate ‘respond to event X’ message. That way, the read model will definitely have processed event X and will be consistent. However, as we saw, there are a lot of drawbacks to using a fully sequential event bus.

Conclusion

Eventual Consistency is a difficult problem to deal with. Solving it usually requires thinking carefully about how your read models interact with one another and whether you need a read model to update in near real-time or if it’s fine for it to spend some time updating the underlying data store.

If parallel processing makes sense in your case, it’s probably a good idea for each read model to be fully independent by introducing a separate queue for each read model.