OpenTSDB源碼解析
- OpenTSDB查詢路由/api/query的解析
-
- 1. TSDMain.java
-
- main()
- 2. RpcManager.java
-
- instance()
- initializeBuiltinRpcs()
- 3. QueryRpc.java
-
- execute()
- handleQuery()
- GET方式-parseQuery()
- GET方式-parseMTypeSubQuery()
- 4. HttpJsonSerializer.java
-
- POST方式- parseQueryV1()
- 5. TSQuery.java
-
- validateAndSetQuery()
- 6. TSSubQuery.java
-
- validateAndSetQuery()
OpenTSDB查詢路由/api/query的解析
1. TSDMain.java
main()
整個project的入口在net/opentsdb/tools/TSDMain.java檔案的main()函數中。
/**
* Main class of the TSD, the Time Series Daemon.
*/
final class TSDMain {
......
private static TSDB tsdb = null;
public static void main(String[] args) throws IOException {
//可以在列印日志時,擷取類的具體資訊
Logger log = LoggerFactory.getLogger(TSDMain.class);
log.info("Starting.");
log.info(BuildData.revisionString());
log.info(BuildData.buildString());
......
final ServerSocketChannelFactory factory;
int connections_limit = 0;
try {
connections_limit = config.getInt("tsd.core.connections.limit");
} catch (NumberFormatException nfe) {
usage(argp, "Invalid connections limit", 1);
}
if (config.getBoolean("tsd.network.async_io")) {
int workers = Runtime.getRuntime().availableProcessors() * 2;
if (config.hasProperty("tsd.network.worker_threads")) {
try {
workers = config.getInt("tsd.network.worker_threads");
} catch (NumberFormatException nfe) {
usage(argp, "Invalid worker thread count", 1);
}
}
final Executor executor = Executors.newCachedThreadPool();
final NioServerBossPool boss_pool =
new NioServerBossPool(executor, 1, new Threads.BossThreadNamer());
final NioWorkerPool worker_pool = new NioWorkerPool(executor,
workers, new Threads.WorkerThreadNamer());
**factory = new NioServerSocketChannelFactory(boss_pool, worker_pool);**
} else {
factory = new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
new Threads.PrependThreadNamer());
}
......
final ServerBootstrap server = new ServerBootstrap(factory);
final RpcManager manager = RpcManager.instance(tsdb);
server.setPipelineFactory(new PipelineFactory(tsdb, manager, connections_limit));
......
final InetSocketAddress addr = new InetSocketAddress(bindAddress,config.getInt("tsd.network.port"));
server.bind(addr);
......
2. RpcManager.java
上面代碼中比較重要的是final RpcManager manager = RpcManager.instance(tsdb);
server.setPipelineFactory(new PipelineFactory(tsdb, manager, connections_limit));
我們先分析final RpcManager manager = RpcManager.instance(tsdb);
instance()
RpcManager.java中的instance方法如下:
public static synchronized RpcManager instance(final TSDB tsdb) {
final RpcManager existing = INSTANCE.get();
if (existing != null) {
return existing;
}
final RpcManager manager = new RpcManager(tsdb);
final String mode = Strings.nullToEmpty(tsdb.getConfig().getString("tsd.mode"));
// Load any plugins that are enabled via Config. Fail if any plugin cannot be loaded.
final ImmutableList.Builder<RpcPlugin> rpcBuilder = ImmutableList.builder();
if (tsdb.getConfig().hasProperty("tsd.rpc.plugins")) {
final String[] plugins = tsdb.getConfig().getString("tsd.rpc.plugins").split(",");
manager.initializeRpcPlugins(plugins, rpcBuilder);
}
manager.rpc_plugins = rpcBuilder.build();
final ImmutableMap.Builder<String, TelnetRpc> telnetBuilder
= ImmutableMap.builder();
final ImmutableMap.Builder<String, HttpRpc> httpBuilder
= ImmutableMap.builder();
manager.initializeBuiltinRpcs(mode, telnetBuilder, httpBuilder);
manager.telnet_commands = telnetBuilder.build();
manager.http_commands = httpBuilder.build();
final ImmutableMap.Builder<String, HttpRpcPlugin> httpPluginsBuilder
= ImmutableMap.builder();
if (tsdb.getConfig().hasProperty("tsd.http.rpc.plugins")) {
final String[] plugins = tsdb.getConfig().getString("tsd.http.rpc.plugins").split(",");
manager.initializeHttpRpcPlugins(mode, plugins, httpPluginsBuilder);
}
manager.http_plugin_commands = httpPluginsBuilder.build();
INSTANCE.set(manager);
return manager;
}
再仔細看manager.initializeBuiltinRpcs(mode, telnetBuilder, httpBuilder); 是如何初始化telnetBuilder和httpBuilder的。
initializeBuiltinRpcs()
private void initializeBuiltinRpcs(final String mode,
final ImmutableMap.Builder<String, TelnetRpc> telnet,
final ImmutableMap.Builder<String, HttpRpc> http) {
final Boolean enableApi = tsdb.getConfig().getString("tsd.core.enable_api").equals("true");
if (mode.equals("rw") || mode.equals("wo")) {
final PutDataPointRpc put = new PutDataPointRpc();
telnet.put("put", put);
if (enableApi) {
http.put("api/put", put);
}
if(mode.equals("rw") || mode.equals("ro")) {
......
if (enableApi) {
http.put("api/aggregators", aggregators);
http.put("api/annotation", annotation_rpc);
http.put("api/annotations", annotation_rpc);
http.put("api/config", new ShowConfig());
http.put("api/dropcaches", dropcaches);
http.put("api/query", new QueryRpc());
http.put("api/search", new SearchRpc());
http.put("api/serializers", new Serializers());
http.put("api/stats", stats);
http.put("api/suggest", suggest_rpc);
http.put("api/tree", new TreeRpc());
http.put("api/uid", new UniqueIdRpc());
http.put("api/version", version);
}
}
}
}
在執行個體化函數instance()中,我們對RpcManager類的成員變量telnet_commands和http_commands進行了初始化,他們的類型分别是ImmutableMap<String, TelnetRpc>和ImmutableMap<String, HttpRpc>,可以看出是Map類型,且Map鍵值不可變。而初始化的主要方法是 initializeBuiltinRpcs(final String mode, final ImmutableMap.Builder<String, TelnetRpc> telnet, final ImmutableMap.Builder<String, HttpRpc> http), 由于java是引用傳遞,是以對Map類型的http 和telnet的改變在函數執行結束之後會保留,從上面的代碼可以看出,如果http服務打開了的話,那麼就對Map類型的http進行初始化。我們主要關注的是/api/query這一項。
3. QueryRpc.java
execute()
final class QueryRpc implements HttpRpc {
private static final Logger LOG = LoggerFactory.getLogger(QueryRpc.class);
......
/**
* Implements the /api/query endpoint to fetch data from OpenTSDB.
* @param tsdb The TSDB to use for fetching data
* @param query The HTTP query for parsing and responding
*/
@Override
public void execute(final TSDB tsdb, final HttpQuery query)
throws IOException {
// only accept GET/POST/DELETE
if (query.method() != HttpMethod.GET && query.method() != HttpMethod.POST &&
query.method() != HttpMethod.DELETE) {
throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED,
"Method not allowed", "The HTTP method [" + query.method().getName() +
"] is not permitted for this endpoint");
}
if (query.method() == HttpMethod.DELETE &&
!tsdb.getConfig().getBoolean("tsd.http.query.allow_delete")) {
throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
"Bad request",
"Deleting data is not enabled (tsd.http.query.allow_delete=false)");
}
final String[] uri = query.explodeAPIPath();
final String endpoint = uri.length > 1 ? uri[1] : "";
/api/query/last這個端點
if (endpoint.toLowerCase().equals("last")) {
handleLastDataPointQuery(tsdb, query);
/api/query/gexp這個端點
} else if (endpoint.toLowerCase().equals("gexp")){
handleQuery(tsdb, query, true);
/api/query/exp這個端點
} else if (endpoint.toLowerCase().equals("exp")) {
handleExpressionQuery(tsdb, query);
return;
} else {
/api/query 這個端點
handleQuery(tsdb, query, false);
}
}
我們主要關注/api/query這個端點的處理方法。
handleQuery()
下面就是handleQuery()方法。
* Processing for a data point query
* @param tsdb The TSDB to which we belong
* @param query The HTTP query to parse/respond
* @param allow_expressions Whether or not expressions should be parsed
* (based on the endpoint)
*/
private void handleQuery(final TSDB tsdb, final HttpQuery query,
final boolean allow_expressions) {
......
// POST方式
if (query.method() == HttpMethod.POST) {
switch (query.apiVersion()) {
case 0:
case 1:
data_query = query.serializer().parseQueryV1();
break;
default:
query_invalid.incrementAndGet();
throw new BadRequestException(HttpResponseStatus.NOT_IMPLEMENTED,
"Requested API version not implemented", "Version " +
query.apiVersion() + " is not implemented");
}
expressions = null;
}
// GET方式
else {
expressions = new ArrayList<ExpressionTree>();
data_query = parseQuery(tsdb, query, expressions);
}
//DELETE方式
if (query.getAPIMethod() == HttpMethod.DELETE &&
tsdb.getConfig().getBoolean("tsd.http.query.allow_delete")) {
data_query.setDelete(true);
}
// validate and then compile the queries
try {
LOG.debug(data_query.toString());
//驗證查詢
data_query.validateAndSetQuery();
}catch (Exception e) {
throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
e.getMessage(), data_query.toString(), e);
}
......
}
我們關注GET方式時的處理方式,parseQuery(tsdb, query, expressions)。
GET方式-parseQuery()
以下是parseQuery(tsdb, query, expressions);方法。
public static TSQuery parseQuery(final TSDB tsdb, final HttpQuery query,
final List<ExpressionTree> expressions) {
final TSQuery data_query = new TSQuery();
data_query.setStart(query.getRequiredQueryStringParam("start"));
data_query.setEnd(query.getQueryStringParam("end"));
if (query.hasQueryStringParam("padding")) {
data_query.setPadding(true);
}
if (query.hasQueryStringParam("no_annotations")) {
data_query.setNoAnnotations(true);
}
if (query.hasQueryStringParam("global_annotations")) {
data_query.setGlobalAnnotations(true);
}
if (query.hasQueryStringParam("show_tsuids")) {
data_query.setShowTSUIDs(true);
}
if (query.hasQueryStringParam("ms")) {
data_query.setMsResolution(true);
}
if (query.hasQueryStringParam("show_query")) {
data_query.setShowQuery(true);
}
if (query.hasQueryStringParam("show_stats")) {
data_query.setShowStats(true);
}
if (query.hasQueryStringParam("show_summary")) {
data_query.setShowSummary(true);
}
// handle tsuid queries first
if (query.hasQueryStringParam("tsuid")) {
final List<String> tsuids = query.getQueryStringParams("tsuid");
for (String q : tsuids) {
parseTsuidTypeSubQuery(q, data_query);
}
}
//解析m參數後面的字元串
if (query.hasQueryStringParam("m")) {
final List<String> legacy_queries = query.getQueryStringParams("m");
for (String q : legacy_queries) {
parseMTypeSubQuery(q, data_query);
}
......
}
可以從标黃部分看出,如果查詢裡面有m參數的話,那麼就得到m參數的值。利用parseMTypeSubQuery(q, data_query)方法循環對每個參數m的值進行解析。我們再具體看看如何解析的。
GET方式-parseMTypeSubQuery()
以下是parseMTypeSubQuery(q, data_query);方法
private static void parseMTypeSubQuery(final String query_string,
TSQuery data_query) {
if (query_string == null || query_string.isEmpty()) {
throw new BadRequestException("The query string was empty");
}
// m is of the following forms:
// agg:[interval-agg:][rate:]metric[{tag=value,...}]
// where the parts in square brackets `[' .. `]' are optional.
final String[] parts = Tags.splitString(query_string, ':');
int i = parts.length;
if (i < 2 || i > 5) {
throw new BadRequestException("Invalid parameter m=" + query_string + " ("
+ (i < 2 ? "not enough" : "too many") + " :-separated parts)");
}
final TSSubQuery sub_query = new TSSubQuery();
// the aggregator is first
sub_query.setAggregator(parts[0]);
i--; // Move to the last part (the metric name).
List<TagVFilter> filters = new ArrayList<TagVFilter>();
sub_query.setMetric(Tags.parseWithMetricAndFilters(parts[i], filters));
sub_query.setFilters(filters);
// parse out the rate and downsampler
for (int x = 1; x < parts.length - 1; x++) {
if (parts[x].toLowerCase().startsWith("rate")) {
sub_query.setRate(true);
if (parts[x].indexOf("{") >= 0) {
sub_query.setRateOptions(QueryRpc.parseRateOptions(true, parts[x]));
}
} else if (Character.isDigit(parts[x].charAt(0))) {
sub_query.setDownsample(parts[x]);
} else if (parts[x].toLowerCase().startsWith("explicit_tags")) {
sub_query.setExplicitTags(true);
}
}
if (data_query.getQueries() == null) {
final ArrayList<TSSubQuery> subs = new ArrayList<TSSubQuery>(1);
data_query.setQueries(subs);
}
data_query.getQueries().add(sub_query);
}
以上标就是主要的解析步驟了。先按冒号:進行切分。先得到頭和尾。再解析剩餘的可選部分。
4. HttpJsonSerializer.java
我們繼續分析POST請求方式的處理邏輯。
POST方式- parseQueryV1()
/**
* Parses a timeseries data query
* @return A TSQuery with data ready to validate
* @throws JSONException if parsing failed
* @throws BadRequestException if the content was missing or parsing failed
*/
public TSQuery parseQueryV1() {
final String json = query.getContent();
if (json == null || json.isEmpty()) {
throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
"Missing message content",
"Supply valid JSON formatted data in the body of your request");
}
try {
return JSON.parseToObject(json, TSQuery.class);
} catch (IllegalArgumentException iae) {
throw new BadRequestException("Unable to parse the given JSON", iae);
}
}
主要就是将json文檔轉成了TSQuery的對象的格式。
5. TSQuery.java
以下是驗證查詢參數的代碼。
validateAndSetQuery()
public void validateAndSetQuery() {
if (start == null || start.isEmpty()) {
throw new IllegalArgumentException("Missing start time");
}
start_time = DateTime.parseDateTimeString(start, timezone);
if (end != null && !end.isEmpty()) {
end_time = DateTime.parseDateTimeString(end, timezone);
} else {
end_time = System.currentTimeMillis();
}
if (end_time <= start_time) {
throw new IllegalArgumentException(
"End time [" + end_time + "] must be greater than the start time ["
+ start_time +"]");
}
if (queries == null || queries.isEmpty()) {
throw new IllegalArgumentException("Missing queries");
}
// validate queries
int i = 0;
for (TSSubQuery sub : queries) {
sub.validateAndSetQuery();
final DownsamplingSpecification ds = sub.downsamplingSpecification();
if (ds != null && timezone != null && !timezone.isEmpty() &&
ds != DownsamplingSpecification.NO_DOWNSAMPLER) {
final TimeZone tz = DateTime.timezones.get(timezone);
if (tz == null) {
throw new IllegalArgumentException(
"The timezone specification could not be found");
}
ds.setTimezone(tz);
}
if (ds != null && use_calendar &&
ds != DownsamplingSpecification.NO_DOWNSAMPLER) {
ds.setUseCalendar(true);
}
sub.setIndex(i++);
}
}
6. TSSubQuery.java
以下是驗證子查詢參數的方法。
validateAndSetQuery()
public void validateAndSetQuery() {
if (aggregator == null || aggregator.isEmpty()) {
throw new IllegalArgumentException("Missing the aggregation function");
}
try {
agg = Aggregators.get(aggregator);
} catch (NoSuchElementException nse) {
throw new IllegalArgumentException(
"No such aggregation function: " + aggregator);
}
// we must have at least one TSUID OR a metric
if ((tsuids == null || tsuids.isEmpty()) &&
(metric == null || metric.isEmpty())) {
throw new IllegalArgumentException(
"Missing the metric or tsuids, provide at least one");
}
// Make sure we have a filter list
if (filters == null) {
filters = new ArrayList<TagVFilter>();
}
// parse the downsampler if we have one
if (downsample != null && !downsample.isEmpty()) {
// downsampler given, so parse it
downsample_specifier = new DownsamplingSpecification(downsample);
} else {
// no downsampler
downsample_specifier = DownsamplingSpecification.NO_DOWNSAMPLER;
}
}
到此,/api/query端口的查詢解析代碼已經分析完畢。後面将研究OpenTSDB是如何執行查詢操作的。