Exactly Once Stream Processing Semantics ? Not Exactly


Stream processing systems  are characterized by at least once, at most once and exactly once processing semantics. These are important characteristics that should be carefully considered from the point of view of  consistency and durability of a stream processing application. However if a stream processing product claims to guarantee exactly once processing semantics, you should carefully read the fine prints.

The inconvenient truth is that a stream processing product can not unilaterally guarantee exactly once processing semantics.  It’s true under certain assumptions or when the application and the stream processing frame work collaborate in certain ways.

From a system architecture point of view a stream processing framework can only implement at least once or at most once processing semantics.  Exactly once processing happens to be a side effect of at least once processing semantics. 

Stream Processing

For the sake of discussion we could think of Storm or Spark Streaming as the streaming platform. Essentially there are two critical events in stream processing. The stream processing framework has to keep track of the last message that got processed by storing it in some durable storage.  The application typically does some processing with a message and takes some action. Here are some example application actions

  • Write to database
  • Make a web service call
  • Send an email alert

The reason exactly once processing can not be guaranteed is because there is no orchestration between saving current message pointer by the framework and the action taken by the application and the two operations are not performed atomically.  We are assuming there is no distributed transaction with the two operations.

Irrespective of the stream processing semantics, in an ideal condition with no failure, the system will always behave like exactly once processing.

Next, we will consider different failure scenario for at most once and at least once processing semantics and find the conditions under which we can get effectively exactly once processing.

I will use the term message pointer to the refer to the last message processed by the system. It’s interpretation will depend on the platform being used. For example, if the messages are being read from Kafka, it will be the message offset.

Exactly Once Processing

As mentioned earlier, this kind of semantics can not be unilaterally guaranteed by the stream processing frame work.

However, it’s not as bleak as it sounds. We can get exactly once processing as a side effect of at least once processing when one of the following conditions are met.

  • When the application action has a specific characteristic
  • When the stream processing framework and the application collaborate in a a particular way

Unfortunately, if none of the conditions are met then exactly once processing is not achievable.

At Most Once Processing

As the name suggests, a message will be handed over to the application at most once i.e. no message will be replayed, which implies that some messages may never be processed by the application depending upon when failure happens.

With this message processing semantics, the stream processing framework saves the message pointer before handing over the message to the application. Let’s consider different failure scenario and the impact on the system behavior.

If the failure happens after the message pointer is check pointed, but before the application action has been performed as below , we have a lost message.

start message pointer check pointed failure application action end

 

In the next failure scenario, we have failure after the application action as below. Since the failure happens after all the critical steps, the behavior is effectively similar to a successful scenario.

start message pointer check pointed application action failure end

In this scenario, we have exactly once processing as side effect. But we never design systems for failure.

Based on our analysis, we can conclude that with at most once processing we can not achieve exactly once processing semantics, when failure happens.

At Least Once Processing

With this processing semantics, a message may be handed over the application more than once. An application will have to guard against that.

With this semantics, the stream processing framework hands over the message to the application first. Once the application processing is done, the message pointer is check pointed by the stream processing framework.

In the first failure scenario, the failure happens before the application action has been performed, as below.

start failure application action message pointer check pointed end

There is no detrimental effect in this scenario., whether the application action is idempotent or not. The stream processor will replay the message, because it was not able to checkpoint the message pointer in the earlier attempt.

Next, we sill consider the scenario where the failure happens after the application action as below.

start application action failure message pointer check pointed end

The stream processor will replay the message in this case and the system behavior will depend upon whether the application action is idempotent or not.

An action is idempotent when repeated execution of an action has no undesirable effect. A database update could be idempotent. However if the update involved a counter, it won’t be.

If the action is idempotent, then effectively we are ding exactly once processing. So we can design for such exactly once processing system as long as the the assumption of idem-potency is valid. Realistically, this is the only kind of exactly once processing system we can design for.

Other Ways of Getting Exactly Once Behavior

Are there other ways of getting exactly once processing behavior, barring distributed transaction?

Here is one approach that will work under the following conditions, if the application saves the message pointer along it’s own database update within a transnational boundary.

  • At least once processing semantics
  • User action is a  data base update
  • The message pointer is made available to the application by the streaming system

To prevent repeat execution of the same database update, when the application receives a message it has to check the message pointer against what’s stored in the database. If it already exists, the message will be ignored by the application.

Essentially we are performing a dedup operation. The application is taking steps to prevent duplicate processing of a message. In case the message pointer is not available to the application, it could save the actual message along with whatever database update the application is doing in the same transaction.

De duplication

De-duplication is one way of preventing multiple execution of an operation and achieving exactly once processing semantics. De-duplication as we discussed is achievable if the application action is a database update. Let’s consider some other action e.g. web services call.

Can we prevent multiple calls to a web service with the same data? You might consider keeping track of the web service  call made by logging in a durable storage the fact that the web service got called. However, from a consistency point of view, the two actions of making the web service call and logging must be enclosed within a transaction, which is not possible.  So going down this path proves to be futile.

Idempotent Operations

For database updates, a close examination will reveal whether the database operation is idempotent or not. For example, any operation involving counting i.e., incrementing or decrementing a value is not idempotent.

Consider a scenario in eCommerce, when a customer earns some points, that can be redeemed in future, based on the purchase amount for a transaction. The points earned gets added to the point already earned by the customer and updated in the database. Clearly this operation is not idempotent.

What about a web service? Is it idempotent? It depends on what the server end point of the web service does. If it does a non idempotent database operations, then it’s not idempotent. Sometimes we simply may not the end point behavior and it remains ambiguous.

As far as email, it depends on the nature of email. If it’s a marketing email, it probably does not matter if the same email goes out twice, except that it could be annoying to the recipient. So we could declare sending of such emails an idempotent operation.

Let’s contrast that with a scenario where an alert email goes out when some sensor reading is above or below some threshold. Although not catastrophic, it is undesirable to send multiple emails for the same alert. We could categorize sending such emails as non idempotent operation.

Summing Up

Exactly once processing is a desired and sought after artifact of a stream processing application. However, it’s only achievable under certain conditions. Here is an interesting article to read on this topic

 

 

 

Advertisements

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Big Data, Real Time Processing, Spark Streaming, Storm, stream processing and tagged , , , . Bookmark the permalink.

One Response to Exactly Once Stream Processing Semantics ? Not Exactly

  1. Pingback: Last week in Stream Processing & Analytics 5/24/2016 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s