public RpcHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private Object handle(RpcRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
/*Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, parameters);*/
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("server caught exception", cause);
ctx.close();
}
}
为了避免使用 Java 反射带来的性能问题,我们可以使用 CGLib 提供的反射 API,如上面用到的FastClass与FastMethod 。
第七步:配置客户端同样使用 Spring 配置文件来配置 RPC 客户端,spring.xml代码如下:
<beans ...>
<context:property-placeholder location="classpath:config.properties"/>
<!-- 配置服务发现组件 -->
<bean id="serviceDiscovery" class="com.xxx.rpc.registry.ServiceDiscovery">
<constructor-arg name="registryAddress" value=https://www.isolves.com/it/cxkf/kj/2019-10-24/"${registry.address}"/>
</bean>
<!-- 配置 RPC 代理 -->
<bean id="rpcProxy" class="com.xxx.rpc.client.RpcProxy">
<constructor-arg name="serviceDiscovery" ref="serviceDiscovery"/>
</bean>
</beans>
其中config.properties提供了具体的配置:
# ZooKeeper 服务器
registry.address=127.0.0.1:2181
第八步:实现服务发现同样使用 ZooKeeper 实现服务发现功能,见如下代码:
public class ServiceDiscovery {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
private CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> dataList = new ArrayList<>();
private String registryAddress;
public ServiceDiscovery(String registryAddress) {
this.registryAddress = registryAddress;
ZooKeeper zk = connectServer();
if (zk != null) {
watchNode(zk);
}
}
public String discover() {
String data = https://www.isolves.com/it/cxkf/kj/2019-10-24/null;
int size = dataList.size();
if (size > 0) {
if (size == 1) {
data = https://www.isolves.com/it/cxkf/kj/2019-10-24/dataList.get(0);
LOGGER.debug("using only data: {}", data);
} else {
data = https://www.isolves.com/it/cxkf/kj/2019-10-24/dataList.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random data: {}", data);
}
}
return data;
}
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
LOGGER.error("", e);
}
return zk;
}
private void watchNode(final ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
LOGGER.debug("node data: {}", dataList);
this.dataList = dataList;
} catch (KeeperException | InterruptedException e) {
推荐阅读
- 分布式原理:一致性哈希算法简介
- 分布式基础之CAP
- 基于分布式服务器搭建NFS服务共享视频、文档目录
- 华为实验 ENSP模拟VXLAN EVPN分布式网关
- 蚂蚁金服推出分布式的图神经知识表示框架,性能和可扩展性俱佳
- gRPC 1.24.2 发布,谷歌开源的高性能 RPC 框架
- 什么是分布式系统,如何学习分布式系统
- 一文看懂MySQL中基于XA实现的分布式事务
- Java架构-还不了解Flink底层RPC使用的框架和原理?那就认真看完
- 图解各路分布式ID生成算法