Stream processing systems are characterized by at least once, at most once and exactly once processing semantics. These are important characteristics that should be carefully considered from the point of view of consistency and durability of a stream processing application. However if a stream processing product claims to guarantee exactly once processing semantics, you should carefully read the fine prints.
The inconvenient truth is that a stream processing product can not unilaterally guarantee exactly once processing semantics. It’s true under certain assumptions or when the application and the stream processing frame work collaborate in certain ways.
From a system architecture point of view a stream processing framework can only implement at least once or at most once processing semantics. Exactly once processing happens to be a side effect of at least once processing semantics.
For the sake of discussion we could think of Storm or Spark Streaming as the streaming platform. Essentially there are two critical events in stream processing. The stream processing framework has to keep track of the last message that got processed by storing it in some durable storage. The application typically does some processing with a message and takes some action. Here are some example application actions
- Write to database
- Make a web service call
- Send an email alert
The reason exactly once processing can not be guaranteed is because there is no orchestration between saving current message pointer by the framework and the action taken by the application and the two operations are not performed atomically. We are assuming there is no distributed transaction with the two operations.
Irrespective of the stream processing semantics, in an ideal condition with no failure, the system will always behave like exactly once processing.
Next, we will consider different failure scenario for at most once and at least once processing semantics and find the conditions under which we can get effectively exactly once processing.
I will use the term message pointer to the refer to the last message processed by the system. It’s interpretation will depend on the platform being used. For example, if the messages are being read from Kafka, it will be the message offset.
Exactly Once Processing
As mentioned earlier, this kind of semantics can not be unilaterally guaranteed by the stream processing frame work.
However, it’s not as bleak as it sounds. We can get exactly once processing as a side effect of at least once processing when one of the following conditions are met.
- When the application action has a specific characteristic
- When the stream processing framework and the application collaborate in a a particular way
Unfortunately, if none of the conditions are met then exactly once processing is not achievable.
At Most Once Processing
As the name suggests, a message will be handed over to the application at most once i.e. no message will be replayed, which implies that some messages may never be processed by the application depending upon when failure happens.
With this message processing semantics, the stream processing framework saves the message pointer before handing over the message to the application. Let’s consider different failure scenario and the impact on the system behavior.
If the failure happens after the message pointer is check pointed, but before the application action has been performed as below , we have a lost message.
|start||message pointer check pointed||failure||application action||end|
In the next failure scenario, we have failure after the application action as below. Since the failure happens after all the critical steps, the behavior is effectively similar to a successful scenario.
|start||message pointer check pointed||application action||failure||end|
In this scenario, we have exactly once processing as side effect. But we never design systems for failure.
Based on our analysis, we can conclude that with at most once processing we can not achieve exactly once processing semantics, when failure happens.
At Least Once Processing
With this processing semantics, a message may be handed over the application more than once. An application will have to guard against that.
With this semantics, the stream processing framework hands over the message to the application first. Once the application processing is done, the message pointer is check pointed by the stream processing framework.
In the first failure scenario, the failure happens before the application action has been performed, as below.
|start||failure||application action||message pointer check pointed||end|
There is no detrimental effect in this scenario., whether the application action is idempotent or not. The stream processor will replay the message, because it was not able to checkpoint the message pointer in the earlier attempt.
Next, we sill consider the scenario where the failure happens after the application action as below.
|start||application action||failure||message pointer check pointed||end|
The stream processor will replay the message in this case and the system behavior will depend upon whether the application action is idempotent or not.
An action is idempotent when repeated execution of an action has no undesirable effect. A database update could be idempotent. However if the update involved a counter, it won’t be.
If the action is idempotent, then effectively we are ding exactly once processing. So we can design for such exactly once processing system as long as the the assumption of idem-potency is valid. Realistically, this is the only kind of exactly once processing system we can design for.
Other Ways of Getting Exactly Once Behavior
Are there other ways of getting exactly once processing behavior, barring distributed transaction?
Here is one approach that will work under the following conditions, if the application saves the message pointer along it’s own database update within a transnational boundary.
- At least once processing semantics
- User action is a data base update
- The message pointer is made available to the application by the streaming system
To prevent repeat execution of the same database update, when the application receives a message it has to check the message pointer against what’s stored in the database. If it already exists, the message will be ignored by the application.
Essentially we are performing a dedup operation. The application is taking steps to prevent duplicate processing of a message. In case the message pointer is not available to the application, it could save the actual message along with whatever database update the application is doing in the same transaction.
De-duplication is one way of preventing multiple execution of an operation and achieving exactly once processing semantics. De-duplication as we discussed is achievable if the application action is a database update. Let’s consider some other action e.g. web services call.
Can we prevent multiple calls to a web service with the same data? You might consider keeping track of the web service call made by logging in a durable storage the fact that the web service got called. However, from a consistency point of view, the two actions of making the web service call and logging must be enclosed within a transaction, which is not possible. So going down this path proves to be futile.
For database updates, a close examination will reveal whether the database operation is idempotent or not. For example, any operation involving counting i.e., incrementing or decrementing a value is not idempotent.
Consider a scenario in eCommerce, when a customer earns some points, that can be redeemed in future, based on the purchase amount for a transaction. The points earned gets added to the point already earned by the customer and updated in the database. Clearly this operation is not idempotent.
What about a web service? Is it idempotent? It depends on what the server end point of the web service does. If it does a non idempotent database operations, then it’s not idempotent. Sometimes we simply may not the end point behavior and it remains ambiguous.
As far as email, it depends on the nature of email. If it’s a marketing email, it probably does not matter if the same email goes out twice, except that it could be annoying to the recipient. So we could declare sending of such emails an idempotent operation.
Let’s contrast that with a scenario where an alert email goes out when some sensor reading is above or below some threshold. Although not catastrophic, it is undesirable to send multiple emails for the same alert. We could categorize sending such emails as non idempotent operation.
Exactly once processing is a desired and sought after artifact of a stream processing application. However, it’s only achievable under certain conditions. Here is an interesting article to read on this topic
For commercial support for any solution in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,