Execute and View Result

Event Selections

Events are selected by using the MR rule. Here all the events with sensorid ‘tempsensor’ will be selected.

  • MR rule JSON

{
  "Status": "Running",
  "RuleID": 422,
  "RuleName": "MR Rule cep",
  "RuleDescription": "MR Rule for cep",
  "parallelismFactor": 1,
  "input": {
    "type": "",
    "DirectExchange": "Direct.Exchange",
    "RoutingKey": "e325f4fe-a0e3-4252-82af-4c24deb8c0cf",
    "MQName": ""
  },
  "params": [
    {
      "name": "feature",
      "value": ""
    },
    {
      "name": "sensorID",
      "value": "tempsensor"
    },
    {
      "name": "time",
      "value": {
        "starttime": "",
        "endtime": ""
      }
    },
    {
      "name": "expression",
      "value": {
        "if": "",
        "then": "",
        "else": ""
      }
    },
    {
      "name": "ObservedProperties",
      "value": []
    }
  ],
  "output": {
    "TopicExchange": "cepdemo",
    "Topic": "android1"
  }
}

Note

User needs to start the MR rule and publish the observation data to MR.

Publish Observation Data

Input data can be posted to CEP through the following three mediums:

  • SOS : SOS is Sensor Observation Service which registers the sensors and posts observation JSON to MR and then MR posts the data to CEP.

  • DM : DM is Device Management Service which registers the devices and posts data directly to MR and then MR posts the data to CEP.

  • SendRequest API : This is an additional API available in MR for posting data to MR input. Please refer to Publish Observation Data section of Message Routing Service -> Execute and View Result for detailed operation of send request API.

Note

While posting data through SOS and MR ,SOS compliant observation JSON will be tranformed to flat JSON by a Action Rule adapter and the output flat JSON will be published to CEP(CEP supports flat JSON only)

Operations on Events

In the below example average of temp and humidity observed property is calculated over 1 minute window and then filtering is applied on aggregated value using logical AND operator. If condition is satisfied then calculated average value goes to output.

Following is the create rule json :-

{
     "ruleName": "AggregationSample1",
     "selectClause": "avg(s1.temp) as avgtemp,avg(s2.humidity) as avghumidity",
     "joinClause": "",
     "groupByField": "feature",
     "whereClause": "avgtemp > 40 AND avghumidity > 90",
     "triggerInterval": "10",
     "windowDuration": "60",
     "input": [{
                     "alias": "s1",
                     "topic": "test1",
                     "properties": [
                             "temp"
                     ]
             },
             {
                     "alias": "s2",
                     "topic": "test2",
                     "properties": [
                             "humidity"
                     ]
             }
     ],
     "output": "outputaggre1"
}

Note

As seen from the above MR and CEP rule,the output topic of the MR rule is the input topic of the CEP rule.CEP operates on the event which is selected by MR rule.

Actions

In this case, action is the notification on topic. Output will be available to the topic mentioned in the CEP rule. Users can view the result by using SSE Viewer. Please refer to the User’s guide of SSE for details.

Note

In SSE Viewer page topic name should be the same as it is provided in the output of CEP rule and the rule should be in running status.

The following input JSON should be used to publish the data for the above aggregation CEP Rule.

  • Input Flat JSON for stream1

{
     "feature": "testRoom",
     "sensor": "tempsensor",
     "temp": 46,
     "starttime": "24-Aug-2020 15:51:20.000 UTC"
}
  • Input Flat JSON for stream2

{
     "feature": "testRoom",
     "sensor": "humiditysensor",
     "temp": 94,
     "starttime": "24-Aug-2020 15:51:20.000 UTC"
}