天天看点

Soul网关源码探秘《十二》 - 数据同步(后台接收篇)

前文探索了网关中服务自动检查探活的机制。同时也了解到当在后台调整选择器时,后台会通过 websocket 的方式将数据同步到网关。今天就接着探究后台是如何接收后台同步过来的配置的。

准备工作

启动后台(soul-admin)和网关(soul-bootstrap)两个项目。

源码探秘

网关接收配置

同前文一样,在类

UpstreamCacheManager

sumbit

方法打上断点,然后在后台项目中编辑一条选择器规则。成功后,立即进入了断点位置。

Soul网关源码探秘《十二》 - 数据同步(后台接收篇)

从调用栈可以看到,同步消息时到网关时,先到了

SoulWebsocketClient

onMessage

方法。在这个方法上打上断点,然后重新在后台修改选择器规则后。进入这个方法的逻辑。

public void onMessage(final String result) {
        handleResult(result);
    }
    
	private void handleResult(final String result) {
        // 将接收到的字符串配置信息转换为 WebsocketData 对象
        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
        // 获取数据类型,鉴权数据、插件数据、规则数据、选择器数据、元数据
        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
        // 获取事件类型
        String eventType = websocketData.getEventType();
        // 将选择器的配置信息转换为 JSON
        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // 交给 WebsocketDataHandler 去处理数据
        websocketDataHandler.executor(groupEnum, json, eventType);
    }
           

可以看到,

onMessage

这个方法是网关接收到选择器修改数据的入口。接收参数为String类型的选择器配置数据。初步处理数据后交给

WebsocketDataHandler

去处理。

// 缓存了各种类型数据各自的 DataHandler
	private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);

	public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
 	  // 通过数据类型找到对应的 DataHandler,然后转交给 SelectorDataHandler 进行后续处理    
      ENUM_MAP.get(type).handle(json, eventType);
    }
           

WebsocketDataHandler

干的事情是识别了需要处理该数据的对应类型

SelectorDataHandler

然后交给它做后续处理。

SelectorDataHandler

中没有实现

handle

这个方法,实际调用的是抽象类

AbstractDataHandler

中的实现。

public void handle(final String json, final String eventType) {
        // 将 JSON 转换为对应的数据类型列表
        List<T> dataList = convert(json);
        if (CollectionUtils.isNotEmpty(dataList)) {
            // 获取事件类型
            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
            switch (eventTypeEnum) {
                case REFRESH:
                case MYSELF:
                    doRefresh(dataList);
                    break;
                case UPDATE:
                case CREATE:
 				  // 调用实现类里的逻辑                 
                  doUpdate(dataList);
                    break;
                case DELETE:
                    doDelete(dataList);
                    break;
                default:
                    break;
            }
        }
    }
           

在抽象类

AbstractDataHandler

中将JSON数据转换为对应类型的列表,然后根据事件类型调用实现类中不同的逻辑。REFRESH、MYSELF 调用的是

doRefresh

刷新的操作;UPDATE、CREATE 调用的是

doUpdate

更新的操作;还有 DELETE 调用的是

doDelete

删除的操作。

protected void doUpdate(final List<SelectorData> dataList) {
 // 数据列表循环交给 CommonPluginDataSubscriber 通用的插件数据订阅者进行处理   
    dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);
    }
           

CommonPluginDataSubscriber

中查看具体处理逻辑。

public void onSelectorSubscribe(final SelectorData selectorData) {
        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}
    
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
   // ...
   } else if (data instanceof SelectorData) {
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
					// BaseDataCache 里将选择器信息缓存起来
					BaseDataCache.getInstance().cacheSelectData(selectorData);
					// 转交给实现类 DividePluginDataHandler 进行后续处理             
             		Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removeSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
                }
  // ...
}
           

CommonPluginDataSubscriber

中的

subscribeDataHandler

方法中对插件、选择器、规则的

UPDATE、DELETE

进行了处理,在

BaseDataCache

中缓存了相应信息,并把最后的处理逻辑转发给各自实现类。

public void handlerSelector(final SelectorData selectorData) {
// 终于来到了文章开头说的 submit 方法,在这里把选择器配置信息交给 UpstreamCacheManager 进行处理
UpstreamCacheManager.getInstance().submit(selectorData);
    }
           

总结

网关在接收到后台发来的同步配置信息后,依次在

SoulWebsocketClient

WebsocketDataHandler

中对数据进行处理后转向后面的处理逻辑。这一块应该是对应使用 websocket 同步才会使用到的模块。

而不管使用什么方式去同步配置,后续的处理流程都是通用的。

AbstractDataHandler -> SelectorDataHandler -> CommonPluginDataSubscriber -> DividePluginDataHandler
           

至此,网关接收到后台传来的同步数据之后是如何处理的就明朗了。那么在后台修改完数据后,是经过了哪些流程传到网关这一侧的呢?后续再进行探索。