实现一个自己的RPC框架5.0

前言


距离上一个版本的RPC也有一两个月的时间了,中间也是在做论文的相关实验,自己也陆陆续续学习了一点Netty和其他的知识,今天记录和分享一下用Netty实现的RPC框架。在上一个博客中提到,使用NIO会有很多问题,比如Selector的空轮询、编码难度大等问题,用Netty能很好的解决以上问题,因为它把NIO很好的进行了封装,让使用者不必去考虑这么多的问题,这也是Netty的一个优势所在吧。

一、整体实现思路

首先说明一下,我这边的Netty用的是4.1的版本。RPC实现的思路和之前的大同小异。从客户端来说,仍然用到了动态代理,只不过这版中我对发送的请求做了封装,用了jdk原生的序列化方法,这块后面还可以再做优化,尝试用其他的序列化方法去做。服务端反序列化并解析请求参数,用反射去得到相应的函数调用,最终返回给客户端。

二、代码实现

客户端

封装请求MSG.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MSG implements Serializable {
private static final long serialVersionUID = 1L;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] args;

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Class<?>[] getParameterTypes() {
return parameterTypes;
}

public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}

public Object[] getArgs() {
return args;
}

public void setArgs(Object[] args) {
this.args = args;
}

@Override
public String toString() {
return "MSG{" +
"methodName='" + methodName + '\'' +
", parameterTypes=" + Arrays.toString(parameterTypes) +
", args=" + Arrays.toString(args) +
'}';
}
}

配置客户端的相应函数

EchoClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class EchoClient {
private final String host;
private final int port;
private String name;
private Class<?>[] parameterTypes;
private Object[] args;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public EchoClient(String host, int port, String name, Class<?>[] parameterTypes, Object[] args) {
this.host = host;
this.port = port;
this.name = name;
this.parameterTypes = parameterTypes;
this.args = args;
}

public void run() throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(
new EchoClientHandler(name, parameterTypes, args));
}
});

// Start the client.
ChannelFuture f = b.connect(host, port).sync();

// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}

这边我是将请求函数的函数名、参数类型和参数直接传到EchoClientHandler中进行处理。来看EchoClientHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private String name;
private Class<?>[] parameterTypes;
private Object[] args;

public EchoClientHandler(String name, Class<?>[] parameterTypes, Object[] args) {
this.name = name;
this.parameterTypes = parameterTypes;
this.args = args;
}

// 当客户端连接上服务端的时候触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws IOException {
MSG message = new MSG();
message.setMethodName(name);
message.setParameterTypes(parameterTypes);
message.setArgs(args);
System.out.println("客户端发送请求到服务端...");
ctx.write(message);
ctx.flush();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String msg1 = (String) msg;
System.out.println("客户端接收服务端响应,并输出返回信息...");
System.out.println(msg1);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

EchoClientHandler中最重要的是重写的channelActivechannelRead两个方法。其中第一个是在客户端与服务端连接上之后触发的相应,就可以将请求发送过去。在服务端处理请求后返回,用channelRead来接收服务端返回的数据。

服务端

服务端的配置和客户端的类似,

同样的,为了解析请求,要封装请求MSG.java,是和客户端的一致。注意,这边由于序列化的编码与解码,MSG.java要和客户端的包位置相同,否则会报错!

1
io.netty.handler.codec.DecoderException: java.io.InvalidClassException: failed to read class descriptor

NettyService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NettyService {
private final int port;
private Object service;

public NettyService(int port, Object service) {
this.port = port;
this.service = service;
}

public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(1024 *1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(
new EchoServerHandler(service));
}
});
// Start the server.
ChannelFuture f = b.bind(port).sync(); // (5)

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

同样的,来看看服务端的handle

EchoServerHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private Object service;

public EchoServerHandler(Object service) {
this.service = service;
}

//有数据读取的时候调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = dealMSG((MSG) msg);
System.out.println("服务端解析请求...");
ctx.writeAndFlush(result);
System.out.println("服务端返回数据给客户端...");
}

//本次读取完成调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}

public Object dealMSG(MSG message) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
Object result = method.invoke(service, message.getArgs());
return result;
}
}

这边最重要的一个函数是channelRead,这里面需要实现解析客户端发送过来请求的逻辑,并将得到的结果发送回客户端。