Event Feeds: Simple and reliable messaging infrastructure

Event feeds are a simple mechanism to enable applications to react to things happening in other applications, reliably, consistently, and quickly.

Event Feeds: Simple and reliable messaging infrastructure

How do you make an application take action when something happens in another application? How do you make sure this happens reliably and in a timely fashion? In a world of service-oriented architectures and microservices, this is a hugely underestimated and overlooked problem, in my experience.

In this article, I’ll try and explain a mechanism that I’ve been working with for a while now, which I think is the simplest mechanism to reliably make applications react to one another, and which is also easily maintainable.

First I’ll describe the mechanisms typically used to make applications react to one another, and illustrate where they exhibit problems. To make my examples a bit more relatable, including the ramifications of when the system fails, let’s describe a fictional application: part of an e-commerce back-office. This will be an incredible over-simplification of how things work in the real world, but it does the job.

There are three applications:

  • Ordering; this takes orders, validates them, and when they have been paid for, releases them to be fulfilled.
  • Payments; this accepts and records payments from customers.
  • Logistics; this makes sure products get picked, packed, and put in a box with a shipping label on it.

Obviously there is some information exchange happening between these applications. Ordering will need to know when a payment has been made, so it can mark orders as ‘paid for’. Logistics will need to know when an order has been paid for, so it can direct workers to pick and pack the products.

The payment transaction

We’ll focus on the ‘payment’ transaction. After a payment has been recorded in Payments, Ordering needs to be notified so it can evaluate whether the amount due for the order has been fully paid, after which it can ‘release’ the order to Logistics.

So how do we get this notification across?

RPC

The easy approach is to use an RPC, or Remote Procedure Call. There are many ways of doing this; you could use a TCP socket directly, or something a little more abstract, like SOAP or gRPC, or even something as simple as an HTTP request.

This is very easy to do; after a payment has been made and has been registered, Payments simply makes a request directly to Ordering, and that is that. It is done synchronously, before returning to the caller, which is likely a customer from a browser, or indirectly so, with the front-end making a call to Payments.

So what’s wrong with this? There are several problems. One of them is a high degree of coupling. Payments needs to know that Ordering is interested in this notification, and it also needs to know how it likes to receive this notification—the protocol—and how it can reach the other application. This is the kind of coupling that can slow down development; when the team maintaining Ordering wants to change some of the details, they will need to align with the team maintaining Payments, and they might need a multiple-step deployment to ensure nothing breaks. This is at odds with what is typically seen as the value of having microservices: being able to independently deploy changes.

Another problem is the need for availability. When Payments cannot reach Ordering, or an error occurs while handling the notification, what do we do then? Do we drop the notification? Do we wait and try again? We can retry, but only for some time; remember, the customer is sitting there, waiting. After a while, you have to give up. How do you handle that scenario?

If you want to notify multiple applications using RPC, pretty much the only option is to make multiple calls, which increases the coupling even further. Aside from that, how do you deal with failure? If you need to notify three separate applications, and notifying one of them fails, what do you do?

Message queues

One solution that alleviates these problems to some extent is to use a message queue. Message queues are difficult to get right, so they are usually provided in the form of an external service. This could be in the form of dedicated software, like RabbitMQ, or a service of your cloud provider, like AWS SQS and Azure Queue Storage, or even part of your database system, like SQL Server’s Service Broker or Oracle Advanced Queues.

Message queues have a well-known, fixed, API, so the only things teams will need to align on are the format of the message and the location of the queue. Queues are usually implemented in a way that makes them highly available, and because they are usually considered ‘infrastructure’, developers usually don’t have to worry a lot about keeping these services alive.

As a bonus, most queue brokers support the ‘publish/subscribe’ or ‘fan-out’ pattern, whereby a single notification is sent to multiple consumers. The publishing application only knows that it is sending a notification to a queue or a ‘topic’, nothing more. The consumers are responsible for subscribing to topics based on what they are interested in.

Note that the problems aren’t solved completely. They are simply less severe. A new problem is that both applications are now forced to use this service. This kind of coupling can make it difficult to make changes in your hosting infrastructure.

In essence, using this kind of specialized infrastructure between applications is like having multiple applications sharing the same database. There are all sorts of problems with that, like making it much more difficult to make changes, creating a single point of failure, limiting scalability, and making managing the infrastructure more complicated.

Depending on the specific queue broker you use, you might need to consider what happens if the queue broker itself experiences an outage while messages are ‘in flight’.

Atomicity and timing

Both RPC and message queues1 suffer from a different kind of problem; that of atomicity and timing. Consider this, pretty typical, zoomed out view of what a ‘transaction’ looks like.

var transaction = BeginTransaction();
UpdateDatabase();
transaction.Commit();

// here be dragons

SendNotifications();

Note the line that says // here be dragons. The work in the database has been performed in a nice transaction, so either all of it has been performed—or, if something goes belly-up, nothing. Sending the notification, however, is not protected by anything. If an error occurs during or before sending the notification, you’re screwed; your database transaction has already been committed. Database operations can usually be reverted, but that is not always easy or desirable.

What if your application dies at that point in time? A fatal error happens, one of the cleaners unplugs the server, or the virtual machine dies. Your transaction has already been committed, but nobody knows about it. Applied to our example, this means the payment has been registered, but Ordering never hears about it, and therefore never releases the order to be fulfilled. That customer is not going to be a happy camper.

Well, we just do the notification inside the transaction, right? Unfortunately, that doesn’t magically make the notification transactional.

var transaction = BeginTransaction();
UpdateDatabase();
SendNotifications();

// here be dragons

transaction.Commit();

The point of dragons is still in the same place, but it’s now after the notification instead of before it. Why are there still dragons? Consider what happens if your application dies at that point. You’ve already sent a notification to other applications that something has happened, but the database transaction will be rolled back. In reality, nothing has happened, so you’re effectively lying to other applications.

I also mentioned that timing was a problem, and it surfaces in this order of operations. You’re telling other applications ‘this has happened’, but you haven’t committed that information to the database yet. What if the application you’ve just notified very quickly turns around and asks you for more information? Especially if the notification is synchronous, that might even be before the notification completes.

Payments might be telling Ordering ‘a payment has been made’, upon which Ordering asks ‘oh really, so how much has been paid in total?’. Because the original transaction has not yet been committed, the answer might be ‘nothing’. And yet again, we have effectively lost information and gained an unhappy customer.

Storing notifications

Because of these atomicity and timing problems, I consider sending notifications on the spot, without any back-ups, to be ‘unreliable’; there is a very real possibility a notification is lost because of a temporary disruption, like a network glitch or a VM dying.

We can make sending notifications ‘reliable’ again by making it asynchronous. Instead of immediately sending notifications, we store them somewhere, to be picked up by a background process. This is called the Outbox Pattern. If we store the notifications in a database, we can make it part of the database transaction, so either the intended changes happen and the notification is stored to be sent, or neither of them happen.

This approach has the added benefit of being resilient to temporary outages. The background process picks up the next pending notification, sends it, and then marks it as ‘done’, which can either be deleting it, or setting a flag. Rinse and repeat. If, for some reason, the notification cannot be sent, because the receiving party is not reachable or not responding, the background process backs off and tries again.

Again, this does not completely solve the atomicity problem, it changes it. Instead of not sending notifications, you might be sending them more than once. You’re still dealing with operations in two different systems—your database and another application, whether that’s an actual application or a piece of infrastructure like a queue. This can, and should, be solved by using notifications that are idempotent, which I’ll describe in a bit.

On idempotent messages

A brief interlude about what idempotent means. It derives from the Latin words idem (meaning ‘same’) and potent (meaning ‘having power’), so ‘having the same power’. It means that an operation, when performed more than once, will have the same outcome as the first time. Fine examples of this are arithmetic operations; 2 + 2 will always equal 4, no matter how often you compute it. The polar opposite would be getting a random number; that is intended to produce a different value each time.

How does this apply to messages? The go-to example to illustrate idempotent and non-idempotent messages is when a quantity of something has changed, like the quantity you have on stock of a particular product.

Consider the following two messages, describing that the stock quantity of a product has increased.

class ProductStockQuantityIncreased
{
  public string Sku { get; }
  public int QuantityAdded { get; }
}

… versus…

class ProductStockQuantityIncreased
{
  public string Sku { get; }
  public int QuantityAdded { get; }
  public int StockQuantity { get; }
}

In the first example, if you start with a quantity of zero, and you have a single message saying 5 have been added, which you receive twice, it looks like you have a total quantity of 10.

On the other hand, in the second example, you have a message saying 5 have been added and the new quantity is 5. If you receive that twice, you’ll still know the correct total quantity.

If you really want to avoid receiving a message twice, you could attach a unique ID to each message, and keeping track of which message you have already received. This comes with its own special set of problems, though, especially since RPC and most of the queueing mechanisms I talked about allow messages to be received out of order; e.g. message A, then message B, then a retry of message A. The mechanism through which you receive the message also doesn’t use the same transaction as where you keep track of which messages you’ve received1, so you have another atomicity problem, but this time on the receiving end.

Making messages idempotent doesn’t solve any of the coupling problems attached to the RPC or message queue solution, though.

Pulling notifications (or events)

Let’s make a small change. We still store notifications, instead of sending them on the spot, but instead of pushing them to other applications, we have those applications pull the notifications from us. The notifications describe what happened, so they’re less like notifications, and more like events, which is the terminology I’ll use from now on. We create an endpoint that provides the caller a list of events it needs to process, and it is then up to that application to keep track of which ones have already been processed.

To be clear, this means Payments exposes an endpoint with a list of events to other applications. Ordering regularly polls this endpoint to retrieve any new events. In other words, Payments produces a list of events, which Ordering consumes.

How does this solve the problems we’ve seen?

Our first problem, coupling, is still there, but it’s been inverted; instead of Payments being dependent on Ordering, it’s actually Ordering which is dependent on Payments. Depending on how you implement the list of events, the coupling can also be very minimal; down to a URL and knowledge of the structure and format of events. What’s more, if you have more than one application that needs to be notified, this doesn’t increase the coupling in any single application2.

The need for availability has also pretty much vanished. When Ordering can’t reach Payments to retrieve the list of events, that isn’t really a problem. Sure, orders will not be released to be fulfilled, but no notifications are lost. As soon as connectivity is restored, Ordering will pick up where it left off, and business will continue. If Ordering cannot process an event for whatever reason—maybe it cannot reach its own database—it just gives up and retries after waiting a while. The event is still there, no matter how many times you retry. This also means the process of handling events is self-recovering.

We don’t need any specific infrastructure for this solution; it involves applications communicating with other applications. The only ‘infrastructure’ we need is HTTP, and that is mature, ubiquitous, and flexible enough that it shouldn’t cause you any headaches when you make changes to your hosting infrastructure (like moving to a different cloud provider).

When Payments stores events in the same database transaction it uses to do the actual work, you’ve solved the atomicity problem. Because a notification only appears in the list of events provided by the endpoint after the database transaction has been committed, the timing problem has also been dealt with.

An additional advantage of the fact that events remain available is that you can re-process events. Let’s say Ordering maintains a private cache of how much has been paid in total for each order, by accumulating data from the events it pulls from Payments. Unfortunately, instead of adding amounts, it subtracts them. Oops. No problem; just fix the issue in code and have the application re-process all events. It can also be a tremendous advantage when implementing new functionality that uses data from the events you didn’t need previously.

Kafka?

Some of you might be thinking ‘this is very similar to a Kafka broker,’ and you’d be right, up to a certain point. Apache Kafka is a ‘distributed event streaming platform’ that uses a lot of the same ideas (an append-only list of events, clients pulling data and keeping track of where they are), but then in a distributed fashion. A lot of the smarts in Kafka is in the way they manage clusters of servers and making sure the members of a cluster agree with one another.

Kafka is, like a message queue, a stand-alone piece of infrastructure, which means it has the same ‘problems’; no atomicity3, and it forces both producer and consumer to use the same infrastructure. It is very well suited if you have to distribute a lot of data, if you have a lot of consumers, or if there are a lot of consumers. For smaller scale problems, it feels like shooting at a mosquito with a cannon.

Protocol

Now that we’ve established on a high level what the mechanism will be, let’s talk about the actual implementation of this list of events. We could, of course, just roll our own protocol. How hard could it be? It’s just a list of events. Of course, this list of events will grow larger and larger, so we’ll probably need some pagination. How does a consumer know what the URL for a given page is? What about the format of the list itself? If we don’t establish one, each team will invent their own little format, which means a consuming application will need a new implementation for each event feed they consume.

Instead, let’s pick a standardized protocol and format. We’ll have less thinking to do, and it’s very likely a client library already exists for your platform of choice. We need something that is well-suited for large collections of items and for figuring out ‘what has changed’. It should also support some mechanism indicating the different kinds of events that are stored.

Enter Atom

It turns out there’s a format that suits these requirements very nicely. It’s called Atom, and it’s most frequently used as a syndication mechanism for websites that have frequently changing content, like news sites. It supports pagination and different kinds of content (through media types). It’s XML-based and looks a little something like this:

<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
  <id>urn:publicid:PaymentHandlingEvents</id>
  <updated>2020-01-09T18:02:13.242Z</updated>
  <link rel="self" href="https://payments.myshop.eu/events/mostRecent" />
  <link rel="prev-archive" href="https://payments.myshop.eu/events/2020-01-08T0000" />
  <entry>
    <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a</id>
    <updated>2020-01-09T18:02:13.242Z</updated>
    <content type="application/vnd.myshop.payments.paid+json">
eyJQYXltZW50VHJhbnNhY3Rpb25JZCI6Mzk4MDg3MjM0Nzk4OTIsIkFtb3VudCI6MjI0LjUsIkN1cnJlbmN5IjoiRVVSIiwiUmVmZXJlbmNlIjoiMjM5ODcyOSJ9
<!-- {"PaymentTransactionId":39808723479892,"Amount":224.5,"Currency":"EUR","Reference":"2398729"} -->
    </content>
  </entry>
  <!-- more entries go here -->
</feed>

There’s a bit of ceremony in the first few lines of the XML, which are not all that interesting—yet—but let’s look at the entry tag. When using Atom as an event feed, each event will be represented by an entry element.

The entry element has an id element, containing an IRI, which in this case is a URN containing a UUID. It doesn’t have to be a UUID, per se, but it should be something that uniquely identifies the event, because this is what a consumer will use to keep track of what they’ve already seen.

The content element is where the actual payload of the event is. In order for consumers to be able to tell which kind of event this is—as there could be multiple kinds of events in a single feed—we use the type attribute and give the event a proper media type. Because the standard says any content whose media type does not start with text/ should be Base64-encoded, that’s what you see here. The giant wall of text starting with eyJQY is the Base64-encoded version of the JSON in the comment below it.

This example only contains a single event, but if there were more, they would be listed in reverse chronological order. That is, it starts with the most recent event, and ends with the oldest event.

Pagination and navigation

As I mentioned earlier, we need support for pagination, because this list of events will likely become huge. So how does pagination work in a feed like this? Given that this is the entry page of the feed, how does a consumer know how to get to the next page?

‘Next page’ is a very confusing name, by the way, because that depends on which way you’re navigating. Given our context, you always start at the entry page, with the most recent events in reverse chronological order. If you want something that’s older than the last event on the entry page, you logically want the item on the ‘next’ page.

Back to our original question: traditionally, the answer would be that you establish a ‘contract’ of sorts; if the URL of the entry page is something like /events/mostRecent, the next page could be at /events/1, followed by /events/2, and so on.

This works well, but because the consumer now needs to know about this contract, the familiar spectre of coupling rears its ugly head. Besides that, what if ‘page 1’ isn’t the next page from the entry page, but instead it’s the page with the events from the beginning of time? What if pagination isn’t based on the number of events stored, but on date and time? The document doesn’t contain any data that can be used to generate these kinds of URLs from.

Instead, the feed uses hyperdata to link to other pages, much like a normal web page would. When you google something, you don’t manually change the URL in the address bar to go to the next page; you click the ‘Next’ link at the bottom.

The document contains the following element:

<link rel="prev-archive" href="https://payments.myshop.eu/events/2020-01-08T0000" />

The rel attribute indicates what kind of relationship this link has to this document (in this case, the ‘previous page of archived data’), and the href element is simply an opaque URL that the consumer should follow to get to the next page. This way, the consumer only needs to know that the next page is indicated by a link element with a rel attribute equal to prev-archive, and the producer of the feed is free to use whatever pagination strategy they like.

But why is prev-archive an appropriate relationship? Why not prev? Or next, for that matter? From the point of view of the standard for Feed Paging and Navigation, RFC 5005, ‘previous’ is actually the term used to describe ‘older’ pages. The difference between prev and prev-archive is that prev-archive indicates that all the pages of the feed together form a complete set of entries, and that:

Archive documents are feed documents that contain less recent entries in the feed. The set of entries contained in an archive document published at a particular URI SHOULD NOT change over time. Likewise, the URI for a particular archive document SHOULD NOT change over time.

In other words, prev-archive describes the stability of the pages that are being linked to. Because ‘archive’ pages never change, this means you can have them be cached essentially forever. This becomes interesting when feeds are being used at scale.

Client processing

So how do you process this feed of events as a client application?

When you’re working with a queue, you dequeue an item from the queue, you process it, and after you acknowledge it’s been processed successfully, it’s gone. In other words, processing items changes the queue; the queue is stateful. This is not the case with the event feed; the events stay there after you process them. How do you know which event to process next? It’s like reading a book; you start with the first page, and after a while you put it down. You remember where you left off by using a bookmark.

Of course, that is where the analogy ends, because a bookmark is usually something physical that you insert in between pages of a book, but that is not something you can do with this event feed. Instead, think of it as writing down on a piece of paper which page you read last. A more accurate analogy would be that each paragraph in the book has a unique number, and you write down on a piece of paper at which paragraph you left off.

You always start with the entry page—because that’s the only page whose URL you know. Then follow these steps:

  1. Try to find the entry whose <id> element matches your bookmark.
  2. If you find it, then everything (including entire pages) that comes before it is new (remember, events appear in the feed in reverse chronological order).
  3. If you don’t find it, navigate to the ‘next’ page, by finding a <link> element with the relationship prev-archive, read the page indicated by its href, and start from step 1.

Once you have found out what the new events are, you process them as follows:

try
{
  string? bookmark = GetBookmark();
  var events = GetNewEventsSince(bookmark);

  foreach (var @event in events)
  {
    ProcessEvent(@event);
    UpdateBookmark(@event.Id);
  }
}
catch (Exception ex)
{
  // log exception, don't rethrow
}

In this high-level code, @event.Id would contain the value of the <id> element. Obviously GetNewEventsSince contains code to navigate and parse an Atom feed and some logic to correctly deserialize events from their media types, and ProcessEvent probably uses a DI container to invoke an event handler, but otherwise, just invoke this code on an interval—and you’re done.

Why the big try/catch around the entire block of code? As I mentioned, this code will be invoked on an interval, likely a timer, and we don’t want an unhandled exception to stop that timer.

If you make sure that the code invoked by ProcessEvent uses the same transaction as that of StoreBookmark, you can achieve something pretty nice: in-order exactly-once delivery. This means that the order of events is always exactly the same as how they were stored, and you will never process an event more than once.

Of course, that is kind of a white lie, because if processing an event throws an exception, it will not update the bookmark, but at the next interval, it will definitely process the same event again. The event is, in effect, only delivered successfully once, assuming it is eventually processed successfully.

There are, as always, a lot of gotchas and subtleties around how to make this work efficiently and correctly, but I will touch on those in a separate post (probably several, actually). I will touch on one which is probably brewing in the back of your mind: latency.

Latency

In the very first paragraph I mentioned that one of the benefits was responding ‘in a timely manner.’ In its current form, this is limited to the interval at which you check for new events. What if your requirements for latency are very strict, like having responding within half a second? Obviously you can set the interval between checks to something very low, say 250 milliseconds, but this means the application hosting the event feed gets 4 requests per second, from just one application. That doesn’t scale very well.

There is a nicer approach, which scales a lot better, and which can get you latency in the order of tens of milliseconds. It involves using a backchannel to signal to clients that a new event is available. Clients then act upon that signal by checking for new events and processing them as usual. Think of it as a secondary trigger to check for new events, in addition to the timer firing on an interval.

It’s important that this backchannel only contains a notification that a new event is available, and preferrably its unique ID, but not the event itself. This is because the point of this backchannel is to be an optimization, and it can therefore be relatively unreliable. If you miss a single message, you will either pick it up, in the right order, when the next notification arrives, or when the timer fires.

This backchannel should be a push mechanism, so that clients can react as quickly as possible. It should also be client-initiated, by which I mean the client is the one initiating the request for real-time updates.

One of the options that meets this requirement is a fan-out (or ‘publish/subscribe’) queue. As I’ve said before, I don’t like to use this kind of specific (and stateful) infrastructure to communicate across application boundaries. Another option is to use a WebSocket. This lets a client connect to a well-defined URL to establish a bidirectional communication channel. SignalR is a solid option in the .NET ecosystem, for both publishing and receiving the notifications.

Example

Let’s look at an example of the payment transaction as a high-level exchange between the systems involved, in this case Ordering and Payments. I’m using numbers as unique identifiers for the events, for clarity, while you would probably use UUIDs in a real application.

Situation
Payments has a single event in its feed; a payment for order number 1. The unique ID of this event is 1. To make this sample more illustrative, it paginates based on the number of events, and its page size is 1, meaning each event gets its own page.

Ordering’s bookmark for the Payments event feed is event 1.

The transaction

  1. Ordering fetches the entry page of the Payments event feed. The feed still only contains the one event, so there are no new events.
  2. The customer makes a payment for order number 2, through Payments. Payments registers the payment and appends an event to its feed, with unique ID 2.
  3. Ordering begins its next check of the Payments event feed. Its entry page only contains event 2. Since it doesn’t contain the bookmark event (1), Ordering requests the next page of events.
  4. Ordering finds its bookmark event on the next page, meaning everything that came after it (chronologically) is new. In this case that is only event 2.
  5. Ordering processes the event and marks order number 2 as ‘paid for’. It appends an event to its own event feed, to be picked up by other interested parties, like Logistics.
  6. Ordering updates its bookmark for the Payments event feed to event 2.

Sample application

All of this so far has been a bit theoretical and high-level, so I will leave you with a small sample application that not only demonstrates a very simplistic event feed, but also shows off the notification backchannel. It should be a good basis for you to start playing around with event feeds yourself.

heemskerkerik/EventFeedDemo

In future posts, I will discuss error handling, reliably storing events, and scaling out, amongst others.


  1. Except for when the queue is a database queue in the same database your application does its regular work in.

  2. This is also the case for ‘publish/subscribe’ queues, though.

  3. Kafka does have ‘idempotent delivery’, but that doesn’t help you if your application dies before delivering the message.