where col1 = 100 and abs(col2) > 0在Hive中的處理過程
where過濾條件稱為謂詞predicate。
以上where過濾條件在經過Hive的文法解析後,生成如下的文法樹:
TOK_WHERE
AND
=
TOK_TABLE_OR_COL
c1
100
>
TOK_FUNCTION
ABS
TOK_TABLE_OR_COL
c2
有了文法樹之後,最終的目的是生成predicate每個節點對應的ExprNodeDesc,即描述對應的節點:
public Map<ASTNode, ExprNodeDesc> genAllExprNodeDesc(ASTNode expr, RowResolver input,
TypeCheckCtx tcCtx) throws SemanticException {
…
Map<ASTNode, ExprNodeDesc> nodeOutputs =
TypeCheckProcFactory.genExprNode(expr, tcCtx);
…
生成的過程是對上述文法樹的一個深度優先周遊的過程,Hive中大量對樹的周遊的代碼,在周遊過程中根據指定的規則或對文法樹進行修改,或輸出相應的結果。
Hive中有一個預設的深度優先周遊的實作DefaultGraphWalker。
這個周遊器的實作部分代碼如下:
public void walk(Node nd) throws SemanticException {
// Push the node in the stack
opStack.push(nd);
// While there are still nodes to dispatch...
while (!opStack.empty()) {
Node node = opStack.peek();
if (node.getChildren() == null ||
getDispatchedList().containsAll(node.getChildren())) {
// Dispatch current node
if (!getDispatchedList().contains(node)) {
dispatch(node, opStack);
opQueue.add(node);
}
opStack.pop();
continue;
}
// Add a single child and restart the loop
for (Node childNode : node.getChildren()) {
if (!getDispatchedList().contains(childNode)) {
opStack.push(childNode);
break;
}
}
} // end while
}
先将目前節點放到待處理的棧opStack中,然後從opStack取節點出來,如果取出來的節點沒有Children,或者Children已經全部處理完畢,才對目前節點進行處理(dispatch),如果目前節點有Children且還沒有處理完,則将目前節點的Children放到棧頂,然後重新從棧中取節點進行處理。這是很基礎的深度優先周遊的實作。
那在周遊的過程中,如何針對不同的節點進行不同的處理呢?
在周遊之前,先預置一些針對不同的節點不同規則的處理器,然後在周遊過程中,通過分發器Dispatcher選擇最合适的處理器進行處理。
生成ExprNodeDesc的周遊中一共先預置了8個規則Rule,每個規則對應一個處理器NodeProcessor:
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("R1", HiveParser.TOK_NULL + "%"),
tf.getNullExprProcessor());
opRules.put(new RuleRegExp("R2", HiveParser.Number + "%|" +
HiveParser.TinyintLiteral + "%|" +
HiveParser.SmallintLiteral + "%|" +
HiveParser.BigintLiteral + "%|" +
HiveParser.DecimalLiteral + "%"),
tf.getNumExprProcessor());
opRules
.put(new RuleRegExp("R3", HiveParser.Identifier + "%|"
+ HiveParser.StringLiteral + "%|" + HiveParser.TOK_CHARSETLITERAL + "%|"
+ HiveParser.TOK_STRINGLITERALSEQUENCE + "%|"
+ "%|" + HiveParser.KW_IF + "%|" + HiveParser.KW_CASE + "%|"
+ HiveParser.KW_WHEN + "%|" + HiveParser.KW_IN + "%|"
+ HiveParser.KW_ARRAY + "%|" + HiveParser.KW_MAP + "%|"
+ HiveParser.KW_STRUCT + "%|" + HiveParser.KW_EXISTS + "%|"
+ HiveParser.KW_GROUPING + "%|"
+ HiveParser.TOK_SUBQUERY_OP_NOTIN + "%"),
tf.getStrExprProcessor());
opRules.put(new RuleRegExp("R4", HiveParser.KW_TRUE + "%|"
+ HiveParser.KW_FALSE + "%"), tf.getBoolExprProcessor());
opRules.put(new RuleRegExp("R5", HiveParser.TOK_DATELITERAL + "%|"
+ HiveParser.TOK_TIMESTAMPLITERAL + "%"), tf.getDateTimeExprProcessor());
opRules.put(new RuleRegExp("R6",
HiveParser.TOK_INTERVAL_YEAR_MONTH_LITERAL + "%|"
+ HiveParser.TOK_INTERVAL_DAY_TIME_LITERAL + "%|"
+ HiveParser.TOK_INTERVAL_YEAR_LITERAL + "%|"
+ HiveParser.TOK_INTERVAL_MONTH_LITERAL + "%|"
+ HiveParser.TOK_INTERVAL_DAY_LITERAL + "%|"
+ HiveParser.TOK_INTERVAL_HOUR_LITERAL + "%|"
+ HiveParser.TOK_INTERVAL_MINUTE_LITERAL + "%|"
+ HiveParser.TOK_INTERVAL_SECOND_LITERAL + "%"), tf.getIntervalExprProcessor());
opRules.put(new RuleRegExp("R7", HiveParser.TOK_TABLE_OR_COL + "%"),
tf.getColumnExprProcessor());
opRules.put(new RuleRegExp("R8", HiveParser.TOK_SUBQUERY_OP + "%"),
tf.getSubQueryExprProcessor());
這裡使用的分發器Dispatcher是DefaultRuleDispatcher,DefaultRuleDispatcher選擇處理器的邏輯如下:
// find the firing rule
// find the rule from the stack specified
Rule rule = null;
int minCost = Integer.MAX_VALUE;
for (Rule r : procRules.keySet()) {
int cost = r.cost(ndStack);
if ((cost >= 0) && (cost <= minCost)) {
minCost = cost;
rule = r;
}
}
NodeProcessor proc;
if (rule == null) {
proc = defaultProc;
} else {
proc = procRules.get(rule);
}
// Do nothing in case proc is null
if (proc != null) {
// Call the process function
return proc.process(nd, ndStack, procCtx, nodeOutputs);
} else {
return null;
}
周遊所有的規則Rule,調用每個規則的cost方法計算cost,找其中cost最小的規則對應的處理器,如果沒有找到,則使用預設處理器,如果沒有設定預設處理器,則不做任何事情。
那麼每個規則的cost是如何計算的?
– 沒太看懂==|| (後續再理理)
WHERE條件文法樹每個節點對應的處理器如下:
TOK_WHERE
AND --> TypeCheckProcFactory.DefaultExprProcessor
= --> TypeCheckProcFactory.DefaultExprProcessor
TOK_TABLE_OR_COL --> TypeCheckProcFactory.ColumnExprProcessor
c1 --> TypeCheckProcFactory.StrExprProcessor
100 --> TypeCheckProcFactory.NumExprProcessor
> --> TypeCheckProcFactory.DefaultExprProcessor
TOK_FUNCTION --> TypeCheckProcFactory.DefaultExprProcessor
ABS --> TypeCheckProcFactory.StrExprProcessor
TOK_TABLE_OR_COL --> TypeCheckProcFactory.ColumnExprProcessor
c2 --> TypeCheckProcFactory.StrExprProcessor
0 --> TypeCheckProcFactory.NumExprProcessor
TypeCheckProcFactory.StrExprProcessor 生成ExprNodeConstantDesc
TypeCheckProcFactory.ColumnExprProcessor 處理column,生成ExprNodeColumnDesc
TypeCheckProcFactory.NumExprProcessor生成ExprNodeConstantDesc
TypeCheckProcFactory.DefaultExprProcessor生成ExprNodeGenericFuncDesc
在深度優先周遊完WHERE文法樹後,每個節點都會生成一個ExprNodeDesc,但是其實除了最頂層的AND節點生成的ExprNodeDesc有用,其他的節點生成的都是中間結果,最終都會包含在AND節點生成的ExprNodeDesc中。是以在周遊WHERE樹後,通過AND節點生成的ExprNodeDesc構造FilterDesc:
new FilterDesc(genExprNodeDesc(condn, inputRR, useCaching), false)
有了FilterDesc後,就能夠構造出FilterOperator了,然後再将生成的FilterOperator加入到Operator樹中:
Operator ret = get((Class) conf.getClass());
ret.setConf(conf);
至此,where過濾條件對應的FilterOperator構造完畢。
接下來仔細看下AND生成的ExprNodeDesc,它其實是一個ExprNodeGenericFuncDesc:
// genericUDF是GenericUDFOPAnd,就是對應AND操作符
private GenericUDF genericUDF;
// AND是一個二進制操作符,children裡存的是對應的操作符
// 根據WHERE文法樹,可以知道children[0]肯定又是一個ExprNodeGenericFuncDesc,而且是一個=函 // 數,而children[1]也是一個肯定又是一個ExprNodeGenericFuncDesc,而且是一個>函數,以此類 // 推,每個ExprNodeGenericFuncDesc都有對應的children
private List chidren;
// UDF的名字,這裡是and
private transient String funcText;
/**
- This class uses a writableObjectInspector rather than a TypeInfo to store
-
the canonical type information for this NodeDesc.
*/
private transient ObjectInspector writableObjectInspector;
每個ExprNodeDesc都對應有一個ExprNodeEvaluator,來對每個ExprNodeDesc進行實際的計算。看下ExprNodeEvaluator類的基本方法:
public abstract class ExprNodeEvaluator {
// 對應的ExprNodeDesc
protected final T expr;
// 在經過這個Evaluator計算後,它的輸出值該如何解析的ObjectInspector
protected ObjectInspector outputOI;
…
/**
- Initialize should be called once and only once. Return the ObjectInspector
- for the return value, given the rowInspector.
- 初始化方法,傳入一個ObjectInspector,即傳入的資料應該如何解析的ObjectInspector
-
而需要傳回經過這個Evaluator計算後的輸出值的解析ObjectInspector
*/
public abstract ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException;
// evaluate方法,調用來對row資料進行解析
public Object evaluate(Object row) throws HiveException {
return evaluate(row, -1);
}
/**
- Evaluate the expression given the row. This method should use the
- rowInspector passed in from initialize to inspect the row object. The
- return value will be inspected by the return value of initialize.
-
If this evaluator is referenced by others, store it for them
*/
protected Object evaluate(Object row, int version) throws HiveException {
if (version < 0 || version != this.version) {
this.version = version;
return evaluation = _evaluate(row, version);
}
return evaluation;
}
// 由各個子類實作的方法的_evaluate方法,結合上面的evaluate方法,這裡實際使用了設計模式的模闆 // 方法模式
protected abstract Object _evaluate(Object row, int version) throws HiveException;
…
}
通過ExprNodeEvaluatorFactory擷取到每個ExprNodeDesc對應的ExprNodeEvaluator:
public static ExprNodeEvaluator get(ExprNodeDesc desc) throws HiveException {
// Constant node
if (desc instanceof ExprNodeConstantDesc) {
return new ExprNodeConstantEvaluator((ExprNodeConstantDesc) desc);
}
// Column-reference node, e.g. a column in the input row
if (desc instanceof ExprNodeColumnDesc) {
return new ExprNodeColumnEvaluator((ExprNodeColumnDesc) desc);
}
// Generic Function node, e.g. CASE, an operator or a UDF node
if (desc instanceof ExprNodeGenericFuncDesc) {
return new ExprNodeGenericFuncEvaluator((ExprNodeGenericFuncDesc) desc);
}
// Field node, e.g. get a.myfield1 from a
if (desc instanceof ExprNodeFieldDesc) {
return new ExprNodeFieldEvaluator((ExprNodeFieldDesc) desc);
}
throw new RuntimeException(
"Cannot find ExprNodeEvaluator for the exprNodeDesc = " + desc);
}
看下FilterOperator中如何使用ExprNodeEvaluator對資料進行過濾的。
首先在FilterOperator的initializeOp方法中,擷取到ExprNodeEvaluator:
conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate());
然後在process方法中,調用initialize方法後,調用eveluate方法擷取到整個where過濾的結果:
conditionInspector = (PrimitiveObjectInspector) conditionEvaluator
.initialize(rowInspector);
…
Object condition = conditionEvaluator.evaluate(row);
…
Boolean ret = (Boolean) conditionInspector
.getPrimitiveJavaObject(condition);
// 如果結果是true,則forward到下一個operator繼續處理
if (Boolean.TRUE.equals(ret)) {
forward(row, rowInspector);
}
再來看下GenericUDFOPAnd的evaluate方法實作:
Object a1 = arguments[1].get();
if (a1 != null) {
bool_a1 = boi1.get(a1);
if (bool_a1 == false) {
result.set(false);
return result;
}
}
if ((a0 != null && bool_a0 == true) && (a1 != null && bool_a1 == true)) {
result.set(true);
return result;
}
return null;