天天看點

Spring Cloud Gateway源碼解析-11-擴充RouteDefinitionRepository實作基于Redis的動态路由由來思路實作使用

由來

在Spring Cloud Gateway源碼解析-10-自定義Predicate實作黑名單中我們自定義了Predicate來實作黑名單,但發現每次更改黑名單規則都要重新開機項目來實作,是以需要将路由資訊存儲在外部資料源中,定時重新整理SCG記憶體中的路由資訊。

思路

Spring Cloud Gateway源碼解析-11-擴充RouteDefinitionRepository實作基于Redis的動态路由由來思路實作使用

在Spring Cloud Gateway源碼解析-03-RouteDefinitionLocator、RouteLocator解析中我們已經介紹過

RouteDefinitionRepository

,該接口在SCG中隻有一個實作

InMemoryRouteDefinitionRepository

,并且該接口繼承了

RouteDefinitionWriter

RouteDefinitionWriter

中定義了

save、delete

方法,通過方法名稱可以知道是用來儲存/添加/删除路由資訊。

  1. 是以我們可以實作

    RouteDefinitionRepository

    用來儲存從Redis中擷取到的RouteDefinition

    RedisRouteDefinitionRepository

    ,由于

    RouteDefinitionRepository

    繼承了

    RouteDefinitionLocator

    ,是以會被

    CompositeRouteDefinitionLocator

    組合進去,進而被

    CachingRouteLocator

    拿到對應的Redis中的RouteDefinition裝換成Route。
  2. 有了地方存儲Redis中的定義的RouteDefinition,那是不是要有一個角色用來擷取Redis中的資料,并組裝成RouteDefinition儲存到

    RedisRouteDefinitionRepository

    中,是以需要定義

    RedisRouteDefinitionRepositoryOperator

    用來從Redis中擷取到資料庫後生成RouteDefinition。可能我們的路由資訊以後會放到MySQL、MongoDB等,是以可以抽象出一個從Repository中擷取資料轉換為RouteDefinition的接口

    RouteDefinitionRepositoryOperator

  3. 基于上邊這些,我們就實作了當SCG啟動時從Redis中擷取資料轉換為RouteDefinition,并儲存到

    RedisRouteDefinitionRepository

    中,但是想要實作當修改了Redis中的路由資訊後同步SCG更新,還不夠,需要有一個類似Nacos的心跳機制,定時通知SCG去重新擷取一次Redis中的資料。是以可以模仿Nacos的心跳機制實作

    RedisRouteDefinitionWatch

    發送心跳事件,觸發

    CachingRouteLocator

    重新擷取RouteDefinition來重新生成Route。

實作

RouteDefinitionRepositoryOperator

/**
 * 定義從不同資料源擷取RouteDefinition的抽象
 * @author li.hongjian
 * @email [email protected]
 * @Date 2021/4/1
 */
public interface RouteDefinitionRepositoryOperator {

	Flux<RouteDefinition> getRouteDefinitions();

}
           

RedisRouteDefinitionRepositoryOperator

/**
 * Description:用來擷取Redis中的RouteDefinition 并儲存到{@link RedisRouteDefinitionRepository}
 *
 * @author li.hongjian
 * @email [email protected]
 * @Date 2021/4/1
 */
public class RedisRouteDefinitionRepositoryOperator implements RouteDefinitionRepositoryOperator {

	private final String REDIS_ROUTE_ID_PREFIX = "route-*";

	private StringRedisTemplate redisTemplate;

	public RedisRouteDefinitionRepositoryOperator(StringRedisTemplate redisTemplate) {
		this.redisTemplate = redisTemplate;
	}


	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		//擷取指定字首的RedisKey。Redis的資料結構使用Hash,value的結構為predicates和filters,
		//predicates資料結構JsonArray,可配置多個 
		//  由于PredicateDefinition的構造方法支援傳入類似Path=/api/hello這種格式的參數,并會自動封裝為name和args,是以我們取巧可以在Redis中存儲如下結構
		// 		如:["Path=/api/hello","BlackRemoteAddr=172.17.30.1/18,172.17.31.1/18"],表示PathRoutePredicateFactory和BlackRemoteAddrRoutePredicateFactory
		//filters與predicates一樣
		return Flux.fromStream(redisTemplate.keys(REDIS_ROUTE_ID_PREFIX).parallelStream().map(routeId -> {
			RouteDefinition routeDefinition = new RouteDefinition();
			//以RedisKey作為RouteID
			routeDefinition.setId(routeId);
			Map<Object, Object> entries = redisTemplate.opsForHash().entries(routeId);
			String uri = (String) entries.get("uri");
			try {
				routeDefinition.setUri(new URI(uri));
			} catch (URISyntaxException e) {
				e.printStackTrace();
			}
			//初始化PredicateDefinition,并添加到RouteDefinition中
			initPredicate(routeDefinition, entries);

			//初始化FilterDefinition,并添加到RouteDefinition中
			initFilter(routeDefinition, entries);
			return routeDefinition;
		}));
	}

	private void initPredicate(RouteDefinition routeDefinition, Map<Object, Object> entries) {
		Object predicates = entries.get("predicates");
		if (predicates == null) {
			return;
		}
		JSONArray predicateArry = JSONArray.parseArray((String) predicates);
		predicateArry.parallelStream().forEach(predicate -> {
			//周遊predicates,建立RouteDefinition,并添加到RouteDefinition中
			PredicateDefinition predicateDefinition = new PredicateDefinition((String) predicate);
			routeDefinition.getPredicates().add(predicateDefinition);
		});
	}

	private void initFilter(RouteDefinition routeDefinition, Map<Object, Object> entries) {
		Object filters = entries.get("filters");
		if (filters == null) {
			return;
		}
		JSONArray predicateArry = JSONArray.parseArray((String) filters);
		predicateArry.parallelStream().forEach(filter -> {
			//周遊predicates,建立RouteDefinition,并添加到RouteDefinition中
			FilterDefinition filterDefinition = new FilterDefinition((String) filter);
			routeDefinition.getFilters().add(filterDefinition);
		});
	}
}
           

RedisRouteDefinitionRepository

/**
 * Description:基于Redis作為RouteDefinition Repository
 *
 * @author li.hongjian
 * @email [email protected]
 * @Date 2021/4/1
 */
public class RedisRouteDefinitionRepository implements RouteDefinitionRepository{

	private final Map<String, RouteDefinition> routes = synchronizedMap(
			new LinkedHashMap<String, RouteDefinition>());

	private RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator;

	/**
	 * 将RedisRouteDefinitionRepositoryOperator組裝進來
	 * @param redidRouteDefinitionOperator
	 */
	public RedisRouteDefinitionRepository(RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator) {
		this.redidRouteDefinitionOperator = redidRouteDefinitionOperator;
	}

	/**
	 * 在{@link CompositeRouteDefinitionLocator#getRouteDefinitions()}調用時 調用redidRouteDefinitionOperator去Redis中取資料
	 * @return
	 */
	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		redidRouteDefinitionOperator.getRouteDefinitions().flatMap(r -> save(Mono.just(r))).subscribe();
		return Flux.fromIterable(routes.values());
	}

	@Override
	public Mono<Void> save(Mono<RouteDefinition> route) {
		return route.flatMap(r -> {
			if (StringUtils.isEmpty(r.getId())) {
				return Mono.error(new IllegalArgumentException("id may not be empty"));
			}
			routes.put(r.getId(), r);
			return Mono.empty();
		});
	}

	@Override
	public Mono<Void> delete(Mono<String> routeId) {
		return routeId.flatMap(id -> {
			if (routes.containsKey(id)) {
				routes.remove(id);
				return Mono.empty();
			}
			return Mono.defer(() -> Mono.error(
					new NotFoundException("RouteDefinition not found: " + routeId)));
		});
	}
}
           

RedisRouteDefinitionWatch

/**
 * @author li.hongjian
 * @email [email protected]
 * @Date 2021/4/1
 */
public class RedisRouteDefinitionWatch implements ApplicationEventPublisherAware, SmartLifecycle {

	private final TaskScheduler taskScheduler = getTaskScheduler();

	private final AtomicLong redisWatchIndex = new AtomicLong(0);

	private final AtomicBoolean running = new AtomicBoolean(false);

	private ApplicationEventPublisher publisher;

	private ScheduledFuture<?> watchFuture;

	private static ThreadPoolTaskScheduler getTaskScheduler() {
		ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
		taskScheduler.setBeanName("Redis-Watch-Task-Scheduler");
		taskScheduler.initialize();
		return taskScheduler;
	}


	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
		this.publisher = publisher;
	}

	@Override
	public void start() {
		if (this.running.compareAndSet(false, true)) {
			this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
					this::redisServicesWatch, 30000); //啟動一個定時,30s執行一次
		}
	}

	/**
	 * 這裡最好是自定義一個事件,因為如果使用了Nacos的話,會沖突,這樣的話需要修改SCG的源碼,監聽自定義的事件
	 * 我們就不這麼做了,感興趣的可以自行實作
	 */
	private void redisServicesWatch() {
		// nacos doesn't support watch now , publish an event every 30 seconds.
		this.publisher.publishEvent( //30s釋出一次事件,通知SCG重新拉取
				new HeartbeatEvent(this, redisWatchIndex.getAndIncrement()));
	}

	@Override
	public void stop() {
		if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
			this.watchFuture.cancel(true);
		}
	}

	@Override
	public boolean isRunning() {
		return false;
	}
}
           

這樣就大功告成了,實作了基于Redis配置路由資訊并且可動态重新整理的功能。

使用

1、Redis中資料:

Spring Cloud Gateway源碼解析-11-擴充RouteDefinitionRepository實作基于Redis的動态路由由來思路實作使用

2、将RedisRouteDefinitionWatch、RedisRouteDefinitionRepository、RedisRouteDefinitionRepositoryOperator放到Spring容器中,比如@Bean注入

通過以上兩步,即可完成。代碼寫的比較簡陋。

大家可自行驗證下,親測有效。代碼倉庫位址

繼續閱讀