天天看點

OpenTSDB查詢代碼解析OpenTSDB查詢路由/api/query的解析

OpenTSDB源碼解析

  • OpenTSDB查詢路由/api/query的解析
    • 1. TSDMain.java
      • main()
    • 2. RpcManager.java
      • instance()
      • initializeBuiltinRpcs()
    • 3. QueryRpc.java
      • execute()
      • handleQuery()
      • GET方式-parseQuery()
      • GET方式-parseMTypeSubQuery()
    • 4. HttpJsonSerializer.java
      • POST方式- parseQueryV1()
    • 5. TSQuery.java
      • validateAndSetQuery()
    • 6. TSSubQuery.java
      • validateAndSetQuery()

OpenTSDB查詢路由/api/query的解析

1. TSDMain.java

main()

整個project的入口在net/opentsdb/tools/TSDMain.java檔案的main()函數中。

/**
 * Main class of the TSD, the Time Series Daemon.
 */
final class TSDMain {
	......
	private static TSDB tsdb = null;
	public static void main(String[] args) throws IOException {
	//可以在列印日志時,擷取類的具體資訊
    Logger log = LoggerFactory.getLogger(TSDMain.class);
    log.info("Starting.");
    log.info(BuildData.revisionString());
    log.info(BuildData.buildString());
    ......
    final ServerSocketChannelFactory factory;
    int connections_limit = 0;
    try {
      connections_limit = config.getInt("tsd.core.connections.limit");
    } catch (NumberFormatException nfe) {
      usage(argp, "Invalid connections limit", 1);
    }
	if (config.getBoolean("tsd.network.async_io")) {
      int workers = Runtime.getRuntime().availableProcessors() * 2;
      if (config.hasProperty("tsd.network.worker_threads")) {
        try {
        workers = config.getInt("tsd.network.worker_threads");
        } catch (NumberFormatException nfe) {
          usage(argp, "Invalid worker thread count", 1);
        }
      }
      final Executor executor = Executors.newCachedThreadPool();
      final NioServerBossPool boss_pool = 
          new NioServerBossPool(executor, 1, new Threads.BossThreadNamer());
      final NioWorkerPool worker_pool = new NioWorkerPool(executor, 
          workers, new Threads.WorkerThreadNamer());
      **factory = new NioServerSocketChannelFactory(boss_pool, worker_pool);**
    } else {
      factory = new OioServerSocketChannelFactory(
          Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 
          new Threads.PrependThreadNamer());
    }
    ......
    final ServerBootstrap server = new ServerBootstrap(factory);
    final RpcManager manager = RpcManager.instance(tsdb);
    server.setPipelineFactory(new PipelineFactory(tsdb, manager, connections_limit));
    ......
   final InetSocketAddress addr = new InetSocketAddress(bindAddress,config.getInt("tsd.network.port"));
   server.bind(addr);
   ......
           

2. RpcManager.java

上面代碼中比較重要的是final RpcManager manager = RpcManager.instance(tsdb);

server.setPipelineFactory(new PipelineFactory(tsdb, manager, connections_limit));

我們先分析final RpcManager manager = RpcManager.instance(tsdb);

instance()

RpcManager.java中的instance方法如下:

public static synchronized RpcManager instance(final TSDB tsdb) {
    final RpcManager existing = INSTANCE.get();
    if (existing != null) {
      return existing;
    }

    final RpcManager manager = new RpcManager(tsdb);
    final String mode = Strings.nullToEmpty(tsdb.getConfig().getString("tsd.mode"));

    // Load any plugins that are enabled via Config.  Fail if any plugin cannot be loaded.
    final ImmutableList.Builder<RpcPlugin> rpcBuilder = ImmutableList.builder();
    if (tsdb.getConfig().hasProperty("tsd.rpc.plugins")) {
      final String[] plugins = tsdb.getConfig().getString("tsd.rpc.plugins").split(",");
      manager.initializeRpcPlugins(plugins, rpcBuilder);
    }
    manager.rpc_plugins = rpcBuilder.build();
	final ImmutableMap.Builder<String, TelnetRpc> telnetBuilder 
	= ImmutableMap.builder();
	final ImmutableMap.Builder<String, HttpRpc> httpBuilder 
	= ImmutableMap.builder();
    manager.initializeBuiltinRpcs(mode, telnetBuilder, httpBuilder);
    manager.telnet_commands = telnetBuilder.build();
    manager.http_commands = httpBuilder.build();
	final ImmutableMap.Builder<String, HttpRpcPlugin> httpPluginsBuilder 
	= ImmutableMap.builder();
    if (tsdb.getConfig().hasProperty("tsd.http.rpc.plugins")) {
      final String[] plugins = tsdb.getConfig().getString("tsd.http.rpc.plugins").split(",");
      manager.initializeHttpRpcPlugins(mode, plugins, httpPluginsBuilder);
    }
    manager.http_plugin_commands = httpPluginsBuilder.build();
    INSTANCE.set(manager);
    return manager;
  }

           

再仔細看manager.initializeBuiltinRpcs(mode, telnetBuilder, httpBuilder); 是如何初始化telnetBuilder和httpBuilder的。

initializeBuiltinRpcs()

private void initializeBuiltinRpcs(final String mode,
        final ImmutableMap.Builder<String, TelnetRpc> telnet,
        final ImmutableMap.Builder<String, HttpRpc> http) {
        final Boolean enableApi = tsdb.getConfig().getString("tsd.core.enable_api").equals("true");
        if (mode.equals("rw") || mode.equals("wo")) {
		      final PutDataPointRpc put = new PutDataPointRpc();
		      telnet.put("put", put);
		      if (enableApi) {
		        http.put("api/put", put);
		      }
		      if(mode.equals("rw") || mode.equals("ro")) {
		    	 ......
		    	 if (enableApi) {
			        http.put("api/aggregators", aggregators);
			        http.put("api/annotation", annotation_rpc);
			        http.put("api/annotations", annotation_rpc);
			        http.put("api/config", new ShowConfig());
			        http.put("api/dropcaches", dropcaches);
			        http.put("api/query", new QueryRpc());
			        http.put("api/search", new SearchRpc());
			        http.put("api/serializers", new Serializers());
			        http.put("api/stats", stats);
			        http.put("api/suggest", suggest_rpc);
			        http.put("api/tree", new TreeRpc());
			        http.put("api/uid", new UniqueIdRpc());
			        http.put("api/version", version);
    		    }
    	     }
	     }
     }

           

在執行個體化函數instance()中,我們對RpcManager類的成員變量telnet_commands和http_commands進行了初始化,他們的類型分别是ImmutableMap<String, TelnetRpc>和ImmutableMap<String, HttpRpc>,可以看出是Map類型,且Map鍵值不可變。而初始化的主要方法是 initializeBuiltinRpcs(final String mode, final ImmutableMap.Builder<String, TelnetRpc> telnet, final ImmutableMap.Builder<String, HttpRpc> http), 由于java是引用傳遞,是以對Map類型的http 和telnet的改變在函數執行結束之後會保留,從上面的代碼可以看出,如果http服務打開了的話,那麼就對Map類型的http進行初始化。我們主要關注的是/api/query這一項。

3. QueryRpc.java

execute()

final class QueryRpc implements HttpRpc {
  private static final Logger LOG = LoggerFactory.getLogger(QueryRpc.class);
  ......
  /**
   * Implements the /api/query endpoint to fetch data from OpenTSDB.
   * @param tsdb The TSDB to use for fetching data
   * @param query The HTTP query for parsing and responding
   */
  @Override
  public void execute(final TSDB tsdb, final HttpQuery query) 
    throws IOException {
    // only accept GET/POST/DELETE
    if (query.method() != HttpMethod.GET && query.method() != HttpMethod.POST &&
        query.method() != HttpMethod.DELETE) {
      throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, 
          "Method not allowed", "The HTTP method [" + query.method().getName() +
          "] is not permitted for this endpoint");
    }
    if (query.method() == HttpMethod.DELETE && 
        !tsdb.getConfig().getBoolean("tsd.http.query.allow_delete")) {
      throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
               "Bad request",
               "Deleting data is not enabled (tsd.http.query.allow_delete=false)");
    }
    
    final String[] uri = query.explodeAPIPath();
    final String endpoint = uri.length > 1 ? uri[1] : ""; 
    /api/query/last這個端點
	if (endpoint.toLowerCase().equals("last")) {
	      handleLastDataPointQuery(tsdb, query);
	    /api/query/gexp這個端點
	    } else if (endpoint.toLowerCase().equals("gexp")){
	      handleQuery(tsdb, query, true);
	    /api/query/exp這個端點
	    } else if (endpoint.toLowerCase().equals("exp")) {
	      handleExpressionQuery(tsdb, query);
	      return;
	} else {
	  /api/query 這個端點
	      handleQuery(tsdb, query, false);
	    }
	  }
           

我們主要關注/api/query這個端點的處理方法。

handleQuery()

下面就是handleQuery()方法。

* Processing for a data point query
   * @param tsdb The TSDB to which we belong
   * @param query The HTTP query to parse/respond
   * @param allow_expressions Whether or not expressions should be parsed
   * (based on the endpoint)
   */
  private void handleQuery(final TSDB tsdb, final HttpQuery query, 
      final boolean allow_expressions) {
	  	......
		// POST方式
		if (query.method() == HttpMethod.POST) {
		      switch (query.apiVersion()) {
		      case 0:
		      case 1:
		        data_query = query.serializer().parseQueryV1();
		        break;
		      default:
		        query_invalid.incrementAndGet();
		        throw new BadRequestException(HttpResponseStatus.NOT_IMPLEMENTED, 
		            "Requested API version not implemented", "Version " + 
		            query.apiVersion() + " is not implemented");
		      }
		      expressions = null;
		}
		// GET方式
		 else {
		      expressions = new ArrayList<ExpressionTree>();
		      data_query = parseQuery(tsdb, query, expressions);
		}
		//DELETE方式
	   if (query.getAPIMethod() == HttpMethod.DELETE &&
		     tsdb.getConfig().getBoolean("tsd.http.query.allow_delete")) {
		     data_query.setDelete(true);
      }
       // validate and then compile the queries
    try {
      LOG.debug(data_query.toString());
     //驗證查詢
 	 data_query.validateAndSetQuery();
 	 }catch (Exception e) {
      throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, 
          e.getMessage(), data_query.toString(), e);
    }
    ......
}
           

我們關注GET方式時的處理方式,parseQuery(tsdb, query, expressions)。

GET方式-parseQuery()

以下是parseQuery(tsdb, query, expressions);方法。

public static TSQuery parseQuery(final TSDB tsdb, final HttpQuery query,
      final List<ExpressionTree> expressions) {
      final TSQuery data_query = new TSQuery();
    
    data_query.setStart(query.getRequiredQueryStringParam("start"));
    data_query.setEnd(query.getQueryStringParam("end"));
    
    if (query.hasQueryStringParam("padding")) {
      data_query.setPadding(true);
    }
    
    if (query.hasQueryStringParam("no_annotations")) {
      data_query.setNoAnnotations(true);
    }
    
    if (query.hasQueryStringParam("global_annotations")) {
      data_query.setGlobalAnnotations(true);
    }
    
    if (query.hasQueryStringParam("show_tsuids")) {
      data_query.setShowTSUIDs(true);
    }
    
    if (query.hasQueryStringParam("ms")) {
      data_query.setMsResolution(true);
    }
    
    if (query.hasQueryStringParam("show_query")) {
      data_query.setShowQuery(true);
    }  
    
    if (query.hasQueryStringParam("show_stats")) {
      data_query.setShowStats(true);
    }    
    
    if (query.hasQueryStringParam("show_summary")) {
        data_query.setShowSummary(true);
    }
    
    // handle tsuid queries first
    if (query.hasQueryStringParam("tsuid")) {
      final List<String> tsuids = query.getQueryStringParams("tsuid");     
      for (String q : tsuids) {
        parseTsuidTypeSubQuery(q, data_query);
      }
    }
    //解析m參數後面的字元串
    if (query.hasQueryStringParam("m")) {
      final List<String> legacy_queries = query.getQueryStringParams("m");      
      for (String q : legacy_queries) {
        parseMTypeSubQuery(q, data_query);
      }
      ......
}
           

可以從标黃部分看出,如果查詢裡面有m參數的話,那麼就得到m參數的值。利用parseMTypeSubQuery(q, data_query)方法循環對每個參數m的值進行解析。我們再具體看看如何解析的。

GET方式-parseMTypeSubQuery()

以下是parseMTypeSubQuery(q, data_query);方法

private static void parseMTypeSubQuery(final String query_string, 
      TSQuery data_query) {
    if (query_string == null || query_string.isEmpty()) {
      throw new BadRequestException("The query string was empty");
    }
    // m is of the following forms:
    // agg:[interval-agg:][rate:]metric[{tag=value,...}]
    // where the parts in square brackets `[' .. `]' are optional.
    final String[] parts = Tags.splitString(query_string, ':');
    int i = parts.length;
    if (i < 2 || i > 5) {
      throw new BadRequestException("Invalid parameter m=" + query_string + " ("
          + (i < 2 ? "not enough" : "too many") + " :-separated parts)");
    }
    final TSSubQuery sub_query = new TSSubQuery();
    
    // the aggregator is first
    sub_query.setAggregator(parts[0]);
    
    i--; // Move to the last part (the metric name).
    List<TagVFilter> filters = new ArrayList<TagVFilter>();
    sub_query.setMetric(Tags.parseWithMetricAndFilters(parts[i], filters));
    sub_query.setFilters(filters);
    
    // parse out the rate and downsampler 
    for (int x = 1; x < parts.length - 1; x++) {
      if (parts[x].toLowerCase().startsWith("rate")) {
        sub_query.setRate(true);
        if (parts[x].indexOf("{") >= 0) {
          sub_query.setRateOptions(QueryRpc.parseRateOptions(true, parts[x]));
        }
      } else if (Character.isDigit(parts[x].charAt(0))) {
        sub_query.setDownsample(parts[x]);
      } else if (parts[x].toLowerCase().startsWith("explicit_tags")) {
        sub_query.setExplicitTags(true);
      }
    }
    
    if (data_query.getQueries() == null) {
      final ArrayList<TSSubQuery> subs = new ArrayList<TSSubQuery>(1);
      data_query.setQueries(subs);
    }
    data_query.getQueries().add(sub_query);
  }
           

以上标就是主要的解析步驟了。先按冒号:進行切分。先得到頭和尾。再解析剩餘的可選部分。

4. HttpJsonSerializer.java

我們繼續分析POST請求方式的處理邏輯。

POST方式- parseQueryV1()

/**
   * Parses a timeseries data query
   * @return A TSQuery with data ready to validate
   * @throws JSONException if parsing failed
   * @throws BadRequestException if the content was missing or parsing failed
   */
  public TSQuery parseQueryV1() {
    final String json = query.getContent();
    if (json == null || json.isEmpty()) {
      throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
          "Missing message content",
          "Supply valid JSON formatted data in the body of your request");
    }
    try {
      return JSON.parseToObject(json, TSQuery.class);
    } catch (IllegalArgumentException iae) {
      throw new BadRequestException("Unable to parse the given JSON", iae);
    }
  }
           

主要就是将json文檔轉成了TSQuery的對象的格式。

5. TSQuery.java

以下是驗證查詢參數的代碼。

validateAndSetQuery()

public void validateAndSetQuery() {
    if (start == null || start.isEmpty()) {
      throw new IllegalArgumentException("Missing start time");
    }
    start_time = DateTime.parseDateTimeString(start, timezone);
    
    if (end != null && !end.isEmpty()) {
      end_time = DateTime.parseDateTimeString(end, timezone);
    } else {
      end_time = System.currentTimeMillis();
    }
    if (end_time <= start_time) {
      throw new IllegalArgumentException(
          "End time [" + end_time + "] must be greater than the start time ["
          + start_time +"]");
    }
    
    if (queries == null || queries.isEmpty()) {
      throw new IllegalArgumentException("Missing queries");
    }
    
    // validate queries
    int i = 0;
    for (TSSubQuery sub : queries) {
      sub.validateAndSetQuery();
      final DownsamplingSpecification ds = sub.downsamplingSpecification();
      if (ds != null && timezone != null && !timezone.isEmpty() && 
          ds != DownsamplingSpecification.NO_DOWNSAMPLER) {
        final TimeZone tz = DateTime.timezones.get(timezone);
        if (tz == null) {
          throw new IllegalArgumentException(
              "The timezone specification could not be found");
        }
        ds.setTimezone(tz);
      }
      if (ds != null && use_calendar && 
          ds != DownsamplingSpecification.NO_DOWNSAMPLER) {
        ds.setUseCalendar(true);
      }
      
      sub.setIndex(i++);
    }
  }
           

6. TSSubQuery.java

以下是驗證子查詢參數的方法。

validateAndSetQuery()

public void validateAndSetQuery() {
    if (aggregator == null || aggregator.isEmpty()) {
      throw new IllegalArgumentException("Missing the aggregation function");
    }
    try {
      agg = Aggregators.get(aggregator);
    } catch (NoSuchElementException nse) {
      throw new IllegalArgumentException(
          "No such aggregation function: " + aggregator);
    }
    
    // we must have at least one TSUID OR a metric
    if ((tsuids == null || tsuids.isEmpty()) && 
        (metric == null || metric.isEmpty())) {
      throw new IllegalArgumentException(
          "Missing the metric or tsuids, provide at least one");
    }
    
    // Make sure we have a filter list
    if (filters == null) {
      filters = new ArrayList<TagVFilter>();
    }
    // parse the downsampler if we have one
    if (downsample != null && !downsample.isEmpty()) {
      // downsampler given, so parse it
      downsample_specifier = new DownsamplingSpecification(downsample);
    } else {
      // no downsampler
      downsample_specifier = DownsamplingSpecification.NO_DOWNSAMPLER;
    }
  }
           

到此,/api/query端口的查詢解析代碼已經分析完畢。後面将研究OpenTSDB是如何執行查詢操作的。