Introduction¶
The essential characteristic of connectors allow users to interact and explore capabilities with underlying TCUP Restful servives , such as sensors observation service, datalakes service , Message routing service etc. Connectors, provides the simplest way for users to link their data along with ongoing services for shaping customized apps.
Pyconnector is a python3 library interface that extends access to TCUP data, devices and RESTful microservices. It coordinates as one stop activity solution to python based applications as well as for managing tailored client applications. It allows user to ingest data to TCUP and retrieve/query data from TCUP data stores.
Purpose/Usage¶
TCUP pyconnector plays pivotal role in building connections with TCUP services involving the rest APIs directly. The connector can be installed directly in any python environment (version 3.7 and above). Basically, users need to import the pyconnector library for handling scheduled and random executions for processing sensor data and information. User can leverage pyconnector library for data analysis, building own workflows and apps. TCUP connectors are organized into these groups:
Connector Type
Description
Service connectors
Provide access to built-in TCUP REST APIs,receive and respond to requests, publish the results
Protocol oriented connectors
Manage on-premises data publishing, these connectors map different protocols between publish-subscriber nodes such as HTTP, WS, AQMP-based request, MQTT etc.
For example, to train any model, huge volume of historical data can be uploaded in a file format by using DLS Service Connector. In addition, ongoing stream of data can be posted and viewed using any Protocol oriented connectors. Finally, the required operation is performed on the dataframes returned by the SOS Service Connector to meet custom requirements.
TCUP Sensor Observation Service : Using SOS, user can register sensor, manage sensor data, enable search and sophisticated querying as mentioned in the list below
Get list of sensors/features and observation properties as deployed in TCUP cloud.
Provides access to a set of APIs that helps applications, sensors, gateways to store and retrieve sensor observation data.
Represents payload of sensor metadata, sensor capabilities from TCUP data store in tabulated form.
Any comma seperated (csv) observation data file with predefined column name format can be uploaded.
Based on varying parameters and requirements, observation data can be searched, returned as dataframes and downloaded as csv file if record founds.
Message routing, Web Socket and MQTT : Message routing controls data using message service by stored or rejected based on simple API call. It uses queue and direct exchanges to build powerful application. Main functions are as follows
Allows users to communicate multiple services in the platform through its underline AQMP-based Message Routing and Web Socket protocols.
Get all the observations from the particular sensors and pass all the observation values to the specific topic as per existing rules.
Allows users to communicate with a queue in the RabbitMQ, a message broker which can accept and forward messages and perform different oparations on it.
Involves publish-subscribe based messaging protocol MQTT to communicate with topic exchanges and message queues.
TCUP Data Lake Service : Data Lake Service (DLS) is a set of REST end-points to upload and download files from TCUP Data Lake as mentioned below.
Allows users to upload Heterogeneous data files like program files, log files, binary files, analytics output files, device firmware packages etc. in Distributed File Systems in TCUP cloud.
Download files from TCUP datalake and returns dataframes for given files for data analysis.
Searches files based on meta-data, share files across various DLS users and link a file to another based on Ad-hoc relationships.
TCUP Task Service : Task Service enables an user to run many batch jobs concurrently in background. Task Service is a also distributed over cluster of machines and making it more reliable and scalable platform. Task Service API can be leveraged to create and manage, various tasks created by user. It supports following runtime environments: java, cpp, c, py, js and spark. Main functions are listed below
Provides access to create task, Deploy task with executable file, dependent files (in a .zip file), input files (in a .zip file) and runtime arguments, start/pause/resume or stop jobs.
Get list of tasks and associated project details as deployed in TCUP.
Deploy delayed task with execution time in future. In case of recurrent task, deploy task with recurrence interval (daily, weekly, etc.) using Cron pattern, and optional argument of recurrence limit (0 = infinite, or > 0).
Based on requirements, tasks can be rescheduled, copied to another task and linked to other external jobs.
TCUP Workflow Service : Workflow Service is used to schedule, orchestrate and monitor execution of multiple tasks in sequence or in parallel. The service is primarily leveraged to implement data processing pipelines where the output of one task goes as the input of another task, therefore series of tasks can be created to do complex data processing. All these interdependent tasks are finally combined into single unit as a workflow which can execute them in sequence or in parallel to achieve the final goal. Main functions are listed below
Provides access to create workflow by setting list of interdependent tasks to be executed and creates schedule.
From the schedule, get the process definition and run/abort instance of the process.
Based on requirements, workflows can be recovered/pauses/resumed. It can display the various tasks already executed as part of the process instance, along with the task(s) that are currently running.
Prerequisite¶
To get started with TCUP connectors, users must have subscription of TCUP and a basic understanding of python 3.8.
To built a set of custom apps, user need an API key for accessing microservices. In a security context, JWT mode authentication requires additional symmetric key.
For publishing records in Direct Exchanges, valid routing key is required.
To use this library, following Python supported third-party libraries and frameworks installation is essential. Libraries can be downloaded using pip or conda.
user@user-ThinkCentre:~$ pip install <package name> or,
user@user-ThinkCentre:~$ conda install <package name>
Package name
Description
pandas
pandas is a Python package providing fast, flexible, and expressive data structures designed to make working with structured and time series data both easy and intuitive.
prettytable
PrettyTable is a simple Python library designed to make it quick and easy to represent tabular data in visually appealing ASCII tables.
sseclient-py
This is a Python client library for iterating over http Server Sent Event (SSE) streams.
paho-mqtt
This package provides a client class which enable applications to connect to an MQTT broker to publish messages, and to subscribe to topics and receive published messages.
websocket-client
websocket-client module is WebSocket client for python. This provide the low level APIs for WebSocket.
Download and install the following .whl file.
user@user-ThinkCentre:~$ pip install pyconnector-0.0.1-py3-none-any.whl
Working with connector examples¶
Import pyConnector and load necessary configuration
>>> # import pyconnector library
>>> import pyconnector
>>> # Load the configuration details from a file or using explicit parameters
>>> conn = pyconnector.connectors(('/home/user/Desktop/MyDocuments/PythonConnectors/ConfigFile.conf') # If configuration parameters present in a file
>>> conn = pyconnector.connectors( api_key='4e506310aca4c9a1979e708d7c2ee07b4e3a4d8332af32250ec2fc1460d0e8bf', URL= 'https://in.tcupiot.com' ) # If user specifies configuration details
Working with Sensor observation connector
>>> # Load Service oriented connector for Sensor observation
>>> sos = conn.SensorObservation()
>>> # For example, to get the count of all sensors and features :
>>> print(sos.getCount())
>>> # To get the output as pretty table format:
>>> print(sos.getCount(responseType = 'table'))
>>> # To get the datadrame representation of observation present in any sensor-feature :
>>> print(sos.getObservation(feature="tf1",sensor="ts1",responseType='dataframe'))
Working with Datalake connector
>>> # Load Service oriented connector for Datalake
>>> dls = conn.Datalake()
>>> # For example, to get all the existing relations present in DLS :
>>> print(dls.getRelation())
>>> # To get the output as pretty table format:
>>> print(dls.getRelation(responseType = 'table'))
>>> # To get dataframe representation of any file:
>>> print(dls.getFile(fileUri =["user1/dlsadmin/csv/SampleFlightData1/SampleFlightData1.csv"],responseType='dataframe'))
Working with Message Routing and Web Socket connector
>>> # Load protocol oriented connector for Message routing and web socket
>>> srws = conn.srwsUtils()
>>> json = srws.data('ts1',{'obs1':'test_1','obs2':1},'tf1')
>>> print(srws.SendRequestExchange(json))
>>> print(srws.WebSocketPublish(json)) # Publish SOS supported json
>>> print(srws.webSocketSubTopic("ex1","test")) # subscribe to topic-test and exchange name-ex1
Working with MQTT connector
>>> # Load protocol oriented connector for MQTT
>>> mqtt = conn.mqttUtils()
>>> sub = conn.mqttSubscribe("in.tcupiot.com",8883,"user1@tcup:user1","user1","amq.topic","intcupiotcom.crt")
>>> def callback(json):
... import pprint
... pprint.pprint(json)
>>> sub.main(callback) # Subscribe to MQTT gateway
>>> json = mqtt.payload('s1',{'obs1':1,'obs2':'test_1'},'f1')
>>> pub = conn.mqttPublish("in.tcupiot.com",8883,"user1@tcup:user1","user1","amq.topic","intcupiotcom.crt")
>>> pub.main() # Publish SOS supported json in MQTT gateway
>>> pub.disconnect()
Working with Task connector
>>> # Load Service oriented connector for Task
>>> myTask = conn.Task(projectName='testProj',jobName='testJob')
>>> print (myTask.deployTask(filename='SampleFile.py', runTime = 'py', overwrite_deployed=True))
>>> myTask.startTask()
>>> dTask = conn.DelayedTask(delayedTime='15-07-2019 12:35:56.789 IST', timeZone = 'IST', projectName=myTask.projectId, jobName='dTask112')
>>> dTask.deployTask(filename='SampleFile.py',runTime = 'py', overwrite_deployed=True)
>>> dTask.rescheduleTask(delayedTime='16-07-2019 12:35:56.789 IST')
>>> rTask = conn.RecurrentTask(cronPatternType = 'fixed', hour='4', day='6', projectId=myTask.projectId, jobName='rTask112')
>>> rTask.deployTask(filename='SampleFile.py',runTime = 'py', overwrite_deployed=True)
>>> rTask.rescheduleTask(cronPatternType='Fixed', hour='5', day='7')
>>> print(rTask.getJobByName(jobName = 'rTask112'))
>>> myTask.deleteTask()
>>> myTask.deleteProject()
Working with Workflow connector
>>> # Load Service oriented connector for Workflow
>>> t1 = conn.Task(projectId='P9909',jobId='T123')
>>> res = t1.deployTask(filename='code1.py', runTime = 'py', input_dir_path='data1.zip')
>>> t2 = conn.Task(projectId='P9909',jobId='T124')
>>> res = t2.deployTask(filename='code2.py', runTime = 'py')
>>> res = t2.externalFileLinking(externalProjectIds=[t1.projectId], externalTaskIds=[t1.jobId], fileNames = ['out.out'])
>>> wf = conn.Workflow(taskList=[(t1,t2)], processName='procTest1', processVersion='verTest1')
>>> wf_proc = wf.preInitiate()
>>> res = wf_proc.getMapping()
>>> def getJodId(res,jobId):
>>> response = res['taskIds']
>>> for res in response:
>>> if res['baseTaskId'] == jobId:
>>> return res['copiedTaskId']
>>> t3 = conn.Task(projectId=t2.projectId,jobId=getJodId(res,t2.jobId))
>>> res = t3.externalFileLinking(externalProjectIds=[t2.projectId], externalTaskIds=[getJodId(res,t1.jobId)], fileNames = ['out.out'])
>>> wf_proc.status()
>>> wf_proc.initiate()
>>> wf.selectRecoveryMode(True)
>>> wf_proc.recover()
>>> wf_proc.getHistory()