天天看點

CamelContext的啟動過程1. Start Camel參考

CamelContext的啟動過程

  • 1. Start Camel
    • 1.1 Start Route Definitions
      • 1.1.1 Create RouteContext Using From Definitions
        • 1.1.1.1 Resolve Endpoint
        • 1.1.1.2 Add Event Driven Processor
        • 1.1.1.3 Commit Route Context
      • 1.1.2 Start RouteService
    • 1.2 Start Or Resume Route
      • 1.2.1 Safely Start Route Services
        • 1.2.1.1 Warm Up Route Service
        • 1.2.1.2 Start Or Resume Route Consumers
  • 參考

camel version:2.20.1

1. Start Camel

org.apache.camel.impl.DefaultCamelContext#doStartCamel

private void doStartCamel() throws Exception {
	// start components
    startServices(components.values());
    // start the route definitions before the routes is started
    startRouteDefinitions(routeDefinitions);
    // invoke this logic to warmup the routes and if possible also start the routes
    // check clash = true; start consumer = true; resume consume = false; addingRoutes = true
    doStartOrResumeRoutes(routeServices, true, !doNotStartRoutesOnFirstStart, false, true);
    // starting will continue in the start method
}
           

1.1 Start Route Definitions

周遊所有的RouteDefinition,執行startRoute。

一個RouteDefinition可以包含多個消費者(from()),每個消費者對應一個Route,每個Route對應一個RouteContext。

org.apache.camel.impl.DefaultCamelContext#startRoute(org.apache.camel.model.RouteDefinition)

public void startRoute(RouteDefinition route) throws Exception {
    isStartingRoutes.set(true);
    try {
        // must ensure route is prepared, before we can start it
        route.prepare(this);

        List<Route> routes = new ArrayList<Route>();
        List<RouteContext> routeContexts = route.addRoutes(this, routes);
        RouteService routeService = new RouteService(this, route, routeContexts, routes);
        startRouteService(routeService, true);
    } finally {
        // we are done staring routes
        isStartingRoutes.remove();
    }
}
           

1.1.1 Create RouteContext Using From Definitions

周遊所有的FromDefinition生成對應數量的RouteContext

org.apache.camel.model.RouteDefinition#addRoutes(org.apache.camel.model.ModelCamelContext, java.util.Collection<org.apache.camel.Route>)

public List<RouteContext> addRoutes(ModelCamelContext camelContext, Collection<Route> routes) throws Exception {
    List<RouteContext> answer = new ArrayList<RouteContext>();
    for (FromDefinition fromType : inputs) {
        RouteContext routeContext;
        try {
            routeContext = addRoutes(camelContext, routes, fromType);
        } catch (FailedToCreateRouteException e) {
        } catch (Exception e) {
        }
        answer.add(routeContext);
    }
    return answer;
}
           

建立RouteContext的邏輯:

1. 解析endpoint;

2. 添加processor;

3. 送出routeContext;

org.apache.camel.model.RouteDefinition#addRoutes(org.apache.camel.CamelContext, java.util.Collection<org.apache.camel.Route>, org.apache.camel.model.FromDefinition)

protected RouteContext addRoutes(CamelContext camelContext, Collection<Route> routes, FromDefinition fromType) throws Exception {
   RouteContext routeContext = new DefaultRouteContext(camelContext, this, fromType, routes); 
   routeContext.getEndpoint();

   List<ProcessorDefinition<?>> list = new ArrayList<ProcessorDefinition<?>>(outputs);
   for (ProcessorDefinition<?> output : list) {
       try {
           output.addRoutes(routeContext, routes);
       } catch (Exception e) {
           RouteDefinition route = routeContext.getRoute();
           throw new FailedToCreateRouteException(route.getId(), route.toString(), output.toString(), e);
       }
   }

   routeContext.commit();
   return routeContext;
}
           

1.1.1.1 Resolve Endpoint

解析Endpoint。

org.apache.camel.impl.DefaultRouteContext#getEndpoint

public Endpoint getEndpoint() {
    if (endpoint == null) {
        endpoint = from.resolveEndpoint(this);
    }
    return endpoint;
}
           

org.apache.camel.impl.DefaultRouteContext#resolveEndpoint(java.lang.String, java.lang.String)

public Endpoint resolveEndpoint(String uri, String ref) {
    Endpoint endpoint = null;
    if (uri != null) {
        endpoint = resolveEndpoint(uri);
    }
    if (ref != null) {
        endpoint = lookup(ref, Endpoint.class);
        try {
            // need add the endpoint into service
            getCamelContext().addService(endpoint);
        } catch (Exception ex) {
            throw new RuntimeCamelException(ex);
        }
    }
}
           

通過URI擷取Endpoint的最終邏輯在 org.apache.camel.CamelContext#getEndpoint(java.lang.String) 方法中實作。

1.1.1.2 Add Event Driven Processor

1. 根據processor類型建構Processor;

2. 将processor包裝成channel;

3. 将包裝後的channel添加到eventDrivenProcessors清單;

org.apache.camel.model.ProcessorDefinition#addRoutes

public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
    Processor processor = makeProcessor(routeContext);
    if (!routeContext.isRouteAdded()) {
        boolean endpointInterceptor = false;
        if (endpointInterceptor) {
        } else {
            routeContext.addEventDrivenProcessor(processor);
        }
    }
}
           

org.apache.camel.model.ProcessorDefinition#makeProcessorImpl

private Processor makeProcessorImpl(RouteContext routeContext) throws Exception {
    Processor processor = null;
	// at first use custom factory
    if (routeContext.getCamelContext().getProcessorFactory() != null) {
        processor = routeContext.getCamelContext().getProcessorFactory().createProcessor(routeContext, this);
    }
    // fallback to default implementation if factory did not create the processor
    if (processor == null) {
        processor = createProcessor(routeContext);
    }
    return wrapProcessor(routeContext, processor);
}
           

1.1.1.3 Commit Route Context

1. 組裝Route:

Event Driven Processor -> Pipeline -> CamelInternalProcessor -> EventDrivenConsumerRoute。

2. 添加到Route清單;

org.apache.camel.impl.DefaultRouteContext#commit

public void commit() {
    // now lets turn all of the event driven consumer processors into a single route
    if (!eventDrivenProcessors.isEmpty()) {
        Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
        CamelInternalProcessor internal = new CamelInternalProcessor(target);
        // and create the route that wraps the UoW
        Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
        routes.add(edcr);
    }
}
           

1.1.2 Start RouteService

建立并啟用RouteService。

CamelContext既包含啟動RouteService的方法也包含恢複RouteService的方法。

在CamelContext啟動過程中,shouldStartRoutes()的值為false,是以這裡并不啟動路由,而是在org.apache.camel.impl.DefaultCamelContext#doStartOrResumeRoutes方法執行時才啟動。

org.apache.camel.impl.DefaultCamelContext#startRouteService

protected synchronized void startRouteService(RouteService routeService, boolean addingRoutes) throws Exception {
    // we may already be starting routes so remember this, so we can unset accordingly in finally block
    boolean alreadyStartingRoutes = isStartingRoutes();
    if (!alreadyStartingRoutes) {
        isStartingRoutes.set(true);
    }

    try {
        // 如果routeService處于挂起狀态,需要恢複routeService
        if (routeService.getStatus().isSuspended()) {
            resumeRouteService(routeService);
        } else {
            // 将routeService添加到 routeService Map
            routeServices.put(routeService.getId(), routeService);
            // CamelContext啟動後才能添加route
            if (shouldStartRoutes()) {
                // 以安全的方式啟動routeService
                // route将按正确的順序啟動、進行沖突檢查、并且以正确的順序關閉
                // forceAutoStart = true; check clash = true; start consumer = true; resume consume = false
                safelyStartRouteServices(true, true, true, false, addingRoutes, routeService);
                // start route services if it was configured to auto startup and we are not adding routes
                boolean autoStartup = routeService.getRouteDefinition().isAutoStartup(this) && this.isAutoStartup();
                if (!addingRoutes || autoStartup) {
                    // start the route since auto start is enabled or we are starting a route (not adding new routes)
                    routeService.start();
                }
            }
        }
    } finally {
        if (!alreadyStartingRoutes) {
            isStartingRoutes.remove();
        }
    }
}
           

1.2 Start Or Resume Route

org.apache.camel.impl.DefaultCamelContext#doStartOrResumeRoutes

protected void doStartOrResumeRoutes(Map<String, RouteService> routeServices, boolean checkClash,
                                         boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) throws Exception {
    isStartingRoutes.set(true);
    try {
        // filter out already started routes
        Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>();
        for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) {
            boolean startable = false;
			// judgement
            if (startable) {
                filtered.put(entry.getKey(), entry.getValue());
            }
        }

        // the context is in last phase of staring, so lets start the routes
        safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values());
    } finally {
        isStartingRoutes.remove();
    }
}
           

1.2.1 Safely Start Route Services

1. 為每個路由設定啟動順序

2. 優先啟動processor及其子服務

3. 然後啟動consumer

4. 清空consumer清單

參數介紹

checkClash 是否檢查啟動順序沖突

startConsumer 是否啟動路由consumer, 可以在不啟動consumer的情況下對路由進行預熱

resumeConsumer 是否恢複路由consumer

addingRoutes 是否新添加路由

routeServices 路由清單

org.apache.camel.impl.DefaultCamelContext#safelyStartRouteServices(boolean, boolean, boolean, boolean, java.util.Collection<org.apache.camel.impl.RouteService>)

protected synchronized void safelyStartRouteServices(boolean checkClash, boolean startConsumer, boolean resumeConsumer,
                                                         boolean addingRoutes, Collection<RouteService> routeServices) throws Exception {
    Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<Integer, DefaultRouteStartupOrder>();

    // 為每個路由設定啟動順序
    for (RouteService routeService : routeServices) {
        DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService);
        inputs.put(order.getStartupOrder(), order);
    }

    // 優先啟動processor及其子服務
    doWarmUpRoutes(inputs, startConsumer);

    // 然後啟動consumer
    if (startConsumer) {
        if (resumeConsumer) {
            // 恢複consumer
            doResumeRouteConsumers(inputs, addingRoutes);
        } else {
            // 啟動consumer
            // 檢查是否存在多個consumer消費一個endpoint的情形,這是不允許的。
            doStartRouteConsumers(inputs, addingRoutes);
        }
    }

    // 清空consumer清單
    inputs.clear();
}
           

1.2.1.1 Warm Up Route Service

warm up的主要作用是啟動RouteService對應路由的endpoint和子服務。

優先啟動所有的processor和子服務,然後再啟用consumer,增強了Camel的健壯性。當consumer啟動後便可以及時的消費消息。

org.apache.camel.impl.RouteService#doWarmUp

protected synchronized void doWarmUp() throws Exception {
	// 啟動endpoint
    if (endpointDone.compareAndSet(false, true)) {
        // endpoint隻能啟動一次,可以被其他路由重用,endpoint啟用後隻有在Camel停止時才被關閉
        for (Route route : routes) {
            // 確定endpoint在route services(比如consumer)之前啟動
            ServiceHelper.startService(route.getEndpoint());
        }
    }

    if (warmUpDone.compareAndSet(false, true)) {
        for (Route route : routes) {
            try (MDCHelper mdcHelper = new MDCHelper(route.getId())) {
                // 清空服務清單
                route.warmUp();
				// 擷取服務清單
                List<Service> services = route.getServices();
                // 建構服務清單
                route.onStartingServices(services);

                // 擷取需要啟動的所有服務(包含所有子服務、遞歸擷取)
                Set<Service> list = new LinkedHashSet<Service>();
                for (Service service : services) {
                    list.addAll(ServiceHelper.getChildServices(service));
                }

                // 将Consumer和子服務分離,優先啟動子服務
                List<Service> childServices = new ArrayList<Service>();
                for (Service service : list) {
                    // inject the route
                    if (service instanceof RouteAware) {
                        ((RouteAware) service).setRoute(route);
                    }

                    if (service instanceof Consumer) {
                        inputs.put(route, (Consumer) service);
                    } else {
                        childServices.add(service);
                    }
                }
                startChildService(route, childServices);

                // 觸發route添加事件
                EventHelper.notifyRouteAdded(camelContext, route);
            }
        }

        // ensure lifecycle strategy is invoked which among others enlist the route in JMX
        for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
            strategy.onRoutesAdd(routes);
        }

        // add routes to camel context
        camelContext.addRouteCollection(routes);

        // add the routes to the inflight registry so they are pre-installed
        for (Route route : routes) {
            camelContext.getInflightRepository().addRoute(route.getId());
        }
    }
}
           

org.apache.camel.impl.EventDrivenConsumerRoute#addServices

protected void addServices(List<Service> services) throws Exception {
   	// 建立并添加consumer
    Endpoint endpoint = getEndpoint();
    consumer = endpoint.createConsumer(processor);
    if (consumer != null) {
        services.add(consumer);
        if (consumer instanceof RouteAware) {
            ((RouteAware) consumer).setRoute(this);
        }
    }
    // 如果processor是Service的執行個體,添加processor
    Processor processor = getProcessor();
    if (processor instanceof Service) {
        services.add((Service)processor);
    }
}
           

1.2.1.2 Start Or Resume Route Consumers

org.apache.camel.impl.DefaultCamelContext#doStartOrResumeRouteConsumers

private void doStartOrResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute) throws Exception {
    List<Endpoint> routeInputs = new ArrayList<Endpoint>();

    for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) {
        Integer order = entry.getKey();
        Route route = entry.getValue().getRoute();
        RouteService routeService = entry.getValue().getRouteService();
        // start the service
        for (Consumer consumer : routeService.getInputs().values()) {
            Endpoint endpoint = consumer.getEndpoint();
            if (resumeOnly && route.supportsSuspension()) {
                // if we are resuming and the route can be resumed
                ServiceHelper.resumeService(consumer);
            } else {
                // when starting we should invoke the lifecycle strategies
                for (LifecycleStrategy strategy : lifecycleStrategies) {
                    strategy.onServiceAdd(this, consumer, route);
                }
                
                startService(consumer);
                }
            }

            routeInputs.add(endpoint);
        }

        if (resumeOnly) {
            routeService.resume();
        } else {
            routeService.start(false);
        }
    }
}
           

參考

Camel路由啟動過程