MultipleStreamJoining/filtering/SQL on streaming data¶
Below are sample rules with multiple stream joining,filtering,SQL on streaming data etc capabilities
Joining two streams based on common feature of interest and window of 1 minute¶
Joining is performed on two events or streams based on common feature of interest and within a window of 1 minute
Following is the sample create rule json :-
{
"ruleName": "JoiningSample1",
"selectClause": "s1.temp as currtemp,s2.humidity as currhumidity,s1.feature as feature,s1.t as s1time,s2.t as s2time",
"whereClause": "currtemp > 40 AND currhumidity > 90",
"joinClause": "(s1.feature = s2.feature) AND abs(s1.t - s2.t) <= 60",
"groupByField": "feature",
"triggerInterval": "30",
"windowDuration": "",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
},
{
"type": "stream",
"alias": "s2",
"topic": "test2",
"properties": [
"humidity",
"t"
]
}
],
"output": {
"type": "topic",
"name": "cepopjoin1"
}
}
Sample rule with standard sql functions(difference between the temp values in a room and the maximum value of temp in the same room)¶
standard sql functions is performed on streaming data. Here we calculate difference between the temp values in a room and the maximum value of temp in the same room as output
Following is the sample create rule json for the given template :-
{
"ruleName": "JoiningSample1",
"selectClause": "max(s1.temp) OVER (PARTITION BY s1.feature) - s1.temp as tempdiff,s1.temp as currrtemp,s1.feature as room",
"whereClause": "",
"joinClause": "",
"groupByField": "feature",
"triggerInterval": "30",
"windowDuration": "",
"input": [{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp"
]
}],
"output": {
"type": "topic",
"name": "outputjoin1"
}
}
Note
The above rule is an example of a rule with standard sql functions,here user needs to give OVER , PARTITION BY clause along with aggregation functions similar to normal sql(unlike pure aggregations)
Sample rule with standard sql functions(calculating count of a field)¶
standard sql functions is performed on streaming data. Here we are calculating count of observed property temp as output.
Following is the sample create rule json :-
{
"ruleName": "JoiningSampleCount",
"selectClause": "count(*) OVER (PARTITION BY s1.feature) as counttemp,s1.feature as category",
"whereClause": "",
"joinClause": "",
"groupByField": "feature",
"triggerInterval": "10",
"windowDuration": "",
"input": [
{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp"
]
}
],
"output": {
"type": "topic",
"name": "outputjoin3"
}
}
Now we will create above rule with following way :-
Filtering rule with current and previous values of a field¶
Here we are calculating difference of current and immediate previous value of temp observed property and giving output after applying filtering on that difference.
Following is the sample create rule json :-
{
"ruleName": "CurrPrevRule",
"selectClause": "s1.temp - s1.temp.lag(1) as tempdiff,s1.temp as currtemp,s1.temp.lag(1) as prevtemp",
"whereClause": "tempdiff <= 0",
"joinClause": "",
"groupByField": "feature",
"triggerInterval": "10",
"windowDuration": "",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
}],
"output": {
"type": "topic",
"name": "outputjoin3"
}
}
Filtering rule with current and next values of a field¶
Here we are calculating difference of current and immediate next value of temp observed property and giving output after applying filtering on that difference.
Following is the sample create rule json:-
{
"ruleName": "CurrNextRule",
"selectClause": "s1.temp - s1.temp.lead(1) as tempdiff,s1.temp as currtemp,s1.temp.lead(1) as nexttemp",
"whereClause": "tempdiff >= 0",
"joinClause": "",
"groupByField": "feature",
"triggerInterval": "10",
"windowDuration": "",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
}],
"output": {
"type": "topic",
"name": "outputjoin3"
}
}
Filtering rule with rank of a field based on order by value descending of that field¶
Here we are calculating rank of temp observed property based on descending stemp values within 10 seconds window and giving output after applying filtering on rank. In the below rule we are finding top two temp values
Following is the sample create rule json:-
{
"ruleName": "FindTopTwoRule",
"selectClause": "s1.temp.rank as ranktemp,s1.temp as currrtemp,s1.sensor as category",
"whereClause": "ranktemp <= 2",
"joinClause": "",
"groupByField": "sensor",
"triggerInterval": "10",
"windowDuration": ""
"input": [{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp"
]
}],
"output": {
"type": "topic",
"name": "outputjoin3"
}
}
Joining more than two Streams within window with multiple join conditions¶
Here we are joining more than two streams within 60 seconds window with multiple join conditions
Following is the sample create rule json :-
{
"ruleName": "JoiningMultipleStreams",
"selectClause": "s1.temp as currtemp,s2.humidity as currhumidity,s3.pressure as currpressure,s1.feature as feature,s1.t as s1time,s2.t as s2time,s3.t as s3time",
"joinClause": "(s1.feature = s2.feature) AND abs(s1.t - s2.t) <= 30,(s2.feature = s3.feature) AND abs(s2.t - s3.t) <= 30",
"whereClause": "",
"groupByField": "feature",
"triggerInterval": "60",
"windowDuration": "",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [{
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
},
{
"alias": "s2",
"topic": "test2",
"properties": [
"humidity",
"t"
]
},
{
"alias": "s3",
"topic": "test3",
"properties": [
"pressure",
"t"
]
}
],
"output": {
"type": "topic",
"name": "outputjoin4"
}
}
Note
When joining more than two streams user need to give multiple comma separated join conditions as shown in the above rule
Joining streaming data with static tables¶
Here we are joining stream “s1” with static table “rule_meta1” with common field “feature” and then applying filtering on joined data
Following is the sample create rule json :-
{
"ruleName": "JoiningStreamStatic1",
"selectClause": "s1.temp as currtemp,rule_meta1.temp as templookup,s1.feature as currfeature,currtemp - templookup as tempdiff",
"whereClause": "tempdiff >= 0",
"joinClause": "(s1.feature = rule_meta1.feature)",
"groupByField": "feature",
"triggerInterval": "10",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
},
{
"type": "database",
"alias": "rule_meta1",
"topic": "",
"properties": [
"temp",
"t"
],
"connectionDetails": {
"url": "jdbc:postgresql://postgres:5432/CEPSpark",
"user": "postgres",
"password": "postgres",
"table": "rule_meta1"
}
}
],
"output": {
"type": "topic",
"name": "outputjoin3"
}
}
Note
When joining stream with static table ,static table database credentials must be provided in the “connectionDetails” section
Executing user defined functions(udf) with streaming data¶
Here we are calling user defined functions(udf) “strLen”. the udf must be called by following convention “udf.{className}.{udfName(params)}”. Before calling the udf, it must be uploaded to CEP using upload udf API
Following is the sample create rule json :-
{
"ruleName": "UdfRule1",
"status": "Stopped",
"selectClause": "s1.sensor as currsensor,udf.SparkUdfFunctions.strLen(currsensor,1) as sensorlength",
"groupByField": "feature",
"triggerInterval": "1",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
}
],
"output": {
"type": "topic",
"name": "outputjoin3"
}
}
Note
As Spark version 2.4.x does not support java 11 ,the udf functions must be written with java 8 and jar must be built with java 8
Posting output streaming data to Delta Lake¶
Here we are inserting output streaming data to delta lake with table name,mode,overwriteFlag and partitionBy parameters
Following is the sample create rule json :-
{
"ruleName": "Joiningdelta1",
"status": "Stopped",
"selectClause": "s1.temp as currtemp,s2.humidity as currhumidity,s1.feature as feature,s1.t as s1time,s2.t as s2time,s1.partitioncol as partcol",
"joinClause": "(s1.feature = s2.feature) AND abs(s1.t - s2.t) <= 60",
"groupByField": "feature",
"triggerInterval": "10",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
},
{
"type": "stream",
"alias": "s2",
"topic": "test2",
"properties": [
"humidity",
"t"
]
}
],
"output": {
"type": "delta",
"name": "joindelta1",
"details": {
"mode": "Overwrite",
"overwriteFlag": "true",
"partitionBy": "dayofweek"
}
}}
Note
Spark version 2.4.x does not support delta lake functionalities, need spark version 3.x.x
rule with maxLateArrivalDelay¶
Here “maxLateArrivalDelay” is the maximum delay that CEP will allow to process out of order data . so “maxLateArrivalDelay” is the threshold time by which data can delayed from the oldest window start time
so if 2 events are published with event time “Mon Jan 24 14:28:35 IST 2022” and “Mon Jan 24 14:30:35 IST 2022” and “maxLateArrivalDelay” is 2 min then the minimum eventtime of the out of order data which will be processed is (14:28:00 - 2 min) = 14:26:00 and all data which comes before 14:26:00 will be discarded.
(14:28:00 is the starttime of oldest window,the other window starttime is 14:30:00)
Following is the sample create rule json :-
{
"ruleName": "JoiningWithWaterMark1",
"selectClause": "s1.temp as currtemp,s2.humidity as currhumidity,s1.feature as feature,s1.t as s1time,s2.t as s2time",
"joinClause": "(s1.feature = s2.feature) AND abs(s1.t - s2.t) <= 60",
"groupByField": "feature",
"triggerInterval": "60",
"maxLateArrivalDelay": "120",
"event": {
"name": "t",
"format": "dd-MMM-yyyy HH:mm:ss.SSS zzz"
},
"input": [
{
"type": "stream",
"alias": "s1",
"topic": "test1",
"properties": [
"temp",
"t"
]
},
{
"type": "stream",
"alias": "s2",
"topic": "test2",
"properties": [
"humidity",
"t"
]
}
],
"output": {
"type": "topic",
"name": "watermarktopic"
}}