天天看点

soul网关源码解析-数据同步-zookeeper同步配置源码

zookeeper同步

  • 配置
  • 源码

配置

更新soul-admin和soul-bootstrap的yml文件配置如下

soul :
  sync:
      zookeeper:
          url: localhost:2181
          sessionTimeout: 5000
          connectionTimeout: 2000
           
soul :
    sync:
        zookeeper:
           url: localhost:2181
           sessionTimeout: 5000
           connectionTimeout: 2000
           

pom文件依赖(加入依赖后网关需要重启)

<dependency>
          <groupId>org.dromara</groupId>
           <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
           <version>${project.version}</version>
     </dependency>
           

如果你在启动网关的时候看到 zookeeper.request.timeout 相关的日志说明你配置成功了!

源码

  1. 首先我们找到同步数据的类: ZookeeperSyncDataService

    我们可以看到最主要的方法(在启动的时候加载相关数据)

public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
        this.zkClient = zkClient;
        this.pluginDataSubscriber = pluginDataSubscriber;
        this.metaDataSubscribers = metaDataSubscribers;
        this.authDataSubscribers = authDataSubscribers;
        watcherData();
        watchAppAuth();
        watchMetaData();
    }
           
  1. 在看soul-plugin-base类 CommonPluginDataSubscriber 中有很多方法用于不同数据的同步这里我们简单的看下
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            //插件数据同步
            if (data instanceof PluginData) {
                PluginData pluginData = (PluginData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    //把数据添加到BaseDataCache的插件缓存中
                    BaseDataCache.getInstance().cachePluginData(pluginData);
                    //获取插件信息并对于数据进行更新(这里主要根据不同的插件PluginDataHandler实现的方法)   Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removePluginData(pluginData);
                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
                }
                //选择器数据同步
            } else if (data instanceof SelectorData) {
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    BaseDataCache.getInstance().cacheSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removeSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
                }
                //规则数据同步
            } else if (data instanceof RuleData) {
                RuleData ruleData = (RuleData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    BaseDataCache.getInstance().cacheRuleData(ruleData);
                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removeRuleData(ruleData);
                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
                }
            }
        });
    }
           

3.PluginDataHandler

我们再来看看实现这个类方的其中一个DividePluginDataHandler

public class DividePluginDataHandler implements PluginDataHandler {
   
   @Override
   public void handlerSelector(final SelectorData selectorData) {
       UpstreamCacheManager.getInstance().submit(selectorData);
   }
   
   @Override
   public void removeSelector(final SelectorData selectorData) {
       UpstreamCacheManager.getInstance().removeByKey(selectorData.getId());
   }
   
   @Override
   public String pluginNamed() {
       return PluginEnum.DIVIDE.getName();
   }
}
           

这里我们可以看UpstreamCacheManager类

我们可以看到调用方法submit

public void submit(final SelectorData selectorData) {
       final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);
       if (null != upstreamList && upstreamList.size() > 0) {
           UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
           UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
       } else {
           UPSTREAM_MAP.remove(selectorData.getId());
           UPSTREAM_MAP_TEMP.remove(selectorData.getId());
       }
   }
           

到这里整个方法的流程就执行完了.

继续阅读