天天看點

elasticsearch源碼分析--transport子產品

在InternalNode中啟動了transportservice,這個服務最終的實作是綁定到transport子產品中的local transport或者netty transport。

在transportservice中的doStart方法中:

protected void doStart() throws ElasticsearchException {
        adapter.rxMetric.clear();
        adapter.txMetric.clear();
        transport.transportServiceAdapter(adapter);
        transport.start();
        if (transport.boundAddress() != null && logger.isInfoEnabled()) {
            logger.info("{}", transport.boundAddress());
        }
    }
           

transport執行個體是在transport module中注冊的:

public Iterable<? extends Module> spawnModules() {
        Class<? extends Module> defaultTransportModule;
        if (DiscoveryNode.localNode(settings)) {
            defaultTransportModule = LocalTransportModule.class;
        } else {
            defaultTransportModule = NettyTransportModule.class;
        }
        return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
    }
           
protected void configure() {
        Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService");
        if (!TransportService.class.equals(transportService)) {
            bind(TransportService.class).to(transportService).asEagerSingleton();
        } else {
            bind(TransportService.class).asEagerSingleton();
        }
    }
           

最終transport service中的transport執行個體落到了transport 子產品上。

transport子產品分為local transport 和 netty transport。這裡我們隻看netty transport。

這個子產品完全是一個運用netty在節點之間進行通信的子產品,處理消息的收發,異常的處理等。

在子產品構造函數中,通過從配置檔案或者擷取預設值來對一些設定進行指派,比如tcp的buffer_size等等。

this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true));
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
           

子產品啟動主要啟動了一個用戶端ClientBootstrap和一個伺服器ServerBootstrap,因為es節點之間要通信互聯,是以每一個節點是一個伺服器節點,同時也是一個用戶端節點。是以預設情況下,用戶端跟伺服器都會啟動,注意doStart中這行代碼:

if (!settings.getAsBoolean("network.server", true)) {
            return;
 }
           

注意看network.server的設定,這裡預設是true。那什麼時候設定成為false呢?當節點僅僅是用戶端節點的時候,比如transportclient。在transportclient.java的構造函數中:

Settings settings = settingsBuilder().put(tuple.v1())
                .put("network.server", false)
                .put("node.client", true)
                .build();
           

在ClientBootStrap和ServerBootStrap中都注冊了消息處理的類 MessageChannelHandler:

pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
           

MessageChannelHandler類負責消息接受及處理邏輯,在其他子產品中會對不同的消息(Action)注冊對應的處理程式(handler)。在對收到的内容進行解析的過程中擷取到action,找到對應的handler進行處理,這個過程會交給threadpool中的線程來操作。

transportService.registerHandler(SearchAction.NAME, new TransportHandler());
           

以上是對SearchAction的注冊代碼

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 
           

該方法負責消息的接受,裡邊會調用handleRequest/handleResponse來處理各種不同的消息(Action)

繼續閱讀