Tars-Java网络编程源码分析

2023-06-27 0 565

原副标题:Tars-Java网络程式设计源代码预测

译者:vivo 网络服务项目器项目组- Jin Kai

责任编辑从Java NIO网络程式设计的基本知识说到了Tars架构采用NIO展开网络程式设计的源代码预测。

一、Tars架构基本如是说

Tars是百度开放源码的全力支持多词汇的高效能RPC架构,源于百度外部2008年迄今一直采用的标准化应用领域架构TAF(Total Application Framework),目前全力支持C++、Java、PHP、Nodejs、Go词汇。

该架构为使用者提供更多了牵涉到合作开发、网络管理、和试验的整套软件系统,协助两个产品或是服务项目加速合作开发、布署、试验、上架。它集可扩充协定编码国际标准、高效能RPC通讯架构、英文名字路由器与发现、正式发布监视、笔记统计数据、SQLite等同于多功能,通过它能加速用微服务项目的方式构筑自己的平衡可信的洛佐韦,并实现完备有效率的服务项目环境治理。

非官方库房门牌号:

https://github.com/TarsCloud/Tars

vivo发送网络平台也广度采用了该架构,布署服务项目结点超过一百个,经过圣戈当斯区每星期二百多亿最新消息发送量的挑战。

Tars-java 新一代Fedora1.7.2和以后的版都采用Java NIO展开网络程式设计;责任编辑将分别详尽如是说java NIO的基本原理和Tars 采用NIO展开网络程式设计的技术细节。

二、Java NIO基本原理如是说

从1.4版开始,Java提供更多了一种捷伊IO作法:NIO (New IO 或 Non-blocking IO) 是两个能代替国际标准Java IO 的API,它是面向全国头文件而不是二进制流,它亦然堵塞的,全力支持IO数据通讯。

2.1 Channels (地下通道) and Buffers (头文件)

国际标准的IO如前所述二进制流展开操作方式的,而NIO是如前所述地下通道(Channel)和头文件(Buffer)展开操作方式。数据总是从地下通道读取到头文件中,或是从头文件写入到地下通道中,下图是两个完备流程。

Channel类型:

全力支持文件读写数据的FileChannel 能通过UDP读写网络中的数据的DatagramChannel 能通过TCP读写网络数据的SocketChannel 能监听新进来的TCP连接,对每两个新进来的连接都会创建两个SocketChannel的ServerSocketChannel 。

SocketChannel:

打开 SocketChannel: SocketChannel socketChannel = SocketChannel.open; 关闭 SocketChannel: socketChannel.close; 从Channel中读取的数据放到Buffer: int bytesRead = inChannel.read(buf); 将Buffer中的数据写到Channel: int bytesWritten = inChannel.write(buf);

ServerSocketChannel:

通过 ServerSocketChannel.accept 方法监听新进来的连接,当accept方法返回的时候,它返回两个包含新进来的连接的SocketChannel,因此accept方法会一直堵塞到有新连接到达。

通常不会仅仅只监听两个连接,在while循环中调用 accept方法. 如下面的例子:

代码1:

while( true){ SocketChannel socketChannel = serverSocketChannel.accept; //do something with socketChannel… }

ServerSocketChannel能设置成非堵塞模式。在非堵塞模式下,accept 方法会立刻返回,如果还没有新进来的连接,返回的将是null。因此,需要检查返回的SocketChannel是否是null。

代码2:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open; serverSocketChannel.socket.bind( newInetSocketAddress( 8888)); serverSocketChannel.configureBlocking(false); while( true){ SocketChannel socketChannel = serverSocketChannel.accept; if(socketChannel !=null){ //do something with socketChannel… } }

Buffer类型:

ByteBuffer CharBuffer DoubleBuffer FloatBuffer IntBuffer LongBuffer ShortBuffer

Buffer的分配:

ByteBuffer buf = ByteBuffer.allocate(2048);

Buffer的读写:

一般是以下四个步骤:

写入数据到Buffer,最大写入量是capacity,写模式下limit值即为capacity值,position即为写到的位置。 调用flip方法将Buffer从写模式切换到读模式,此时position移动到开始位置0,limit移动到position的位置。 从Buffer中读取数据,在读模式下能读取以后写入到buffer的所有数据,即为limit位置。 调用clear方法或是compact方法。clear方法将position设为0,limit被设置成capacity的值。compact方法将所有未读的数据拷贝到Buffer起始处,然后将position设到最后两个未读元素后面。

Tars-Java网络编程源码分析

mark 与 reset方法

通过调用Buffer.mark方法,能标记Buffer中的两个特定position,之后能通过调用Buffer.reset方法恢复到这个position。

duplicate

此方法返回承载先前二进制头文件内容的新二进制头文件。

remaining

limit 减去 position的值

2.2 Selector(选择器)

Java NIO引入了选择器的概念,选择器用于监听多个地下通道的事件。单个的线程能监听多个数据地下通道。要采用Selector,得向Selector注册Channel,然后调用它的select方法。这个方法会一直堵塞到某个注册的地下通道有事件就绪。一旦这个方法返回,线程就能处理这些事件。

Tars-Java网络编程源码分析

线程采用两个selector处理多个channel

代码3:

channel.configureBlocking(false); SelectionKey key = channel. register(selector,Selectionkey.OP_READ);

注意register方法的第二个

SelectionKey包含:

1) interest集合:selectionKey.interestOps 能监听四种不同类型的事件:OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ

2) ready集合:selectionKey.readyOps; ready 集合是地下通道已经准备就绪的操作方式的集合,提供更多4个方便的方法:

selectionKey.isAcceptable; selectionKey.isConnectable; selectionKey.isReadable; selectionKey.isWritable;

3) Channel:selectionKey.channel;

4) Selector:selectionKey.selector;

5) 可选的附加对象:

提示:

OP_ACCEPT和OP_CONNECT的区别:简单来说,客户端建立连接是connect,服务项目器准备接收连接是accept。两个典型的客户端服务项目器网络交互流程如下图

Tars-Java网络编程源码分析

selectedKeys

一旦调用了select方法,并且返回值表明有两个或更多个地下通道就绪了,然后能通过调用selector的selectedKeys方法,访问已选择键集(selected key set)中的就绪地下通道。

wakeUp

某个线程调用select方法后堵塞了,即使没有地下通道已经就绪,也有办法让其从select方法返回。只要让其它线程在堵塞线程调用select方法的对象上调用Selector.wakeup方法即可。堵塞在select方法上的线程会立马返回。如果有其它线程调用了wakeup方法,但当前没有线程堵塞在select方法上,下个调用select方法的线程会立即wake up。

close

用完Selector后调用其close方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。地下通道本身并不会关闭。

通过Selector选择地下通道:

int select 堵塞直到至少有两个地下通道在你注册的事件上就绪了 int select(long timeout) 增加最长堵塞毫秒数 int selectNow 不会堵塞,不管什么地下通道就绪都立刻返回

三、 Tars NIO网络程式设计

了解完 Java NIO的基本原理,我们来看看Tars是如何采用NIO展开网络程式设计的。

Tars-Java网络编程源码分析

Tars的网络模型是多reactor多线程模型。有一点特殊的是tars的reactor线程组里随机选两个线程处理网络事件,并且该线程同时也能处理读写。

核心类之间的关系如下:

Tars-Java网络编程源码分析

3.1 两个典型的Java NIO服务项目端合作开发流程

创建ServerSocketChannel,设置为非堵塞,并绑定端口 创建Selector对象 给ServerSocketChannel注册SelectionKey.OP_ACCEPT事件 启动两个线程循环,调用Selector的select方法来检查IO就绪事件,一旦有IO就绪事件,就通知使用者线程去处理IO事件 如果有Accept事件,就创建两个SocketChannel,并注册SelectionKey.OP_READ 如果有读事件,判断一下是否全包,如果全包,就交给后端线程处理 写事件比较特殊。isWriteable表示的是本机的写头文件是否可写。这个在绝大多少情况下都是为真的。在Netty中只有写半包的时候才需要注册写事件,如果一次写就完全把数据写入了头文件就不需要注册写事件。

3.2 Tars客户端发起请求到服务项目器的流程

Communicator.stringToProxy 根据servantName等配置信息创建通讯器。 ServantProxyFactory.getServantProxy 调用工厂方法创建servant代理。 ObjectProxyFactory.getObjectProxy 调用工厂方法创建obj代理。 TarsProtocolInvoker.create 创建协定调用者。 ServantProtocolInvoker.initClient(Url url) 根据servantProxyConfig中的配置信息找到servant的ip端口等展开初始化ServantClient。 ClientPoolManager.getSelectorManager 如果第一次调用selectorManager是空的就会去初始化selectorManager。 reactorSet = new Reactor[selectorPoolSize]; SelectorManager初始化构造类中的会根据selectorPoolSize(默认是2)的配置创建Reactor线程数组。线程名称的前缀是servant-proxy-加上CommunicatorId,CommunicatorId生成规则是由locator的门牌号生成的UUID。 启动reactor线程。

3.3 Tars服务项目端启动步骤

tars全力支持TCP和UDP两种协定,RPC场景下是采用TCP协定。 new SelectorManager 根据配置信息初始化selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是server-tcp-reactor,然后启动reactor线程数组中的所有线程。 开启服务项目端监听的ServerSocketChannel,绑定服务项目端本地ip和监听的端口号,设置TCP连接请求队列的最大容量为1024;设置非堵塞模式。 选取reactor线程数组中第0个线程作为服务项目端监听连接OP_ACCEPT就绪事件的线程。

代码4:

publicvoidbind( AppService appService) throws IOException { // 此处略去非关键代码 if(endpoint.type. equals( “tcp”)) { // 1 this.selectorManager = newSelectorManager(Utils.getSelectorPoolSize, newServantProtocolFactory(codec), threadPool, processor, keepAlive,“server-tcp-reactor”, false); // 2 this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay); this.selectorManager.start; ServerSocketChannel serverChannel = ServerSocketChannel.open; serverChannel.socket.bind( newInetSocketAddress(endpoint.host, endpoint.port),1024); // 3 serverChannel.configureBlocking( false); selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT);// 4 } elseif(endpoint.type. equals( “udp”)) { this.selectorManager =newSelectorManager( 1, newServantProtocolFactory(codec), threadPool, processor, false, “server-udp-reactor”, true); this.selectorManager.start; // UDP开启的是DatagramChannel DatagramChannel serverChannel = DatagramChannel.open; DatagramSocket socket = serverChannel.socket; socket.bind( newInetSocketAddress(endpoint.host, endpoint.port)); serverChannel.configureBlocking( false); // UDP协定不需要建连,监听的是OP_READ就绪事件 this.selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_READ); } }

3.4 Reactor线程启动流程

的channel。 更新Session中最近操作方式时间,Tars服务项目端启动时会调用 startSessionManager , 单线程每30s扫描一次session会话列表,会检查每个session的 lastUpdateOperationTime 与当前时间的时间差,如果超过60秒会将过期session对应的channel踢除。 分发IO事件展开处理。 处理unregister队列中剩余的channel,从当前reactor线程的数据通讯器selector中解除注册。

代码5:

publicvoidrun( ) { while(!Thread.interrupted) { selector.select; // 1 processRegister; // 2 Iterator<SelectionKey> iter = selector.selectedKeys.iterator; // 3 while(iter.hasNext) { SelectionKey key = iter.next; iter. remove; if(!key.isValid) continue; try{ if(key.attachment !=null&& key.attachment instanceof Session) { ((Session) key.attachment).updateLastOperationTime;//4 } dispatchEvent(key); // 5 } catch(Throwable ex) { disConnectWithException(key, ex); } } processUnRegister;// 6 } }

3.5 IO事件分发处理

每个reactor线程都有两个专门的Accepter类去处理各种IO事件。TCPAccepter能处理全部的四种事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter由于不需要建立连接所以只需要处理读和写两种事件。

1. 处理OP_ACCEPT

个TCP请求创建TCPSession,会话的状态是服务项目器已连接 会话注册到sessionManager中,Tars服务项目可配置最大连接数maxconns,如果超过就会关闭当前会话。 寻找下两个reactor线程展开数据通讯器与channel的绑定。

代码6:

publicvoidhandleAcceptEvent(SelectionKey key)throwsIOException { ServerSocketChannel server = (ServerSocketChannel) key.channel;// 1 SocketChannel channel = server.accept; channel.socket.setTcpNoDelay(selectorManager.isTcpNoDelay); channel.configureBlocking( false); Utils.setQosFlag(channel.socket); TCPSession session = newTCPSession(selectorManager); // 2 session.setChannel(channel); session.setStatus(SessionStatus.SERVER_CONNECTED); session.setKeepAlive(selectorManager.isKeepAlive); session.setTcpNoDelay(selectorManager.isTcpNoDelay); SessionManager.getSessionManager.registerSession(session);// 3 selectorManager.nextReactor.registerChannel(channel, SelectionKey.OP_READ, session);// 4 }

2. 处理OP_CONNECT

sion中的状态修改为客户端已连接

代码7:

publicvoidhandleConnectEvent(SelectionKey key)throwsIOException { SocketChannel client = (SocketChannel) key.channel;// 1 TCPSession session = (TCPSession) key.attachment; //2 if(session ==null) thrownewRuntimeException( “The session is null when connecting to …”); try{ // 3 client.finishConnect; key.interestOps(SelectionKey.OP_READ); session.setStatus(SessionStatus.CLIENT_CONNECTED); } finally{ session.finishConnect; } }

3.处理OP_WRITE、 处理OP_READ

调用session.read和session.doWrite 方法处理读写事件

代码8:

publicvoidhandleReadEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( “The session is null when reading data…”); session.read; } publicvoidhandleWriteEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( “The session is null when writing data…”); session.doWrite; }

3.6 seesion中网络读写的事件详尽处理过程

1. 读事件处理

申请2k的ByteBuffer空间,读取channel中的数据到readBuffer中。根据sessionStatus判断是客户端读响应还是服务项目器读请求,分别展开处理。

代码9:

protectedvoid read throws IOException { int ret = readChannel; if( this.status == SessionStatus.CLIENT_CONNECTED) { readResponse; } elseif( this.status == SessionStatus.SERVER_CONNECTED) { readRequest; } else{ thrownew IllegalStateException( “The current session status is invalid. [status:”+ this.status + “]”); } if(ret < 0) { close; return; } } privateint readChannel throws IOException { int readBytes = 0, ret = 0; ByteBuffer data= ByteBuffer.allocate( 1024* 2); // 1 if(readBuffer ==null) { readBuffer = IoBuffer.allocate(bufferSize); } // 2 while((ret = ((SocketChannel) channel).read(data)) > 0) { data.flip; // 3 readBytes += data.remaining; readBuffer.put( data.array, data.position, data.remaining); data.clear; } returnret < 0? ret : readBytes; }

① 客户端读响应

从当前readBuffer中的内容复制到两个捷伊临时buffer中,并且切换到读模式,采用TarsCodec类解析出buffer内的协定字段到response,WorkThread线程通知Ticket处理response。如果response为空,则重置tempBuffer到mark的位置,重新解析协定。

代码10:

publicvoidreadResponse( ) { Response response = null; IoBuffer tempBuffer = null; tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { response = selectorManager.getProtocolFactory.getDecoder.decodeResponse(tempBuffer,this); } else{ response = null; } if(response != null) { if(response.getTicketNumber == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession.hashCode); selectorManager.getThreadPool.execute( newWorkThread(response, selectorManager)); } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }

② 服务项目器读请求

任务放入线程池交给 WorkThread线程,最终交给Processor类出构筑请求的响应体,包括分布式上下文,然后经过FilterChain的处理,最终通过jdk提供更多的反射方法invoke服务项目端本地的方法然后返回response。如果线程池抛出拒绝异常,则返回SERVEROVERLOAD = -9,服务项目端过载保护。如果request为空,则重置tempBuffer到mark的位置,重新解析协定。

代码11:

publicvoidreadRequest( ) { Request request = null; IoBuffer tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { request = selectorManager.getProtocolFactory.getDecoder.decodeRequest(tempBuffer,this); } else{ request = null; } if(request != null) { try{ request.resetBornTime; selectorManager.getThreadPool.execute( newWorkThread(request, selectorManager)); } catch(RejectedExecutionException e) { selectorManager.getProcessor.overload(request, request.getIoSession); } catch(Exception ex) { ex.printStackTrace; } } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }

2. 写事件处理

同样也包括客户端写请求和服务项目端写响应两种,其实这两种都是往TCPSession中的LinkedBlockingQueue(有界队列最大8K)中插入ByteBuffer。LinkedBlockingQueue中的ByteBuffer最终会由TCPAcceptor中的handleWriteEvent监听写就绪事件并消费。

代码12:

protectedvoidwrite(IoBuffer buffer)throwsIOException { if(buffer == null) return; if(channel == null|| key == null) thrownewIOException(“Connection is closed”); if(! this.queue.offer(buffer.buf)) { thrownewIOException( “The session queue is full. [ queue size:”+ queue.size + ” ]”); } if(key != null) { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); key.selector.wakeup; } }

四、总结

责任编辑主要如是说了Java NIO程式设计的基本知识 和 Tars-Java 1.7.2版的网络程式设计模块的源代码实现。

在最捷伊Tars-Java的master分支中我们能发现网络程式设计已经由NIO改成了Netty,虽然Netty更加成熟平衡,但是作为学习者了解NIO的基本原理也是掌握网络程式设计的必经之路。

更多关于Tars架构的如是说能访问:

https://tarscloud.org/

责任编辑预测源代码门牌号(v1.7.x分支):

https://github.com/TarsCloud/TarsJava

2023 源创会线下重启,基础软件技术面面谈。

🕜时间:2023 年 7 月 1 日

📍地点:广东省深圳市南山区高新南四道创维半导体设计大厦裙楼四楼·SKYWORK会议中心【国际会议中心】

【嘉宾预告】

演讲人:

常亮,CubeFS开放源码分布式文件存储项目maintainer

演讲主题:

AI背景下CubeFS开放源码分布式文件存储网络平台的演进与发展

演讲大纲:

AI大模型取得令人惊叹的表现,正在成为最热门的谈论话题,AI 的大模型构筑在算力和存储之上,AI 本身的数据特性和存取特性明显,海量的 AI 语料存储,小文件等存取需求,对存储系标准化直是一种挑战。为了适配更强大的算力和语料,传统的中心存储,渐渐不能满足 AI 的算力需求,从而云上云下结合是必经之路,CubeFS提供更多了整套针对 AI 大模型计算的存储调度服务项目,元数据缓存和数据缓存,以提高计算效率,极大改善混合云场景的延迟问题。

👇 立即参与

END

马斯克承认“家丑”,去年大裁员给自己挖了坑

这里有新一代开放源码资讯、软件更新、技术干货等内容

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务