You show up at work in the morning and open your email to find 100 alarm emails in your inbox for the same error from an application running on some server within a short time window of 1 minute. You are off to to bad start, struggling to find other emails. I was motivated by this unpleasant experience to come up with a solution to stop the deluge of the same alarm emails in a small time window.
When there is a burst of events it’s essentially a cluster on the temporal dimension. If we can identify the clusters from the real time stream of events, then we can send only one or few alarms per cluster, instead of of one alarm per event. If the cluster extends over an long period, we could send multiple alarms.
I have implemented the solution in Spark Streaming and it’s available in my OSS project ruscello in github. The core stream processing algorithms are implemented in plain Java in another OSS project of mine called hoidla. So the solution can easily be implemented in any other streaming platform e.g. Storm or Flink.
Event Burst is a Temporal Cluster
As I alluded to it earlier, a burst of events is a essentially a cluster in the temporal dimension. A cluster is characterized by the following two properties.
- Data points within a cluster are maximally cohesive, i.e. they are close to each other. For time series data, cohesiveness is defined by the gap between successive data points.
- Data points within a cluster are maximally separated from data points in other clusters. Cluster separation is not going to be of much concern for us.
Time gap between successive events is an important parameters. We will detect cluster using various statistics based on time gap. The various algorithms are listed below. All computation are done for a window with a specified time span.
|Number of occurrences||Number of occurrences above threshold|
|Average interval||Average interval below threshold|
|Maximum interval||Maximum interval below threshold|
The first two conditions are essentially same, expressed in different way, except for one difference. With average interval, you could also enable detection of clusters within a window, by appropriately specifying two additional configuration parameters. There is more explanation of this feature later in the post.
Sometimes it makes sense to join some conditions together conjunctively (AND) or disjunctively (OR). A score is returned based on the evaluation of the cluster conditions. For example, the second and third conditions and be used together conjunctively, which will constrain the mean and the variance of the time gap.
The third condition implicitly applies an upper bound on the variance of time gap, albeit in a crude way . It is better to actually use variance or standard deviation. I will make that enhancement when I get a chance.
When the conditions joined conjunctively, weights can be specified as an option for each condition and an weighted score can be calculated.
The decision to generate an alarm is based on the score. An alarm is always generated at the onset of an event cluster. How many additional alarms will be generated for an event cluster depends on 2 parameters, depending on whether the event stream is in a cluster mode or not.
Any event stream at any timeis in any of the two modes as below. It’s in cluster mode when the cluster conditions are met.
- Non cluster
In cluster mode, alarm generation is controlled by the window sliding step. The time bound window has a sliding step specified through the parameter eventCluster.window.timeStep . Computation is performed when the window is full i.e. every sliding step interval. For example, If we have have an widow spanning 20 sec with a sliding interval of 10 sec, cluster score computation will be performed every 10 sec.
In the non cluster mode, alarm generation is controlled through the parameter eventCluster.window.minEventTimeInterval. The time gap between successive alarms will not be less than the specified time interval.
Through these two parameters, alarm flooding can be controlled by reducing the number of alarms significantly.
We are considering events based on server logs of some application. It is assumed that the server logs are pre processed to generate records with the following content. Error code and time stamp are generally in logs. Logs may include host name also.
|Host ID||Host name or IP address for the server|
|Application ID||Unique ID for the application running on server|
|Error code||Error code from the server log|
|Time stamp||Time stamp corresponding to the error event|
The first 3 fields constitute the key in spark processing. For each unique combination of (host ID, application ID, error code) we want to process the event stream and detect event cluster.
The implementation is in the scala object EventCluster. It can take input stream from the following sources. Choice can be made through a configuration parameter.
I have used text socket source for my testing. The implementation relies on Spark Streaming state management and uses the mapWithState() function. For each unique (host ID, application ID, error code) there is a time bound window that is part of the Spark Streaming state. HDFS is used for state checkpointing.
A natural question that might come up is why window functions offered by Spark is not being used. They are just too simplistic and does not meet the requirements for this use case. I have used time bound window from hoidla, which is a java library for real time streaming application. It provides the following features.
- Various types of windows and associated processing algorithms
- Various approximate aggregation algorithms
Here is some output for alarm. This test was done with a stream of 500 events, consisting of multiple event clusters. This output fragment shows a cluster involving the host, application and error combination of (50ZQTJZM,JZ7CD5,60035). Alarms are generated about 5 sec apart, because window slide step is configured to be 5 sec.
(1U79V3L3,N639GQ,20089,1471670766492,true) (2KL6O08J,VZ5ZQ1,40088,1471670778598,true) (2KL6O08J,VZ5ZQ1,40010,1471670783746,true)
Here is some output fragment in non clustered ambient state of event generation. The alarms are further apart as determined by minimum alarm time interval parameter.
(F2693VBD,612S9T,20075,1471707963373,true) (F2693VBD,612S9T,20104,1471707976177,true) (50ZQTJZM,JZ7CD5,60003,1471707996255,true) (F2693VBD,612S9T,20048,1471708004148,true) (F2693VBD,612S9T,20080,1471708023964,true)
The number of alarms from this run is less than 50, which translates to a 90% reduction in the number of alarms. Without the alarm controlling logic, there would have been one alarm per event. The number of alarms can be reduced further by appropriately controlling the two configuration parameters I alluded to earlier.
It’s conceivable, that you want to control the alarm generation rate depending on the application and error code. One way to do that would be to make these configuration parameters application and error code dependent, instead of global.
Although I have used error code as part of the key, you could just use host ID and application ID as the key if you don’t care about the error codes.
Event Stream Generation
I have used a python script to generate events. It also serves as a socket server, serving the events to Spark Streaming. Here is some console output of the script.
connected with 127.0.0.1:64492 connected with 127.0.0.1:64495 connected with 127.0.0.1:64498 num of messages sent: 1 connected with 127.0.0.1:64501 starting cluster connected with 127.0.0.1:64508 num of messages sent: 4 connected with 127.0.0.1:64511 num of messages sent: 8 connected with 127.0.0.1:64520 num of messages sent: 7 connected with 127.0.0.1:64523 num of messages sent: 8 connected with 127.0.0.1:64526 num of messages sent: 8 connected with 127.0.0.1:64529 num of messages sent: 8 connected with 127.0.0.1:64532 num of messages sent: 7
It shows the number of messages sent to spark, when the Spark receiver calls. In this output we see an event cluster formation, as the number of messages sent increases sharply.
Generally clusters are detected only if they span across the window i.e. when the cluster length is greater than the window length. This forces the window length to be set to a value small enough to be smaller than than expected cluster length. Unfortunately, with a small window length, too many alarms may be generated.
Ideally, to alleviate this problem, we would like to set a large window length and we should be able to detect multiple clusters with the window. This feature can be enabled by setting the parameter findClusterWithin to true and setting the parameter minClusterSize appropriately. When enabling this feature, window processing strategy averageInterval should be used. First, the window processing function tries to detect a cluster across the window. If that fails it goes on to find clusters within the window.
For detecting cluster, hierarchical agglomerative clustering algorithm is used. Since time sequence data is ordered, this algorithm is easy to implement.
Other Use Cases
This solution can be used for many other similar use cases. For example, you may have an IoT application, where you want to detect outlier in the sensor data and trigger alarms, when outliers are detected.
You are up against the same problem and could adopt the same solution. If the outlier events consist of (sensor ID, timestamp). you could use the sensor ID as the key. A separate window will be created and managed for each sensor.
Realtime Streaming Library
Following blogs of mine provide more details on the capabilities of hoidla. I have implement various Storm and Spark Streaming based realtime use cases, leveraging stream processing library in hoidla.
- Real Time Detection of Outliers in Sensor Data using Spark Streaming
- Counting Unique Mobile App Users with HyperLogLog
- Tracking Web Site Bounce Rate in Real Time
Since it’s a plain java library, it can be used from any application written in JVM language, whether it’s running under Spark Streaming, Flink or Storm.
We have gone through a simple and intuitive solution for controlling alarm flooding using Spark Streaming. Here is the tutorial document to run this use case. Here is the configuration file used for this use case.