Complex Event Processing (CEP)¶
1. Introduction¶
Complex Event Processing (CEP) is a rule based service that allows data from multiple streams to combine and processes them in real time based on user-defined rules to match conditions and produce events or alarms. It consumes data emitted by the Message Routing service based on routing rules and processes that data. The final output gets written to a sink. CEP not only makes it possible to process current data but also historical data. Event streams are collected in user defined ‘windows’ and conditions or aggregations are checked on these windows of data streams.
The following is a detailed flow diagram for CEP service:
1.1 Intended Audience¶
The intended audience of this document is anyone who wants to have an overview of TCUP Complex Event Processing Service. After going through this document, the user will understand the capability of TCUP CEP Service in IoT platform.
2. Key Concepts¶
In order to use the Complex Event Processing Service, a user needs to understand some of the basic building blocks in Complex Event Processing Services.
2.1 Input¶
This is the entry point from where CEP rules take data to process. This provides the stream of data to the CEP rule to process further. This is generally the destination point of Message Routing service. Each CEP rule internally reads from the input and processes them based on the user defined CEP rule.
2.2 Output¶
Output is the entity where output of the CEP rule gets published for the consumer.
2.3 Streams¶
It is an unbounded sequence of sensor data i.e. when the data would stop arriving is not known.
2.4 Window¶
In order to perform some operation on unbounded streams of data, window is needed. CEP operations are done on window data. Windows can be defined by the following four properties:
Window Type – only time based window is supported as of now.
Eviction policy - Based on data eviction policy, it can be sliding and tumbling window.
Window Size – It describes the length of window like thirty seconds window .
2.5 Condition¶
Simple sql like interface in the rule to write simple sql queries to perform joining, aggregation or filtering operations. It has three parts
selectClause - Standard sql select query along with some supported spark sql specific select query with the help of below keywords:
sum,avg,max,min,count - pure aggregation functions
lag,lead,rank – non aggregation functions(immediate previous,immediate next,order by desc)
Example: s1.temp - lag(1).s1.temp as tempdiff
whereClause - Condition clause with the alias already defined in “selectClause”
Example: tempdiff <= 0
joinClause - Join condition between two/many streams,based on common field,difference of event time in two streams along with optional filtering condition.
Example: (s1.feature = s2.feature) AND abs(s1.starttime - s2.starttime) <= 60 AND (s1.temp > 40) AND (s2.humidity > 90)
3. Functional Capabilities¶
The Complex Event Processing provides the following functional capabilities:
It can accept data from multiple sources on a real-time basis and perform the following:
Joins and correlates that data and selecting fields from joined stream within a window
Perform aggregation of multiple streams within and without(cumulative) window
Performing any sql operation on single stream or joined stream from multiple streams
It processes the continuous flow of data in a window on real-time basis.
Grouping of stream data can be done by sensors or object to be observed or both.
Aggregation functions such as sum, average, maximum, minimum and count are supported on a window of streams or without window(cumulative).
Rule conditions or aggregation function can be detected on the window of stream data periodically or immediately once the data arrives.
Rule can be started with or without old data
Out of order data can also be processed
Window can be done on event time or processing time
Output can be triggered at specified interval based on value of “triggerInterval” parameter
4. Purpose/Usage¶
Many applications can be built using this service. Some of the common usage patterns are as follows:
To detect water leakage and generate an alert when the home is locked - Multiple stream joining template can be used to detect conditions such as ‘water leakage’ and generate alert. The condition can be defined as ‘humidity’ for bathroom sensor is greater than 70% for 30 minutes and the home security status is ‘locked’. In this scenario two data streams get generated, one from the humidity sensor and the other from the home security sensor.
To identify and generate alerts for power failure - Absence pattern can be detected to generate alerts such as ‘power failure’. The condition can be defined as energy consumption ‘not reported’ or is ‘zero’ for fifteen minutes. Smart Energy meter is used to publish the consumption data.
To detect energy wastage and generate alerts - Multiple rules can be chained to detect conditions for generating energy wastage alert. The condition can be defined as the home security status being ‘locked’ and the energy consumption being 10% greater than the average level for non-occupancy. In this scenario the average level for non-occupancy is calculated based on aggregation rule and its output is fed to the joining rule.
Detect the speed of a bike corresponding to the maximum lateral acceleration - Two sensors (lateral acceleration and speed) independently and with different sampling rates capturing bike data and sending to the data stream. We need to detect the speed corresponding to the maximum lateral acceleration. First we need to create dataset for speed sensor observations in time based sorted form with a MR rule. Then using cep rule we can find out the time when the lateral acceleration is maximum. Then with another MR rule lookup speed sensor dataset for finding nearest timestamp and its speed value.
5. Examples¶
Consider an example of elderly patient monitoring system and remote healthcare services for senior citizens. Elderly people are constantly monitored by placing motion sensors in their homes. In case there is no movement detected by the motion sensors for a certain period of time then it might be a cause of concern and it should generate notification to the caregiver network.
The sensors are connected to gateway over a short range communication network such as blue-tooth. Gateway is connected to TCUP via a wireless internet connection. The gateway runs an embedded Linux operating system.
A data acquisition software is running on the gateway. This software uses Sensor Observation Service to push data to TCUP cloud from where Message Routing service takes the data and does the required filtering. Then Complex Event Processing gets the data by simply subscribing to the destination of Message Routing service.
The application is modelled in TCUP as follows:
Patient is the Feature of Interest i.e. the object being observed. Patient ID is mapped on to the Feature ID.
Sensors used are passive infrared sensor for motion detection and accelerometers fixed to the bed to detect movement of the patient.
Different axis of movements of the patient are observed properties of the sensor
If proximity based passive infrared sensor does not post any data to CEP and also if no movement gets detected in the accelerometer for the last 30 minutes then CEP will detect that and generate an alert for the caregiver network to take appropriate actions.
6. Reference Document¶
Please refer to the following documents for more details about this service:
User guide
API Guide