天天看点

elasticsearch源码分析--transport模块

在InternalNode中启动了transportservice,这个服务最终的实现是绑定到transport模块中的local transport或者netty transport。

在transportservice中的doStart方法中:

protected void doStart() throws ElasticsearchException {
        adapter.rxMetric.clear();
        adapter.txMetric.clear();
        transport.transportServiceAdapter(adapter);
        transport.start();
        if (transport.boundAddress() != null && logger.isInfoEnabled()) {
            logger.info("{}", transport.boundAddress());
        }
    }
           

transport实例是在transport module中注册的:

public Iterable<? extends Module> spawnModules() {
        Class<? extends Module> defaultTransportModule;
        if (DiscoveryNode.localNode(settings)) {
            defaultTransportModule = LocalTransportModule.class;
        } else {
            defaultTransportModule = NettyTransportModule.class;
        }
        return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
    }
           
protected void configure() {
        Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService");
        if (!TransportService.class.equals(transportService)) {
            bind(TransportService.class).to(transportService).asEagerSingleton();
        } else {
            bind(TransportService.class).asEagerSingleton();
        }
    }
           

最终transport service中的transport实例落到了transport 模块上。

transport模块分为local transport 和 netty transport。这里我们只看netty transport。

这个模块完全是一个运用netty在节点之间进行通信的模块,处理消息的收发,异常的处理等。

在模块构造函数中,通过从配置文件或者获取默认值来对一些设置进行赋值,比如tcp的buffer_size等等。

this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true));
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
           

模块启动主要启动了一个客户端ClientBootstrap和一个服务器ServerBootstrap,因为es节点之间要通信互联,所以每一个节点是一个服务器节点,同时也是一个客户端节点。所以默认情况下,客户端跟服务器都会启动,注意doStart中这行代码:

if (!settings.getAsBoolean("network.server", true)) {
            return;
 }
           

注意看network.server的设置,这里默认是true。那什么时候设置成为false呢?当节点仅仅是客户端节点的时候,比如transportclient。在transportclient.java的构造函数中:

Settings settings = settingsBuilder().put(tuple.v1())
                .put("network.server", false)
                .put("node.client", true)
                .build();
           

在ClientBootStrap和ServerBootStrap中都注册了消息处理的类 MessageChannelHandler:

pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
           

MessageChannelHandler类负责消息接受及处理逻辑,在其他模块中会对不同的消息(Action)注册对应的处理程序(handler)。在对收到的内容进行解析的过程中获取到action,找到对应的handler进行处理,这个过程会交给threadpool中的线程来操作。

transportService.registerHandler(SearchAction.NAME, new TransportHandler());
           

以上是对SearchAction的注册代码

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 
           

该方法负责消息的接受,里边会调用handleRequest/handleResponse来处理各种不同的消息(Action)

继续阅读