如何实现Netty框架中服务器端的消息推送?

如何实现Netty框架中服务器端的消息推送?,第1张

netty框架是用在服务器端,客户端是嵌入式编程,通过自定义的tcp通信协议进行连接的,现在需求是这样的,服务器端只是用来和客户端进行通信,现在有第三方如微信端进行支付成功后在数据库里生成了一条数据,表示要往某个客户端发送指令,以下两种方式可供参考:

1、微信端生成通讯指令后调用TCP端的接口(负责通讯程序和数据库交互的),在接口程序中通过定义Socket连到通讯程序服务器端,根据通道编号去发送,但是这种会导致服务器端的tcp客户端连接变得更多。

2、直接在netty框架中定义了scheduleAtF。

当然也可借助第三方工具来完成推送。例如极光推送,极光推送具有以下功能:

1、多种消息类型

开发者可以轻松地通过极光发送各个移动平台的系统通知,还可以在控制台编辑多种富文本展示模板; 极光还提供自定义消息的透传,客户端接到消息内容后根据自己的逻辑自由处理。

2、用户和推送统计

完整的消息生命周期查询,并且可以形成“推送报表”与“用户统计报表”呈现给开发者,用来观察推送的效果和应用发展趋势。

3、短信补充

通过极光后台推送APP通知消息,对于一些重要又不能遗漏的信息可以调用极光短信的后台对未收到的客户端发送短信通知,保证消息的可靠性。

4、A/B 测试

合理的推送能够激活用户,提高用户粘性,使用A/B分组测试的科学方法,根据测试反馈的结果,帮助开发者选择最优化的推送方案。

5、极光推送安全包

为金融、新闻、政务及其他对推送安全要求极高的客户提供安全严谨、稳定可靠的信息推送解决方案

6、可定制的私有云

对于安全性要求更高,希望推送数据和系统存储在自己服务器的客户,及个性化需求需要定制开发的,性能更高要求的,或者想拥有自己推送平台的甚至要求源码授权二次开发的开发者,极光提供全功能的私有云解决方案。

深圳市和讯华谷信息技术有限公司(极光 Aurora Mobile,纳斯达克股票代码:JG)成立于2011年,是中国领先的开发者服务提供商,专注于为开发者提供稳定高效的消息推送、一键认证以及流量变现等服务,助力开发者的运营、增长与变现。同时,极光的行业应用已经拓展至市场洞察、金融风控与商业地理服务,助力各行各业优化决策、提升效率。

netty为什么快呢?这是因为netty底层使用了JAVA的NIO技术,并在其基础上进行了性能的优化,虽然netty不是单纯的JAVA nio,但是netty的底层还是基于的是nio技术。

nio是JDK1.4中引入的,用于区别于传统的IO,所以nio也可以称之为new io。

nio的三大核心是Selector,channel和Buffer,本文我们将会深入探究NIO和netty之间的关系。

在讲解netty中的NIO实现之前,我们先来回顾一下JDK中NIO的selector,channel是怎么工作的。对于NIO来说selector主要用来接受客户端的连接,所以一般用在server端。我们以一个NIO的服务器端和客户端聊天室为例来讲解NIO在JDK中是怎么使用的。

因为是一个简单的聊天室,我们选择Socket协议为基础的ServerSocketChannel,首先就是open这个Server channel:

然后向server channel中注册selector:

虽然是NIO,但是对于Selector来说,它的select方法是阻塞方法,只有找到匹配的channel之后才会返回,为了多次进行select操作,我们需要在一个while循环里面进行selector的select操作:

selector中会有一些SelectionKey,SelectionKey中有一些表示操作状态的OP Status,根据这个OP Status的不同,selectionKey可以有四种状态,分别是isReadable,isWritable,isConnectable和isAcceptable。

当SelectionKey处于isAcceptable状态的时候,表示ServerSocketChannel可以接受连接了,我们需要调用register方法将serverSocketChannel accept生成的socketChannel注册到selector中,以监听它的OP READ状态,后续可以从中读取数据:

当selectionKey处于isReadable状态的时候,表示可以从socketChannel中读取数据然后进行处理:

上面的serverResponse方法中,从selectionKey中拿到对应的SocketChannel,然后调用SocketChannel的read方法,将channel中的数据读取到byteBuffer中,要想回复消息到channel中,还是使用同一个socketChannel,然后调用write方法回写消息给client端,到这里一个简单的回写客户端消息的server端就完成了。

接下来就是对应的NIO客户端,在NIO客户端需要使用SocketChannel,首先建立和服务器的连接:

然后就可以使用这个channel来发送和接受消息了:

向channel中写入消息可以使用write方法,从channel中读取消息可以使用read方法。

这样一个NIO的客户端就完成了。

虽然以上是NIO的server和client的基本使用,但是基本上涵盖了NIO的所有要点。接下来我们来详细了解一下netty中NIO到底是怎么使用的。

以netty的ServerBootstrap为例,启动的时候需要指定它的group,先来看一下ServerBootstrap的group方法:

ServerBootstrap可以接受一个EventLoopGroup或者两个EventLoopGroup,EventLoopGroup被用来处理所有的event和IO,对于ServerBootstrap来说,可以有两个EventLoopGroup,对于Bootstrap来说只有一个EventLoopGroup。两个EventLoopGroup表示acceptor group和worker group。

EventLoopGroup只是一个接口,我们常用的一个实现就是NioEventLoopGroup,如下所示是一个常用的netty服务器端代码:

这里和NIO相关的有两个类,分别是NioEventLoopGroup和NioServerSocketChannel,事实上在他们的底层还有两个类似的类分别叫做NioEventLoop和NioSocketChannel,接下来我们分别讲解一些他们的底层实现和逻辑关系。

NioEventLoopGroup和DefaultEventLoopGroup一样都是继承自MultithreadEventLoopGroup:

他们的不同之处在于newChild方法的不同,newChild用来构建Group中的实际对象,NioEventLoopGroup来说,newChild返回的是一个NioEventLoop对象,先来看下NioEventLoopGroup的newChild方法:

这个newChild方法除了固定的executor参数之外,还可以根据NioEventLoopGroup的构造函数传入的参数来实现更多的功能。

这里参数中传入了SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、taskQueueFactory和tailTaskQueueFactory这几个参数,其中后面的两个EventLoopTaskQueueFactory并不是必须的。

最后所有的参数都会传递给NioEventLoop的构造函数用来构造出一个新的NioEventLoop。

在详细讲解NioEventLoop之前,我们来研读一下传入的这几个参数类型的实际作用。

SelectorProvider是JDK中的类,它提供了一个静态的provider()方法可以从Property或者ServiceLoader中加载对应的SelectorProvider类并实例化。

另外还提供了openDatagramChannel、openPipe、openSelector、openServerSocketChannel和openSocketChannel等实用的NIO操作方法。

SelectStrategyFactory是一个接口,里面只定义了一个方法,用来返回SelectStrategy:

什么是SelectStrategy呢?

先看下SelectStrategy中定义了哪些Strategy:

SelectStrategy中定义了3个strategy,分别是SELECT、CONTINUE和BUSY_WAIT。

我们知道一般情况下,在NIO中select操作本身是一个阻塞操作,也就是block操作,这个操作对应的strategy是SELECT,也就是select block状态。

如果我们想跳过这个block,重新进入下一个event loop,那么对应的strategy就是CONTINUE。

BUSY_WAIT是一个特殊的strategy,是指IO 循环轮询新事件而不阻塞,这个strategy只有在epoll模式下才支持,NIO和Kqueue模式并不支持这个strategy。

RejectedExecutionHandler是netty自己的类,和 java.util.concurrent.RejectedExecutionHandler类似,但是是特别针对SingleThreadEventExecutor来说的。这个接口定义了一个rejected方法,用来表示因为SingleThreadEventExecutor容量限制导致的任务添加失败而被拒绝的情况:

EventLoopTaskQueueFactory是一个接口,用来创建存储提交给EventLoop的taskQueue:

这个Queue必须是线程安全的,并且继承自java.util.concurrent.BlockingQueue.

讲解完这几个参数,接下来我们就可以详细查看NioEventLoop的具体NIO实现了。

首先NioEventLoop和DefaultEventLoop一样,都是继承自SingleThreadEventLoop:

表示的是使用单一线程来执行任务的EventLoop。

首先作为一个NIO的实现,必须要有selector,在NioEventLoop中定义了两个selector,分别是selector和unwrappedSelector:

在NioEventLoop的构造函数中,他们是这样定义的:

首先调用openSelector方法,然后通过返回的SelectorTuple来获取对应的selector和unwrappedSelector。

这两个selector有什么区别呢?

在openSelector方法中,首先通过调用provider的openSelector方法返回一个Selector,这个Selector就是unwrappedSelector:

然后检查DISABLE_KEY_SET_OPTIMIZATION是否设置,如果没有设置那么unwrappedSelector和selector实际上是同一个Selector:

DISABLE_KEY_SET_OPTIMIZATION表示的是是否对select key set进行优化:

如果DISABLE_KEY_SET_OPTIMIZATION被设置为false,那么意味着我们需要对select key set进行优化,具体是怎么进行优化的呢?

先来看下最后的返回:

最后返回的SelectorTuple第二个参数就是selector,这里的selector是一个SelectedSelectionKeySetSelector对象。

SelectedSelectionKeySetSelector继承自selector,构造函数传入的第一个参数是一个delegate,所有的Selector中定义的方法都是通过调用

delegate来实现的,不同的是对于select方法来说,会首先调用selectedKeySet的reset方法,下面是以isOpen和select方法为例观察一下代码的实现:

selectedKeySet是一个SelectedSelectionKeySet对象,是一个set集合,用来存储SelectionKey,在openSelector()方法中,使用new来实例化这个对象:

netty实际是想用这个SelectedSelectionKeySet类来管理Selector中的selectedKeys,所以接下来netty用了一个高技巧性的对象替换操作。

首先判断系统中有没有sun.nio.ch.SelectorImpl的实现:

SelectorImpl中有两个Set字段:

这两个字段就是我们需要替换的对象。如果有SelectorImpl的话,首先使用Unsafe类,调用PlatformDependent中的objectFieldOffset方法拿到这两个字段相对于对象示例的偏移量,然后调用putObject将这两个字段替换成为前面初始化的selectedKeySet对象:

如果系统设置不支持Unsafe,那么就用反射再做一次:

还记得前面我们提到的selectStrategy吗?run方法通过调用selectStrategy.calculateStrategy返回了select的strategy,然后通过判断

strategy的值来进行对应的处理。

如果strategy是CONTINUE,这跳过这次循环,进入到下一个loop中。

BUSY_WAIT在NIO中是不支持的,如果是SELECT状态,那么会在curDeadlineNanos之后再次进行select操作:

如果strategy >0,表示有拿到了SelectedKeys,那么需要调用processSelectedKeys方法对SelectedKeys进行处理:

上面提到了NioEventLoop中有两个selector,还有一个selectedKeys属性,这个selectedKeys存储的就是Optimized SelectedKeys,如果这个值不为空,就调用processSelectedKeysOptimized方法,否则就调用processSelectedKeysPlain方法。

processSelectedKeysOptimized和processSelectedKeysPlain这两个方法差别不大,只是传入的要处理的selectedKeys不同。

处理的逻辑是首先拿到selectedKeys的key,然后调用它的attachment方法拿到attach的对象:

如果channel还没有建立连接,那么这个对象可能是一个NioTask,用来处理channelReady和channelUnregistered的事件。

如果channel已经建立好连接了,那么这个对象可能是一个AbstractNioChannel。

针对两种不同的对象,会去分别调用不同的processSelectedKey方法。

对第一种情况,会调用task的channelReady方法:

对第二种情况,会根据SelectionKey的readyOps()的各种状态调用ch.unsafe()中的各种方法,去进行read或者close等操作。

NioEventLoop虽然也是一个SingleThreadEventLoop,但是通过使用NIO技术,可以更好的利用现有资源实现更好的效率,这也就是为什么我们在项目中使用NioEventLoopGroup而不是DefaultEventLoopGroup的原因。

通过客户端是嵌入式编程,通过自定义的tcp通信协议进行连接的,这样就可以实现消息推送。现在市面上的消息推送软件有很多,但是相比较来说,深圳极光的就不错,功能也是有很多的,具体功能如下:

1、智能触达精准广告,智能送达

能够实现更高效的流量变现服务;极光强大的数据处理能力,能够为更高效的流量变现保驾护航。

2、VaaS视频广告,高效转化

提高高用户活跃时长,能够帮助开发者构建用户兴趣标签,一举三得;赋能APP以算法、视频内容和视频广告能力,提高用户活跃时长同时,也能实现流量变现。

3、互动广告深度互动,前置体验

多样性不断更新的互动工具,以及更原生的适配媒体特性的有趣活动场景;兼顾流量变现与用户体验,可实现流量主、广告主和用户的多方共赢。

极光助力开发者提高用户活跃时长、增加用户粘性,提升用户价值,为开发者提供优质的广告变现服务。


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/282471.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-21
下一篇2023-04-21

发表评论

登录后才能评论

评论列表(0条)

    保存