3分钟彻底弄懂Sentinel集群限流探索( 三 )

serverTransportDs = new ApolloDataSource<>(namespace, ruleServerKey,defaultRuleValue, source -> {List groupList = JSON.parseObject(source, new TypeReference>() {});ServerTransportConfig serverTransportConfig = Optional.ofNullable(groupList).flatMap(this::extractServerTransportConfig).orElse(null);return serverTransportConfig;});ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());//ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig().setIdleSeconds(600).setPort(transPort));}private void registerClusterRuleSupplier() {ClusterFlowRuleManager.setPropertySupplier(namespace -> {ReadableDataSource> ds = new ApolloDataSource<>(this.namespace, ruleKey,defaultRuleValue, source -> JSON.parseObject(source, new TypeReference>() {}));return ds.getProperty();});ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {ReadableDataSource> ds = new ApolloDataSource<>(this.namespace, ruleKey,defaultRuleValue, source -> JSON.parseObject(source, new TypeReference>() {}));return ds.getProperty();});}/*** 服务端配置,设置server端口和IP,注释的配置是写死的方式,这个在服务端是不用配置的,只有客户端需要配置用来连接服务端*/private void initClientServerAssignProperty() {ReadableDataSource clientAssignDs = new ApolloDataSource<>(namespace, ruleServerKey,defaultRuleValue, source -> {List groupList = JSON.parseObject(source, new TypeReference>() {});ClusterClientAssignConfig clusterClientAssignConfig = Optional.ofNullable(groupList).flatMap(this::extractClientAssignment).orElse(null);return clusterClientAssignConfig;});ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());//ClusterClientAssignConfig serverConfig = new ClusterClientAssignConfig();//serverConfig.setServerHost("127.0.0.1");//serverConfig.setServerPort(transPort);//ConfigSupplierRegistry.setNamespaceSupplier(() -> "trade-center");//ClusterClientConfigManager.applyNewAssignConfig(serverConfig);}private Optional extractClientAssignment(List groupList) {ClusterGroupEntity tokenServer = groupList.stream().filter(x -> x.getState().equals(ClusterStateManager.CLUSTER_SERVER)).findFirst().get();Integer currentmachineState = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);if (currentMachineState.equals(ClusterStateManager.CLUSTER_CLIENT)) {String ip = tokenServer.getIp();Integer port = tokenServer.getPort();return Optional.of(new ClusterClientAssignConfig(ip, port));}return Optional.empty();}/*** 初始化客户端和服务端状态,注释的也是写死的配置方式*/private void initStateProperty() {ReadableDataSource clusterModeDs = new ApolloDataSource<>(namespace, ruleServerKey,defaultRuleValue, source -> {List groupList = JSON.parseObject(source, new TypeReference>() {});Integer state = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);return state;});ClusterStateManager.registerProperty(clusterModeDs.getProperty());//ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER);}private Optional extractServerTransportConfig(List groupList) {return groupList.stream().filter(x -> x.getMachineId().equalsIgnoreCase(getCurrentMachineId()) && x.getState().equals(ClusterStateManager.CLUSTER_SERVER)).findAny().map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));}private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {return getCurrentMachineId().equals(group.getMachineId());}private String getCurrentMachineId() {// 通过-Dcsp.sentinel.api.port=8719 配置,默认8719,随后递增return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();}private static final String SEPARATOR = "@";}基础类,定义配置的基础信息 。
@Datapublic class ClusterGroupEntity {private String machineId;private String ip;private Integer port;private Integer state;}然后是Apollo中的限流规则的配置和server/client集群关系的配置 。
需要说明一下的就是flowId,这个是区分限流规则的全局唯一ID,必须要有,否则集群限流会有问题 。
thresholdType代表限流模式,默认是0,代表单机均摊,比如这里count限流QPS=20,有3台机器,那么集群限流阈值就是60,如果是1代表全局阈值,也就是count配置的值就是集群限流的上限 。
demo_sentinel=[{"resource": "test_res", //限流资源名"count": 20, //集群限流QPS"clusterMode": true, //true为集群限流模式"clusterConfig": {"flowId": 111, //这个必须得有,否则会有问题"thresholdType": 1 //限流模式,默认为0单机均摊,1是全局阈值}}]demo_cluster=[{"ip": "192.168.3.20","machineId": "192.168.3.20@8720","port": 9999, //server和client通信接口"state": 1 //指定为server},{"ip": "192.168.3.20","machineId": "192.168.3.20@8721","state": 0},{"ip": "192.168.3.20","machineId": "192.168.3.20@8722","state": 0}]


推荐阅读