在InternalNode中啟動了transportservice,這個服務最終的實作是綁定到transport子產品中的local transport或者netty transport。
在transportservice中的doStart方法中:
protected void doStart() throws ElasticsearchException {
adapter.rxMetric.clear();
adapter.txMetric.clear();
transport.transportServiceAdapter(adapter);
transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
}
}
transport執行個體是在transport module中注冊的:
public Iterable<? extends Module> spawnModules() {
Class<? extends Module> defaultTransportModule;
if (DiscoveryNode.localNode(settings)) {
defaultTransportModule = LocalTransportModule.class;
} else {
defaultTransportModule = NettyTransportModule.class;
}
return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
}
protected void configure() {
Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService");
if (!TransportService.class.equals(transportService)) {
bind(TransportService.class).to(transportService).asEagerSingleton();
} else {
bind(TransportService.class).asEagerSingleton();
}
}
最終transport service中的transport執行個體落到了transport 子產品上。
transport子產品分為local transport 和 netty transport。這裡我們隻看netty transport。
這個子產品完全是一個運用netty在節點之間進行通信的子產品,處理消息的收發,異常的處理等。
在子產品構造函數中,通過從配置檔案或者擷取預設值來對一些設定進行指派,比如tcp的buffer_size等等。
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true));
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
子產品啟動主要啟動了一個用戶端ClientBootstrap和一個伺服器ServerBootstrap,因為es節點之間要通信互聯,是以每一個節點是一個伺服器節點,同時也是一個用戶端節點。是以預設情況下,用戶端跟伺服器都會啟動,注意doStart中這行代碼:
if (!settings.getAsBoolean("network.server", true)) {
return;
}
注意看network.server的設定,這裡預設是true。那什麼時候設定成為false呢?當節點僅僅是用戶端節點的時候,比如transportclient。在transportclient.java的構造函數中:
Settings settings = settingsBuilder().put(tuple.v1())
.put("network.server", false)
.put("node.client", true)
.build();
在ClientBootStrap和ServerBootStrap中都注冊了消息處理的類 MessageChannelHandler:
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
MessageChannelHandler類負責消息接受及處理邏輯,在其他子產品中會對不同的消息(Action)注冊對應的處理程式(handler)。在對收到的内容進行解析的過程中擷取到action,找到對應的handler進行處理,這個過程會交給threadpool中的線程來操作。
transportService.registerHandler(SearchAction.NAME, new TransportHandler());
以上是對SearchAction的注冊代碼
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
該方法負責消息的接受,裡邊會調用handleRequest/handleResponse來處理各種不同的消息(Action)