PureAggregation with Filtering

Below are sample rules with pure aggregation and filtering capabilities.

Aggregation of Multiple Streams within window and filtering

Here average of temp and humidity observed property is calculated over 1 minute window and then filtering is applied on aggregated value using logical AND operator. If condition is satisfied then calculated average value goes to output.

Following is the sample create rule json :-

{
     "ruleName": "AggregationSample1",
     "selectClause": "avg(s1.temp) as avgtemp,avg(s2.humidity) as avghumidity",
     "joinClause": "",
     "groupByField": "feature",
     "whereClause": "avgtemp > 40 AND avghumidity > 90",
     "triggerInterval": "10",
     "windowDuration": "60",
     "event": {
         "name": "t",
         "format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
     },
     "input": [{
                     "type": "stream",
                     "alias": "s1",
                     "topic": "test1",
                     "properties": [
                             "temp",
                             "t"
                     ]
             },
             {
                     "type": "stream",
                     "alias": "s2",
                     "topic": "test2",
                     "properties": [
                             "humidity",
                             "t"
                     ]
             }
     ],
     "output": {
         "type": "topic",
         "name": "outputaggre1"
     }
}

Note

The above rule is an example of pure aggregation rule,the value of groupBy in aggregation functions will be defined in the parameter “groupByField”

Calculate Moving average of a field

Here we are calculating moving average of temp observed property

Following is the sample create rule json :-

{
     "ruleName": "MovingAvgRule",
     "selectClause": "avg(s1.temp) as runningavgtemp",
     "whereClause": "",
     "joinClause": "",
     "groupByField": "sensor",
     "triggerInterval": "1",
     "windowDuration": "",
     "event": {
   "name": "t",
   "format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
 },
     "input":  [{
                   "alias": "s1",
                   "topic": "test1",
                    "properties": [
                         "temp",
                             "t"
                 ]
     }],
     "output": {
             "type": "topic",
             "name": "outputaggre2"
     }
 }

Note

The above rule is an example of running or cumulative aggregation,if ”windowDuration” is not given or if it is blank,that means it is running aggregation without window

Filtering on Running count of a field

Here we are calculating running count of temp observed property,applying filtering on running count and giving output if condition is satisfied.

Following is the sample create rule json :-

{
 "ruleName": "RunningCountRule",
 "selectClause": "count(s1.temp) as runningcounttemp",
 "whereClause": "runningcounttemp > 2",
 "joinClause": "",
 "groupByField": "sensor",
 "triggerInterval": "1",
 "windowDuration": "",
     "event": {
   "name": "t",
   "format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
 },
 "input": [
     {
         "alias": "s1",
         "topic": "test1",
         "properties": [
             "temp",
                             "t"
         ]
     }
 ],
 "output": {
         "type": "topic",
         "name": "outputaggre2"
     }
}

Note

The above rule is an example of running or cumulative aggregation,if ”windowDuration” is not given or if it is blank,that means it is running aggregation without window

Now we will create above rule with following way :-

Calculate sum of a field over the window of 60 seconds with 30 seconds overlapping window

Here we are calculating sum of temp observed property over the window of 60 seconds with 30 seconds overlapping window ,applying filtering on sum and giving output if condition is satisfied.

Following is the sample create rule json :-

{
 "ruleName": "AggregationSample3",
 "selectClause": "sum(s1.temp) as sumtemp",
 "whereClause": "sumtemp > 40",
 "joinClause": "",
 "groupByField": "sensor",
 "triggerInterval": "1",
 "windowDuration": "60",
 "slidingDuration": "30",
     "event": {
         "name": "t",
         "format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
     },
 "input": [
     {
         "alias": "s1",
         "topic": "test1",
         "properties": [
             "temp",
                     "t"
         ]
     }
 ],
 "output": {
         "type": "topic",
         "name": "outputaggre3"
     }
}

Posting output streaming data to delta lake

Here we are inserting output streaming data to delta lake with table name,mode,overwriteFlag and partitionBy parameters

Following is the sample create rule json :-

 {
  "ruleName": "AggregationDemo2",
  "selectClause": "count(s1.temp) as runningcounttemp",
  "whereClause": "runningcounttemp > 2",
  "groupByField": "sensor",
  "triggerInterval": "1",
  "windowDuration": "",
      "event": {
          "name": "t",
          "format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
      },
  "input": [
      {
          "type": "stream",
          "alias": "s1",
          "topic": "test1",
          "properties": [
              "temp"
                              "t"
          ]
      }
  ],
  "output": {
  "type": "delta",
  "name": "aggrdelta182221",
      "details": {
      "mode": "Overwrite",
      "overwriteFlag": "true",
      "partitionBy": "dayofweek"
    }
}

}

Note

Spark version 2.4.x does not support delta lake functionalities, need spark version 3.x.x

rule with maxLateArrivalDelay

Here “maxLateArrivalDelay” is the maximum delay that CEP will allow to process out of order data . so “maxLateArrivalDelay” is the threshold time by which data can delayed from the oldest window start time

so if 2 events are published with event time “Mon Jan 24 14:28:35 IST 2022” and “Mon Jan 24 14:30:35 IST 2022” and “maxLateArrivalDelay” is 2 min then the minimum eventtime of the out of order data which will be processed is (14:28:00 - 2 min) = 14:26:00 and all data which comes before 14:26:00 will be discarded.

(14:28:00 is the starttime of oldest window,the other window starttime is 14:30:00)

Following is the sample create rule json :-

{
 "ruleName": "AggregationWithWaterMark1",
 "selectClause": "count(s1.temp) as counttemp",
 "groupByField": "sensor",
 "triggerInterval": "30",
 "windowDuration": "60",
 "slidingDuration": "30",
 "maxLateArrivalDelay": "120",
     "event": {
         "name": "t",
         "format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
     },
 "input": [
     {
                 "type": "stream",
         "alias": "s1",
         "topic": "test1",
         "properties": [
             "temp",
                     "t"
         ]
     }
 ],
 "output": {
         "type": "topic",
         "name": "outputaggre3"
     }
}