100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > 长连接 短连接 心跳机制与断线重连

长连接 短连接 心跳机制与断线重连

时间:2018-09-20 11:22:02

相关推荐

长连接  短连接 心跳机制与断线重连

在不同场景下要考虑长连接还是短连接,那么我们要先了解他。

短连接

概念

client与server通过三次握手建立连接,client发送请求消息,server返回响应,一次连接就完成了。

这时候双方任意都可以发起close操作,不过一般都是client先发起close操作。上述可知,短连接一般只会在 client/server间传递一次请求操作。

短连接的优缺点

管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。

使用场景

通常浏览器访问服务器的时候就是短连接,也就是服务接口。

对于服务端来说,长连接会耗费服务端的资源,而且用户用浏览器访问服务端相对而言不是很频繁的

如果有几十万,上百万的连接,服务端的压力会非常大,甚至会崩溃。

所以对于并发量大,请求频率低的,建议使用短连接。

长连接

什么是长连接

client向server发起连接,server接受client连接,双方建立连接。

Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。

长连接的生命周期

正常情况下,一条TCP长连接建立后,只要双不提出关闭请求并且不出现异常情况,这条连接是一直存在的.操作系统不会自动去关闭它,甚至经过物理网络拓扑的改变之后仍然可以使用。

所以一条连接保持几天、几个月、几年或者更长时间都有可能,只要不出现异常情况或由用户(应用层)主动关闭。客户端和服务单可一直使用该连接进行数据通信。

长连接的优点

长连接可以省去较多的TCP建立和关闭的操作,减少网络阻塞的影响,

当发生错误时,可以在不关闭连接的情况下进行提示,

减少CPU及内存的使用,因为不需要经常的建立及关闭连接。

长连接的缺点

连接数过多时,影响服务端的性能和并发数量。

使用场景

数据库的连接就是采用TCP长连接.

RPC,远程服务调用,在服务器,一个服务进程频繁调用另一个服务进程,可使用长连接,减少连接花费的时间。

最近在维护物联网设备上报数据到服务端,采用的netty长连接,所以记录一下短连接和长连接的区别。

总结

1.对于长连接和短连接的使用是需要根据应用场景来判断的。

2.长连接并不是万能的,也是需要维护的。

长连接的实现

心跳机制

应用层协议大多都有HeartBeat机制,通常是客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线。

并传输一些可能必要的数据。使用心跳包的典型协议是IM,比如QQ/MSN/飞信等协议。

在TCP的机制里面,本身是存在有心跳包的机制的,也就是TCP的选项:SO_KEEPALIVE。

系统默认是设置的2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。

而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。

为什么需要心跳机制?

因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等,

会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的.

心跳机制即可解决此类问题。

TCP协议的KeepAlive机制

默认KeepAlive状态是不打开的。

需要将setsockopt将SOL_SOCKET.SO_KEEPALIVE设置为1才是打开KeepAlive状态,

并且可以设置三个参数:

tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl

分别表示:连接闲置多久开始发keepalive的ack包、发几个ack包不回复才当对方已断线、两个ack包之间的间隔。

很多网络设备,尤其是NAT路由器,由于其硬件的限制(例如内存、CPU处理能力),无法保持其上的所有连接,因此在必要的时候,会在连接池中选择一些不活跃的连接踢掉。

典型做法是LRU,把最久没有数据的连接给T掉。

通过使用TCP的KeepAlive机制(修改那个time参数),可以让连接每隔一小段时间就产生一些ack包,以降低被踢掉的风险,当然,这样的代价是额外的网络和CPU负担。

如何实现心跳机制?

两种方式实现心跳机制:

使用 TCP 协议层面的 keepalive 机制.

在应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

它不是 TCP 的标准协议, 并且是默认关闭的.

TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.

TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量,

这里主要介绍应用层方面实现心跳机制,使用netty实现心跳和断线重连。

netty实现心跳机制

netty对心跳机制提供了机制,实现的关键是IdleStateHandler先来看一下他的构造函数

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}

实例化一个 IdleStateHandler 需要提供三个参数:

readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.

writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.

allIdleTimeSeconds, 读和写都超时. 即当在指定的时间间隔内没有读并且写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

netty心跳流程

1. 客户端成功连接服务端。

2.在客户端中的ChannelPipeline中加入IdleStateHandler,设置写事件触发事件为5s.

3.客户端超过5s未写数据,触发写事件,向服务端发送心跳包,

4.同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,减轻服务端的压力

5.超过三次,1过0s服务端都会收到来自客户端的心跳信息,服务端可以认为客户端挂了,可以close链路。

6.客户端恢复正常,发现链路已断,重新连接服务端。

代码实现

服务端handler:

package com.heartbreak.server;import ty.channel.ChannelHandlerContext;import ty.channel.SimpleChannelInboundHandler;import ty.handler.timeout.IdleState;import ty.handler.timeout.IdleStateEvent;import java.util.Random;/*** @author janti* @date /6/10 12:21*/public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {// 失败计数器:未收到client端发送的ping请求private int unRecPingTimes = 0;// 定义服务端没有收到心跳消息的最大次数private static final int MAX_UN_REC_PING_TIMES = 3;private Random random = new Random(System.currentTimeMillis());@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {if (msg!=null && msg.equals("Heartbeat")){System.out.println("客户端"+ctx.channel().remoteAddress()+"--心跳信息--");}else {System.out.println("客户端----请求消息----:"+msg);String resp = "商品的价格是:"+random.nextInt(1000);ctx.writeAndFlush(resp);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state()==IdleState.READER_IDLE){System.out.println("===服务端===(READER_IDLE 读超时)");// 失败计数器次数大于等于3次的时候,关闭链接,等待client重连if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {System.out.println("===服务端===(读超时,关闭chanel)");// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连ctx.close();} else {// 失败计数器加1unRecPingTimes++;}}else {super.userEventTriggered(ctx,evt);}}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);System.out.println("一个客户端已连接");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.out.println("一个客户端已断开连接");}}

服务端server:

package com.heartbreak.server;import ty.bootstrap.ServerBootstrap;import ty.channel.*;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.SocketChannel;import ty.channel.socket.nio.NioServerSocketChannel;import ty.handler.codec.string.StringDecoder;import ty.handler.codec.string.StringEncoder;import ty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** @author tangj* @date /6/10 10:46*/public class HeartBeatServer {private static int port = 9817;public HeartBeatServer(int port) {this.port = port;}ServerBootstrap bootstrap = null;ChannelFuture f;// 检测chanel是否接受过心跳数据时间间隔(单位秒)private static final int READ_WAIT_SECONDS = 10;public static void main(String args[]) {HeartBeatServer heartBeatServer = new HeartBeatServer(port);heartBeatServer.startServer();}public void startServer() {EventLoopGroup bossgroup = new NioEventLoopGroup();EventLoopGroup workergroup = new NioEventLoopGroup();try {bootstrap = new ServerBootstrap();bootstrap.group(bossgroup, workergroup).channel(NioServerSocketChannel.class).childHandler(new HeartBeatServerInitializer());// 服务器绑定端口监听f = bootstrap.bind(port).sync();System.out.println("server start ,port: "+port);// 监听服务器关闭监听,此方法会阻塞f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossgroup.shutdownGracefully();workergroup.shutdownGracefully();}}private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 监听读操作,读超时时间为5秒,超过5秒关闭channel;pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", new HeartbeatServerHandler());}}}

客户端handler

package com.heartbreak.client;import ty.buffer.ByteBuf;import ty.buffer.Unpooled;import ty.channel.ChannelHandlerContext;import ty.channel.EventLoop;import ty.channel.SimpleChannelInboundHandler;import ty.handler.timeout.IdleState;import ty.handler.timeout.IdleStateEvent;import ty.util.CharsetUtil;import ty.util.ReferenceCountUtil;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.TimeUnit;/*** @author tangj* @date /6/11 22:55*/public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{private HeartBeatClient client;private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",CharsetUtil.UTF_8));public HeartBeatClientHandler(HeartBeatClient client) {this.client = client;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到服务端回复:"+msg);if (msg.equals("Heartbeat")) {ctx.write("has read message from server");ctx.flush();}ReferenceCountUtil.release(msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());}} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.err.println("客户端与服务端断开连接,断开的时间为:"+format.format(new Date()));// 定时线程 断线重连final EventLoop eventLoop = ctx.channel().eventLoop();eventLoop.schedule(new Runnable() {@Overridepublic void run() {client.doConncet();}}, 10, TimeUnit.SECONDS);}}

客户端启动:

package com.heartbreak.client;import ty.bootstrap.Bootstrap;import ty.buffer.ByteBuf;import ty.channel.*;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.SocketChannel;import ty.channel.socket.nio.NioSocketChannel;import ty.handler.codec.string.StringDecoder;import ty.handler.codec.string.StringEncoder;import ty.handler.timeout.IdleStateHandler;import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.Random;import java.util.concurrent.TimeUnit;/*** @author tangj* @date /6/10 16:18*/public class HeartBeatClient {private Random random = new Random();public Channel channel;public Bootstrap bootstrap;protected String host = "127.0.0.1";protected int port = 9817;public static void main(String args[]) throws Exception {HeartBeatClient client = new HeartBeatClient();client.run();client.sendData();}public void run() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new SimpleClientInitializer(HeartBeatClient.this));doConncet();} catch (Exception e) {e.printStackTrace();}}/*** 发送数据* @throws Exception*/public void sendData() throws Exception {BufferedReader in = new BufferedReader(new InputStreamReader(System.in));while (true){String cmd = in.readLine();switch (cmd){case "close" :channel.close();break;default:channel.writeAndFlush(in.readLine());break;}}}/*** 连接服务端*/public void doConncet() {if (channel != null && channel.isActive()) {return;}ChannelFuture channelFuture = bootstrap.connect(host, port);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture futureListener) throws Exception {if (channelFuture.isSuccess()) {channel = futureListener.channel();System.out.println("connect server successfully");} else {System.out.println("Failed to connect to server, try connect after 10s");futureListener.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doConncet();}}, 10, TimeUnit.SECONDS);}}});}private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {private HeartBeatClient client;public SimpleClientInitializer(HeartBeatClient client) {this.client = client;}@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new IdleStateHandler(0, 5, 0));pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("handler", new HeartBeatClientHandler(client));}}}

运行结果:

1.客户端长时间未发送心跳包,服务端关闭连接

server start ,port: 9817一个客户端已连接===服务端===(READER_IDLE 读超时)===服务端===(READER_IDLE 读超时)===服务端===(READER_IDLE 读超时)===服务端===(READER_IDLE 读超时)===服务端===(读超时,关闭chanel)一个客户端已断开连接

2.客户端发送心跳包,服务端和客户端保持心跳信息

一个客户端已连接客户端/127.0.0.1:55436--心跳信息--客户端/127.0.0.1:55436--心跳信息--客户端/127.0.0.1:55436--心跳信息--客户端/127.0.0.1:55436--心跳信息--

3.服务单宕机,断开连接,客户端进行重连

客户端与服务端断开连接,断开的时间为:-06-12 23:47:12Failed to connect to server, try connect after 10sFailed to connect to server, try connect after 10sFailed to connect to server, try connect after 10sconnect server successfully

代码实例:资源包

最终指定基于Zookeeper + Kafka + Netty + Redis实现一个支持高并发高可用分布式集群部署的服务端,长连接客户端设备,消息发送到kafka供下游数据消费分析。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。