Appendix 1

This appendix can be referred if the user wants to implement new custom action type in action service.

Introduction

The idea is to provide action services as a plug-in to the ActionModule on TCUP that can be triggered once the event is detected and processed in TCUP using MR and CEP service module. ActionModule is built on extensible applications architecture. We can add new action to the module without changing actual code. ActionModule will load classes dynamically with the help of Java Reflection API to perform different actions implemented by developer. This document will give a brief introduction on the steps that needs to followed to implement the different Action Services in ActionModule.

Steps to Create Custom Action Type

Developers of different Action services need to create a java class with two methods:

  • Set_Variable(String param_list_json)

  • Action(byte[] message)

The first method “Set_Variable(String param_list_json)” should be used for setting different parameter variable value needed for their action. These parameter name and value needs to be mentioned in the action rule created by the developer in parameter array as: * name * value * type * dataType

Users are not allowed to use any extra field other than these in the parameter array. Example- If we need to create an action to insert data in RDBMS, we need the value of connection URL, user name, password, database name etc. These details need to be mentioned in the rule as follows:

{
 "RuleName": "TestRule7",
 "RuleDescription": "checking",
 "noOfInstance": 1,
 "input": {
     "type": "topic",
     "exchange": "testTopicExchange1",
     "topic": "testTopic.*",
     "MQName": ""
             },
             "actions": [
             {
               "type": "RDBMSAction",
               "parameter": [
             {
                 "name": "Sensor",
                 "value": "observations:sensor",
                 "type": "columnName",
                 "dataType": "String"
             },
             {
                 "name": "Illumination",
                 "value": "observations:record:output:Illumination",
                 "type": "columnName",
                 "dataType": "integer"
             },
             {
                 "name": "dbName",
                 "value": "ActionModuleTEST"
             },
             {
                 "name": "username",
                 "value": "postgres"
             },
             {
                 "name": "password",
                 "value": "postgres"
             },
             {
                 "name": "tableName",
                 "value": "actionmod333"
             },

             {
                 "name": "connectorURL",
                 "value": "192.168.161.52:5432"
             },
             {
                 "name": "connectionTimeout",
                 "value": "5000"
             }
                             ]
                     }
             ]
     }

Defining Set_Variable Method

The Set_Variable method will be invoked once at the time of rule startup. ActionModule will pass the parameter list from the rule JSON in string format as a parameter of the method. The parameter list from the RDBMS rule JSON will be as follows:

{
             "parameter": [
                     {
                             "name": "Sensor",
                             "value": "observations:sensor",
                             "type": "columnName",
                             "dataType": "String"
                     },
                     {
                             "name": "Illumination",
                             "value": "observations:record:output:Illumination",
                             "type": "columnName",
                             "dataType": "integer"
                     },
                     {
                             "name": "dbName",
                             "value": "ActionModuleTEST"
                     },
                     {
                             "name": "username",
                             "value": "postgres"
                     },
                     {
                             "name": "password",
                             "value": "postgres"
                     },
                     {
                             "name": "tableName",
                             "value": "actionmod333"
                     },
                     {
                             "name": "connectorURL",
                             "value": "192.168.161.52:5432"
                     },
                     {
                             "name": "connectionTimeout",
                             "value": "5000"
                     }
                     ]
     }

It is the responsibility of the developers to parse the JSON and assign the parameter value in class variable (like: public String connectorURL;) for future use as follows:

public String Set_Variable(String param_list_json)
     {

             try{
                             JSONParser parser = new JSONParser();
                             JSONObject paramObject =(JSONObject) parser.parse(param_list_json);
                             JSONArray params = (JSONArray) paramObject.get("parameter");
                             for(Object param : params)
                             {
                                     JSONObject paramval=(JSONObject) param;
                                     String param_name= paramval.get("name").toString().trim();
                                     String param_val=paramval.get("value").toString().trim();
                                     String param_type=paramval.get("type").toString().trim();
                                     if(param_name.equals("connectorURL"))
                                     {
                                             connectorURL=param_val;
                                     }
                                     else if(param_name.equals("connectionTimeout"))
                                     {
                                             connectionTimeout=param_val;
                                     }
                                     else if(param_name.equals("dbName"))
                                     {
                                             dbName=param_val;
                                     }
                                     else if(param_name.equals("username"))
                                     {
                                             username=param_val;
                                     }
                                     else if(param_name.equals("password"))
                                     {
                                             password=param_val;

                                     }
                                     else if(param_name.equals("tableName"))
                                     {
                                             tableName=param_val;
                                     }
                                     else if(param_type.equals("columnName"))
                                     {
                                             String data_type=paramval.get("dataType").toString().trim();
                                             String[] results = param_val.split(":");
                                             String colname= param_name;
                                             String colval=results[results.length-1];
                                             System.out.println("colname::"+colname);
                                             System.out.println("colval::"+param_val);
                                             String coltype=data_type;
                                             colval_colname.put(colval, colname);
                                             colname_type.put(colname, coltype);
                                             column_name.add(colname);
                                             column_value.add(param_val);
                                             column_type.add(coltype);
                                     }
                             }
                     }
                     catch (Exception e) { }

                     return "Rule started successfully";
     }

Defining Action Method

The second method “Action(byte[] message)” performs the action which is invoked every time a rule receives an observation JSON from Topic/MQ/Direct Exchange. The parameter of this method is observation JSON as byte array. It is the responsibility of developers to convert byte array to string and then parse the observation JSON to perform the action as follows:

public String Action(byte[] message)
{
                     String json_str=new String(message, "UTF-8");
                     JsonNode json = mapper1.readTree(json_str);
                     JSONParser parser = new JSONParser();
                     JSONObject jsonObject =(JSONObject) parser.parse(json_str);
                     ArrayList<String> collist=new ArrayList<String>();
                     ArrayList<String> valuelist=new ArrayList<String>();
                     HashMap<String,String> outputmap = new HashMap<String,String>();
                     JSONArray observations=(JSONArray) jsonObject.get("observations");
                     JSONObject observation=(JSONObject)observations.get(0);
                     JSONArray records = (JSONArray) observation.get("record");
                     for (Object objt : records)
                     {
                             JSONObject record = (JSONObject) objt;
                             JSONArray output = (JSONArray) record.get("output");
                             for (Object out : output)
                             {
                                     JSONObject outputval = (JSONObject) out;
                                     String outputname=outputval.get("name").toString();
                                     String outputvalue=outputval.get("value").toString();
                                     System.out.println("outputname::"+outputname);
                                     System.out.println("outputvalue::"+outputvalue);
                                     outputmap.put(outputname, outputvalue);
                             }
                     }

                     //DO the necessary steps for your action (Example: insert data in table for RDBMS)

                     return "Success";       // status message
     }

Note

  • In case of schedule, feature is used for processing batch of data then the function signature will be as follows:

    public String Action(List<byte[]> message)

  • It is the responsibility of developers to convert the list of byte arrays to the observation JSON list to perform the custom action.

  • The developers should not use stdout statements in custom action to log any messages to the log file of Action service. If stdout statements are used for local debugging purpose they should be removed before creating the runnable jar of the custom action .They should use the logging framework to write to the log file as shown below :

public static final org.slf4j.Logger logger = LoggerFactory.getLogger(ClassName.class);
    logger.info("Write your message to log file");

Acknowledgement based Action Types

The aim is to provide data processing guarantee on Action service so that each message is processed at-least once. It will work with Consumer Acknowledgment architecture.Guaranteed processing is a design approach which need to be followed in the module/component involved in it. It is a joint responsibility of producer , rabbitmq broker and consumers. So here Action rule acts as consumer , rabbitmq broker need to be clustered, publisher need to set persistent flag while publishing message so that in the event of failure it will not be lost as it is backed by disk. Action rule should use durable queue as input for achieving guaranteed processing. In case of Exchange/topic, it should be bind to a durable queue and same queue to be used in the rule.

In case of guaranteed processing the data once consumed from broker are marked as “unacknowledged” and once they are processed by consumer(action rule) , they are acknowledged and removed from the rabbitmq broker. If acknowledgment is not sent, they will be in “unacknowledged” state and would be redeliver to the rule once rule is restarted.

Messages to be acknowledged both in case of success and failure of the messages processing. In case of exception, messages will be acknowledged except when Action types return exception messages which specifically append “NO_ACK” to the exception message. It is the responsibility of the custom action developer to append “NO_ACK” to exception messages for scenarios where acknowledgement of message is to be avoided. Sample code is as follows :

  catch(CustomException e)
  {
                       //e.printStackTrace();
                       logger.info("e.getErrorCode()::"+e.getErrorCode());

                       if(e.getErrorCode().contains("404")||e.getErrorCode().contains("504")||e.getErrorCode().contains("UnknownHostException")||e.getErrorCode().contains("SocketTimeoutException"))   // server down or page not found
                               throw (new CustomException(e.getMessage()+" NO_ACK"));
                       else
                               throw (new CustomException(e.getMessage()));
       }

* Consumer Pre Fetch :*

The aim is to provide consumer pre fetch functionality to limit the number of unacknowledged messages sent to the consumer (action type). This is used for flow control from rabbitmq broker to consumer which is action rule in this case .

  • Steps to make an action type as acknowledgement based and set prefetch count :

In the application.conf file of Action service , below parameters are need to be set :

message.guarantee.level=1 ( 1 for at-least once, 0 for at-most once) gurantee.actiontypes=”SOS” ( only SOS action type based rule will work in Acknowledgment based architecture.)

Multiple action types can be made ack-based by adding the type names using comma as shown below:

gurantee.actiontypes=”SOS,TMSAction” (SOS and TMSAction type based rule will work in Acknowledgement based architecture).

To set the prefetch count , need to mention the action type name and the count as shown below :

prefetch_count.SOS=50 prefetch_count.TMSAction=50

Prefetch count will be set to 50 for the respective action types with above configuration .

Note

  • prefetch count 0 means unlimited, if not set or defined it will treat as 0(default). This means rabbitmq will send all the messages to the action rule as soon as it receives them .

Kafka based Action Service

From TCUP 12 onward Action service can use apache kafka as message broker. .. Note:: * In case of Kafka based service if input type is topic then "exchange" will act as kafka topic and "topic" will act as kafka consumer group.In case "topic" not given ,Then default consumer group action_group_<ruleid> will be assigned. * If input type is MQ then "MQName" will act as kafka topic and default kafka consumer group (action_group_<ruleid>) will be assigned.