首先读取到的配置文件是 ShenyuSpringMvcClientConfiguration,它是shenyu 客户端http注册配置类,通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入其他配置类。创建SpringMvcClientEventListener,主要处理元数据和 URI 信息。
/** * shenyu 客户端http注册配置类 */@Configuration@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")public class ShenyuSpringMvcClientConfiguration { // 创建SpringMvcClientEventListener,主要处理元数据和URI信息 @Bean public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository); }}
@RestController@RequestMapping("/test")@ShenyuSpringMvcClient(path = "/test/**") // 表示整个接口注册public class HttpTestController { //......}
注册当前方法
@RestController@RequestMapping("/order")@ShenyuSpringMvcClient(path = "/order")public class OrderController { /** * Save order dto. * * @param orderDTO the order dto * @return the order dto */ @PostMapping("/save") @ShenyuSpringMvcClient(path = "/save", desc = "Save order") // 注册当前方法 public OrderDTO save(@RequestBody final OrderDTO orderDTO) { orderDTO.setName("hello world save order"); return orderDTO; }}
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> { //...... @Override public void run() { // 获取数据 final T data = getData(); // 根据数据类型调用相应的处理器进行处理 subscribers.get(data.getType()).executor(Lists.newArrayList(data)); }}
public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> { // ... @Override public void run() { //获取从disruptor队列中拿到的数据 Collection<DataTypeParent> results = getData() .stream() .filter(this::isValidData) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(results)) { return; } //根据类型执行操作 selectExecutor(results).executor(results); } // 根据类型获取订阅者 private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) { final Optional<DataTypeParent> first = list.stream().findFirst(); return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType()); }}
@SPIpublic interface LoadBalancer { /** * this is select one for upstream list. * * @param upstreamList upstream list * @param ip ip * @return upstream */ Upstream select(List<Upstream> upstreamList, String ip);}
@Overridepublic Upstream doSelect(final List<Upstream> upstreamList, final String ip) { int length = upstreamList.size(); // every upstream has the same weight? boolean sameWeight = true; // the weight of every upstream int[] weights = new int[length]; int firstUpstreamWeight = getWeight(upstreamList.get(0)); weights[0] = firstUpstreamWeight; // init the totalWeight int totalWeight = firstUpstreamWeight; int halfLengthTotalWeight = 0; for (int i = 1; i < length; i++) { int currentUpstreamWeight = getWeight(upstreamList.get(i)); if (i <= (length + 1) / 2) { halfLengthTotalWeight = totalWeight; } weights[i] = currentUpstreamWeight; totalWeight += currentUpstreamWeight; if (sameWeight && currentUpstreamWeight != firstUpstreamWeight) { // Calculate whether the weight of ownership is the same. sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { return random(totalWeight, halfLengthTotalWeight, weights, upstreamList); } return random(upstreamList);}private Upstream random(final int totalWeight, final int halfLengthTotalWeight, final int[] weights, final List<Upstream> upstreamList) { // If the weights are not the same and the weights are greater than 0, then random by the total number of weights. int offset = RANDOM.nextInt(totalWeight); int index = 0; int end = weights.length; if (offset >= halfLengthTotalWeight) { index = (weights.length + 1) / 2; offset -= halfLengthTotalWeight; } else { end = (weights.length + 1) / 2; } // Determine which segment the random value falls on for (; index < end; index++) { offset -= weights[index]; if (offset < 0) { return upstreamList.get(index); } } return random(upstreamList);}
Round-robin轮询方法的原始定义是顺序循环将请求依次循环地连接到每个服务器。当某个服务器发生故障(例如:一分钟连接不上的服务器),从候选队列中取出,不参与下一次的轮询,直到其恢复正常。在 RoundRobinLoadBalancer中实现的是组内加权轮询(Weight Round Robin per-packet)方法:
public abstract class AbstractMatchStrategy { public String buildRealData(final ConditionData condition, final ServerWebExchange exchange) { return ParameterDataFactory.builderData(condition.getParamType(), condition.getParamName(), exchange); }}
/** * 通过工厂模式创建 Websocket数据处理器 * The type Websocket cache handler. */public class WebsocketDataHandler { private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class); /** * Instantiates a new Websocket data handler. * 每种数据类型,提供一个处理器 * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { // 插件 --> 插件数据处理器 ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber)); // 选择器 --> 选择器数据处理器 ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber)); // 规则 --> 规则数据处理器 ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber)); // 认证信息 --> 认证数据处理器 ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers)); // 元数据 --> 元数据处理器 ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers)); } /** * Executor. * * @param type the type * @param json the json * @param eventType the event type */ public void executor(final ConfigGroupEnum type, final String json, final String eventType) { // 根据数据类型,找到对应的数据处理器 ENUM_MAP.get(type).handle(json, eventType); }}
<!--shenyu data sync start use websocket--><dependency><groupId>org.apache.shenyu</groupId><artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId><version>${project.version}</version></dependency>
/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration { //省略了其他代码...... /** * The type Nacos listener. */ @Configuration @ConditionalOnProperty(prefix = "shenyu.sync.nacos", name = "url") @Import(NacosConfiguration.class) static class NacosListener { /** * Data changed listener data changed listener. * * @param configService the config service * @return the data changed listener */ @Bean @ConditionalOnMissingBean(NacosDataChangedListener.class) public DataChangedListener nacosDataChangedListener(final ConfigService configService) { return new NacosDataChangedListener(configService); } /** * Nacos data init zookeeper data init. * * @param configService the config service * @param syncDataService the sync data service * @return the nacos data init */ @Bean @ConditionalOnMissingBean(NacosDataInit.class) public NacosDataInit nacosDataInit(final ConfigService configService, final SyncDataService syncDataService) { return new NacosDataInit(configService, syncDataService); } } //省略了其他代码......}
public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); start();}
[ERROR] Failed to execute goal com.spotify:dockerfile-maven-plugin:1.4.6:build (tag-latest) on project shenyu-admin-dist: Could not build image: java.util.concurrent.ExecutionException: com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException: java.lang.UnsatisfiedLinkError: could not load FFI provider jnr.ffi.provider.jffi.Provider: ExceptionInInitializerError: Can't overwrite cause with java.lang.UnsatisfiedLinkError: java.lang.UnsatisfiedLinkError: /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: dlopen(/private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib, 1): no suitable image found. Did find:[ERROR] /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper[ERROR] /private/var/folders/w2/j27f16yj7cvf_1cxbgqb89vh0000gn/T/jffi4972193792308935312.dylib: no matching architecture in universal wrapper...
/** * 数据同步配置类 * 通过springboot条件装配实现 * The type Data sync configuration. */@Configurationpublic class DataSyncConfiguration { /** * zookeeper数据同步 * The type Zookeeper listener. */ @Configuration @ConditionalOnProperty(prefix = "shenyu.sync.zookeeper", name = "url") // 条件属性,满足才会被加载 @Import(ZookeeperConfiguration.class) static class ZookeeperListener { /** * Config event listener data changed listener. * 创建Zookeeper数据变更监听器 * @param zkClient the zk client * @return the data changed listener */ @Bean @ConditionalOnMissingBean(ZookeeperDataChangedListener.class) public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) { return new ZookeeperDataChangedListener(zkClient); } /** * Zookeeper data init zookeeper data init. * 创建 Zookeeper 数据初始化类 * @param zkClient the zk client * @param syncDataService the sync data service * @return the zookeeper data init */ @Bean @ConditionalOnMissingBean(ZookeeperDataInit.class) public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) { return new ZookeeperDataInit(zkClient, syncDataService); } } //省略了其他代码......}
<!--shenyu data sync start use http--><dependency><groupId>org.apache.shenyu</groupId><artifactId>shenyu-spring-boot-starter-sync-data-http</artifactId><version>${project.version}</version></dependency>
网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。
这里可能会存在一个疑问:为什么不是直接将变更的数据写出?
我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,
或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean { // ...... // 缓存数据的 Map protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>(); /** * if md5 is not the same as the original, then update lcoal cache. * 更新缓存中的数据 * @param group ConfigGroupEnum * @param <T> the type of class * @param data the new config data */ protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) { //数据序列化 String json = GsonUtils.getInstance().toJson(data); //传入md5值和修改时间 ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis()); //更新分组数据 ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal); LOG.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal); } // ......}
@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface Join { /** * It will be sorted according to the current serial number.. * @return int. */ int order() default 0;}
@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface Join { /** * It will be sorted according to the current serial number.. * @return int. */ int order() default 0; /** * The join class instance should be singleton or not * @return true or false */ boolean isSingleton() default true;}
可选地让SPI的实现类实现一个初始化器接口,在该实现类实例化后回调初始化器接口方法,例如:
public interface ExtensionInitializer { void init(); }/** * demo */@SPIpublic interface JdbcSPI { String getClassName();}@Joinpublic class MysqlSPI implements JdbcSPI, ExtensionInitializer { @Override public void init() { // callback when MysqlSPI instance init } @Override public String getClassName() { return "mysql"; }}