PureAggregation with Filtering¶
Below are sample rules with pure aggregation and filtering capabilities.
Aggregation of Multiple Streams within window and filtering¶
Here average of temp and humidity observed property is calculated over 1 minute window and then filtering is applied on aggregated value using logical AND operator. If condition is satisfied then calculated average value goes to output.
Following is the sample create rule json :-
{
"ruleName": "AggregationSample1",
"selectClause": "avg(s1.temp) as avgtemp,avg(s2.humidity) as avghumidity",
"joinClause": "",
"groupByField": "feature",
"whereClause": "avgtemp > 40 AND avghumidity > 90",
"triggerInterval": "10",
"windowDuration": "60",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
},
{
"type": "stream",
"alias": "s2",
"topic": "test2",
"properties": [
"humidity",
"t"
]
}
],
"output": {
"type": "topic",
"name": "outputaggre1"
}
}
Note
The above rule is an example of pure aggregation rule,the value of groupBy in aggregation functions will be defined in the parameter “groupByField”
Calculate Moving average of a field¶
Here we are calculating moving average of temp observed property
Following is the sample create rule json :-
{
"ruleName": "MovingAvgRule",
"selectClause": "avg(s1.temp) as runningavgtemp",
"whereClause": "",
"joinClause": "",
"groupByField": "sensor",
"triggerInterval": "1",
"windowDuration": "",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [{
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
}],
"output": {
"type": "topic",
"name": "outputaggre2"
}
}
Note
The above rule is an example of running or cumulative aggregation,if ”windowDuration” is not given or if it is blank,that means it is running aggregation without window
Filtering on Running count of a field¶
Here we are calculating running count of temp observed property,applying filtering on running count and giving output if condition is satisfied.
Following is the sample create rule json :-
{
"ruleName": "RunningCountRule",
"selectClause": "count(s1.temp) as runningcounttemp",
"whereClause": "runningcounttemp > 2",
"joinClause": "",
"groupByField": "sensor",
"triggerInterval": "1",
"windowDuration": "",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
}
],
"output": {
"type": "topic",
"name": "outputaggre2"
}
}
Note
The above rule is an example of running or cumulative aggregation,if ”windowDuration” is not given or if it is blank,that means it is running aggregation without window
Now we will create above rule with following way :-
Calculate sum of a field over the window of 60 seconds with 30 seconds overlapping window¶
Here we are calculating sum of temp observed property over the window of 60 seconds with 30 seconds overlapping window ,applying filtering on sum and giving output if condition is satisfied.
Following is the sample create rule json :-
{
"ruleName": "AggregationSample3",
"selectClause": "sum(s1.temp) as sumtemp",
"whereClause": "sumtemp > 40",
"joinClause": "",
"groupByField": "sensor",
"triggerInterval": "1",
"windowDuration": "60",
"slidingDuration": "30",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
}
],
"output": {
"type": "topic",
"name": "outputaggre3"
}
}
Posting output streaming data to delta lake¶
Here we are inserting output streaming data to delta lake with table name,mode,overwriteFlag and partitionBy parameters
Following is the sample create rule json :-
{
"ruleName": "AggregationDemo2",
"selectClause": "count(s1.temp) as runningcounttemp",
"whereClause": "runningcounttemp > 2",
"groupByField": "sensor",
"triggerInterval": "1",
"windowDuration": "",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp"
"t"
]
}
],
"output": {
"type": "delta",
"name": "aggrdelta182221",
"details": {
"mode": "Overwrite",
"overwriteFlag": "true",
"partitionBy": "dayofweek"
}
}
}
Note
Spark version 2.4.x does not support delta lake functionalities, need spark version 3.x.x
rule with maxLateArrivalDelay¶
Here “maxLateArrivalDelay” is the maximum delay that CEP will allow to process out of order data . so “maxLateArrivalDelay” is the threshold time by which data can delayed from the oldest window start time
so if 2 events are published with event time “Mon Jan 24 14:28:35 IST 2022” and “Mon Jan 24 14:30:35 IST 2022” and “maxLateArrivalDelay” is 2 min then the minimum eventtime of the out of order data which will be processed is (14:28:00 - 2 min) = 14:26:00 and all data which comes before 14:26:00 will be discarded.
(14:28:00 is the starttime of oldest window,the other window starttime is 14:30:00)
Following is the sample create rule json :-
{
"ruleName": "AggregationWithWaterMark1",
"selectClause": "count(s1.temp) as counttemp",
"groupByField": "sensor",
"triggerInterval": "30",
"windowDuration": "60",
"slidingDuration": "30",
"maxLateArrivalDelay": "120",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
}
],
"output": {
"type": "topic",
"name": "outputaggre3"
}
}