天天看点

elasticsearch DSL部分 源码解析

DSL查询条件解析源码

参考文章:https://www.jianshu.com/p/b77e80d6c18e

先了解es的调用链路——看一次搜索的时序图

一次查询分为三部分

  1. 将用户请求restRequest 转发到应该处理本次请求的RestAction
  2. elasticsearch DSL部分 源码解析

以search查询为例:一次调用链路:

Netty4HttpRequestHandler#channelRead0() 调用 Netty4HttpServerTransport#dispatchRequest()

Netty4HttpServerTransport#dispatchRequest() 这里是接口

RestController#dispatchRequest()实现了上边的接口

接着调用了RestController#tryAllHandlers()

RestController#tryAllHandlers() 调用 RestController#dispatchRequest()

RestController#dispatchRequest() 调用 BaseRestHandler#handleRequest()

BaseRestHandler#handleRequest() 调用子类RestSearchAction#prepareRequest()

  1. 将RestRequest(http层面的请求)转换成 SearchRequest(es内部认识能处理的请求),然后调用search(),此时相当于进入到了controller层。
  2. elasticsearch DSL部分 源码解析
  3. 还要根据请求,来找到要处理的服务层,对应es的是TransportSearchAction,调用其子类的execute()方法来执行搜索。
  4. elasticsearch DSL部分 源码解析

清楚时序图以后,再来看DSL查询条件解析的部分

实际上对应时序图中的第二部分,对应的代码在RestSearchAction类中

RestSearchAction类继承了BaseRestHandler类,重写了prepareRequest方法。

@Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.remoteAddress(new TransportAddress((InetSocketAddress)request.getRemoteAddress()));
        IntConsumer setSize = size -> searchRequest.source().size(size);
        // 下边的 parser -> 是一个lambda表达式。理解起来不太容易。要清楚 parser是在RestRequest#withContentOrSourceParamParserOrNull()方法中完成构建的!知道这一点很重要,因为这个parser是非常重要的。它将请求中的条件进行了解析。并作为了参数给了parseSearchRequest()方法,在该方法里边,去构造了查询条件。
        request.withContentOrSourceParamParserOrNull(parser ->
            //重点在这里,将RestRequest转换成了SearchRequest
            parseSearchRequest(searchRequest, request, parser, setSize));

        return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
    }      

可以看下一下RestRequest#withContentOrSourceParamParserOrNull()方法

从RestRequest对象中,获取请求内容,转成XContentParser对象。

public final void withContentOrSourceParamParserOrNull(CheckedConsumer<XContentParser, IOException> withParser) throws IOException {
        if (hasContentOrSourceParam()) {
            Tuple<XContentType, BytesReference> tuple = contentOrSourceParam();
            BytesReference content = tuple.v2();
            XContentType xContentType = tuple.v1();
            try (InputStream stream = content.streamInput();
                 // 在这里创建了 parser出来,注意这里
                 XContentParser parser = xContentType.xContent()
                     .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
                withParser.accept(parser);
            }
        } else {
            withParser.accept(null);
        }
    }      
/**
     * Parses the rest request on top of the SearchRequest, preserving values that are not overridden by the rest request.
     *
     * @param requestContentParser body of the request to read. This method does not attempt to read the body from the {@code request}
     *        parameter
     * @param setSize how the size url parameter is handled. {@code udpate_by_query} and regular search differ here.
     */
    public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
                                          XContentParser requestContentParser,
                                          IntConsumer setSize) throws IOException {

        if (searchRequest.source() == null) {
            searchRequest.source(new SearchSourceBuilder());
        }
        // 获取到索引的索引
        searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
        if (requestContentParser != null) {
            // 这里是重点!正是这个方法,将http的查询条件,转换成了es的查询条件。
            searchRequest.source().parseXContent(requestContentParser, true);
        }

        final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
        searchRequest.setBatchedReduceSize(batchedReduceSize);
        searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize()));

        if (request.hasParam("max_concurrent_shard_requests")) {
            // only set if we have the parameter since we auto adjust the max concurrency on the coordinator
            // based on the number of nodes in the cluster
            final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",
                searchRequest.getMaxConcurrentShardRequests());
            searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
        }

        if (request.hasParam("allow_partial_search_results")) {
            // only set if we have the parameter passed to override the cluster-level default
            searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));
        }

        // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
        // from the REST layer. these modes are an internal optimization and should
        // not be specified explicitly by the user.
        String searchType = request.param("search_type");
        if ("query_and_fetch".equals(searchType) ||
                "dfs_query_and_fetch".equals(searchType)) {
            throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");
        } else {
            searchRequest.searchType(searchType);
        }
        parseSearchSource(searchRequest.source(), request, setSize);
        searchRequest.requestCache(request.paramAsBoolean("request_cache", null));

        String scroll = request.param("scroll");
        if (scroll != null) {
            searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));
        }

        searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));
        searchRequest.routing(request.param("routing"));
        searchRequest.preference(request.param("preference"));
        searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
    }      
/**
     * Parse some xContent into this SearchSourceBuilder, overwriting any values specified in the xContent. Use this if you need to set up
     * different defaults than a regular SearchSourceBuilder would have and use {@link #fromXContent(XContentParser, boolean)} if you have
     * normal defaults.
     *
     * @param parser The xContent parser.
     * @param checkTrailingTokens If true throws a parsing exception when extra tokens are found after the main object.
     */
    public void parseXContent(XContentParser parser, boolean checkTrailingTokens) throws IOException {
        XContentParser.Token token = parser.currentToken();
        String currentFieldName = null;
        if (token != XContentParser.Token.START_OBJECT && (token = parser.nextToken()) != XContentParser.Token.START_OBJECT) {
            throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.START_OBJECT +
                    "] but found [" + token + "]", parser.getTokenLocation());
        }
        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
            if (token == XContentParser.Token.FIELD_NAME) {
                currentFieldName = parser.currentName();
                // 如果是查询条件的参数,例如:from size 
            } else if (token.isValue()) {
                if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    from = parser.intValue();
                } else if (SIZE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    size = parser.intValue();
                } else if (TIMEOUT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    timeout = TimeValue.parseTimeValue(parser.text(), null, TIMEOUT_FIELD.getPreferredName());
                } else if (TERMINATE_AFTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    terminateAfter = parser.intValue();
                } else if (MIN_SCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    minScore = parser.floatValue();
                } else if (VERSION_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    version = parser.booleanValue();
                } else if (SEQ_NO_PRIMARY_TERM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    seqNoAndPrimaryTerm = parser.booleanValue();
                } else if (EXPLAIN_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    explain = parser.booleanValue();
                } else if (TRACK_SCORES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    trackScores = parser.booleanValue();
                } else if (TRACK_TOTAL_HITS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    trackTotalHits = parser.booleanValue();
                } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    fetchSourceContext = FetchSourceContext.fromXContent(parser);
                } else if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    storedFieldsContext =
                        StoredFieldsContext.fromXContent(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), parser);
                } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    sort(parser.text());
                } else if (PROFILE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    profile = parser.booleanValue();
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                            parser.getTokenLocation());
                }
                // 如果是查询条件 例如:query  agg
            } else if (token == XContentParser.Token.START_OBJECT) {
                if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    // 构造查询条件
                    queryBuilder = parseInnerQueryBuilder(parser);
                } else if (POST_FILTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    postQueryBuilder = parseInnerQueryBuilder(parser);
                } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    fetchSourceContext = FetchSourceContext.fromXContent(parser);
                } else if (SCRIPT_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    scriptFields = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                        scriptFields.add(new ScriptField(parser));
                    }
                } else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    DEPRECATION_LOGGER.deprecated(
                        "Object format in indices_boost is deprecated, please use array format instead");
                    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                        if (token == XContentParser.Token.FIELD_NAME) {
                            currentFieldName = parser.currentName();
                        } else if (token.isValue()) {
                            indexBoosts.add(new IndexBoost(currentFieldName, parser.floatValue()));
                        } else {
                            throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token +
                                " in [" + currentFieldName + "].", parser.getTokenLocation());
                        }
                    }
                } else if (AGGREGATIONS_FIELD.match(currentFieldName, parser.getDeprecationHandler())
                        || AGGS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    aggregations = AggregatorFactories.parseAggregators(parser);
                } else if (HIGHLIGHT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    highlightBuilder = HighlightBuilder.fromXContent(parser);
                } else if (SUGGEST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    suggestBuilder = SuggestBuilder.fromXContent(parser);
                } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
                } else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    rescoreBuilders = new ArrayList<>();
                    rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
                } else if (EXT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    extBuilders = new ArrayList<>();
                    String extSectionName = null;
                    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                        if (token == XContentParser.Token.FIELD_NAME) {
                            extSectionName = parser.currentName();
                        } else {
                            SearchExtBuilder searchExtBuilder = parser.namedObject(SearchExtBuilder.class, extSectionName, null);
                            if (searchExtBuilder.getWriteableName().equals(extSectionName) == false) {
                                throw new IllegalStateException("The parsed [" + searchExtBuilder.getClass().getName() + "] object has a "
                                        + "different writeable name compared to the name of the section that it was parsed from: found ["
                                        + searchExtBuilder.getWriteableName() + "] expected [" + extSectionName + "]");
                            }
                            extBuilders.add(searchExtBuilder);
                        }
                    }
                } else if (SLICE.match(currentFieldName, parser.getDeprecationHandler())) {
                    sliceBuilder = SliceBuilder.fromXContent(parser);
                } else if (COLLAPSE.match(currentFieldName, parser.getDeprecationHandler())) {
                    collapse = CollapseBuilder.fromXContent(parser);
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                            parser.getTokenLocation());
                }
            } else if (token == XContentParser.Token.START_ARRAY) {
                if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    storedFieldsContext = StoredFieldsContext.fromXContent(STORED_FIELDS_FIELD.getPreferredName(), parser);
                } else if (DOCVALUE_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    docValueFields = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        docValueFields.add(FieldAndFormat.fromXContent(parser));
                    }
                } else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        indexBoosts.add(new IndexBoost(parser));
                    }
                } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
                } else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    rescoreBuilders = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
                    }
                } else if (STATS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    stats = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        if (token == XContentParser.Token.VALUE_STRING) {
                            stats.add(parser.text());
                        } else {
                            throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.VALUE_STRING +
                                    "] in [" + currentFieldName + "] but found [" + token + "]", parser.getTokenLocation());
                        }
                    }
                } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    fetchSourceContext = FetchSourceContext.fromXContent(parser);
                } else if (SEARCH_AFTER.match(currentFieldName, parser.getDeprecationHandler())) {
                    searchAfterBuilder = SearchAfterBuilder.fromXContent(parser);
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                            parser.getTokenLocation());
                }
            } else {
                throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                        parser.getTokenLocation());
            }
        }
        if (checkTrailingTokens) {
            boolean success;
            try {
                token = parser.nextToken();
                success = token == null;
            } catch (JsonParseException exc) {
                success = false;
            }
            if (success == false) {
                DEPRECATION_LOGGER.deprecated("Found extra tokens after the _search request body, " +
                    "an error will be thrown in the next major version");
            }
        }
    }      

继续阅读