由來
在Spring Cloud Gateway源碼解析-10-自定義Predicate實作黑名單中我們自定義了Predicate來實作黑名單,但發現每次更改黑名單規則都要重新開機項目來實作,是以需要将路由資訊存儲在外部資料源中,定時重新整理SCG記憶體中的路由資訊。
思路

在Spring Cloud Gateway源碼解析-03-RouteDefinitionLocator、RouteLocator解析中我們已經介紹過
RouteDefinitionRepository
,該接口在SCG中隻有一個實作
InMemoryRouteDefinitionRepository
,并且該接口繼承了
RouteDefinitionWriter
,
RouteDefinitionWriter
中定義了
save、delete
方法,通過方法名稱可以知道是用來儲存/添加/删除路由資訊。
- 是以我們可以實作
用來儲存從Redis中擷取到的RouteDefinitionRouteDefinitionRepository
,由于RedisRouteDefinitionRepository
繼承了RouteDefinitionRepository
,是以會被RouteDefinitionLocator
組合進去,進而被CompositeRouteDefinitionLocator
拿到對應的Redis中的RouteDefinition裝換成Route。CachingRouteLocator
- 有了地方存儲Redis中的定義的RouteDefinition,那是不是要有一個角色用來擷取Redis中的資料,并組裝成RouteDefinition儲存到
中,是以需要定義RedisRouteDefinitionRepository
用來從Redis中擷取到資料庫後生成RouteDefinition。可能我們的路由資訊以後會放到MySQL、MongoDB等,是以可以抽象出一個從Repository中擷取資料轉換為RouteDefinition的接口RedisRouteDefinitionRepositoryOperator
。RouteDefinitionRepositoryOperator
- 基于上邊這些,我們就實作了當SCG啟動時從Redis中擷取資料轉換為RouteDefinition,并儲存到
中,但是想要實作當修改了Redis中的路由資訊後同步SCG更新,還不夠,需要有一個類似Nacos的心跳機制,定時通知SCG去重新擷取一次Redis中的資料。是以可以模仿Nacos的心跳機制實作RedisRouteDefinitionRepository
發送心跳事件,觸發RedisRouteDefinitionWatch
重新擷取RouteDefinition來重新生成Route。CachingRouteLocator
實作
RouteDefinitionRepositoryOperator
/**
* 定義從不同資料源擷取RouteDefinition的抽象
* @author li.hongjian
* @email [email protected]
* @Date 2021/4/1
*/
public interface RouteDefinitionRepositoryOperator {
Flux<RouteDefinition> getRouteDefinitions();
}
RedisRouteDefinitionRepositoryOperator
/**
* Description:用來擷取Redis中的RouteDefinition 并儲存到{@link RedisRouteDefinitionRepository}
*
* @author li.hongjian
* @email [email protected]
* @Date 2021/4/1
*/
public class RedisRouteDefinitionRepositoryOperator implements RouteDefinitionRepositoryOperator {
private final String REDIS_ROUTE_ID_PREFIX = "route-*";
private StringRedisTemplate redisTemplate;
public RedisRouteDefinitionRepositoryOperator(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
//擷取指定字首的RedisKey。Redis的資料結構使用Hash,value的結構為predicates和filters,
//predicates資料結構JsonArray,可配置多個
// 由于PredicateDefinition的構造方法支援傳入類似Path=/api/hello這種格式的參數,并會自動封裝為name和args,是以我們取巧可以在Redis中存儲如下結構
// 如:["Path=/api/hello","BlackRemoteAddr=172.17.30.1/18,172.17.31.1/18"],表示PathRoutePredicateFactory和BlackRemoteAddrRoutePredicateFactory
//filters與predicates一樣
return Flux.fromStream(redisTemplate.keys(REDIS_ROUTE_ID_PREFIX).parallelStream().map(routeId -> {
RouteDefinition routeDefinition = new RouteDefinition();
//以RedisKey作為RouteID
routeDefinition.setId(routeId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(routeId);
String uri = (String) entries.get("uri");
try {
routeDefinition.setUri(new URI(uri));
} catch (URISyntaxException e) {
e.printStackTrace();
}
//初始化PredicateDefinition,并添加到RouteDefinition中
initPredicate(routeDefinition, entries);
//初始化FilterDefinition,并添加到RouteDefinition中
initFilter(routeDefinition, entries);
return routeDefinition;
}));
}
private void initPredicate(RouteDefinition routeDefinition, Map<Object, Object> entries) {
Object predicates = entries.get("predicates");
if (predicates == null) {
return;
}
JSONArray predicateArry = JSONArray.parseArray((String) predicates);
predicateArry.parallelStream().forEach(predicate -> {
//周遊predicates,建立RouteDefinition,并添加到RouteDefinition中
PredicateDefinition predicateDefinition = new PredicateDefinition((String) predicate);
routeDefinition.getPredicates().add(predicateDefinition);
});
}
private void initFilter(RouteDefinition routeDefinition, Map<Object, Object> entries) {
Object filters = entries.get("filters");
if (filters == null) {
return;
}
JSONArray predicateArry = JSONArray.parseArray((String) filters);
predicateArry.parallelStream().forEach(filter -> {
//周遊predicates,建立RouteDefinition,并添加到RouteDefinition中
FilterDefinition filterDefinition = new FilterDefinition((String) filter);
routeDefinition.getFilters().add(filterDefinition);
});
}
}
RedisRouteDefinitionRepository
/**
* Description:基于Redis作為RouteDefinition Repository
*
* @author li.hongjian
* @email [email protected]
* @Date 2021/4/1
*/
public class RedisRouteDefinitionRepository implements RouteDefinitionRepository{
private final Map<String, RouteDefinition> routes = synchronizedMap(
new LinkedHashMap<String, RouteDefinition>());
private RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator;
/**
* 将RedisRouteDefinitionRepositoryOperator組裝進來
* @param redidRouteDefinitionOperator
*/
public RedisRouteDefinitionRepository(RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator) {
this.redidRouteDefinitionOperator = redidRouteDefinitionOperator;
}
/**
* 在{@link CompositeRouteDefinitionLocator#getRouteDefinitions()}調用時 調用redidRouteDefinitionOperator去Redis中取資料
* @return
*/
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
redidRouteDefinitionOperator.getRouteDefinitions().flatMap(r -> save(Mono.just(r))).subscribe();
return Flux.fromIterable(routes.values());
}
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap(r -> {
if (StringUtils.isEmpty(r.getId())) {
return Mono.error(new IllegalArgumentException("id may not be empty"));
}
routes.put(r.getId(), r);
return Mono.empty();
});
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (routes.containsKey(id)) {
routes.remove(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(
new NotFoundException("RouteDefinition not found: " + routeId)));
});
}
}
RedisRouteDefinitionWatch
/**
* @author li.hongjian
* @email [email protected]
* @Date 2021/4/1
*/
public class RedisRouteDefinitionWatch implements ApplicationEventPublisherAware, SmartLifecycle {
private final TaskScheduler taskScheduler = getTaskScheduler();
private final AtomicLong redisWatchIndex = new AtomicLong(0);
private final AtomicBoolean running = new AtomicBoolean(false);
private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture;
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Redis-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::redisServicesWatch, 30000); //啟動一個定時,30s執行一次
}
}
/**
* 這裡最好是自定義一個事件,因為如果使用了Nacos的話,會沖突,這樣的話需要修改SCG的源碼,監聽自定義的事件
* 我們就不這麼做了,感興趣的可以自行實作
*/
private void redisServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent( //30s釋出一次事件,通知SCG重新拉取
new HeartbeatEvent(this, redisWatchIndex.getAndIncrement()));
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
this.watchFuture.cancel(true);
}
}
@Override
public boolean isRunning() {
return false;
}
}
這樣就大功告成了,實作了基于Redis配置路由資訊并且可動态重新整理的功能。
使用
1、Redis中資料:
2、将RedisRouteDefinitionWatch、RedisRouteDefinitionRepository、RedisRouteDefinitionRepositoryOperator放到Spring容器中,比如@Bean注入
通過以上兩步,即可完成。代碼寫的比較簡陋。
大家可自行驗證下,親測有效。代碼倉庫位址