实现一个自己的RPC框架4.0

前言


此版本的RPC采用NIO来实现。NIO最大的特点有这几个:第一,NIO是面向缓冲区的,和普通面向流的IO不同。面向流的IO方式每次都是从流中读取一个或多个字节,没有缓存在任何的地方。而面向缓冲区则可以在缓冲区中对我缓冲的数据进行操作,增加了处理的灵活性。第二,NIO是非阻塞的,这就意味着一个线程能管理多个输入输出通道,解决了之前我们使用BIO时候阻塞的问题。第三,为了一个线程能管理多个通道,引入了Selector,多个通道都能注册在同一个选择器上,因此Selector也能被称为多路复用器。在使用的时候,Selector会不断地轮询注册在其上面的通道,如果通道发生读或者写的事件,这个通道就会处于就绪的状态,从而被Selector轮询出来,再进行后面的操作。

一、Java NIO中的组件

​ 基于以上三点,简单总结一下从Java 1.4之后提出的Java NIO中的三个比较重要的主键:BufferChannelSelector

Buffer

​ Buffer中最常用的缓冲区是ByteBuffer,一个ByteBuffer提供了一组用于操作byte数组的功能,其实缓冲区的最底层实现就是数组的方式,所以缓冲区中有四个数据操作的核心概念,分别是:mark(标记位)、position(当前位置)、limit(限定位置)、capacity(容量)。常用的几个函数是

创建一个缓冲区,并设置缓冲区大小

1
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);

clear方法:将position坐标设置为0,limit设置为capacity,取消标记。也就是恢复到缓冲区刚刚初始化的状态,其实缓冲区中的数据并没有丢失,只不过在下次有数据进来的时候会被覆盖掉。

1
byteBuffer.clear();

remaining方法:判断当前位置和limit之前的元素数。

1
byteBuffer.remaining();

filp方法:将limit设置为当前position的坐标,将position设置为0,取消标记。也就是说从写的状态转化为读的状态,这个在使用的时候要尤其小心。

1
byteBuffer.filp();

wrap方法:把字节数组包装成缓冲区ByteBuffer实例

1
2
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteBuffer.wrap(byteArrayOutputStream.toByteArray());

put方法:put方法可以带多种参数,比较常见的例子如下。

1
2
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
writeBuffer.put(byteArrayOutputStream.toByteArray());

具体可以参考这两篇博客:

https://www.jianshu.com/p/451cc865d413

https://www.jianshu.com/p/c04780771a02

Channel

​ channel是一个通道,网络数据通过channel读取和写入,通道和之前的流的不同之处就是通道是双向(全双工)的,而流是单向的,通道可以用于读、写或者两者同时进行。

​ 其中重点介绍ServerSocketChannelSocketChannel两种通道。前者是用于监听客户端连接,后者是用于与服务器建立连接的。最后服务器在接受到客户的连接之后,再生成一个SocketChannel通道与客户端通信。两者是可以互相转化的。常用的几个函数是。

建立起服务端的通道:

1
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

建立起客户端的通道:

1
SocketChannel sc = SocketChannel.open();

当服务端与客户端建立连接之后,服务端生成一个通道与客户端进行通信。

1
SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();

将通道设置为非阻塞模式:

1
serverSocketChannel.configureBlocking(false);

读取客户端请求到缓冲区:

1
sc.read(byteBuffer)

Selector

​ 选择器Selector也可称为多路复用器。Selector会不断地轮询注册在上面的通道,如果某个通道发生读写事件,这个Channel就会处于就绪状态,从而被Selector轮询出来。我的理解是之所以NIO称为是非阻塞的,其原因就在于这个Selector的轮询线程是直接 读/写 到已经准备好的结果,而不需要和之前的BIO一样,在线程中做read或者write的操作,这样就不会造成阻塞。通常一个线程就能管理一个多路复用器,而其上面有成千上万个channel,这样性能就大大提升。常用的几个函数是。

启动一个多路复用器

1
Selector selector = Selector.open();

将一个通道注册在多路复用器上

1
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

在多路复用器上轮询已经就绪的key,通过这个key我们在后面可以找到与服务器通讯的客户端是哪一台,并在之后建立起与这个客户端之间的通道。

1
2
3
4
5
6
7
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()){
SelectionKey key = it.next();
it.remove();
/**通过key找到客户端,与其进行读写等后续操作*/
}

二、实现

下面主要介绍客户端和服务端的主要实现

服务端

NIOService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
while (true){
selector.select(1000);// 阻塞在这里,可以设置休眠事件为1s,每隔一秒钟被唤醒一次,不管有没有读写事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()){
SelectionKey key = it.next();
it.remove();
// 处理客户端发送过来的请求
if (key.isValid() && key.isAcceptable()){
//多路复用器监听到有新的客户端接入,处理新的请求接入,
// 并完成TCP三次握手,建立起物理链路。
SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
sc.configureBlocking(false); //设置为非阻塞
sc.register(selector, SelectionKey.OP_READ); //注册到多路复用器上,监听客户端的读操作
System.out.println("与服务器建立链接...");
}
if(key.isValid() && key.isReadable()){
System.out.println("检测到客户端的读操作...");
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
byteBuffer.clear();
while (sc.read(byteBuffer) > 0){
System.out.println("读取客户端请求...");
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteBuffer.array());
ObjectInputStream input = new ObjectInputStream(byteArrayInputStream);

String methodName = (String) input.readObject();
System.out.println("methodName:" + methodName);
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
System.out.println("ParameterTypes:" + Arrays.toString(parameterTypes));
Object[] args = (Object [])input.readObject();
System.out.println(Arrays.toString(args));

Method method = service.getClass().getMethod(methodName, parameterTypes);
result = method.invoke(service, args);
sc.register(selector, SelectionKey.OP_WRITE);
}
}
if (key.isWritable()){
System.out.println("检测到写操作到客户端...");
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
byteBuffer.clear();
SocketChannel sc = (SocketChannel) key.channel();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(result);
byteBuffer.put(byteArrayOutputStream.toByteArray());
byteBuffer.flip(); //将读转化为写模式
System.out.println("返回客户端调用结果: "+ result);
sc.write(byteBuffer);
// 关闭资源
sc.close();
}
}
}

客户端

nioInvocationHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
SocketChannel sc = SocketChannel.open();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
// 判断是否连接成功,若成功发送请求消息并读应答
if (sc.connect(new InetSocketAddress(host, port))){
sc.configureBlocking(false);
System.out.println("成功建立连接...");
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(method.getName());
System.out.println("methodName:" + method.getName());
objectOutputStream.writeObject(method.getParameterTypes());
System.out.println("parameterTypes:" + Arrays.toString(method.getParameterTypes()));
objectOutputStream.writeObject(args);
System.out.println("args:" + Arrays.toString(args));
writeBuffer.put(byteArrayOutputStream.toByteArray());
writeBuffer.flip();
sc.write(writeBuffer);
}else {
System.out.println("建立连接失败...");
}
// 读取返回结果
writeBuffer.clear();
int count = 0;
Object result = null;
while ((count = sc.read(writeBuffer))!=-1){
if (count > 0){
System.out.println("读取到服务器端返回结果...");
writeBuffer.flip();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(writeBuffer.array());
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
result = objectInputStream.readObject();
System.out.println(result.toString());
}
}
sc.close();
return result;
}

三、总结

​ 在使用NIO的时候遇到了很多问题,其中一个问题就是客户端无法与服务端产生连接,也就是在连接的时候sc.connect(new InetSocketAddress(host, port));一直抛出连接异常,最后发现是由于提前对通道设置了阻塞,也就是sc.configureBlocking(false);这行代码放在了连接之前,导致客户端一直无法与服务端产生连接。另外,NIO确实比较难用,编码时要考虑的地方很多,特别是对Buffer的一系列操作,容易产生错误,并且不容易解决写半包/粘包的问题,最大的问题是JDK NIO中的BUG,会导致Selector的空轮询,从而使CPU占用100%。

​ 所以基于以上,考虑用Netty来改进我们的RPC框架。