100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > autobahn-python的使用——sendMessage()和断线自动重连

autobahn-python的使用——sendMessage()和断线自动重连

时间:2019-01-15 19:26:56

相关推荐

autobahn-python的使用——sendMessage()和断线自动重连

客户端代码:

# -*-coding:utf8-*-import threadingimport timefrom autobahn.asyncio.websocket import WebSocketClientProtocolfrom autobahn.asyncio.websocket import WebSocketClientFactoryimport asyncioclass BaseSipClientProtocol(WebSocketClientProtocol):KEEPALIVE_INTERVAL = 5def check_keepalive(self):last_interval = time.time() - self.last_ping_timeif last_interval > 2 * self.KEEPALIVE_INTERVAL:# drop connectionself.dropConnection(abort=True)else:# reschedule next checkself.schedule_keepalive()def schedule_keepalive(self):""" Store the future in the class to cancel it later. """try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asyncioloop = asyncio.get_event_loop()self.keepalive_fut = loop.call_later(self.KEEPALIVE_INTERVAL,self.check_keepalive)def onConnect(self, response):print("Server connected: {0}".format(response.peer))# save connection to server handle on factoryself.factory.saveConnectionToServer(self)self.onConnectToServer(response.peer)def onOpen(self):""" Start scheduling the keepalive check. """self.last_ping_time = time.time()self.schedule_keepalive()def onPing(self, payload):""" Respond to the ping request. """self.last_ping_time = time.time()self.sendPong(payload)print('Ping == ', payload)def connection_lost(self, exc):""" Cancel the scheduled future. """self.keepalive_fut.cancel()try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asyncioloop = asyncio.get_event_loop()loop.stop()def onMessage(self, payload, isBinary):# 做一些处理,peer mapping msgself.onMsgReceived(self.peer, payload, isBinary)def onClose(self, wasClean, code, reason):print("WebSocket connection closed: {0}".format(reason))# remove connection to serverself.factory.delConnectionToServer()self.onDisConnectFromServer(wasClean, code, reason)# client连接上server的时候回调def onConnectToServer(self, peer):pass# client断开与server的连接的时候回调def onDisConnectFromServer(self, wasClean, code, reason):pass# 收到Msg消息时回调def onMsgReceived(self, peer, data, isBinary):print('received {0} from {1}'.format(data, peer))# 保存一个与server连接的句柄class BaseSipClientFactory(WebSocketClientFactory):_connectionToServer = None# save connection to serverdef saveConnectionToServer(self, connectedHandle):self._connectionToServer = connectedHandle# remove connection to serverdef delConnectionToServer(self):self._connectionToServer = Nonedef getConnectionToServer(self):return self._connectionToServer# support sendMsg to clientdef sendMsg(self, data):if self._connectionToServer is not None:if isinstance(data,bytes):self._connectionToServer.sendMessage(data, True)else:self._connectionToServer.sendMessage(data.encode('utf-8'))else:raise Exception('与server的连接不存在')if __name__ == '__main__':try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asynciofactory = BaseSipClientFactory(u"ws://192.168.88.3:9009")factory.protocol = BaseSipClientProtocolloop = asyncio.get_event_loop()while True:fut = loop.create_connection(factory, '192.168.88.3', 9009)try:transport, protocol = loop.run_until_complete(asyncio.wait_for(fut, 5))loop.run_forever()except asyncio.TimeoutError:print('TimeoutError')continueexcept OSError as err:print('OSError == ' + str(err))# a little timeout before trying againloop.run_until_complete(asyncio.sleep(5))loop.close()

服务端代码:

# -*-coding:utf8-*-from autobahn.asyncio.websocket import WebSocketServerFactoryfrom autobahn.asyncio.websocket import WebSocketServerProtocolclass BaseSipServerProtocol(WebSocketServerProtocol):def onConnect(self, request):print("Client connecting: {0}".format(request.peer))# save connection in factory _connectionSetsself.factory.addConnection(request.peer, self)self.onClientConnected(request.peer)def onOpen(self):print("WebSocket connection open.")def onMessage(self, payload, isBinary):# 做一些处理,peer mapping msgself.onMsgReceived(self.peer, payload, isBinary)def onClose(self, wasClean, code, reason):print("WebSocket connection closed: {0}".format(reason))# remove connection from factory _connectionSetsself.factory.removeConnection(self)self.onClientLostConnected(wasClean, code, reason)# client连接上来的时候回调def onClientConnected(self, peer):pass# client断开连接的时候回调def onClientLostConnected(self, wasClean, code, reason):pass# 收到Msg消息时回调def onMsgReceived(self, peer, data, isBinary):print('received {0} from {1}'.format(data, peer))class BaseSipServerFactory(WebSocketServerFactory):_connectionSets = dict()# save connectiondef addConnection(self, peer, connectedHandle):self._connectionSets.setdefault(peer, connectedHandle)# remove connectiondef removeConnection(self, connectedHandle):removePeer = Nonefor k, v in self._connectionSets.items():if v == connectedHandle:removePeer = kbreakif removePeer is not None:del self._connectionSets[removePeer]def getConnectionByPeer(self, peer):return self._connectionSets.get(peer)def getConnections(self):return self._connectionSets# support sendMsg to clientdef sendMsg(self, peer, data):connectedHandle = self.getConnectionByPeer(peer)if connectedHandle is not None:if isinstance(data,bytes):connectedHandle.sendMessage(data, True)else:connectedHandle.sendMessage(data.encode('utf-8'))else:raise Exception('peer的连接不存在')

启动Server:

class CommunicationTool:Server = 1Client = 0def __init__(self):self.isAutoReconnect = False# 设置创建的Server还是Clientdef setFlag(self, flag):self.flag = flag# 创建serverdef createServer(self, addr, port):# 先判断创建的类型if self._isServer():if isinstance(addr, str) and isinstance(port, int):self.addr = addrself.port = portelse:raise Exception('createServer的参数类型有误')else:raise Exception('不支持client类型执行此方法')def startListen(self):# 先判断必填参数是否都填了if self._isServer():if self.serverProtocol is None:raise Exception('未设置protocol')if self.serverFactory is None:raise Exception('未设置factory')if self.addr is None or self.port is None:raise Exception('未设置createServer')else:raise Exception('未设置flag或不支持此方法')# 开启服务器,应该在子线程中一直运行# 调用父类的startListen方法,将数据传入factory = self.serverFactoryfactory.protocol = self.serverProtocolif self.isAutoReconnect: # set auto-reconnectionfactory.setProtocolOptions(autoPingInterval=5, autoPingTimeout=2)loop = asyncio.get_event_loop()coro = loop.create_server(factory, self.addr, self.port)server = loop.run_until_complete(coro)serverThread = threading.Thread(target=self._run, args=(loop, server), name='serverThread')serverThread.start()def _run(self, loop, server):try:loop.run_forever()except KeyboardInterrupt:passfinally:server.close()loop.close()# 设置是否自动断线重连def setAutoReconnect(self, isAutoReconnect):if isinstance(isAutoReconnect, bool):self.isAutoReconnect = isAutoReconnectelse:raise Exception('参数类型错误')def setServerProtocol(self, protocol):if self._isServer():self.serverProtocol = protocolelse:raise Exception('Client类型不能调用此方法')def setServerFactory(self, factory):if self._isServer():self.serverFactory = factoryelse:raise Exception('Client类型不能调用此方法')def _isServer(self):if self.flag == self.Server:return Trueif self.flag == self.Client:return Falseelse:raise Exception('未设置flag')if __name__ == '__main__':cTool = CommunicationTool() # 传入webSocket的地址cTool.setFlag(CommunicationTool.Server) # 设置类型cTool.setServerProtocol(BaseSipServerProtocol)factory = BaseSipServerFactory(u"ws://127.0.0.1:9009")cTool.setServerFactory(factory)factory.setProtocolOptions(autoPingInterval=5)cTool.setAutoReconnect(True) # 设置是否自动断线重连cTool.createServer('0.0.0.0', 9009) # 创建servercTool.startListen() # 开启server监听

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。