轻量级分布式 RPC 框架( 三 )


String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
LOGGER.debug("server started on port {}", port);
if (serviceRegistry != null) {
serviceRegistry.register(serverAddress); // 注册服务地址
}
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
以上代码中,有两个重要的 POJO 需要描述一下,它们分别是RpcRequest与RpcResponse 。
使用RpcRequest封装 RPC 请求,代码如下:
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
// getter/setter...
}
使用RpcResponse封装 RPC 响应,代码如下:
public class RpcResponse {
private String requestId;
private Throwable error;
private Object result;
// getter/setter...
}
使用RpcDecoder提供 RPC 解码,只需扩展 Netty 的ByteToMessageDecoder抽象类的decode方法即可,代码如下:
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RpcDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = https://www.isolves.com/it/cxkf/kj/2019-10-24/new byte[dataLength];
in.readBytes(data);
Object obj = SerializationUtil.deserialize(data, genericClass);
out.add(obj);
}
}
使用RpcEncoder提供 RPC 编码,只需扩展 Netty 的MessageToByteEncoder抽象类的encode方法即可,代码如下:
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public RpcEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = https://www.isolves.com/it/cxkf/kj/2019-10-24/SerializationUtil.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
编写一个SerializationUtil工具类,使用Protostuff实现序列化:
public class SerializationUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
private static Objenesis objenesis = new ObjenesisStd(true);
private SerializationUtil() {
}
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
if (schema != null) {
cachedSchema.put(cls, schema);
}
}
return schema;
}
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj) {
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
public static <T> T deserialize(byte[] data, Class<T> cls) {
try {
T message = (T) objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
以上了使用 Objenesis 来实例化对象,它是比 Java 反射更加强大 。
注意:如需要替换其它序列化框架,只需修改SerializationUtil即可 。当然,更好的实现方式是提供配置项来决定使用哪种序列化方式 。
使用RpcHandler中处理 RPC 请求,只需扩展 Netty 的SimpleChannelInboundHandler抽象类即可,代码如下:
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler.class);
private final Map<String, Object> handlerMap;


推荐阅读