同步操作将从 eric_1989/jnet 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
[TOC]
JDK7中提供了对网络IO的新的API也就是AIO。其异步特性相对于NIO来说使得编程更加容易,API也更容易理解。但要构筑一个完善的网络IO层仍然需要花费很多的心思和实践。为了简化基于AIO的Java网络IO编程,设计并构建了Jnet框架。
Jnet框架是Java AIO接口体系中一层薄封装,仅进一步降低其编程复杂性,不提供额外的抽象。
开源中国:https://gitee.com/eric_ds/jnet
Jnet中包含几个重要的模块,如下图所示:
简要介绍下几个模块的作用:
在Socket上的数据流动顺序为读完成器-->通道上下文-->写完成器,这是一个单向过程。在整体的数据处理过程中,会需要申请大量的数组或者ByteBuffer用于处理或准备数据。为了降低频繁新建ByteBuffer或者数组带来的GC开销,采用内存分配池的方案来提升性能。
在总体架构中的三个模块互相之间配合完成了数据的接收,处理,响应值写出等工作。下面会逐一介绍每一个模块的基本设计思路。而在这个之前,首先需要介绍下整个框架体系中的基础数据结构IoBuffer。
JDK原生自带的ByteBuffer并不能很好的满足框架对Buffer的需求。因此框架设计了一个扩展版本的IoBuffer。IoBuffer与JDK原生的ByteBuffer对比主要有两个不同的特性
IoBuffer.free()
完成释放,否则可能会出现内存泄漏的问题。读完成器接口是继承自JDK的java.nio.channels.CompletionHandler<V,A>
接口,只是简单的添加了一个无参的start
方法供外部进行首次启动。
读完成器对接口中的方法void completed(V result, A attachment)
的实现也很简单。从通道中读取完毕数据,将本次读取的字节数累加到本次读取使用的IoBuffer中。然后将该IoBuffer提供给通道上下文对象进行数据处理。
当出现数据处理异常,或者读取通道数据异常时,会触发JDK的failed(Throwable, A)
方法,此时首先将IoBuffer执行free方法进行释放。然后关闭SocketChannel
即可。
通道上下文与SocketChannel实例对应,提供了数据处理接口和数据写出接口。通道上下文内部存储着一个由数据处理器构成的有序链表。数据处理器是一个接口,定义如下
public interface DataProcessor<T>
{
/**
* 通道初始化时被调用
*
* @param channelContext
*/
void bind(ChannelContext channelContext);
/**
* 处理由上一个Invoker传递过来的数据
*
* @param data
* @param next
* @throws Throwable
*/
void process(T data, ProcessorInvoker next) throws Throwable;
}
由数据处理接口传入的IoBuffer参数,会调用数据处理器的有序链表进行处理。由于读处理器需要等待方法返回后才能继续在通道上监听数据,可知第一个数据处理器是工作在单线程环境下的。还有一点需要注意,读处理器传入的IoBuffer参数在方法返回后仍然继续服务于通道数据监听,因此数据处理器进行业务处理器时需要拷贝一份数据出来或者重新申请一个IoBuffer实例,而不能使用入参的IoBuffer实例。
通道上下文的写出接口实际是代理了写完成器的写出接口。
写完成器也继承自JDK的java.nio.channels.CompletionHandler<V,A>
接口,并且提供了写出接口。定义如下
/**
* 提供数据供写出<br/>
* 如果当前写完成器已经处于停止状态,则抛出非法状态异常
*
* @param buffer
* @throws IllegalStateException
*/
void offer(IoBuffer buffer) throws IllegalStateException;
该接口工作于多线程环境下。入参的IoBuffer不能另做他用,即使是在方法返回之后。当入参的IoBuffer最终写出完毕后,写完成器会负责调用其free方法进行释放。写完成器内部使用一个队列存储推入的待写出数据。当写完成器的Complete方法被IO线程调用时并且当前的数据已经写出完毕,则会从队列中取出下一个IoBuffer进行写出。
由于一个通道实例只绑定了一个写完成器,因此写完成器的Complete方法是工作于单线程环境,与工作于多线程环境的offer方法一起,实际上构成了一个多生产者单消费者情形。因此内部的存储队列可以采用MPSCqueue。
写完成器是有状态的。其状态机如下
初始情况下,写完成器处于空闲状态。当多线程调用offer方法时,其内部是通过状态的CAS操作,来确保最终只有一个线程可以在通道上执行写出操作。offer方法的流程如下
弃权操作是一个比较负责的二次确认子流程,流程如下
当通道上的数据写出完毕时,写完成器的Complete方法会被IO线程调用,该方法的具体内容主要分为三部分
有了三个基本模块后,构建服务端就很简单了。在通道AsynchronousServerSocketChannel
上执行方法accept(A, CompletionHandler<AsynchronousSocketChannel, ? super A>)
来获得建立起链接的客户端链接。得到SocketChannel
实例后,构建起三个模块实例,就可以在这个通道上提供服务了。这里的难点主要在于如何让用户可以更容易的切入构建的环节。由于数据处理器必须由使用者传入,其实现本身不要求线程安全,却也不承诺运行于单线程环境中,所以数据处理器的实例化动作与三大模块的实例化动作是有相关性的。
框架在这里应该提供一定的默认实现供用户选择,但用户仍然应该具备最大的自主权,必要时可以自己实现accept
方法中的CompleteHandler
实例来控制三大模块的初始化。
客户端设计与服务端设计涉及的细节基本相同,这里不再赘述。唯一一个不同点在于,客户端提供的写出能力中,需要关注是否提供断线后重连这一功能。该功能并不在接口要求中,而是不同的子类实现需要关注的问题。
JDK的AIO中,每一个监听端口对应的通道可以绑定一个线程池或者使用默认线程池。链接到该端口的通道会使用监听端口对应通道的线程池。这也就意味着,在一个监听端口上,链接,读取,写出,均只能使用一个线程池。考虑这样的场景:
读完成器不断读取数据并且向业务处理器发送,业务处理器处理后,将响应结果发送至写完成器等待写出,写完成器待发送队列为有限容量队列;当读完成器读取数据发送数据速度快于响应数据的发出,业务处理器因为无法推送数据到写完成器而不断重试,读完成器则等待业务处理器的方法返回,导致当前线程资源被占用。当同一时间这种情况大于线程池线程数量时,就会耗光所有的线程资源。由于没有线程资源供写完成器被调用,导致业务处理器一直无法完成推送数据到写完成器的待发送队列。系统出现了假死情况:程序在运作,没有死锁,CPU100%,无法对业务响应结果数据。
产生上面的问题,在于没有背压机制,读完成器无法在系统到达处理瓶颈时停止读取以减轻下游的处理压力。最终导致整个IO线程池饥饿进而假死。
增加背压机制需要整个链路的协同。具体如下
读完成器工作在单线程环境中,其自我中断并且保存现场很容易。实现上述协同的难点主要有:
对于问题1,有两种可能性:
对于问题2也有不同可能性:
方式1,写完成器本身不存储读完成器的信息,因此每次发送完成后都会执行恢复动作。这就要求在读完成器内部具备一个标志位以避免在不需要的时候执行恢复动作。考虑一种极端情况,压力反馈的传递速度较慢,此时写完成器已经完成了数据写出并执行恢复动作。而由于读完成器尚未收到压力反馈,未设置标志位,就会导致写完成器错过此次的恢复机会。该错过可能会导致后续再也没有机会彼此唤醒。所以方式1实际上是不可行的。
方式2,如果一个通道存在多个线程执行重投递任务,会导致读完成器恢复动作的并发竞争。方式2和方式1一样,也会遇到重投递任务完成早于压力反馈的传递的情况,因此无法通过标志位的设定来避免并发竞争。这样一来,在背压线程池中不能存在一个通道的多个重投递任务。
综合考虑问题1和问题2的情况以及解决方式最终会得到如下的背压实现要求:
上面的实现要求并未明确何时创建背压任务。基于4个要求,明确背压任务的创建时机为当前线程为IO线程时,最接近触发背压反馈的业务处理器执行背压任务创建;业务处理器顺序A-->B,B触发了背压反馈,A创建背压任务。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。