100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > IOTOS物联中台开发驱动支持NB-IoT气体感应报警器设备

IOTOS物联中台开发驱动支持NB-IoT气体感应报警器设备

时间:2020-08-06 11:21:50

相关推荐

IOTOS物联中台开发驱动支持NB-IoT气体感应报警器设备

本文章为原创,转载请注明出处!

登录平台:IOTOS®爱投斯物联中台

账号:iotos_test 密码:iotos123

代码地址:IOTOSDK-Python: IOTOS Python版本SDK,自带原生接口和采集引擎 ()

目录

前言

驱动目的

适用范围

使用示例

驱动代码

驱动解析

前言

窄带物联网(Narrow Band Internet of Things, NB-IoT)成为万物互联网络的一个重要分支。NB-IoT构建于蜂窝网络,只消耗大约180kHz的带宽,可直接部署于GSM网络、UMTS网络或LTE网络,以降低部署成本、实现平滑升级。NB-IoT的设备可以连接至三大运营商的IoT平台,方便了设备的对接

驱动目的

将连接至电信平台的NB气体感应报警器的数据拿到并上云展示

适用范围

品牌为JT-TCN,型号为5010的气体感应报警器 ,能将设备绑定在电信平台

使用示例

进入爱投斯中台,账号为iotos_test,密码为iotos123,在【创建模板】->【我的模板】,创建模板,填写相关信息,配置需要的参数,参数application为电信平台分配的唯一应用标识(在应用管理栏) ,参数key为电信平台分配的唯一应用标识(在应用管理栏),productId为电信平台的产品id ,deviceId为电信平台的设备id ,MasterKey在该设备所属产品的概况中可以查看 进入爱投斯中台,账号为iotos_test,密码为iotos123,创建网关 填好网关名称后点击确认 创建设备示例点击【我的设备】 -> 【通信网关】 -> 【设备实例】->【创建设备】 填写【设备名称】、选择刚刚创建的【模板驱动】和【通信网关】。 创建数据点,点击【系统设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例 点击右边的创建数据点,填写名称 并在高级配置中配置数据点的相关标志,“参考信号接收功率 ”,“燃气检测状态 ”,这两个为接收数据的两种数据类型的第一个点,配置"private"属性用于驱动识别,分别配置为show_data,gas_event,例如“参考信号接收功率”这个点的配置: 其他数据数据点中的属性需要填写"point"和"index",第一种类型的数据点中的point填写"参考信号接收功率"该数据的oid,第二类数据点中的point填写"燃气检测状态"该数据点的oid,第一类数据点中燃气AD值、IMEI、小区位置信息、物理小区标识、继电器状态、设备状态、信号与干扰加噪声比、无线信号覆盖等级、数据上报时间。index分别填写9、8、7、......、1。第二类数据点中报警时间。index填写1。例如,”报警时间“: 在【系统设备】 -> 【通信网关】中找到刚才创建的网关,点击【高级】 开启云网关,密码为账号密码 点击 【系统设备】 -> 【通信网关】 -> 【设备实例】->【数据点】,选择刚才创建的设备实例 即可查看数据已经上报成功

驱动代码

#coding=utf-8import syssys.path.append("..")from driver import *import timeimport datetimefrom urllib import urlencodeimport urllib2import base64import hmacimport jsonfrom hashlib import sha1reload(sys)sys.setdefaultencoding('utf8')#签名算法def signature(key, application, timestamp, param, body):code = "application:" + application + "\n" + "timestamp:" + timestamp + "\n"for v in param:code += str(v[0]) + ":" + str(v[1]) + "\n"if (body is not None) and (body.strip()) :code += body + '\n'return base64.b64encode(hash_hmac(key, code, sha1))#数据加密def hash_hmac(key, code, sha1):hmac_code = hmac.new(key.encode(), code.encode(), sha1)print("hmac_code=" + str(hmac_code.hexdigest()))return hmac_code.digest()#时间戳误差调整def getTimeOffset(url):request = urllib2.Request(url)start = int(time.time() * 1000)response = urllib2.urlopen(request)end = int(time.time() * 1000)if response is not None:return int(int(response.headers['x-ag-timestamp']) - (end + start) / 2);else:return 0baseUrl = 'https://ag-'timeUrl = 'https://ag-/echo'offset = getTimeOffset(timeUrl)#发送http请求函数def sendSDKRequest(path, head, param, body, version, application, MasterKey, key, method=None, isNeedSort=True,isNeedGetTimeOffset=False):paramList = []for key_value in param:paramList.append([key_value, param[key_value]])print("paramList=" + str(paramList))if (MasterKey is not None) and (MasterKey.strip()):paramList.append(['MasterKey', MasterKey])if isNeedSort:paramList = sorted(paramList)headers = {}if (MasterKey is not None) and (MasterKey.strip()):headers['MasterKey'] = MasterKeyheaders['application'] = applicationheaders['Date'] = str(datetime.datetime.now())headers['version'] = versiontemp = dict(param.items())if (MasterKey is not None) and (MasterKey.strip()):temp['MasterKey'] = MasterKeyurl_params = urlencode(temp)url = baseUrl + pathif (url_params is not None) and (url_params.strip()):url = url + '?' + url_paramsprint("url=" + str(url))global offsetif isNeedGetTimeOffset:offset = getTimeOffset(timeUrl)timestamp = str(int(time.time() * 1000) + offset)headers['timestamp'] = timestampsign = signature(key, application, timestamp, paramList, body)headers['signature'] = signheaders.update(head)print("headers : %s" % (str(headers)))if (body is not None) and (body.strip()):request = urllib2.Request(url=url, headers=headers, data=body.encode('utf-8'))else:request = urllib2.Request(url=url, headers=headers)if (method is not None):request.get_method = lambda: methodresponse = urllib2.urlopen(request)if ('response' in vars()):print("response.code: %d" % (response.code))return responseelse:return None#分页查询设备历史数据def getDeviceStatusHisInPage(appKey, appSecret, body):path = '/aep_device_status/getDeviceStatusHisInPage'head = {}param = {}version = '0928013337'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, None, key, 'POST')if response is not None:return response.read()return None#查询事件上报def QueryDeviceEventList(appKey, appSecret, MasterKey, body):path = '/aep_device_event/device/events'head = {}param = {}version = '0327064751'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')if response is not None:return response.read()return None#数据下发函数def CreateCommand(appKey, appSecret, MasterKey, body):path = '/aep_device_command/command'head = {}param = {}version = '0712225145'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')if response is not None:return response.read()return Noneclass Project(IOTOSDriverI):def InitComm(self,attrs):self.setPauseCollect(False)self.setCollectingOneCircle(False)self.online(True)try:#获取中台配置的参数(必不可少)self.Nbapplication = self.sysAttrs['config']['param']['application'] # APPKEYself.Nbkey = self.sysAttrs['config']['param']['key'] # APPScretself.NbMasterKey=self.sysAttrs['config']['param']['MasterKey'] #MasterKey# self.NbMasterKey = "5fef44837aa54f42a7a49b73a4d95a15"self.NbproductId = self.sysAttrs['config']['param']['productId']self.NbdeviceId = self.sysAttrs['config']['param']['deviceId']except Exception,e:self.debug(u'获取参数失败!'+e.message)def Collecting(self, dataId):try:cfgtmp = self.data2attrs[dataId]['config']# 过滤掉非采集点if cfgtmp["param"] == "":return ()# 过滤采集点if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:return ()else:self.debug(self.name(dataId))#请求需要用的参数timearry = (datetime.datetime.now() + datetime.timedelta(days=-29)).timetuple() # 当前时间减去29天后转化为timetuple的格式用于转换成timestamp格式begin_timestamp = str(int(time.mktime(timearry) * 1000) + offset)end_timestamp = str(int(time.time() * 1000) + offset)page_size = "100"page_timestamp = ""# 上传设备上传数据if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='show_data':body_HisInPage = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","begin_timestamp":"' + begin_timestamp + '","end_timestamp":"' + end_timestamp + '","page_size":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'#http请求,拿去设备上报的数据res = getDeviceStatusHisInPage(self.Nbapplication, self.Nbkey, body_HisInPage)data = json.loads(res)['deviceStatusList']self.debug(data)data_dic = {}flat = 0for i in data:if 'sample_ad_value' in i and flat != 1:data_dic.update(i)flat += 1elif flat == 1:breakself.debug(data_dic)data_list = []for key, value in data_dic.items():# 拿到时间戳数据时转化为时间if key == 'timestamp':value = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(value) / 1000))if key=='device_status':status={0:"正常",1:"燃气泄漏报警",2:"传感器故障",3:"自检",4:"失效"}value=status[value]if type(value) == unicode:# unicode转为str类型value = value.encode('utf-8')data_list.append(value)print (data_list)return tuple(data_list)# 上传事件上报的数据if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='gas_event':#查询事件上报的数据body_event = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","startTime":"' + begin_timestamp + '","endTime":"' + end_timestamp + '","pageSize":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'event_res = QueryDeviceEventList(self.Nbapplication, self.Nbkey, self.NbMasterKey, body_event)self.debug(event_res)event_data = json.loads(event_res)['result']['list']# self.debug(event_data)event_list = []for i in event_data:self.debug(i['eventContent'])if 'gas_sensor_state' in i['eventContent']:for key, value in eval(i['eventContent']).items():if key=="gas_sensor_state":status={0:"正常",1:"低浓度报警",2:"高浓度"}value=status[value]event_list.append(value)event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))event_list.append(event_time)self.debug(event_list)breakreturn tuple(event_list)return ()except Exception,e:self.debug(u'数据上传失败'+e.message)def Event_setData(self, dataId, value):return json.dumps({'code': 0, 'msg': '', 'data': ''})

驱动解析

编写环境为python2,首先先导入需要的依赖包

#coding=utf-8import syssys.path.append("..")from driver import *import timeimport datetimefrom urllib import urlencodeimport urllib2import base64import hmacimport jsonfrom hashlib import sha1reload(sys)sys.setdefaultencoding('utf8')

定义api请求的签名算法和发送http请求的参数

#签名算法def signature(key, application, timestamp, param, body):code = "application:" + application + "\n" + "timestamp:" + timestamp + "\n"for v in param:code += str(v[0]) + ":" + str(v[1]) + "\n"if (body is not None) and (body.strip()) :code += body + '\n'return base64.b64encode(hash_hmac(key, code, sha1))#数据加密def hash_hmac(key, code, sha1):hmac_code = hmac.new(key.encode(), code.encode(), sha1)print("hmac_code=" + str(hmac_code.hexdigest()))return hmac_code.digest()#时间戳误差调整def getTimeOffset(url):request = urllib2.Request(url)start = int(time.time() * 1000)response = urllib2.urlopen(request)end = int(time.time() * 1000)if response is not None:return int(int(response.headers['x-ag-timestamp']) - (end + start) / 2);else:return 0baseUrl = 'https://ag-'timeUrl = 'https://ag-/echo'offset = getTimeOffset(timeUrl)#发送http请求函数def sendSDKRequest(path, head, param, body, version, application, MasterKey, key, method=None, isNeedSort=True,isNeedGetTimeOffset=False):paramList = []for key_value in param:paramList.append([key_value, param[key_value]])print("paramList=" + str(paramList))if (MasterKey is not None) and (MasterKey.strip()):paramList.append(['MasterKey', MasterKey])if isNeedSort:paramList = sorted(paramList)headers = {}if (MasterKey is not None) and (MasterKey.strip()):headers['MasterKey'] = MasterKeyheaders['application'] = applicationheaders['Date'] = str(datetime.datetime.now())headers['version'] = versiontemp = dict(param.items())if (MasterKey is not None) and (MasterKey.strip()):temp['MasterKey'] = MasterKeyurl_params = urlencode(temp)url = baseUrl + pathif (url_params is not None) and (url_params.strip()):url = url + '?' + url_paramsprint("url=" + str(url))global offsetif isNeedGetTimeOffset:offset = getTimeOffset(timeUrl)timestamp = str(int(time.time() * 1000) + offset)headers['timestamp'] = timestampsign = signature(key, application, timestamp, paramList, body)headers['signature'] = signheaders.update(head)print("headers : %s" % (str(headers)))if (body is not None) and (body.strip()):request = urllib2.Request(url=url, headers=headers, data=body.encode('utf-8'))else:request = urllib2.Request(url=url, headers=headers)if (method is not None):request.get_method = lambda: methodresponse = urllib2.urlopen(request)if ('response' in vars()):print("response.code: %d" % (response.code))return responseelse:return None

定义查询设备上传上来的数据以及上报的报警事件

#分页查询设备历史数据def getDeviceStatusHisInPage(appKey, appSecret, body):path = '/aep_device_status/getDeviceStatusHisInPage'head = {}param = {}version = '0928013337'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, None, key, 'POST')if response is not None:return response.read()return None#查询事件上报def QueryDeviceEventList(appKey, appSecret, MasterKey, body):path = '/aep_device_event/device/events'head = {}param = {}version = '0327064751'application = appKeykey = appSecretresponse = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')if response is not None:return response.read()return None

继承IOTOSDriverI这个类,对驱动进行初始化,获取中台设备实例中的参数,用于发送http请求使用

class Project(IOTOSDriverI):def InitComm(self,attrs):self.setPauseCollect(False)self.setCollectingOneCircle(False)self.online(True)try:#获取中台配置的参数(必不可少)self.Nbapplication = self.sysAttrs['config']['param']['application'] # APPKEYself.Nbkey = self.sysAttrs['config']['param']['key'] # APPScretself.NbMasterKey=self.sysAttrs['config']['param']['MasterKey'] #MasterKey# self.NbMasterKey = "5fef44837aa54f42a7a49b73a4d95a15"self.NbproductId = self.sysAttrs['config']['param']['productId']self.NbdeviceId = self.sysAttrs['config']['param']['deviceId']except Exception,e:self.debug(u'获取参数失败!'+e.message)

采集函数,利用数据点的参数先排除非采集点,再利用设置的私有属性,当采集的数据点到含有私有属性的数据点时,进行相关变量的定义,并且查询设备上报的数据或者上报的报警信息,将其进行处理后以标准的格式输出,进行批量上传

def Collecting(self, dataId):try:cfgtmp = self.data2attrs[dataId]['config']# 过滤掉非采集点if cfgtmp["param"] == "":return ()# 过滤采集点if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:return ()else:self.debug(self.name(dataId))#请求需要用的参数timearry = (datetime.datetime.now() + datetime.timedelta(days=-29)).timetuple() # 当前时间减去29天后转化为timetuple的格式用于转换成timestamp格式begin_timestamp = str(int(time.mktime(timearry) * 1000) + offset)end_timestamp = str(int(time.time() * 1000) + offset)page_size = "100"page_timestamp = ""# 上传设备上传数据if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='show_data':body_HisInPage = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","begin_timestamp":"' + begin_timestamp + '","end_timestamp":"' + end_timestamp + '","page_size":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'#http请求,拿去设备上报的数据res = getDeviceStatusHisInPage(self.Nbapplication, self.Nbkey, body_HisInPage)data = json.loads(res)['deviceStatusList']self.debug(data)data_dic = {}flat = 0for i in data:if 'sample_ad_value' in i and flat != 1:data_dic.update(i)flat += 1elif flat == 1:breakself.debug(data_dic)data_list = []for key, value in data_dic.items():# 拿到时间戳数据时转化为时间if key == 'timestamp':value = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(value) / 1000))if key=='device_status':status={0:"正常",1:"燃气泄漏报警",2:"传感器故障",3:"自检",4:"失效"}value=status[value]if type(value) == unicode:# unicode转为str类型value = value.encode('utf-8')data_list.append(value)print (data_list)return tuple(data_list)# 上传事件上报的数据if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='gas_event':#查询事件上报的数据body_event = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","startTime":"' + begin_timestamp + '","endTime":"' + end_timestamp + '","pageSize":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'event_res = QueryDeviceEventList(self.Nbapplication, self.Nbkey, self.NbMasterKey, body_event)self.debug(event_res)event_data = json.loads(event_res)['result']['list']# self.debug(event_data)event_list = []for i in event_data:self.debug(i['eventContent'])if 'gas_sensor_state' in i['eventContent']:for key, value in eval(i['eventContent']).items():if key=="gas_sensor_state":status={0:"正常",1:"低浓度报警",2:"高浓度"}value=status[value]event_list.append(value)event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))event_list.append(event_time)self.debug(event_list)breakreturn tuple(event_list)return ()except Exception,e:self.debug(u'数据上传失败'+e.message)def Event_setData(self, dataId, value):return json.dumps({'code': 0, 'msg': '', 'data': ''})

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。