100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > IOTOS物联中台JT808驱动开发实例

IOTOS物联中台JT808驱动开发实例

时间:2021-12-10 11:00:39

相关推荐

IOTOS物联中台JT808驱动开发实例

本文章为原创,转载请注明出处!

登录平台:IOTOS®爱投斯物联中台

账号:iotos_test 密码:iotos123

代码地址:IOTOSDK-Python: IOTOS Python版本SDK,自带原生接口和采集引擎 ()

目录

前言

驱动目的

适用范围

使用示例

驱动代码

驱动解析

前言

JT 808泛指JT/T808协议,是指交通部制定的运输行业通信标准,全称《交通运输行业标准-道路运输车辆卫星定位系统终端通信协议及数据格式》可至交通部下载协议

JT808规定了道路运输车辆卫星定位系统车载终端与监管/监控平台之间的通信协议与数据格式,包括协议基础、通信连接、消息处理、协议分类与要求及数据格式,适用于道路运输车辆卫星定位系统车载终端和监管/监控平台之间的通信

驱动目的

处理终端转发过来的数据并且上传至中台

适用范围

符合JT808协议规范的车辆终端

使用示例

进入爱投斯中台,账号为iotos_test,密码为iotos123,在【创建模板】->【我的模板】,创建模板,填写相关信息,配置需要的参数 创建网关 填好网关名称后点击确认 创建设备示例点击【系统设备】 -> 【通信网关】 -> 【设备实例】->【创建设备】 填写【设备名称】、选择刚刚创建的【模板驱动】和【通信网关】。 创建数据点(有点表可以之间导入忽略创建数据点这几步),点击【系统设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例 点击右边的创建数据点,填写名称 并在高级配置中配置数据点的相关标志,需要拿多少数据就可以配置多少数据,目前测试的数据:方向、高度、状态、报警、纬度、经度、速度、油耗、里程、消息接收时间、终端手机号(名称可以修改),又要要可以增加,需要在数据点里面的param中配置相应的"private"属性: ,例如:

各数据点的配置如下:

在【系统设备】 -> 【通信网关】中找到刚才创建的网关,点击【高级】 开启云网关,密码为账号密码 点击 【系统设备】 -> 【通信网关】 -> 【设备实例】->【数据点】,选择刚才创建的设备实例 即可查看数据已经上报成功

驱动代码

import syssys.path.append("..")from driver import *import timeimport threading#coding:utf-8"""JT808泛指JT/T808协议,是指交通部制定的运输行业通信标准,全称《交通运输行业标准 - 道路运输车辆卫星定位系统终端通信协议及数据格式》"""# JT808设备消息监听、处理import asyncoreimport reimport datetimeimport binasciiglobal data_last,jt808_portdata_last=''jt808_port=''def BCC_Check(data):data = data.replace(' ', '')n = int(len(data) / 2)xor = 0for r in range(n):xor ^= int(data[2 * r:2 * r + 2], 16)xor = hex(xor)if len(xor) != 4:xor = '0' + xor[2:]else:# 返回16进制字符串xor = xor[2:]return xordef loca_report(rule, data):loca_data = {}rule = rule + r'(?P<alarm_data>\w{8})(?P<status>\w{8})(?P<lat>\w{8})(?P<lng>\w{8})(?P<hight>\w{4})(?P<speed>\w{4})(?P<dirct>\w{4})(?P<dev_upload>\w{12})'# 获取剩余的为解析的attch_data = data[len(re.search(rule, data).group()):]data = re.search(rule, data).groupdict()alarm_data = data['alarm_data']# print(alarm_data)alarm_happen = ''for x in range(len(alarm_data)):alarm_happen += '{:04b}'.format(int(alarm_data[x], 16))# print("alarm_happen " + alarm_happen)# 解析报警信息all_alarm = []if alarm_happen[-1] == '1':all_alarm.append('sos_alarm') # 紧急报警if alarm_happen[-2] == '1':all_alarm.append('speed_alarm') # 超速if alarm_happen[-3] == '1':all_alarm.append('fatigue_drive') # 疲劳驾驶if alarm_happen[-6] == '1':all_alarm.append('antenna_alarm') # 天线故障if alarm_happen[-8] == '1':all_alarm.append('low_power') # 低电压报警if alarm_happen[-9] == '1':all_alarm.append('power_cut') # 断电报警if alarm_happen[-18] == '1':all_alarm.append('remove_alarm') # 拆除报警if alarm_happen[-29] == '1':all_alarm.append('illegal_move_alarm') # 非法位移报警# print(all_alarm)# 解析状态status = data['status']print("status " + status)status_happen = ''for x in range(len(status)):status_happen += '{:04b}'.format(int(status[x], 16))# print("status_happen " + status_happen)all_status = []if status_happen[-1] == '1':all_status.append('acc_on')else:all_status.append('acc_off')if status_happen[-2] == '1':all_status.append('track')else:all_status.append('un_track')if status_happen[-11] == '0':all_status.append('oil_normal')else:all_status.append('oil_cut')if status_happen[-12] == '0':all_status.append('ele_normal')else:all_status.append('ele_cut')# 经度纬度、高度、速度、方向的数据转换lat = int(data['lat'], 16) / pow(10, 6)lng = int(data['lng'], 16) / pow(10, 6)hight = str(int(data['hight'], 16))speed = str(int((int(data['speed'], 16) / 10)))dirct = str(int(data['dirct'], 16))dev_upload = data['dev_upload']dev_upload = '20%s-%s-%s %s:%s:%s' % (dev_upload[0:2], dev_upload[2:4], dev_upload[4:6], dev_upload[6:8], dev_upload[8:10], dev_upload[10:12])# print(data)if attch_data:# print('attch_data',attch_data)def handle_atth(d):attch_rule = r'(?P<msg_id>\w{2})(?P<msg_len>\w{2})'# print('d==>',d)msg_head = d[0:4]res = re.search(attch_rule, msg_head).groupdict()# print('msg_head==>',res)msg_id = res['msg_id'] # 附加消息idmsg_len = int(res['msg_len'], 16) * 2 # 取附加消息长度value = d[4:4 + msg_len] # 取附加消息# print('value==>',value)if msg_id == '01':value = int(value, 16) / 10loca_data['mileage'] = value # 里程elif msg_id == '02':value = int(value, 16) / 10loca_data['oil'] = value # 油耗elif msg_id == '03':value = int(value, 16) / 10loca_data['speed'] = value # 速度elif msg_id == '30':value = int(value, 16)loca_data['rssi'] = value # 通讯信号强度elif msg_id == '31':value = int(value, 16)loca_data['gnss_num'] = value # 定位卫星颗数else:returnhandle_len = 4 + msg_len # 取后续附加消息(厂家自定)next_data = d[handle_len:]if len(next_data) > 0:handle_atth(next_data) # 迭代取出handle_atth(attch_data)loca_data.update(lat=lat, lng=lng, alarm=str(all_alarm), status=str(all_status), hight=hight, speed=speed,dirct=dirct, dev_upload=dev_upload)return loca_datadef jt808_resopnes(responekind, mesg_head, mesg_result):response = '0'auth = '313233343536'mesg_id = mesg_head['mesg_id']phone_num = mesg_head['phone_num']mesg_num = mesg_head['mesg_num']if responekind == '8001':mesg_perp = '%04x' % int(len(mesg_num + mesg_id) / 2 + 1)response_body = responekind + mesg_perp + phone_num + '0000' + mesg_num + mesg_id + mesg_resultelif responekind == '8100':mesg_perp = '%04x' % int(len(mesg_num + auth) / 2 + 1)response_body = responekind + mesg_perp + phone_num + '0000' + mesg_num + mesg_result + authresponse_check = BCC_Check(response_body)response_body = response_body + response_checkresponse_body = response_body.replace('7d', '7d01')response = '7e' + response_body.replace('7e', '7d02') + '7e'response = binascii.a2b_hex(response)return responsedef jt808_analysis(data, serv_receive, device_id='0'):response = b'0'all_data = {}all_data['device_id'] = device_idall_data['save_kind'] = 'no'all_data['serv_receive'] = serv_receive# ascii转16进制字符 ??????????????data = binascii.hexlify(data).decode()# print(data) # 7e0200002119501001053900000000000000030209323406b471d000000000014019091609270701040000025d2d7e# data='7e010000210147852369000046000B04575331303030533130303030303031323334353637327ca4425039344a35ef7e'# 去掉头尾标识符,进行转译处理(转义还原"7d02"转换为"7e"后"7d01"转换为7d)data = data[2:-2].replace('7d02', '7e')data = data.replace('7d01', '7d')# print(data) # 0200002119501001053900000000000000030209323406b471d000000000014019091609270701040000025d2d# 校验if BCC_Check(data[0:-2]) == data[-2:]:print('check-ok!')else:print('chec—failed')all_data['response'] = responsereturn all_data# 去掉校验码,获取需要解析的数据字段data = data[0:-2]print(data) # 0200002119501001053900000000000000030209323406b471d000000000014019091609270701040000025d# 数据头处理,命令id,是否包含分包处理,电话号码,数据流水号rule = r'(?P<mesg_id>\w{4})(?P<mesg_explain>\w{4})(?P<phone_num>\w{12})(?P<mesg_num>\w{4})'head_len = len(re.search(rule, data).group())result = re.search(rule, data).groupdict()# print(result)mesg_id = result['mesg_id'] # 消息头(消息ID)print(mesg_id)phone_num = result['phone_num'] # 消息头(终端手机号)print(phone_num)mesg_num = result['mesg_num'] # 消息头(消息流水号)print(mesg_num)all_data['device_id'] = str(int(phone_num))response = jt808_resopnes('8001', result, '00')# 解析消息体属性mesg_explain = ''for x in range(len(result['mesg_explain'])):mesg_explain += '{:04b}'.format(int(result['mesg_explain'][x], 16))print(mesg_explain)mesg_split = mesg_explain[2]mesg_encryption = mesg_explain[5]# print(mesg_explain)if mesg_split == '1':rule = rule + r'(?P<mesg_total_num>\w{4})' + r'(?P<mesg_order_num>\w{2})'if mesg_encryption == '1':# 消息体RSA解密转码print('RSA_algorithm_encryption')if mesg_id == '0001': # 终端回应平台下发指令rule = rule + r'(?P<answer_num>\w{4})(?P<answer_id>\w{4})(?P<command_result>\w{2})'data = re.search(rule, data).groupdict()# print('0001:终端回应平台下发指令',data)# 设备登录elif mesg_id == '0100':# if len(data)-head_len==76:rule = rule + r'(?P<Provincial_ID>\w{4})(?P<City_County_ID>\w{4})(?P<Manufacturer_ID>\w{10})(?P<Terminal_model>\w{16})(?P<Terminal_ID>\w{14})(?P<car_num_color>\w{2})(?P<car_identification>\w{2})'data = re.search(rule, data).groupdict()all_data.update(data)response = jt808_resopnes('8100', result, '00')all_data['save_kind'] = 'yes'# 主要的位置上报信息elif mesg_id == '0200':print(data)print(rule)loca_data = loca_report(rule, data)print(loca_data)all_data.update(loca_data)all_data['save_kind'] = 'yes'elif mesg_id == '0201':data_head = data[0:head_len]data_body = data[head_len:]# 把消息体的前4位,应答流水号去掉,重新组成数据进行处理handle_data = data_head + data_body[4:]# 处理0200位置信息loca_data = loca_report(rule, data)all_data.update(loca_data)all_data['save_kind'] = 'yes'print('0201:终端查询位置信息回复')elif mesg_id == '0704':print('0704:多条数据上传')rule = rule + r'(?P<data_num>\w{4})(?P<data_type>\w{2})(?P<detail>\w*)'res = re.search(rule, data).groupdict()data_num = int(res['data_num'], 16)type_dict = {'00': '正常', '01': '补报'}data_type = type_dict[res['data_type']]detail = res['detail']multi_rule = rule = r'(?P<mesg_explain>\w{4})(?P<phone_num>\w{12})(?P<mesg_num>\w{4})'all_loca = []for x in range(data_num):data_len = int(detail[0:4], 16) * 2handle_len = 4 + data_lendata_body = detail[4:handle_len]data_res = loca_report('', data_body)detail = detail[handle_len:]lng = data_res.get('lng')if lng not in [0.0, '', '0', None]:data_res['device_id'] = all_data['device_id']data_res['serv_receive'] = serv_receiveall_loca.append(data_res)all_data['save_kind'] = 'yes'all_data['all_loca'] = all_locaall_data['response'] = response# print('all_data',all_data)return all_dataclass Jt808Handler(asyncore.dispatcher_with_send):res_data=''buffer_data={}# 预处理,丢弃不完整的数据包def pre_handle(self,datas):data_handled=[]print("datas"+str(datas))#判断是否为有效数据if datas.startswith(b'~') and datas.endswith(b'~'):datas=datas.split(b'~')datas.pop() #去掉列表最后一个元素print(datas)for data in datas:if data!=b'':data=b'~'+data+b'~'data_handled.append(data)# print(data_handled) #处理后的data_handle变成了列表形式# if data_handled[0]==b'~\x02\x00\x00"\x01A\x19P\x10\x01\x059\x00\x00\x00\x00\x00\x00\x00\x03\x02\t24\x06\xb4q\xd0\x00\x00\x00\x00\x01@\x19\t\x16\t\'\x07\x01\x04\x00\x00\x02]-~':# print(1111111111111111111111111111111111111111111111111)return data_handledelse:return 'buffer_data'# 存储功能def handle_save(sefl,data):pass# 数据解析,存储,回复def handle_read(self):# print(self.buffer_data)serv_receive=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')data = self.recv(8192)# print(data)data=self.pre_handle(data)if data!='buffer_data':# print(data)for d in data:print("d:"+str(d))dev_id=self.buffer_data.get(str(self.addr),'0')res=jt808_analysis(d,serv_receive,dev_id)print("res = "+str(res))if res.get('device_id')!=None:self.buffer_data[res['device_id']]=selfself.buffer_data[str(self.addr)]=res['device_id']else:res['device_id']=self.buffer_data[str(self.addr)]save=res.pop('save_kind')response=res.pop('response')if save=='yes':self.handle_save(res)if response!=b'0':self.send(response)# print("111111111111"+str(res))global data_lastdata_last=resdef writable(self):return self.res_data# 设备链接监听class Jt808Server(asyncore.dispatcher):def __init__(self, host, port):asyncore.dispatcher.__init__(self)self.create_socket()self.set_reuse_addr()self.bind((host, port))self.listen(5)def handle_accepted(self, sock, addr):print('Incoming connection from %s' % repr(addr))handler = Jt808Handler(sock)# print("handler = "+str(handler))#服务器监听线程class EchoServerThread(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):server = Jt808Server('', jt808_port)asyncore.loop()class JT808(IOTOSDriverI):def InitComm(self,attrs):global jt808_portjt808_port=self.sysAttrs['config']['param']['PORT']self.debug(jt808_port)self.setPauseCollect(False)self.setCollectingOneCircle = Trueself.online(True)EchoServerThread().start()def Collecting(self, dataId):if data_last=='':return ()self.debug(1111111111111111111111111)cfgtmp = self.data2attrs[dataId]['config']if cfgtmp["param"] == "":return ()# 过滤采集点if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:return ()else:self.debug(self.name(dataId))# 上传数据if 'private' in cfgtmp['param']:for data_key,data_value in data_last.items():if data_key==cfgtmp['param']['private']:self.debug(3333333333333333333333333333333)self.debug(data_value)self.setValue(self.name(dataId),data_value)return ()

驱动解析

运行环境为python3,先导入驱动和数据解析所需的包

from driver import *import timeimport threadingimport asyncoreimport reimport datetimeimport binascii

定义数据解析的函数,包括校验检验函数、消息体数据解析函数、消息回应生成函数和消息包解析函数

global data_last,jt808_portdata_last=''jt808_port=''def BCC_Check(data):data = data.replace(' ', '')n = int(len(data) / 2)xor = 0for r in range(n):xor ^= int(data[2 * r:2 * r + 2], 16)xor = hex(xor)if len(xor) != 4:xor = '0' + xor[2:]else:# 返回16进制字符串xor = xor[2:]return xordef loca_report(rule, data):loca_data = {}rule = rule + r'(?P<alarm_data>\w{8})(?P<status>\w{8})(?P<lat>\w{8})(?P<lng>\w{8})(?P<hight>\w{4})(?P<speed>\w{4})(?P<dirct>\w{4})(?P<dev_upload>\w{12})'# 获取剩余的为解析的attch_data = data[len(re.search(rule, data).group()):]data = re.search(rule, data).groupdict()# print(data)# print('attch_data',attch_data) #01040000025dalarm_data = data['alarm_data']# print(alarm_data)alarm_happen = ''for x in range(len(alarm_data)):alarm_happen += '{:04b}'.format(int(alarm_data[x], 16))# print("alarm_happen " + alarm_happen)# 解析报警信息all_alarm = []if alarm_happen[-1] == '1':all_alarm.append('sos_alarm') # 紧急报警if alarm_happen[-2] == '1':all_alarm.append('speed_alarm') # 超速if alarm_happen[-3] == '1':all_alarm.append('fatigue_drive') # 疲劳驾驶if alarm_happen[-6] == '1':all_alarm.append('antenna_alarm') # 天线故障if alarm_happen[-8] == '1':all_alarm.append('low_power') # 低电压报警if alarm_happen[-9] == '1':all_alarm.append('power_cut') # 断电报警if alarm_happen[-18] == '1':all_alarm.append('remove_alarm') # 拆除报警if alarm_happen[-29] == '1':all_alarm.append('illegal_move_alarm') # 非法位移报警# print(all_alarm)# 解析状态status = data['status']print("status " + status)status_happen = ''for x in range(len(status)):status_happen += '{:04b}'.format(int(status[x], 16))# print("status_happen " + status_happen)all_status = []if status_happen[-1] == '1':all_status.append('acc_on')else:all_status.append('acc_off')if status_happen[-2] == '1':all_status.append('track')else:all_status.append('un_track')if status_happen[-11] == '0':all_status.append('oil_normal')else:all_status.append('oil_cut')if status_happen[-12] == '0':all_status.append('ele_normal')else:all_status.append('ele_cut')# 经度纬度、高度、速度、方向的数据转换lat = int(data['lat'], 16) / pow(10, 6)lng = int(data['lng'], 16) / pow(10, 6)hight = str(int(data['hight'], 16))speed = str(int((int(data['speed'], 16) / 10)))dirct = str(int(data['dirct'], 16))dev_upload = data['dev_upload']dev_upload = '20%s-%s-%s %s:%s:%s' % (dev_upload[0:2], dev_upload[2:4], dev_upload[4:6], dev_upload[6:8], dev_upload[8:10], dev_upload[10:12])# print(data)if attch_data:# print('attch_data',attch_data)def handle_atth(d):attch_rule = r'(?P<msg_id>\w{2})(?P<msg_len>\w{2})'# print('d==>',d)msg_head = d[0:4]res = re.search(attch_rule, msg_head).groupdict()# print('msg_head==>',res)msg_id = res['msg_id'] # 附加消息idmsg_len = int(res['msg_len'], 16) * 2 # 取附加消息长度value = d[4:4 + msg_len] # 取附加消息# print('value==>',value)if msg_id == '01':value = int(value, 16) / 10loca_data['mileage'] = value # 里程elif msg_id == '02':value = int(value, 16) / 10loca_data['oil'] = value # 油耗elif msg_id == '03':value = int(value, 16) / 10loca_data['speed'] = value # 速度elif msg_id == '30':value = int(value, 16)loca_data['rssi'] = value # 通讯信号强度elif msg_id == '31':value = int(value, 16)loca_data['gnss_num'] = value # 定位卫星颗数else:returnhandle_len = 4 + msg_len # 取后续附加消息(厂家自定)next_data = d[handle_len:]if len(next_data) > 0:handle_atth(next_data) # 迭代取出handle_atth(attch_data)loca_data.update(lat=lat, lng=lng, alarm=str(all_alarm), status=str(all_status), hight=hight, speed=speed,dirct=dirct, dev_upload=dev_upload)return loca_datadef jt808_resopnes(responekind, mesg_head, mesg_result):response = '0'auth = '313233343536'mesg_id = mesg_head['mesg_id']phone_num = mesg_head['phone_num']mesg_num = mesg_head['mesg_num']if responekind == '8001':mesg_perp = '%04x' % int(len(mesg_num + mesg_id) / 2 + 1)response_body = responekind + mesg_perp + phone_num + '0000' + mesg_num + mesg_id + mesg_resultelif responekind == '8100':mesg_perp = '%04x' % int(len(mesg_num + auth) / 2 + 1)response_body = responekind + mesg_perp + phone_num + '0000' + mesg_num + mesg_result + authresponse_check = BCC_Check(response_body)response_body = response_body + response_checkresponse_body = response_body.replace('7d', '7d01')response = '7e' + response_body.replace('7e', '7d02') + '7e'response = binascii.a2b_hex(response)return responsedef jt808_analysis(data, serv_receive, device_id='0'):# data=b'~\x02\x00\x00"\x01A\x19P\x10\x01\x059\x00\x00\x00\x00\x00\x00\x00\x03\x02\t24\x06\xb4q\xd0\x00\x00\x00\x00\x01@\x19\t\x16\t\'\x07\x01\x04\x00\x00\x02]-~'response = b'0'all_data = {}all_data['device_id'] = device_idall_data['save_kind'] = 'no'all_data['serv_receive'] = serv_receive# ascii转16进制字符 ??????????????data = binascii.hexlify(data).decode()# print(data) # 7e0200002119501001053900000000000000030209323406b471d000000000014019091609270701040000025d2d7e# data='7e010000210147852369000046000B04575331303030533130303030303031323334353637327ca4425039344a35ef7e'# 去掉头尾标识符,进行转译处理(转义还原"7d02"转换为"7e"后"7d01"转换为7d)data = data[2:-2].replace('7d02', '7e')data = data.replace('7d01', '7d')# print(data) # 0200002119501001053900000000000000030209323406b471d000000000014019091609270701040000025d2d# 校验if BCC_Check(data[0:-2]) == data[-2:]:print('check-ok!')else:print('chec—failed')all_data['response'] = responsereturn all_data# 去掉校验码,获取需要解析的数据字段data = data[0:-2]print(data) # 0200002119501001053900000000000000030209323406b471d000000000014019091609270701040000025d# 数据头处理,命令id,是否包含分包处理,电话号码,数据流水号rule = r'(?P<mesg_id>\w{4})(?P<mesg_explain>\w{4})(?P<phone_num>\w{12})(?P<mesg_num>\w{4})'head_len = len(re.search(rule, data).group())result = re.search(rule, data).groupdict()# print(result)mesg_id = result['mesg_id'] # 消息头(消息ID)print(mesg_id)phone_num = result['phone_num'] # 消息头(终端手机号)print(phone_num)mesg_num = result['mesg_num'] # 消息头(消息流水号)print(mesg_num)all_data['device_id'] = str(int(phone_num))response = jt808_resopnes('8001', result, '00')# 解析消息体属性mesg_explain = ''for x in range(len(result['mesg_explain'])):mesg_explain += '{:04b}'.format(int(result['mesg_explain'][x], 16))print(mesg_explain)mesg_split = mesg_explain[2]mesg_encryption = mesg_explain[5]# print(mesg_explain)if mesg_split == '1':rule = rule + r'(?P<mesg_total_num>\w{4})' + r'(?P<mesg_order_num>\w{2})'if mesg_encryption == '1':# 消息体RSA解密转码print('RSA_algorithm_encryption')if mesg_id == '0001': # 终端回应平台下发指令rule = rule + r'(?P<answer_num>\w{4})(?P<answer_id>\w{4})(?P<command_result>\w{2})'data = re.search(rule, data).groupdict()# print('0001:终端回应平台下发指令',data)# 设备登录elif mesg_id == '0100':# if len(data)-head_len==76:rule = rule + r'(?P<Provincial_ID>\w{4})(?P<City_County_ID>\w{4})(?P<Manufacturer_ID>\w{10})(?P<Terminal_model>\w{16})(?P<Terminal_ID>\w{14})(?P<car_num_color>\w{2})(?P<car_identification>\w{2})'data = re.search(rule, data).groupdict()all_data.update(data)response = jt808_resopnes('8100', result, '00')all_data['save_kind'] = 'yes'# 主要的位置上报信息elif mesg_id == '0200':print(data)print(rule)loca_data = loca_report(rule, data)print(loca_data)all_data.update(loca_data)all_data['save_kind'] = 'yes'elif mesg_id == '0201':data_head = data[0:head_len]data_body = data[head_len:]# 把消息体的前4位,应答流水号去掉,重新组成数据进行处理handle_data = data_head + data_body[4:]# 处理0200位置信息loca_data = loca_report(rule, data)all_data.update(loca_data)all_data['save_kind'] = 'yes'print('0201:终端查询位置信息回复')elif mesg_id == '0704':print('0704:多条数据上传')rule = rule + r'(?P<data_num>\w{4})(?P<data_type>\w{2})(?P<detail>\w*)'res = re.search(rule, data).groupdict()data_num = int(res['data_num'], 16)type_dict = {'00': '正常', '01': '补报'}data_type = type_dict[res['data_type']]detail = res['detail']multi_rule = rule = r'(?P<mesg_explain>\w{4})(?P<phone_num>\w{12})(?P<mesg_num>\w{4})'all_loca = []for x in range(data_num):data_len = int(detail[0:4], 16) * 2handle_len = 4 + data_lendata_body = detail[4:handle_len]data_res = loca_report('', data_body)detail = detail[handle_len:]lng = data_res.get('lng')if lng not in [0.0, '', '0', None]:data_res['device_id'] = all_data['device_id']data_res['serv_receive'] = serv_receiveall_loca.append(data_res)all_data['save_kind'] = 'yes'all_data['all_loca'] = all_locaall_data['response'] = response# print('all_data',all_data)return all_data

设置预处理函数和asyncore异步通信的服务端线程

class Jt808Handler(asyncore.dispatcher_with_send):res_data=''buffer_data={}# 预处理,丢弃不完整的数据包def pre_handle(self,datas):data_handled=[]print("datas"+str(datas))#判断是否为有效数据if datas.startswith(b'~') and datas.endswith(b'~'):datas=datas.split(b'~')datas.pop() #去掉列表最后一个元素print(datas)for data in datas:if data!=b'':data=b'~'+data+b'~'data_handled.append(data)# print(data_handled) #处理后的data_handle变成了列表形式# if data_handled[0]==b'~\x02\x00\x00"\x01A\x19P\x10\x01\x059\x00\x00\x00\x00\x00\x00\x00\x03\x02\t24\x06\xb4q\xd0\x00\x00\x00\x00\x01@\x19\t\x16\t\'\x07\x01\x04\x00\x00\x02]-~':# print(1111111111111111111111111111111111111111111111111)return data_handledelse:return 'buffer_data'# 存储功能def handle_save(sefl,data):pass# 数据解析,存储,回复def handle_read(self):# print(self.buffer_data)serv_receive=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')data = self.recv(8192)# print(data)data=self.pre_handle(data)if data!='buffer_data':# print(data)for d in data:print("d:"+str(d))dev_id=self.buffer_data.get(str(self.addr),'0')res=jt808_analysis(d,serv_receive,dev_id)print("res = "+str(res))if res.get('device_id')!=None:self.buffer_data[res['device_id']]=selfself.buffer_data[str(self.addr)]=res['device_id']else:res['device_id']=self.buffer_data[str(self.addr)]save=res.pop('save_kind')response=res.pop('response')if save=='yes':self.handle_save(res)if response!=b'0':self.send(response)# print("111111111111"+str(res))global data_lastdata_last=resdef writable(self):return self.res_data# 设备链接监听class Jt808Server(asyncore.dispatcher):def __init__(self, host, port):asyncore.dispatcher.__init__(self)self.create_socket()self.set_reuse_addr()self.bind((host, port))self.listen(5)def handle_accepted(self, sock, addr):print('Incoming connection from %s' % repr(addr))handler = Jt808Handler(sock)# print("handler = "+str(handler))#服务器监听线程class EchoServerThread(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):server = Jt808Server('', jt808_port)asyncore.loop()

对驱动进行初始化,获取中台配置的"PORT"参数,启动服务的线程

class JT808(IOTOSDriverI):def InitComm(self,attrs):global jt808_portjt808_port=self.sysAttrs['config']['param']['PORT']self.debug(jt808_port)self.setPauseCollect(False)self.setCollectingOneCircle = Trueself.online(True)EchoServerThread().start()

编写采集函数,先判断是否有数据转发过来没有就推出,有的话过滤掉非采集点,判断采集点并且根据每个数据点的私有属性“private”进行数据的上传

def Collecting(self, dataId):if data_last=='':return ()cfgtmp = self.data2attrs[dataId]['config']if cfgtmp["param"] == "":return ()# 过滤采集点if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:return ()else:self.debug(self.name(dataId))# 获取用于数据下发的点if 'private' in cfgtmp['param']:for data_key,data_value in data_last.items():if data_key==cfgtmp['param']['private']:self.debug(3333333333333333333333333333333)self.debug(data_value)self.setValue(self.name(dataId),data_value)return ()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。