CamelContext的啟動過程
- 1. Start Camel
-
- 1.1 Start Route Definitions
-
- 1.1.1 Create RouteContext Using From Definitions
-
- 1.1.1.1 Resolve Endpoint
- 1.1.1.2 Add Event Driven Processor
- 1.1.1.3 Commit Route Context
- 1.1.2 Start RouteService
- 1.2 Start Or Resume Route
-
- 1.2.1 Safely Start Route Services
-
- 1.2.1.1 Warm Up Route Service
- 1.2.1.2 Start Or Resume Route Consumers
- 參考
camel version:2.20.1
1. Start Camel
org.apache.camel.impl.DefaultCamelContext#doStartCamel
private void doStartCamel() throws Exception {
// start components
startServices(components.values());
// start the route definitions before the routes is started
startRouteDefinitions(routeDefinitions);
// invoke this logic to warmup the routes and if possible also start the routes
// check clash = true; start consumer = true; resume consume = false; addingRoutes = true
doStartOrResumeRoutes(routeServices, true, !doNotStartRoutesOnFirstStart, false, true);
// starting will continue in the start method
}
1.1 Start Route Definitions
周遊所有的RouteDefinition,執行startRoute。
一個RouteDefinition可以包含多個消費者(from()),每個消費者對應一個Route,每個Route對應一個RouteContext。
org.apache.camel.impl.DefaultCamelContext#startRoute(org.apache.camel.model.RouteDefinition)
public void startRoute(RouteDefinition route) throws Exception {
isStartingRoutes.set(true);
try {
// must ensure route is prepared, before we can start it
route.prepare(this);
List<Route> routes = new ArrayList<Route>();
List<RouteContext> routeContexts = route.addRoutes(this, routes);
RouteService routeService = new RouteService(this, route, routeContexts, routes);
startRouteService(routeService, true);
} finally {
// we are done staring routes
isStartingRoutes.remove();
}
}
1.1.1 Create RouteContext Using From Definitions
周遊所有的FromDefinition生成對應數量的RouteContext
org.apache.camel.model.RouteDefinition#addRoutes(org.apache.camel.model.ModelCamelContext, java.util.Collection<org.apache.camel.Route>)
public List<RouteContext> addRoutes(ModelCamelContext camelContext, Collection<Route> routes) throws Exception {
List<RouteContext> answer = new ArrayList<RouteContext>();
for (FromDefinition fromType : inputs) {
RouteContext routeContext;
try {
routeContext = addRoutes(camelContext, routes, fromType);
} catch (FailedToCreateRouteException e) {
} catch (Exception e) {
}
answer.add(routeContext);
}
return answer;
}
建立RouteContext的邏輯:
1. 解析endpoint;
2. 添加processor;
3. 送出routeContext;
org.apache.camel.model.RouteDefinition#addRoutes(org.apache.camel.CamelContext, java.util.Collection<org.apache.camel.Route>, org.apache.camel.model.FromDefinition)
protected RouteContext addRoutes(CamelContext camelContext, Collection<Route> routes, FromDefinition fromType) throws Exception {
RouteContext routeContext = new DefaultRouteContext(camelContext, this, fromType, routes);
routeContext.getEndpoint();
List<ProcessorDefinition<?>> list = new ArrayList<ProcessorDefinition<?>>(outputs);
for (ProcessorDefinition<?> output : list) {
try {
output.addRoutes(routeContext, routes);
} catch (Exception e) {
RouteDefinition route = routeContext.getRoute();
throw new FailedToCreateRouteException(route.getId(), route.toString(), output.toString(), e);
}
}
routeContext.commit();
return routeContext;
}
1.1.1.1 Resolve Endpoint
解析Endpoint。
org.apache.camel.impl.DefaultRouteContext#getEndpoint
public Endpoint getEndpoint() {
if (endpoint == null) {
endpoint = from.resolveEndpoint(this);
}
return endpoint;
}
org.apache.camel.impl.DefaultRouteContext#resolveEndpoint(java.lang.String, java.lang.String)
public Endpoint resolveEndpoint(String uri, String ref) {
Endpoint endpoint = null;
if (uri != null) {
endpoint = resolveEndpoint(uri);
}
if (ref != null) {
endpoint = lookup(ref, Endpoint.class);
try {
// need add the endpoint into service
getCamelContext().addService(endpoint);
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
}
}
通過URI擷取Endpoint的最終邏輯在 org.apache.camel.CamelContext#getEndpoint(java.lang.String) 方法中實作。
1.1.1.2 Add Event Driven Processor
1. 根據processor類型建構Processor;
2. 将processor包裝成channel;
3. 将包裝後的channel添加到eventDrivenProcessors清單;
org.apache.camel.model.ProcessorDefinition#addRoutes
public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
Processor processor = makeProcessor(routeContext);
if (!routeContext.isRouteAdded()) {
boolean endpointInterceptor = false;
if (endpointInterceptor) {
} else {
routeContext.addEventDrivenProcessor(processor);
}
}
}
org.apache.camel.model.ProcessorDefinition#makeProcessorImpl
private Processor makeProcessorImpl(RouteContext routeContext) throws Exception {
Processor processor = null;
// at first use custom factory
if (routeContext.getCamelContext().getProcessorFactory() != null) {
processor = routeContext.getCamelContext().getProcessorFactory().createProcessor(routeContext, this);
}
// fallback to default implementation if factory did not create the processor
if (processor == null) {
processor = createProcessor(routeContext);
}
return wrapProcessor(routeContext, processor);
}
1.1.1.3 Commit Route Context
1. 組裝Route:
Event Driven Processor -> Pipeline -> CamelInternalProcessor -> EventDrivenConsumerRoute。
2. 添加到Route清單;
org.apache.camel.impl.DefaultRouteContext#commit
public void commit() {
// now lets turn all of the event driven consumer processors into a single route
if (!eventDrivenProcessors.isEmpty()) {
Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
CamelInternalProcessor internal = new CamelInternalProcessor(target);
// and create the route that wraps the UoW
Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
routes.add(edcr);
}
}
1.1.2 Start RouteService
建立并啟用RouteService。
CamelContext既包含啟動RouteService的方法也包含恢複RouteService的方法。
在CamelContext啟動過程中,shouldStartRoutes()的值為false,是以這裡并不啟動路由,而是在org.apache.camel.impl.DefaultCamelContext#doStartOrResumeRoutes方法執行時才啟動。
org.apache.camel.impl.DefaultCamelContext#startRouteService
protected synchronized void startRouteService(RouteService routeService, boolean addingRoutes) throws Exception {
// we may already be starting routes so remember this, so we can unset accordingly in finally block
boolean alreadyStartingRoutes = isStartingRoutes();
if (!alreadyStartingRoutes) {
isStartingRoutes.set(true);
}
try {
// 如果routeService處于挂起狀态,需要恢複routeService
if (routeService.getStatus().isSuspended()) {
resumeRouteService(routeService);
} else {
// 将routeService添加到 routeService Map
routeServices.put(routeService.getId(), routeService);
// CamelContext啟動後才能添加route
if (shouldStartRoutes()) {
// 以安全的方式啟動routeService
// route将按正确的順序啟動、進行沖突檢查、并且以正确的順序關閉
// forceAutoStart = true; check clash = true; start consumer = true; resume consume = false
safelyStartRouteServices(true, true, true, false, addingRoutes, routeService);
// start route services if it was configured to auto startup and we are not adding routes
boolean autoStartup = routeService.getRouteDefinition().isAutoStartup(this) && this.isAutoStartup();
if (!addingRoutes || autoStartup) {
// start the route since auto start is enabled or we are starting a route (not adding new routes)
routeService.start();
}
}
}
} finally {
if (!alreadyStartingRoutes) {
isStartingRoutes.remove();
}
}
}
1.2 Start Or Resume Route
org.apache.camel.impl.DefaultCamelContext#doStartOrResumeRoutes
protected void doStartOrResumeRoutes(Map<String, RouteService> routeServices, boolean checkClash,
boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) throws Exception {
isStartingRoutes.set(true);
try {
// filter out already started routes
Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
boolean startable = false;
// judgement
if (startable) {
filtered.put(entry.getKey(), entry.getValue());
}
}
// the context is in last phase of staring, so lets start the routes
safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
} finally {
isStartingRoutes.remove();
}
}
1.2.1 Safely Start Route Services
1. 為每個路由設定啟動順序
2. 優先啟動processor及其子服務
3. 然後啟動consumer
4. 清空consumer清單
參數介紹
checkClash 是否檢查啟動順序沖突
startConsumer 是否啟動路由consumer, 可以在不啟動consumer的情況下對路由進行預熱
resumeConsumer 是否恢複路由consumer
addingRoutes 是否新添加路由
routeServices 路由清單
org.apache.camel.impl.DefaultCamelContext#safelyStartRouteServices(boolean, boolean, boolean, boolean, java.util.Collection<org.apache.camel.impl.RouteService>)
protected synchronized void safelyStartRouteServices(boolean checkClash, boolean startConsumer, boolean resumeConsumer,
boolean addingRoutes, Collection<RouteService> routeServices) throws Exception {
Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<Integer, DefaultRouteStartupOrder>();
// 為每個路由設定啟動順序
for (RouteService routeService : routeServices) {
DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService);
inputs.put(order.getStartupOrder(), order);
}
// 優先啟動processor及其子服務
doWarmUpRoutes(inputs, startConsumer);
// 然後啟動consumer
if (startConsumer) {
if (resumeConsumer) {
// 恢複consumer
doResumeRouteConsumers(inputs, addingRoutes);
} else {
// 啟動consumer
// 檢查是否存在多個consumer消費一個endpoint的情形,這是不允許的。
doStartRouteConsumers(inputs, addingRoutes);
}
}
// 清空consumer清單
inputs.clear();
}
1.2.1.1 Warm Up Route Service
warm up的主要作用是啟動RouteService對應路由的endpoint和子服務。
優先啟動所有的processor和子服務,然後再啟用consumer,增強了Camel的健壯性。當consumer啟動後便可以及時的消費消息。
org.apache.camel.impl.RouteService#doWarmUp
protected synchronized void doWarmUp() throws Exception {
// 啟動endpoint
if (endpointDone.compareAndSet(false, true)) {
// endpoint隻能啟動一次,可以被其他路由重用,endpoint啟用後隻有在Camel停止時才被關閉
for (Route route : routes) {
// 確定endpoint在route services(比如consumer)之前啟動
ServiceHelper.startService(route.getEndpoint());
}
}
if (warmUpDone.compareAndSet(false, true)) {
for (Route route : routes) {
try (MDCHelper mdcHelper = new MDCHelper(route.getId())) {
// 清空服務清單
route.warmUp();
// 擷取服務清單
List<Service> services = route.getServices();
// 建構服務清單
route.onStartingServices(services);
// 擷取需要啟動的所有服務(包含所有子服務、遞歸擷取)
Set<Service> list = new LinkedHashSet<Service>();
for (Service service : services) {
list.addAll(ServiceHelper.getChildServices(service));
}
// 将Consumer和子服務分離,優先啟動子服務
List<Service> childServices = new ArrayList<Service>();
for (Service service : list) {
// inject the route
if (service instanceof RouteAware) {
((RouteAware) service).setRoute(route);
}
if (service instanceof Consumer) {
inputs.put(route, (Consumer) service);
} else {
childServices.add(service);
}
}
startChildService(route, childServices);
// 觸發route添加事件
EventHelper.notifyRouteAdded(camelContext, route);
}
}
// ensure lifecycle strategy is invoked which among others enlist the route in JMX
for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
strategy.onRoutesAdd(routes);
}
// add routes to camel context
camelContext.addRouteCollection(routes);
// add the routes to the inflight registry so they are pre-installed
for (Route route : routes) {
camelContext.getInflightRepository().addRoute(route.getId());
}
}
}
org.apache.camel.impl.EventDrivenConsumerRoute#addServices
protected void addServices(List<Service> services) throws Exception {
// 建立并添加consumer
Endpoint endpoint = getEndpoint();
consumer = endpoint.createConsumer(processor);
if (consumer != null) {
services.add(consumer);
if (consumer instanceof RouteAware) {
((RouteAware) consumer).setRoute(this);
}
}
// 如果processor是Service的執行個體,添加processor
Processor processor = getProcessor();
if (processor instanceof Service) {
services.add((Service)processor);
}
}
1.2.1.2 Start Or Resume Route Consumers
org.apache.camel.impl.DefaultCamelContext#doStartOrResumeRouteConsumers
private void doStartOrResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute) throws Exception {
List<Endpoint> routeInputs = new ArrayList<Endpoint>();
for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
Integer order = entry.getKey();
Route route = entry.getValue().getRoute();
RouteService routeService = entry.getValue().getRouteService();
// start the service
for (Consumer consumer : routeService.getInputs().values()) {
Endpoint endpoint = consumer.getEndpoint();
if (resumeOnly && route.supportsSuspension()) {
// if we are resuming and the route can be resumed
ServiceHelper.resumeService(consumer);
} else {
// when starting we should invoke the lifecycle strategies
for (LifecycleStrategy strategy : lifecycleStrategies) {
strategy.onServiceAdd(this, consumer, route);
}
startService(consumer);
}
}
routeInputs.add(endpoint);
}
if (resumeOnly) {
routeService.resume();
} else {
routeService.start(false);
}
}
}
參考
Camel路由啟動過程