天天看點

spring cloud bus介紹與源碼分析簡介運用場景源碼分析總結

spring cloud bus介紹與源碼分析

  • 簡介
  • 運用場景
  • 源碼分析
  • 總結

簡介

根據官方文檔,spring cloud bus為分布式的輕量級消息代理服務,可以用來狀态改變的時候進行廣播,比如配置值發生變換,目前支援AMQP協定的代理服務。也就是說目前spring cloud bus隻支援rabbitmq 以及kafkamq,kafka普遍用于大資料傳輸方面。

運用場景

目前所推薦的spring cloud bus的運用場景大多都是與spring cloud config 使用實作子服務動态重新整理擷取server端的配置檔案變動。實作簡單隻需要簡單配置,不需要代碼實作。

spring cloud bus介紹與源碼分析簡介運用場景源碼分析總結

和spring cloud config 組合的動态重新整理擷取最新配置的流程圖,讀者若想擷取關于spring cloud config 的運用和原理可以參考筆者的這篇文章,spring cloud config的運用與原理

源碼分析

關于spring cloud bus的源碼挺簡潔的,隻用到了mq的部分特性,其核心類是BusAutoConfiguration類,此類完成了bus的自動配置。

@Configuration
@ConditionalOnBusEnabled
@EnableBinding(SpringCloudBusClient.class)
@EnableConfigurationProperties(BusProperties.class)
@AutoConfigureBefore(BindingServiceConfiguration.class)
// so stream bindings work properly
@AutoConfigureAfter(LifecycleMvcEndpointAutoConfiguration.class)
// so actuator endpoints have needed dependencies
public class BusAutoConfiguration implements ApplicationEventPublisherAware {

           

可以看出,引入了SpringCloudBusClient,BusProperties,BindingServiceConfiguration,LifecycleMvcEndpointAutoConfiguration這幾個類。接下來看這幾個類的作用

public interface SpringCloudBusClient {

	/**
	 * Name of the input channel for Spring Cloud Bus.
	 */
	String INPUT = "springCloudBusInput";

	/**
	 * Name of the output channel for Spring Cloud Bus.
	 */
	String OUTPUT = "springCloudBusOutput";

	@Output(SpringCloudBusClient.OUTPUT)
	MessageChannel springCloudBusOutput();

	@Input(SpringCloudBusClient.INPUT)
	SubscribableChannel springCloudBusInput();

           

SpringCloudBusClient這個類顯而易見的得知是建立了兩個channel在mq上。

關于BusProperties,從類名就得知是bus的屬性類。

spring cloud bus介紹與源碼分析簡介運用場景源碼分析總結

BindingServiceConfiguration在spring boot 加載bus自動配置前的做的操作

private static Map<String, BinderConfiguration> getBinderConfigurations(BinderTypeRegistry binderTypeRegistry, BindingServiceProperties bindingServiceProperties) {
        Map<String, BinderConfiguration> binderConfigurations = new HashMap();
        Map<String, BinderProperties> declaredBinders = bindingServiceProperties.getBinders();
        boolean defaultCandidatesExist = false;

        for(Iterator binderPropertiesIterator = declaredBinders.entrySet().iterator(); !defaultCandidatesExist && binderPropertiesIterator.hasNext(); defaultCandidatesExist = ((BinderProperties)((Entry)binderPropertiesIterator.next()).getValue()).isDefaultCandidate()) {
        }

        List<String> existingBinderConfigurations = new ArrayList();
        Iterator var7 = declaredBinders.entrySet().iterator();

        Entry binderEntry;
        while(var7.hasNext()) {
            binderEntry = (Entry)var7.next();
            BinderProperties binderProperties = (BinderProperties)binderEntry.getValue();
            if (binderTypeRegistry.get((String)binderEntry.getKey()) != null) {
                binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
                existingBinderConfigurations.add(binderEntry.getKey());
            } else {
                Assert.hasText(binderProperties.getType(), "No 'type' property present for custom binder " + (String)binderEntry.getKey());
                binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration(binderProperties.getType(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
                existingBinderConfigurations.add(binderEntry.getKey());
            }
        }

        var7 = binderConfigurations.entrySet().iterator();

        while(var7.hasNext()) {
            binderEntry = (Entry)var7.next();
            if (((BinderConfiguration)binderEntry.getValue()).isDefaultCandidate()) {
                defaultCandidatesExist = true;
            }
        }

        if (!defaultCandidatesExist) {
            var7 = binderTypeRegistry.getAll().entrySet().iterator();

            while(var7.hasNext()) {
                binderEntry = (Entry)var7.next();
                if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
                    binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), new HashMap(), true, true));
                }
            }
        }

        return binderConfigurations;
    }

           

這個方法擷取所有binder的實體資訊存儲在一個map。LifecycleMvcEndpointAutoConfiguration配置完成的後web方面配置。是以spring cloud bus

啟動時執行的流程是,建立channel->加載對應的配置->掃描所有的serverice binder的配置資訊儲存到map->對web的配置。

關于post請求的觸發,spring cloud bus 啟動了許多的listener對各個post的請求的監聽。

spring cloud bus介紹與源碼分析簡介運用場景源碼分析總結

比如發送一個bus/refresh 的post請求

@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

	public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
		super(context, id);
	}

	@WriteOperation
	public void busRefreshWithDestination(@Selector String destination) { // TODO:
																			// document
																			// destination
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
	}

	@WriteOperation
	public void busRefresh() {
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
	}

}
           

會觸發一個RefreshRemoteApplicationEvent事件,根據執行個體的id,以及輸入的目标範圍去重新整理對應的service。

總結

spring cloud bus目前主要用于和spring cloud config的組合使用,并未完全用到mq的所有特性,在spring cloud stream中對于mq的使用以及擴充更加完善。

繼續閱讀