实现一个自己的RPC框架2.0

前言


在不同服务部署在不同的机器上的前提下,服务器之间存在大量的网络通信,这时候就需要写大量的网络通信的代码,十分不方便。如果能够让服务器调用远程方法就像是调用本地方法一样,将会大大减少工作量,这种方法的实现其实就是RPC。

大致的实现应该如下图一样。

RPC框架

其中客户端发送过去的是方法的方法名、参数类型以及具体参数,这时候应该要想到用JDK动态代理来实现客户端的这一部分。因为当客户端传输的方法有很多的时候,如果不用JDK动态代理的话,就需要手动处理每一个方法的发送,解析出方法的方法名、参数类型、具体参数,这显然是很麻烦的。所以当我们用JDK动态代理的时候,用一个类就能实现所有方法的处理,客户端在调用某个方法的时候其实调用的是一个增强的方法,这个增强方法中实现了发送Socket请求之前对方法的解析,发送RPC请求以及服务器返回值之后的响应。

下面我具体介绍客户端和服务端的实现。

客户端

定义服务的接口类HelloService

1
2
3
public interface HelloService {
String hello(String name);
}

用JDK动态代理实现向服务端发送RPC请求和解析服务端的响应,需要实现InvocationHandler接口,并重写invoke方法,这边重写的invoke就是增强的hello方法,能与服务端进行交互。myInvocationHandler类其中的主要部分是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 与服务器端创建连接
System.out.println("创建与服务器端的的连接...");
Socket socket = new Socket(host, port);
// 客户端向服务端发送请求
System.out.println("向客户端发送请求...");
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
// 方法名
//output.writeUTF(method.getName());
output.writeObject(method.getName());
System.out.println("methodName:" + method.getName());
// 参数类型列表
output.writeObject(method.getParameterTypes());
System.out.println("parameterTypes:" + Arrays.toString(method.getParameterTypes()));
// 参数值列表
output.writeObject(args);
System.out.println("args:" + Arrays.toString(args));

/**客户端读取服务端的返回*/
System.out.println("\n客户端读取服务端的返回...");
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
Object returnResult = input.readObject();
System.out.println(returnResult);

// 关闭资源
socket.close();
input.close();
output.close();

return returnResult;
}

模拟一个消费者发送请求。

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
// 重写 InvocationHandler
myInvocationHandler handler = new myInvocationHandler("127.0.0.1", 1234);
// JDK动态代理生成HelloService的代理类proxy,相当于是对目标方法的增强类,这边用它来发送RPC请求和接受服务器端返回值
HelloService proxy = (HelloService) Proxy.newProxyInstance(HelloService.class.getClassLoader(),
new Class<?>[] {HelloService.class},
handler);
proxy.hello("World");
}

服务端

在服务端同样也要有接口类HelloService,与客户端的相同。另外需要有HelloService的实现类HelloServiceImpl,提供服务器端需要暴露的接口,客户端的远程调用也就是调用该类对应的方法。

1
2
3
public String hello(String name) {
return "你好" + name;
}

在程序中需要暴露相应的服务端口,这边将HelloServiceImpl类和端口号进行初始化。

1
2
3
4
public static void main(String[] args) throws IOException {
HelloService helloService = new HelloServiceImpl();
ExportService.exportHelloService(helloService, 1234);
}

其中ExportService.exportHelloService(helloService, 1234);方法的实现是这样子的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void exportHelloService(final Object service, int port) throws IOException {
// 建立Socket服务端请求
System.out.println("建立Socket请求, port = " + port);
ServerSocket serverSocket = new ServerSocket(port);
while (true){
//监听是否有来自客户端的socket请求
final Socket socket = serverSocket.accept();
new Thread(new Runnable() {
@Override
public void run() {
try {
/**解析请求*/
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
//String methodName = input.readUTF();
String methodName = (String) input.readObject();
System.out.println("methodName:" + methodName);
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
System.out.println("ParameterTypes:" + Arrays.toString(parameterTypes));
Object[] args = (Object [])input.readObject();
System.out.println(Arrays.toString(args));

/**处理请求,输出结果*/
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
// 反射调用,处理请求
Method method = service.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(service, args);
System.out.println("服务器端处理完并返回响应:" + result);
output.writeObject(result);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}).start();
}
}

客户端的主要逻辑是监听来自客户端的请求。当有请求发起的时候,服务器解析来自客户端的请求并且用反射的方法处理请求。主要方法是下面的两行

1
2
Method method = service.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(service, args);

其中method方法其实就是我们客户端请求的HelloServiceImpl的hello方法,这边是用反射来获得的,注意两个参数methodNameparameterTypes。第一个参数是方法名,第二个参数是参数类型,通过这两个参数就能确定method是类中的哪个方法。最后invoke方法相当于传入相应的参数并调用该方法。

运行截图

服务端监听端口

服务端监听端口

客户端向端口发送请求并获得返回

客户端向端口发送请求并获得返回

服务端接受请求并处理返回

服务端接受请求并处理返回

到此为止,一个简单的RPC框架就实现了,虽然能简单实现但是有很多方面的不足,后面考虑以下的优化。

由于Socket是采用BIO的通信,在发送请求之后需要等待Socket服务端的处理,并返回响应的结果。基于BIO的通信就会发生阻塞,即在accept到一个请求的时候就要进行处理,此时当有其他的请求过来的时候就会被阻塞。服务端和客户端的线程可以看成是1:1的关系,当有一台客户端发送请求,服务端就要创建一个新的线程来处理它,试想,当同时有大量的客户端请求的时候,服务器并不能满足这种高并发接入的场景。还有Java原生的序列化机制占内存太多,运行效率也不高。可以考虑从下面几种方法改进。

  • 可以采用线程池或者消息队列的方式来实现一种伪异步,将客户端发送过来的请求“管理起来”,由于线程池和消息队列是有界的,所以不会产生内存溢出或者线程数量不够用的问题;
  • 可以使用NIO或直接使用Netty替代BIO实现;
  • 使用开源的序列化机制,如Hadoop Avro、Google protobuf、protostuff等,后面可能会用protostuff来实现序列化与反序列化;
  • 服务注册可以使用Zookeeper进行管理,能够让应用更加稳定。