spring cloud bus介紹與源碼分析
- 簡介
- 運用場景
- 源碼分析
- 總結
簡介
根據官方文檔,spring cloud bus為分布式的輕量級消息代理服務,可以用來狀态改變的時候進行廣播,比如配置值發生變換,目前支援AMQP協定的代理服務。也就是說目前spring cloud bus隻支援rabbitmq 以及kafkamq,kafka普遍用于大資料傳輸方面。
運用場景
目前所推薦的spring cloud bus的運用場景大多都是與spring cloud config 使用實作子服務動态重新整理擷取server端的配置檔案變動。實作簡單隻需要簡單配置,不需要代碼實作。
和spring cloud config 組合的動态重新整理擷取最新配置的流程圖,讀者若想擷取關于spring cloud config 的運用和原理可以參考筆者的這篇文章,spring cloud config的運用與原理
源碼分析
關于spring cloud bus的源碼挺簡潔的,隻用到了mq的部分特性,其核心類是BusAutoConfiguration類,此類完成了bus的自動配置。
@Configuration
@ConditionalOnBusEnabled
@EnableBinding(SpringCloudBusClient.class)
@EnableConfigurationProperties(BusProperties.class)
@AutoConfigureBefore(BindingServiceConfiguration.class)
// so stream bindings work properly
@AutoConfigureAfter(LifecycleMvcEndpointAutoConfiguration.class)
// so actuator endpoints have needed dependencies
public class BusAutoConfiguration implements ApplicationEventPublisherAware {
可以看出,引入了SpringCloudBusClient,BusProperties,BindingServiceConfiguration,LifecycleMvcEndpointAutoConfiguration這幾個類。接下來看這幾個類的作用
public interface SpringCloudBusClient {
/**
* Name of the input channel for Spring Cloud Bus.
*/
String INPUT = "springCloudBusInput";
/**
* Name of the output channel for Spring Cloud Bus.
*/
String OUTPUT = "springCloudBusOutput";
@Output(SpringCloudBusClient.OUTPUT)
MessageChannel springCloudBusOutput();
@Input(SpringCloudBusClient.INPUT)
SubscribableChannel springCloudBusInput();
SpringCloudBusClient這個類顯而易見的得知是建立了兩個channel在mq上。
關于BusProperties,從類名就得知是bus的屬性類。
BindingServiceConfiguration在spring boot 加載bus自動配置前的做的操作
private static Map<String, BinderConfiguration> getBinderConfigurations(BinderTypeRegistry binderTypeRegistry, BindingServiceProperties bindingServiceProperties) {
Map<String, BinderConfiguration> binderConfigurations = new HashMap();
Map<String, BinderProperties> declaredBinders = bindingServiceProperties.getBinders();
boolean defaultCandidatesExist = false;
for(Iterator binderPropertiesIterator = declaredBinders.entrySet().iterator(); !defaultCandidatesExist && binderPropertiesIterator.hasNext(); defaultCandidatesExist = ((BinderProperties)((Entry)binderPropertiesIterator.next()).getValue()).isDefaultCandidate()) {
}
List<String> existingBinderConfigurations = new ArrayList();
Iterator var7 = declaredBinders.entrySet().iterator();
Entry binderEntry;
while(var7.hasNext()) {
binderEntry = (Entry)var7.next();
BinderProperties binderProperties = (BinderProperties)binderEntry.getValue();
if (binderTypeRegistry.get((String)binderEntry.getKey()) != null) {
binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
} else {
Assert.hasText(binderProperties.getType(), "No 'type' property present for custom binder " + (String)binderEntry.getKey());
binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration(binderProperties.getType(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
}
var7 = binderConfigurations.entrySet().iterator();
while(var7.hasNext()) {
binderEntry = (Entry)var7.next();
if (((BinderConfiguration)binderEntry.getValue()).isDefaultCandidate()) {
defaultCandidatesExist = true;
}
}
if (!defaultCandidatesExist) {
var7 = binderTypeRegistry.getAll().entrySet().iterator();
while(var7.hasNext()) {
binderEntry = (Entry)var7.next();
if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), new HashMap(), true, true));
}
}
}
return binderConfigurations;
}
這個方法擷取所有binder的實體資訊存儲在一個map。LifecycleMvcEndpointAutoConfiguration配置完成的後web方面配置。是以spring cloud bus
啟動時執行的流程是,建立channel->加載對應的配置->掃描所有的serverice binder的配置資訊儲存到map->對web的配置。
關于post請求的觸發,spring cloud bus 啟動了許多的listener對各個post的請求的監聽。
比如發送一個bus/refresh 的post請求
@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {
public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
super(context, id);
}
@WriteOperation
public void busRefreshWithDestination(@Selector String destination) { // TODO:
// document
// destination
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
}
@WriteOperation
public void busRefresh() {
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
}
}
會觸發一個RefreshRemoteApplicationEvent事件,根據執行個體的id,以及輸入的目标範圍去重新整理對應的service。
總結
spring cloud bus目前主要用于和spring cloud config的組合使用,并未完全用到mq的所有特性,在spring cloud stream中對于mq的使用以及擴充更加完善。