從上一篇對Hive metastore表結構的簡要分析中,我再根據資料設計的實體對象,再進行整個代碼結構的總結。那麼我們先打開metadata的目錄,其目錄結構:
可以看到,整個hivemeta的目錄包含metastore(用戶端與服務端調用邏輯)、events(事件目錄包含table生命周期中的檢查、權限認證等listener實作)、hooks(這裡的hooks僅包含了jdo connection的相關接口)、parser(對于表達樹的解析)、spec(partition的相關代理類)、tools(jdo execute相關方法)及txn及model,下來我們從整個metadata分逐一進行代碼分析及注釋:
沒有把包打開,很多類?是不是感覺害怕很想死?我也想死,咱們繼續。。一開始,我們可能覺得一團亂麻煩躁,這是啥玩意兒啊這。。冷靜下來,我們從Hive這個大類開始看,因為它是metastore中繼資料調用的入口。整個生命周期分析流程為: HiveMetaStoreClient用戶端的建立及加載、HiveMetaStore服務端的建立及加載、createTable、dropTable、AlterTable、createPartition、dropPartition、alterPartition。當然,這隻是完整metadata的一小部分。
1、HiveMetaStoreClient用戶端的建立及加載
那麼我們從Hive這個類一點點開始看:
1 private HiveConf conf = null;
2 private IMetaStoreClient metaStoreClient;
3 private UserGroupInformation owner;
4
5 // metastore calls timing information
6 private final Map<String, Long> metaCallTimeMap = new HashMap<String, Long>();
7
8 private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
9 @Override
10 protected synchronized Hive initialValue() {
11 return null;
12 }
13
14 @Override
15 public synchronized void remove() {
16 if (this.get() != null) {
17 this.get().close();
18 }
19 super.remove();
20 }
21 };
這裡聲明的有hiveConf對象、metaStoreClient 、操作使用者組userGroupInfomation以及調用時間Map,這裡存成一個map,用來記錄每一個動作的運作時長。同時維護了一個本地線程hiveDB,如果db為空的情況下,會重新建立一個Hive對象,代碼如下:
1 public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
2 Hive db = hiveDB.get();
3 if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
4 if (db != null) {
5 LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
6 ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
7 }
8 closeCurrent();
9 c.set("fs.scheme.class", "dfs");
10 Hive newdb = new Hive(c);
11 hiveDB.set(newdb);
12 return newdb;
13 }
14 db.conf = c;
15 return db;
16 }
随後我們會發現,在建立Hive對象時,便已經将function進行注冊,什麼是function呢,通過上次的表結構分析,可以了解為所有udf等jar包的中繼資料存儲。代碼如下:
1 // register all permanent functions. need improvement
2 static {
3 try {
4 reloadFunctions();
5 } catch (Exception e) {
6 LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
7 }
8 }
9
10 public static void reloadFunctions() throws HiveException {
//擷取 Hive對象,用于後續方法的調用
11 Hive db = Hive.get();
//通過周遊每一個dbName
12 for (String dbName : db.getAllDatabases()) {
//通過dbName查詢挂在該db下的所有function的資訊。
13 for (String functionName : db.getFunctions(dbName, "*")) {
14 Function function = db.getFunction(dbName, functionName);
15 try {
//這裡的register便是将查詢到的function的資料注冊到Registry類中的一個Map<String,FunctionInfo>中,以便計算引擎在調用時,不必再次查詢資料庫。
16 FunctionRegistry.registerPermanentFunction(
17 FunctionUtils.qualifyFunctionName(functionName, dbName), function.getClassName(),
18 false, FunctionTask.toFunctionResource(function.getResourceUris()));
19 } catch (Exception e) {
20 LOG.warn("Failed to register persistent function " +
21 functionName + ":" + function.getClassName() + ". Ignore and continue.");
22 }
23 }
24 }
25 }
調用getMSC()方法,進行metadataClient用戶端的建立,代碼如下:
1 1 private IMetaStoreClient createMetaStoreClient() throws MetaException {
2 2
3 //這裡實作接口HiveMetaHookLoader
4 3 HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
5 4 @Override
6 5 public HiveMetaHook getHook(
7 6 org.apache.hadoop.hive.metastore.api.Table tbl)
8 7 throws MetaException {
9 8
10 9 try {
11 10 if (tbl == null) {
12 11 return null;
13 12 }
14 //根據tble的kv屬性加載不同storage的執行個體,比如hbase、redis等等拓展存儲,作為外部表進行存儲
15 13 HiveStorageHandler storageHandler =
16 14 HiveUtils.getStorageHandler(conf,
17 15 tbl.getParameters().get(META_TABLE_STORAGE));
18 16 if (storageHandler == null) {
19 17 return null;
20 18 }
21 19 return storageHandler.getMetaHook();
22 20 } catch (HiveException ex) {
23 21 LOG.error(StringUtils.stringifyException(ex));
24 22 throw new MetaException(
25 23 "Failed to load storage handler: " + ex.getMessage());
26 24 }
27 25 }
28 26 };
29 27 return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
30 28 SessionHiveMetaStoreClient.class.getName());
31 29 }
2、HiveMetaStore服務端的建立及加載
在HiveMetaStoreClient初始化時,會初始化HiveMetaStore用戶端,代碼如下:
1 public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
2 throws MetaException {
3
4 this.hookLoader = hookLoader;
5 if (conf == null) {
6 conf = new HiveConf(HiveMetaStoreClient.class);
7 }
8 this.conf = conf;
9 filterHook = loadFilterHooks();
10 //根據hive-site.xml中的hive.metastore.uris配置,如果配置該參數,則認為是遠端連接配接,否則為本地連接配接
11 String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
12 localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
13 if (localMetaStore) {
//本地連接配接直接連接配接HiveMetaStore
16 client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
17 isConnected = true;
18 snapshotActiveConf();
19 return;
20 }
21
22 //擷取配置中的重試次數及timeout時間
23 retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
24 retryDelaySeconds = conf.getTimeVar(
25 ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
26
27 //拼接metastore uri
28 if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
29 String metastoreUrisString[] = conf.getVar(
30 HiveConf.ConfVars.METASTOREURIS).split(",");
31 metastoreUris = new URI[metastoreUrisString.length];
32 try {
33 int i = 0;
34 for (String s : metastoreUrisString) {
35 URI tmpUri = new URI(s);
36 if (tmpUri.getScheme() == null) {
37 throw new IllegalArgumentException("URI: " + s
38 + " does not have a scheme");
39 }
40 metastoreUris[i++] = tmpUri;
41
42 }
43 } catch (IllegalArgumentException e) {
44 throw (e);
45 } catch (Exception e) {
46 MetaStoreUtils.logAndThrowMetaException(e);
47 }
48 } else {
49 LOG.error("NOT getting uris from conf");
50 throw new MetaException("MetaStoreURIs not found in conf file");
51 }
52 調用open方法建立連接配接
53 open();
54 }
從上面代碼中可以看出,如果我們是遠端連接配接,需要配置hive-site.xml中的hive.metastore.uri,是不是很熟悉?加入你的client與server不在同一台機器,就需要配置進行遠端連接配接。那麼我們繼續往下面看,建立連接配接的open方法:
1 private void open() throws MetaException {
2 isConnected = false;
3 TTransportException tte = null;
//是否使用Sasl
4 boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
//If true, the metastore Thrift interface will use TFramedTransport. When false (default) a standard TTransport is used.
5 boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
//If true, the metastore Thrift interface will use TCompactProtocol. When false (default) TBinaryProtocol will be used 具體他們之間的差別我們後續再讨論
6 boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
//擷取socket timeout時間
7 int clientSocketTimeout = (int) conf.getTimeVar(
8 ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
9
10 for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
11 for (URI store : metastoreUris) {
12 LOG.info("Trying to connect to metastore with URI " + store);
13 try {
14 transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
15 if (useSasl) {
16 // Wrap thrift connection with SASL for secure connection.
17 try {
//建立HadoopThriftAuthBridge client
18 HadoopThriftAuthBridge.Client authBridge =
19 ShimLoader.getHadoopThriftAuthBridge().createClient();
20 //權限認證相關
21 // check if we should use delegation tokens to authenticate
22 // the call below gets hold of the tokens if they are set up by hadoop
23 // this should happen on the map/reduce tasks if the client added the
24 // tokens into hadoop's credential store in the front end during job
25 // submission.
26 String tokenSig = conf.get("hive.metastore.token.signature");
27 // tokenSig could be null
28 tokenStrForm = Utils.getTokenStrForm(tokenSig);
29 if(tokenStrForm != null) {
30 // authenticate using delegation tokens via the "DIGEST" mechanism
31 transport = authBridge.createClientTransport(null, store.getHost(),
32 "DIGEST", tokenStrForm, transport,
33 MetaStoreUtils.getMetaStoreSaslProperties(conf));
34 } else {
35 String principalConfig =
36 conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL);
37 transport = authBridge.createClientTransport(
38 principalConfig, store.getHost(), "KERBEROS", null,
39 transport, MetaStoreUtils.getMetaStoreSaslProperties(conf));
40 }
41 } catch (IOException ioe) {
42 LOG.error("Couldn't create client transport", ioe);
43 throw new MetaException(ioe.toString());
44 }
45 } else if (useFramedTransport) {
46 transport = new TFramedTransport(transport);
47 }
48 final TProtocol protocol;
//後續詳細說明兩者的差別(因為俺還沒看,哈哈)
49 if (useCompactProtocol) {
50 protocol = new TCompactProtocol(transport);
51 } else {
52 protocol = new TBinaryProtocol(transport);
53 }
//建立ThriftHiveMetastore client
54 client = new ThriftHiveMetastore.Client(protocol);
55 try {
56 transport.open();
57 isConnected = true;
58 } catch (TTransportException e) {
59 tte = e;
60 if (LOG.isDebugEnabled()) {
61 LOG.warn("Failed to connect to the MetaStore Server...", e);
62 } else {
63 // Don't print full exception trace if DEBUG is not on.
64 LOG.warn("Failed to connect to the MetaStore Server...");
65 }
66 }
67 //使用者組及使用者的加載
68 if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){
69 // Call set_ugi, only in unsecure mode.
70 try {
71 UserGroupInformation ugi = Utils.getUGI();
72 client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
73 } catch (LoginException e) {
74 LOG.warn("Failed to do login. set_ugi() is not successful, " +
75 "Continuing without it.", e);
76 } catch (IOException e) {
77 LOG.warn("Failed to find ugi of client set_ugi() is not successful, " +
78 "Continuing without it.", e);
79 } catch (TException e) {
80 LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
81 + "Continuing without it.", e);
82 }
83 }
84 } catch (MetaException e) {
85 LOG.error("Unable to connect to metastore with URI " + store
86 + " in attempt " + attempt, e);
87 }
88 if (isConnected) {
89 break;
90 }
91 }
92 // Wait before launching the next round of connection retries.
93 if (!isConnected && retryDelaySeconds > 0) {
94 try {
95 LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
96 Thread.sleep(retryDelaySeconds * 1000);
97 } catch (InterruptedException ignore) {}
98 }
99 }
100
101 if (!isConnected) {
102 throw new MetaException("Could not connect to meta store using any of the URIs provided." +
103 " Most recent failure: " + StringUtils.stringifyException(tte));
104 }
105
106 snapshotActiveConf();
107
108 LOG.info("Connected to metastore.");
109 }
本篇先對對protocol的原理放置一邊。從代碼中可以看出HiveMetaStore服務端是通過ThriftHiveMetaStore建立,它本是一個class類,但其中定義了接口Iface、AsyncIface,這樣做的好處是利于繼承實作。那麼下來,我們看一下HMSHandler的初始化。如果是在本地調用的過程中,直接調用newRetryingHMSHandler,便會直接進行HMSHandler的初始化。代碼如下:
1 public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
2 super(name);
3 hiveConf = conf;
4 if (init) {
5 init();
6 }
7 }
下倆我們繼續看它的init方法:
1 public void init() throws MetaException {
//擷取與資料互動的實作類className,該類為objectStore,是RawStore的實作,負責JDO與資料庫的互動。
2 rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
//加載Listeners,來自hive.metastore.init.hooks,可自行實作并加載
3 initListeners = MetaStoreUtils.getMetaStoreListeners(
4 MetaStoreInitListener.class, hiveConf,
5 hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
6 for (MetaStoreInitListener singleInitListener: initListeners) {
7 MetaStoreInitContext context = new MetaStoreInitContext();
8 singleInitListener.onInit(context);
9 }
10 //初始化alter的實作類
11 String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",
12 HiveAlterHandler.class.getName());
13 alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(
14 alterHandlerName), hiveConf);
//初始化warehouse
15 wh = new Warehouse(hiveConf);
16 //建立預設db以及使用者,同時加載currentUrl
17 synchronized (HMSHandler.class) {
18 if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) {
19 createDefaultDB();
20 createDefaultRoles();
21 addAdminUsers();
22 currentUrl = MetaStoreInit.getConnectionURL(hiveConf);
23 }
24 }
25 //計數資訊的初始化
26 if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
27 try {
28 Metrics.init();
29 } catch (Exception e) {
30 // log exception, but ignore inability to start
31 LOG.error("error in Metrics init: " + e.getClass().getName() + " "
32 + e.getMessage(), e);
33 }
34 }
35 //Listener的PreListener的初始化
36 preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
37 hiveConf,
38 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));
39 listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf,
40 hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
41 listeners.add(new SessionPropertiesListener(hiveConf));
42 endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
43 MetaStoreEndFunctionListener.class, hiveConf,
44 hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));
45 //針對partitionName的正則校驗,可自行設定,根據hive.metastore.partition.name.whitelist.pattern進行設定
46 String partitionValidationRegex =
47 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN);
48 if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
49 partitionValidationPattern = Pattern.compile(partitionValidationRegex);
50 } else {
51 partitionValidationPattern = null;
52 }
53
54 long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS);
55 if (cleanFreq > 0) {
56 // In default config, there is no timer.
57 Timer cleaner = new Timer("Metastore Events Cleaner Thread", true);
58 cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq);
59 }
60 }
它初始化了與資料庫互動的rawStore的實作類、實體操作的warehouse以及Event與Listener。進而通過接口調用相關meta生命周期方法進行表的操作。
3、createTable
從createTable方法開始。上代碼:
1 public void createTable(String tableName, List<String> columns, List<String> partCols,
2 Class<? extends InputFormat> fileInputFormat,
3 Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
4 Map<String, String> parameters) throws HiveException {
5 if (columns == null) {
6 throw new HiveException("columns not specified for table " + tableName);
7 }
8
9 Table tbl = newTable(tableName);
//SD表屬性,設定該表的input及output class名,在計算引擎計算時,拉取相應的ClassName 通過反射進行input及output類的加載
10 tbl.setInputFormatClass(fileInputFormat.getName());
11 tbl.setOutputFormatClass(fileOutputFormat.getName());
12
//封裝FileSchema對象,該為每個column的名稱及字段類型,并加入到sd對象的的column屬性中
13 for (String col : columns) {
14 FieldSchema field = new FieldSchema(col, STRING_TYPE_NAME, "default");
15 tbl.getCols().add(field);
16 }
17
//如果在建立表時,設定了分區資訊,比如dt字段為該分區。則進行分區資訊的記錄,最終寫入Partition表中
18 if (partCols != null) {
19 for (String partCol : partCols) {
20 FieldSchema part = new FieldSchema();
21 part.setName(partCol);
22 part.setType(STRING_TYPE_NAME); // default partition key
23 tbl.getPartCols().add(part);
24 }
25 }
//設定序列化的方式
26 tbl.setSerializationLib(LazySimpleSerDe.class.getName());
//設定分桶資訊
27 tbl.setNumBuckets(bucketCount);
28 tbl.setBucketCols(bucketCols);
//設定table額外添加的kv資訊
29 if (parameters != null) {
30 tbl.setParamters(parameters);
31 }
32 createTable(tbl);
33 }
從代碼中可以看到,Hive 構造了一個Table的對象,該對象可以當做是一個model,包含了幾乎所有以Tbls表為主表的所有以table_id為的外鍵表屬性(具體可參考hive metastore表結構),封裝完畢後在進行createTable的調用,接下來的調用如下:
public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
try {
//這裡再次擷取SessionState中的CurrentDataBase進行setDbName(安全)
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
tbl.setDbName(SessionState.get().getCurrentDatabase());
}
//這裡主要對每一個column屬性進行校驗,比如是否有非法字元等等
if (tbl.getCols().size() == 0 || tbl.getSd().getColsSize() == 0) {
tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(),
tbl.getDeserializer()));
}
//該方法對table屬性中的input、output以及column屬性的校驗
tbl.checkValidity();
if (tbl.getParameters() != null) {
tbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
}
org.apache.hadoop.hive.metastore.api.Table tTbl = tbl.getTTable();
//這裡開始進行權限認證,牽扯到的便是我們再hive中配置的 hive.security.authorization.createtable.user.grants、hive.security.authorization.createtable.group.grants、
hive.security.authorization.createtable.role.grants配置參數,來自于hive自己封裝的 使用者、角色、組的概念。
PrincipalPrivilegeSet principalPrivs = new PrincipalPrivilegeSet();
SessionState ss = SessionState.get();
if (ss != null) {
CreateTableAutomaticGrant grants = ss.getCreateTableGrants();
if (grants != null) {
principalPrivs.setUserPrivileges(grants.getUserGrants());
principalPrivs.setGroupPrivileges(grants.getGroupGrants());
principalPrivs.setRolePrivileges(grants.getRoleGrants());
tTbl.setPrivileges(principalPrivs);
}
}
//通過用戶端連結服務端進行table的建立
getMSC().createTable(tTbl);
} catch (AlreadyExistsException e) {
if (!ifNotExists) {
throw new HiveException(e);
}
} catch (Exception e) {
throw new HiveException(e);
}
}
那麼下來,我們來看一下受到調用的HiveMetaClient中createTable方法,代碼如下:
1 public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
2 InvalidObjectException, MetaException, NoSuchObjectException, TException {
//這裡擷取HiveMeetaHook對象,針對不同的存儲引擎進行建立前的加載及驗證
3 HiveMetaHook hook = getHook(tbl);
4 if (hook != null) {
5 hook.preCreateTable(tbl);
6 }
7 boolean success = false;
8 try {
//随即調用HiveMetaStore進行服務端與資料庫的建立互動
10 create_table_with_environment_context(tbl, envContext);
11 if (hook != null) {
12 hook.commitCreateTable(tbl);
13 }
14 success = true;
15 } finally {
如果建立失敗的話,進行復原操作
16 if (!success && (hook != null)) {
17 hook.rollbackCreateTable(tbl);
18 }
19 }
20 }
這裡簡要說下Hook的作用,HiveMetaHook為接口,接口方法包括preCreate、rollbackCreateTable、preDropTable等等操作,它的實作為不同存儲類型的預建立加載及驗證,以及失敗復原等動作。代碼如下:
1 public interface HiveMetaHook {
2 /**
3 * Called before a new table definition is added to the metastore
4 * during CREATE TABLE.
5 *
6 * @param table new table definition
7 */
8 public void preCreateTable(Table table)
9 throws MetaException;
10
11 /**
12 * Called after failure adding a new table definition to the metastore
13 * during CREATE TABLE.
14 *
15 * @param table new table definition
16 */
17 public void rollbackCreateTable(Table table)
18 throws MetaException;
35 public void preDropTale(Table table)
36 throws MetaException;
...............................
随後,我們再看一下HiveMetaStore服務端的createTable方法,如下:
1 private void create_table_core(final RawStore ms, final Table tbl,
2 final EnvironmentContext envContext)
3 throws AlreadyExistsException, MetaException,
4 InvalidObjectException, NoSuchObjectException {
5 //名稱正則校驗,校驗是否含有非法字元
6 if (!MetaStoreUtils.validateName(tbl.getTableName())) {
7 throw new InvalidObjectException(tbl.getTableName()
8 + " is not a valid object name");
9 }
//改端代碼屬于校驗代碼,對于column的名稱及column type類型j及partitionKey的名稱校驗
10 String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
11 if (validate != null) {
12 throw new InvalidObjectException("Invalid column " + validate);
13 }
14 if (tbl.getPartitionKeys() != null) {
15 validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
16 if (validate != null) {
17 throw new InvalidObjectException("Invalid partition column " + validate);
18 }
19 }
20 SkewedInfo skew = tbl.getSd().getSkewedInfo();
21 if (skew != null) {
22 validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
23 if (validate != null) {
24 throw new InvalidObjectException("Invalid skew column " + validate);
25 }
26 validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
27 skew.getSkewedColNames(), tbl.getSd().getCols());
28 if (validate != null) {
29 throw new InvalidObjectException("Invalid skew column " + validate);
30 }
31 }
32
33 Path tblPath = null;
34 boolean success = false, madeDir = false;
35 try {
//建立前的事件調用,metastore已實作的listner事件包含DummyPreListener、AuthorizationPreEventListener、AlternateFailurePreListener以及MetaDataExportListener。
//這些Listener是幹嘛的呢?詳細解釋由分析meta設計模式時,詳細說明。
36 firePreEvent(new PreCreateTableEvent(tbl, this));
37
//打開事務
38 ms.openTransaction();
39
//如果db不存在的情況下,則抛異常
40 Database db = ms.getDatabase(tbl.getDbName());
41 if (db == null) {
42 throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist");
43 }
44
45 // 校驗該db下,table是否存在
46 if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) {
47 throw new AlreadyExistsException("Table " + tbl.getTableName()
48 + " already exists");
49 }
50 // 如果該表不為視圖表,則組裝完整的tbleParth ->fs.getUri().getScheme()+fs.getUri().getAuthority()+path.toUri().getPath())
51 if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
52 if (tbl.getSd().getLocation() == null
53 || tbl.getSd().getLocation().isEmpty()) {
54 tblPath = wh.getTablePath(
55 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
56 } else {
//如果該表不是内部表同時tbl的kv中storage_handler為空時,則隻是警告
57 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
58 LOG.warn("Location: " + tbl.getSd().getLocation()
59 + " specified for non-external table:" + tbl.getTableName());
60 }
61 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
62 }
//将拼接完的tblPath set到sd的location中
63 tbl.getSd().setLocation(tblPath.toString());
64 }
65 //建立table的路徑
66 if (tblPath != null) {
67 if (!wh.isDir(tblPath)) {
68 if (!wh.mkdirs(tblPath, true)) {
69 throw new MetaException(tblPath
70 + " is not a directory or unable to create one");
71 }
72 madeDir = true;
73 }
74 }
// hive.stats.autogather 配置判斷
75 if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
76 !MetaStoreUtils.isView(tbl)) {
77 if (tbl.getPartitionKeysSize() == 0) { // Unpartitioned table
78 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir);
79 } else { // Partitioned table with no partitions.
80 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true);
81 }
82 }
83
84 // set create time
85 long time = System.currentTimeMillis() / 1000;
86 tbl.setCreateTime((int) time);
87 if (tbl.getParameters() == null ||
88 tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
89 tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
90 }
執行createTable資料庫操作
91 ms.createTable(tbl);
92 success = ms.commitTransaction();
93
94 } finally {
95 if (!success) {
96 ms.rollbackTransaction();
//如果由于某些原因沒有建立,則進行已建立表路徑的删除
97 if (madeDir) {
98 wh.deleteDir(tblPath, true);
99 }
100 }
//進行create完成時的listener類發送 比如 noftify通知
101 for (MetaStoreEventListener listener : listeners) {
102 CreateTableEvent createTableEvent =
103 new CreateTableEvent(tbl, success, this);
104 createTableEvent.setEnvironmentContext(envContext);
105 listener.onCreateTable(createTableEvent);
106 }
107 }
108 }
這裡的listener後續會詳細說明,那麼我們繼續垂直往下看,這裡的 ms.createTable方法。ms便是RawStore接口對象,這個接口對象包含了所有生命周期的統一方法調用,部分代碼如下:
1 public abstract Database getDatabase(String name)
2 throws NoSuchObjectException;
3
4 public abstract boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException;
5
6 public abstract boolean alterDatabase(String dbname, Database db) throws NoSuchObjectException, MetaException;
7
8 public abstract List<String> getDatabases(String pattern) throws MetaException;
9
10 public abstract List<String> getAllDatabases() throws MetaException;
11
12 public abstract boolean createType(Type type);
13
14 public abstract Type getType(String typeName);
15
16 public abstract boolean dropType(String typeName);
17
18 public abstract void createTable(Table tbl) throws InvalidObjectException,
19 MetaException;
20
21 public abstract boolean dropTable(String dbName, String tableName)
22 throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException;
23
24 public abstract Table getTable(String dbName, String tableName)
25 throws MetaException;
26 ..................
那麼下來我們來看一下具體怎麼實作的,首先hive metastore會通過調用getMS()方法,擷取本地線程中的RawStore的實作,代碼如下:
1 public RawStore getMS() throws MetaException {
//擷取本地線程中已存在的RawStore
2 RawStore ms = threadLocalMS.get();
//如果不存在,則建立該對象的實作,并加入到本地線程中
3 if (ms == null) {
4 ms = newRawStore();
5 ms.verifySchema();
6 threadLocalMS.set(ms);
7 ms = threadLocalMS.get();
8 }
9 return ms;
10 }
看到這裡,是不是很想看看newRawStore它幹嘛啦?那麼我們繼續:
1 public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName,
2 int id) throws MetaException {
3 //通過反射,建立baseClass,随後再進行該實作對象的建立
4 Class<? extends RawStore> baseClass = (Class<? extends RawStore>) MetaStoreUtils.getClass(
5 rawStoreClassName);
6
7 RawStoreProxy handler = new RawStoreProxy(hiveConf, conf, baseClass, id);
8
9 // Look for interfaces on both the class and all base classes.
10 return (RawStore) Proxy.newProxyInstance(RawStoreProxy.class.getClassLoader(),
11 getAllInterfaces(baseClass), handler);
12 }
那麼問題來了,rawstoreClassName從哪裡來呢?它是在HiveMetaStore進行初始化時加載的,來源于HiveConf中的METASTORE_RAW_STORE_IMPL,配置參數,也就是RawStore的實作類ObjectStore。好了,既然RawStore的實作類已經建立,那麼我們繼續深入ObjectStore,代碼如下:
1 @Override
2 public void createTable(Table tbl) throws InvalidObjectException, MetaException {
3 boolean commited = false;
4 try {
//建立事務
5 openTransaction();
//這裡再次進行db 、table的校驗,代碼不再貼出來,具體為什麼又要做一次校驗,還需要深入思考
6 MTable mtbl = convertToMTable(tbl);
這裡的pm為ObjectStore建立時,init的JDO PersistenceManage對象。這裡便是送出Table對象的地方,具體可研究下JDO module對象與資料庫的互動
7 pm.makePersistent(mtbl);
//封裝權限使用者、角色、組對象并寫入
8 PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges();
9 List<Object> toPersistPrivObjs = new ArrayList<Object>();
10 if (principalPrivs != null) {
11 int now = (int)(System.currentTimeMillis()/1000);
12
13 Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges();
14 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER);
15
16 Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges();
17 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP);
18
19 Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges();
20 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE);
21 }
22 pm.makePersistentAll(toPersistPrivObjs);
23 commited = commitTransaction();
24 } finally {
//如果失敗則復原
25 if (!commited) {
26 rollbackTransaction();
27 }
28 }
29 }
4、dropTable
二話不說上從Hive類中上代碼:
1 public void dropTable(String tableName, boolean ifPurge) throws HiveException {
//這裡Hive 将dbName與TableName合并成一個數組
2 String[] names = Utilities.getDbTableName(tableName);
3 dropTable(names[0], names[1], true, true, ifPurge);
4 }
為什麼要進行這樣的處理呢,其實是因為 drop table的時候 我們的sql語句會是drop table dbName.tableName 或者是drop table tableName,這裡進行tableName和DbName的組裝,如果為drop table tableName,則擷取目前session中的dbName,代碼如下:
1 public static String[] getDbTableName(String dbtable) throws SemanticException {
//擷取目前Session中的DbName
2 return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable);
3 }
4
5 public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException {
6 if (dbtable == null) {
7 return new String[2];
8 }
9 String[] names = dbtable.split("\\.");
10 switch (names.length) {
11 case 2:
12 return names;
//如果長度為1,則重新組裝
13 case 1:
14 return new String [] {defaultDb, dbtable};
15 default:
16 throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable);
17 }
18 }
随後通過getMSC()調用HiveMetaStoreClient中的dropTable,代碼如下:
1 public void dropTable(String dbname, String name, boolean deleteData,
2 boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
3 NoSuchObjectException, UnsupportedOperationException {
4 Table tbl;
5 try {
//通過dbName與tableName擷取正個Table對象,也就是通過dbName與TableName擷取該Table存儲的所有中繼資料
6 tbl = getTable(dbname, name);
7 } catch (NoSuchObjectException e) {
8 if (!ignoreUnknownTab) {
9 throw e;
10 }
11 return;
12 }
//根據table type來判斷是否為IndexTable,如果為索引表則不允許删除
13 if (isIndexTable(tbl)) {
14 throw new UnsupportedOperationException("Cannot drop index tables");
15 }
//這裡的getHook 與create時getHook一緻,擷取對應table存儲的hook
16 HiveMetaHook hook = getHook(tbl);
17 if (hook != null) {
18 hook.preDropTable(tbl);
19 }
20 boolean success = false;
21 try {
調用HiveMetaStore服務端的dropTable方法
22 drop_table_with_environment_context(dbname, name, deleteData, envContext);
23 if (hook != null) {
24 hook.commitDropTable(tbl, deleteData);
25 }
26 success=true;
27 } catch (NoSuchObjectException e) {
28 if (!ignoreUnknownTab) {
29 throw e;
30 }
31 } finally {
32 if (!success && (hook != null)) {
33 hook.rollbackDropTable(tbl);
34 }
35 }
36 }
下面我們重點看下服務端HiveMetaStore幹了些什麼,代碼如下:
1 private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
2 final boolean deleteData, final EnvironmentContext envContext,
3 final String indexName) throws NoSuchObjectException,
4 MetaException, IOException, InvalidObjectException, InvalidInputException {
5 boolean success = false;
6 boolean isExternal = false;
7 Path tblPath = null;
8 List<Path> partPaths = null;
9 Table tbl = null;
10 boolean ifPurge = false;
11 try {
12 ms.openTransaction();
13 // 擷取正個Table的對象屬性
14 tbl = get_table_core(dbname, name);
15 if (tbl == null) {
16 throw new NoSuchObjectException(name + " doesn't exist");
17 }
//如果sd資料為空,則認為該表資料損壞
18 if (tbl.getSd() == null) {
19 throw new MetaException("Table metadata is corrupted");
20 }
21 ifPurge = isMustPurge(envContext, tbl);
22
23 firePreEvent(new PreDropTableEvent(tbl, deleteData, this));
//判斷如果該表存在索引,則需要先删除該表的索引
25 boolean isIndexTable = isIndexTable(tbl);
26 if (indexName == null && isIndexTable) {
27 throw new RuntimeException(
28 "The table " + name + " is an index table. Please do drop index instead.");
29 }
//如果不是索引表,則删除索引中繼資料
31 if (!isIndexTable) {
32 try {
33 List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
34 while (indexes != null && indexes.size() > 0) {
35 for (Index idx : indexes) {
36 this.drop_index_by_name(dbname, name, idx.getIndexName(), true);
37 }
38 indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
39 }
40 } catch (TException e) {
41 throw new MetaException(e.getMessage());
42 }
43 }
//判斷是否為外部表
44 isExternal = isExternal(tbl);
45 if (tbl.getSd().getLocation() != null) {
46 tblPath = new Path(tbl.getSd().getLocation());
47 if (!wh.isWritable(tblPath.getParent())) {
48 String target = indexName == null ? "Table" : "Index table";
49 throw new MetaException(target + " metadata not deleted since " +
50 tblPath.getParent() + " is not writable by " +
51 hiveConf.getUser());
52 }
53 }
54
56 checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge);
57 //擷取所有partition的location path 這裡有個奇怪的地方,為什麼不将Table對象直接傳入,而是又在該方法中重新getTable,同時校驗上級目錄的讀寫權限
58 partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
59 tbl.getPartitionKeys(), deleteData && !isExternal);
60 //調用ObjectStore進行meta資料的删除
61 if (!ms.dropTable(dbname, name)) {
62 String tableName = dbname + "." + name;
63 throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
64 "Unable to drop index table " + tableName + " for index " + indexName);
65 }
66 success = ms.commitTransaction();
67 } finally {
68 if (!success) {
69 ms.rollbackTransaction();
70 } else if (deleteData && !isExternal) {
//删除實體partition
73 deletePartitionData(partPaths, ifPurge);
74 //删除Table路徑
75 deleteTableData(tblPath, ifPurge);
76 // ok even if the data is not deleted
77
//Listener 處理
78 for (MetaStoreEventListener listener : listeners) {
79 DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this);
80 dropTableEvent.setEnvironmentContext(envContext);
81 listener.onDropTable(dropTableEvent);
82 }
83 }
84 return success;
85 }
我們繼續深入ObjectStore中的dropTable,會發現 再一次通過dbName與tableName擷取整個Table對象,随後逐一删除。也許代碼并不是同一個人寫的也可能是由于安全性考慮?很多可以通過接口傳入的Table對象,都重新擷取了,這樣會不會加重資料庫的負擔呢?ObjectStore代碼如下:
1 public boolean dropTable(String dbName, String tableName) throws MetaException,
2 NoSuchObjectException, InvalidObjectException, InvalidInputException {
3 boolean success = false;
4 try {
5 openTransaction();
//重新擷取Table對象
6 MTable tbl = getMTable(dbName, tableName);
7 pm.retrieve(tbl);
8 if (tbl != null) {
9 //下列代碼查詢并删除所有的權限
10 List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName);
11 if (tabGrants != null && tabGrants.size() > 0) {
12 pm.deletePersistentAll(tabGrants);
13 }
14 List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(dbName,
15 tableName);
16 if (tblColGrants != null && tblColGrants.size() > 0) {
17 pm.deletePersistentAll(tblColGrants);
18 }
19
20 List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(dbName, tableName);
21 if (partGrants != null && partGrants.size() > 0) {
22 pm.deletePersistentAll(partGrants);
23 }
24
25 List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName,
26 tableName);
27 if (partColGrants != null && partColGrants.size() > 0) {
28 pm.deletePersistentAll(partColGrants);
29 }
30 // delete column statistics if present
31 try {
//删除column統計表資料
32 deleteTableColumnStatistics(dbName, tableName, null);
33 } catch (NoSuchObjectException e) {
34 LOG.info("Found no table level column statistics associated with db " + dbName +
35 " table " + tableName + " record to delete");
36 }
37 //删除mcd表資料
38 preDropStorageDescriptor(tbl.getSd());
39 //删除整個Table對象相關表資料
40 pm.deletePersistentAll(tbl);
41 }
42 success = commitTransaction();
43 } finally {
44 if (!success) {
45 rollbackTransaction();
46 }
47 }
48 return success;
49 }
5、AlterTable
下來我們看下AlterTable,AlterTable包含的邏輯較多,因為牽扯到實體存儲上的路徑修改等,那麼我們來一點點檢視。還是從Hive類中開始,上代碼:
1 public void alterTable(String tblName, Table newTbl, boolean cascade)
2 throws InvalidOperationException, HiveException {
3 String[] names = Utilities.getDbTableName(tblName);
4 try {
5 //删除table kv中的DDL_TIME 因為要alterTable是以,該事件會被改變
6 if (newTbl.getParameters() != null) {
7 newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
8 }
//進行相關校驗,包含dbName、tableName、column、inputOutClass、outputClass的校驗等,如果校驗不通過則抛出HiveException
9 newTbl.checkValidity();
//調用alterTable
10 getMSC().alter_table(names[0], names[1], newTbl.getTTable(), cascade);
11 } catch (MetaException e) {
12 throw new HiveException("Unable to alter table. " + e.getMessage(), e);
13 } catch (TException e) {
14 throw new HiveException("Unable to alter table. " + e.getMessage(), e);
15 }
16 }
對于HiveMetaClient,并沒有做相應處理,是以我們直接來看HiveMetaStore服務端做了些什麼呢?
1 private void alter_table_core(final String dbname, final String name, final Table newTable,
2 final EnvironmentContext envContext, final boolean cascade)
3 throws InvalidOperationException, MetaException {
4 startFunction("alter_table", ": db=" + dbname + " tbl=" + name
5 + " newtbl=" + newTable.getTableName());
6
7 //更新DDL_Time
8 if (newTable.getParameters() == null ||
9 newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
10 newTable.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
11 .currentTimeMillis() / 1000));
12 }
13 boolean success = false;
14 Exception ex = null;
15 try {
//擷取已有Table的整個對象
16 Table oldt = get_table_core(dbname, name);
//進行Event處理
17 firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
//進行alterTable處理,後面詳細說明
18 alterHandler.alterTable(getMS(), wh, dbname, name, newTable, cascade);
19 success = true;
20
//進行Listener處理
21 for (MetaStoreEventListener listener : listeners) {
22
23 AlterTableEvent alterTableEvent =
24 new AlterTableEvent(oldt, newTable, success, this);
25 alterTableEvent.setEnvironmentContext(envContext);
26 listener.onAlterTable(alterTableEvent);
27 }
28 } catch (NoSuchObjectException e) {
29 // thrown when the table to be altered does not exist
30 ex = e;
31 throw new InvalidOperationException(e.getMessage());
32 } catch (Exception e) {
33 ex = e;
34 if (e instanceof MetaException) {
35 throw (MetaException) e;
36 } else if (e instanceof InvalidOperationException) {
37 throw (InvalidOperationException) e;
38 } else {
39 throw newMetaException(e);
40 }
41 } finally {
42 endFunction("alter_table", success, ex, name);
43 }
44 }
那麼,我們重點看下alterHandler具體所做的事情,在這之前簡要說下alterHandler的初始化,它是在HiveMetaStore init時擷取的hive.metastore.alter.impl參數的className,也就是HiveAlterHandler的name,那麼具體,我們來看下它alterTable時的實作,前方高能,小心火燭:)
1 public void alterTable(RawStore msdb, Warehouse wh, String dbname,
2 String name, Table newt, boolean cascade) throws InvalidOperationException, MetaException {
3 if (newt == null) {
4 throw new InvalidOperationException("New table is invalid: " + newt);
5 }
6 //校驗新的tableName是否合法
7 if (!MetaStoreUtils.validateName(newt.getTableName())) {
8 throw new InvalidOperationException(newt.getTableName()
9 + " is not a valid object name");
10 }
//校驗新的column Name type是否合法
11 String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols());
12 if (validate != null) {
13 throw new InvalidOperationException("Invalid column " + validate);
14 }
15
16 Path srcPath = null;
17 FileSystem srcFs = null;
18 Path destPath = null;
19 FileSystem destFs = null;
20
21 boolean success = false;
22 boolean moveData = false;
23 boolean rename = false;
24 Table oldt = null;
25 List<ObjectPair<Partition, String>> altps = new ArrayList<ObjectPair<Partition, String>>();
26
27 try {
28 msdb.openTransaction();
//這裡直接轉換小寫,可以看出 代碼不是一個人寫的
29 name = name.toLowerCase();
30 dbname = dbname.toLowerCase();
31
32 //校驗新的tableName是否存在
33 if (!newt.getTableName().equalsIgnoreCase(name)
34 || !newt.getDbName().equalsIgnoreCase(dbname)) {
35 if (msdb.getTable(newt.getDbName(), newt.getTableName()) != null) {
36 throw new InvalidOperationException("new table " + newt.getDbName()
37 + "." + newt.getTableName() + " already exists");
38 }
39 rename = true;
40 }
41
42 //擷取老的table對象
43 oldt = msdb.getTable(dbname, name);
44 if (oldt == null) {
45 throw new InvalidOperationException("table " + newt.getDbName() + "."
46 + newt.getTableName() + " doesn't exist");
47 }
48 //alter Table時 擷取 METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES配置項,如果為true的話,将改變column的type類型,這裡為false
49 if (HiveConf.getBoolVar(hiveConf,
50 HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES,
51 false)) {
52 // Throws InvalidOperationException if the new column types are not
53 // compatible with the current column types.
54 MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange(
55 oldt.getSd().getCols(), newt.getSd().getCols());
56 }
57 //cascade參數由調用Hive altertable方法穿過來的,也就是引擎調用時參數的設定,這裡用來檢視是否需要alterPartition資訊
58 if (cascade) {
59 //校驗新的column是否與老的column一緻,如不一緻,說明進行了column的添加或删除操作
60 if(MetaStoreUtils.isCascadeNeededInAlterTable(oldt, newt)) {
//根據dbName與tableName擷取整個partition的資訊
61 List<Partition> parts = msdb.getPartitions(dbname, name, -1);
62 for (Partition part : parts) {
63 List<FieldSchema> oldCols = part.getSd().getCols();
64 part.getSd().setCols(newt.getSd().getCols());
65 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
//如果columns不一緻,則删除已有的column統計資訊
66 updatePartColumnStatsForAlterColumns(msdb, part, oldPartName, part.getValues(), oldCols, part);
//更新整個Partition的資訊
67 msdb.alterPartition(dbname, name, part.getValues(), part);
68 }
69 } else {
70 LOG.warn("Alter table does not cascade changes to its partitions.");
71 }
72 }
73
74 //判斷parititonkey是否改變,也就是dt 或 hour等partName是否改變
76 boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(),
77 newt.getPartitionKeys());
78
//如果已有表為視圖表,同時發現老的partkey與新的partKey不一緻,則報錯
79 if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
80 if (oldt.getPartitionKeys().size() != newt.getPartitionKeys().size()
81 || !partKeysPartiallyEqual) {
82 throw new InvalidOperationException(
83 "partition keys can not be changed.");
84 }
85 }
86
//如果該表不為視圖表,同時,該表的location資訊并未發生變化,同時新的location資訊并不為空,并且已有的該表不為外部表,說明使用者是想要移動資料到新的location位址,那麼該操作
// 為alter table rename操作
91 if (rename
92 && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())
93 && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0
94 || StringUtils.isEmpty(newt.getSd().getLocation()))
95 && !MetaStoreUtils.isExternalTable(oldt)) {
96 //擷取新的location資訊
97 srcPath = new Path(oldt.getSd().getLocation());
98 srcFs = wh.getFs(srcPath);
99
100 // that means user is asking metastore to move data to new location
101 // corresponding to the new name
102 // get new location
103 Database db = msdb.getDatabase(newt.getDbName());
104 Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
105 destPath = new Path(databasePath, newt.getTableName());
106 destFs = wh.getFs(destPath);
107 //設定新的table location資訊 用于後續更新動作
108 newt.getSd().setLocation(destPath.toString());
109 moveData = true;
110
//校驗實體目标位址是否存在,如果存在則會override所有資料,是不允許的。
114 if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
115 throw new InvalidOperationException("table new location " + destPath
116 + " is on a different file system than the old location "
117 + srcPath + ". This operation is not supported");
118 }
119 try {
120 srcFs.exists(srcPath); // check that src exists and also checks
121 // permissions necessary
122 if (destFs.exists(destPath)) {
123 throw new InvalidOperationException("New location for this table "
124 + newt.getDbName() + "." + newt.getTableName()
125 + " already exists : " + destPath);
126 }
127 } catch (IOException e) {
128 throw new InvalidOperationException("Unable to access new location "
129 + destPath + " for table " + newt.getDbName() + "."
130 + newt.getTableName());
131 }
132 String oldTblLocPath = srcPath.toUri().getPath();
133 String newTblLocPath = destPath.toUri().getPath();
134
135 //擷取old table中的所有partition資訊
136 List<Partition> parts = msdb.getPartitions(dbname, name, -1);
137 for (Partition part : parts) {
138 String oldPartLoc = part.getSd().getLocation();
//這裡,便開始新老partition位址的變換,修改partition中繼資料資訊
139 if (oldPartLoc.contains(oldTblLocPath)) {
140 URI oldUri = new Path(oldPartLoc).toUri();
141 String newPath = oldUri.getPath().replace(oldTblLocPath, newTblLocPath);
142 Path newPartLocPath = new Path(oldUri.getScheme(), oldUri.getAuthority(), newPath);
143 altps.add(ObjectPair.create(part, part.getSd().getLocation()));
144 part.getSd().setLocation(newPartLocPath.toString());
145 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
146 try {
147 //existing partition column stats is no longer valid, remove them
148 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, part.getValues(), null);
149 } catch (InvalidInputException iie) {
150 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
151 }
152 msdb.alterPartition(dbname, name, part.getValues(), part);
153 }
154 }
//更新stats相關資訊
155 } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) &&
156 (newt.getPartitionKeysSize() == 0)) {
157 Database db = msdb.getDatabase(newt.getDbName());
158 // Update table stats. For partitioned table, we update stats in
159 // alterPartition()
160 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true);
161 }
162 updateTableColumnStatsForAlterTable(msdb, oldt, newt);
163 // now finally call alter table
164 msdb.alterTable(dbname, name, newt);
165 // commit the changes
166 success = msdb.commitTransaction();
167 } catch (InvalidObjectException e) {
168 LOG.debug(e);
169 throw new InvalidOperationException(
170 "Unable to change partition or table."
171 + " Check metastore logs for detailed stack." + e.getMessage());
172 } catch (NoSuchObjectException e) {
173 LOG.debug(e);
174 throw new InvalidOperationException(
175 "Unable to change partition or table. Database " + dbname + " does not exist"
176 + " Check metastore logs for detailed stack." + e.getMessage());
177 } finally {
178 if (!success) {
179 msdb.rollbackTransaction();
180 }
181 if (success && moveData) {
//開始更新hdfs路徑,進行老路徑的rename到新路徑 ,調用fileSystem的rename操作
185 try {
186 if (srcFs.exists(srcPath) && !srcFs.rename(srcPath, destPath)) {
187 throw new IOException("Renaming " + srcPath + " to " + destPath + " failed");
188 }
189 } catch (IOException e) {
190 LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e);
191 boolean revertMetaDataTransaction = false;
192 try {
193 msdb.openTransaction();
//這裡會發現,又一次進行了alterTable中繼資料動作,或許跟JDO的特性有關?還是因為安全?
194 msdb.alterTable(newt.getDbName(), newt.getTableName(), oldt);
195 for (ObjectPair<Partition, String> pair : altps) {
196 Partition part = pair.getFirst();
197 part.getSd().setLocation(pair.getSecond());
198 msdb.alterPartition(newt.getDbName(), name, part.getValues(), part);
199 }
200 revertMetaDataTransaction = msdb.commitTransaction();
201 } catch (Exception e1) {
202 // we should log this for manual rollback by administrator
203 LOG.error("Reverting metadata by HDFS operation failure failed During HDFS operation failed", e1);
204 LOG.error("Table " + Warehouse.getQualifiedName(newt) +
205 " should be renamed to " + Warehouse.getQualifiedName(oldt));
206 LOG.error("Table " + Warehouse.getQualifiedName(newt) +
207 " should have path " + srcPath);
208 for (ObjectPair<Partition, String> pair : altps) {
209 LOG.error("Partition " + Warehouse.getQualifiedName(pair.getFirst()) +
210 " should have path " + pair.getSecond());
211 }
212 if (!revertMetaDataTransaction) {
213 msdb.rollbackTransaction();
214 }
215 }
216 throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name +
217 " failed to move data due to: '" + getSimpleMessage(e) + "' See hive log file for details.");
218 }
219 }
220 }
221 if (!success) {
222 throw new MetaException("Committing the alter table transaction was not successful.");
223 }
224 }
6、createPartition
在分區資料寫入之前,會先進行partition的中繼資料注冊及實體檔案路徑的建立(内部表),Hive類代碼如下:
1 public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
2 try {
//new出來一個Partition對象,傳入Table對象,調用Partition的構造方法來initialize Partition的資訊
3 return new Partition(tbl, getMSC().add_partition(
4 Partition.createMetaPartitionObject(tbl, partSpec, null)));
5 } catch (Exception e) {
6 LOG.error(StringUtils.stringifyException(e));
7 throw new HiveException(e);
8 }
9 }
這裡的createMetaPartitionObject作用在于整個Partition傳入對象的校驗對對象的封裝,代碼如下:
1 public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject(
2 Table tbl, Map<String, String> partSpec, Path location) throws HiveException {
3 List<String> pvals = new ArrayList<String>();
//周遊整個PartCols,并且校驗partMap中是否一一對應
4 for (FieldSchema field : tbl.getPartCols()) {
5 String val = partSpec.get(field.getName());
6 if (val == null || val.isEmpty()) {
7 throw new HiveException("partition spec is invalid; field "
8 + field.getName() + " does not exist or is empty");
9 }
10 pvals.add(val);
11 }
12 //set相關的屬性資訊,包括DbName、TableName、PartValues、以及sd資訊
13 org.apache.hadoop.hive.metastore.api.Partition tpart =
14 new org.apache.hadoop.hive.metastore.api.Partition();
15 tpart.setDbName(tbl.getDbName());
16 tpart.setTableName(tbl.getTableName());
17 tpart.setValues(pvals);
18
19 if (!tbl.isView()) {
20 tpart.setSd(cloneS d(tbl));
21 tpart.getSd().setLocation((location != null) ? location.toString() : null);
22 }
23 return tpart;
24 }
随之MetaDataClient對于該對象調用MetaDataService的addPartition,并進行了深拷貝,這裡不再詳細說明,那麼我們直接看下服務端幹了什麼:
1 private Partition add_partition_core(final RawStore ms,
2 final Partition part, final EnvironmentContext envContext)
3 throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
4 boolean success = false;
5 Table tbl = null;
6 try {
7 ms.openTransaction();
//根據DbName、TableName擷取整個Table對象資訊
8 tbl = ms.getTable(part.getDbName(), part.getTableName());
9 if (tbl == null) {
10 throw new InvalidObjectException(
11 "Unable to add partition because table or database do not exist");
12 }
13 //事件處理
14 firePreEvent(new PreAddPartitionEvent(tbl, part, this));
15 //在建立Partition之前,首先會校驗中繼資料中該partition是否存在
16 boolean shouldAdd = startAddPartition(ms, part, false);
17 assert shouldAdd; // start would throw if it already existed here
//建立Partition路徑
18 boolean madeDir = createLocationForAddedPartition(tbl, part);
19 try {
//加載一些kv資訊
20 initializeAddedPartition(tbl, part, madeDir);
//寫入中繼資料
21 success = ms.addPartition(part);
22 } finally {
23 if (!success && madeDir) {
//如果沒有成功,便删除實體路徑
24 wh.deleteDir(new Path(part.getSd().getLocation()), true);
25 }
26 }
27 // we proceed only if we'd actually succeeded anyway, otherwise,
28 // we'd have thrown an exception
29 success = success && ms.commitTransaction();
30 } finally {
31 if (!success) {
32 ms.rollbackTransaction();
33 }
34 fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success);
35 }
36 return part;
37 }
這裡提及一個設計上的點,從之前的表結構設計上,沒有直接存儲PartName,而是将key與value單獨存在與kv表中,這裡我們看下createLocationForAddedPartition:
1 private boolean createLocationForAddedPartition(
2 final Table tbl, final Partition part) throws MetaException {
3 Path partLocation = null;
4 String partLocationStr = null;
//如果sd不為null,則将sd的location資訊作為表跟目錄賦給partLocationStr
5 if (part.getSd() != null) {
6 partLocationStr = part.getSd().getLocation();
7 }
8 //如果為null,則重新拼接part Location
9 if (partLocationStr == null || partLocationStr.isEmpty()) {
10 // set default location if not specified and this is
11 // a physical table partition (not a view)
12 if (tbl.getSd().getLocation() != null) {
//如果不為null,則繼續拼接檔案路徑及part的路徑,組成完成的Partition location
13 partLocation = new Path(tbl.getSd().getLocation(), Warehouse
14 .makePartName(tbl.getPartitionKeys(), part.getValues()));
15 }
16 } else {
17 if (tbl.getSd().getLocation() == null) {
18 throw new MetaException("Cannot specify location for a view partition");
19 }
20 partLocation = wh.getDnsPath(new Path(partLocationStr));
21 }
22
23 boolean result = false;
//将location資訊寫入sd表
24 if (partLocation != null) {
25 part.getSd().setLocation(partLocation.toString());
26
27 // Check to see if the directory already exists before calling
28 // mkdirs() because if the file system is read-only, mkdirs will
29 // throw an exception even if the directory already exists.
30 if (!wh.isDir(partLocation)) {
31 if (!wh.mkdirs(partLocation, true)) {
32 throw new MetaException(partLocation
33 + " is not a directory or unable to create one");
34 }
35 result = true;
36 }
37 }
38 return result;
39 }
總結:
7、dropPartition
删除partition就不再從Hive開始了,我們直接看HiveMetaStore服務端做了什麼:
1 private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
2 List<String> part_vals, final boolean deleteData, final EnvironmentContext envContext)
3 throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
4 InvalidInputException {
5 boolean success = false;
6 Path partPath = null;
7 Table tbl = null;
8 Partition part = null;
9 boolean isArchived = false;
10 Path archiveParentDir = null;
11 boolean mustPurge = false;
12
13 try {
14 ms.openTransaction();
//根據dbName、tableName、part_values擷取整個part資訊
15 part = ms.getPartition(db_name, tbl_name, part_vals);
//擷取所有Table對象
16 tbl = get_table_core(db_name, tbl_name);
17 firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
18 mustPurge = isMustPurge(envContext, tbl);
19
20 if (part == null) {
21 throw new NoSuchObjectException("Partition doesn't exist. "
22 + part_vals);
23 }
24 //這一片還沒有深入看Arrchived partition
25 isArchived = MetaStoreUtils.isArchived(part);
26 if (isArchived) {
27 archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
28 verifyIsWritablePath(archiveParentDir);
29 checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, mustPurge);
30 }
31 if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
32 throw new MetaException("Unable to drop partition");
33 }
34 success = ms.commitTransaction();
35 if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
36 partPath = new Path(part.getSd().getLocation());
37 verifyIsWritablePath(partPath);
38 checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, mustPurge);
39 }
40 } finally {
41 if (!success) {
42 ms.rollbackTransaction();
43 } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) {
44 if (tbl != null && !isExternal(tbl)) {
45 if (mustPurge) {
46 LOG.info("dropPartition() will purge " + partPath + " directly, skipping trash.");
47 }
48 else {
49 LOG.info("dropPartition() will move " + partPath + " to trash-directory.");
50 }
//删除partition
51 // Archived partitions have har:/to_har_file as their location.
52 // The original directory was saved in params
53 if (isArchived) {
54 assert (archiveParentDir != null);
55 wh.deleteDir(archiveParentDir, true, mustPurge);
56 } else {
57 assert (partPath != null);
58 wh.deleteDir(partPath, true, mustPurge);
59 deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge);
60 }
61 // ok even if the data is not deleted
62 }
63 }
64 for (MetaStoreEventListener listener : listeners) {
65 DropPartitionEvent dropPartitionEvent =
66 new DropPartitionEvent(tbl, part, success, deleteData, this);
67 dropPartitionEvent.setEnvironmentContext(envContext);
68 listener.onDropPartition(dropPartitionEvent);
69 }
70 }
71 return true;
72 }
8、alterPartition
alterPartition牽扯的校驗及檔案目錄的修改,我們直接從HiveMetaStore中的rename_partition中檢視:
1 private void rename_partition(final String db_name, final String tbl_name,
2 final List<String> part_vals, final Partition new_part,
3 final EnvironmentContext envContext)
4 throws InvalidOperationException, MetaException,
5 TException {
//日志記錄
6 startTableFunction("alter_partition", db_name, tbl_name);
7
8 if (LOG.isInfoEnabled()) {
9 LOG.info("New partition values:" + new_part.getValues());
10 if (part_vals != null && part_vals.size() > 0) {
11 LOG.info("Old Partition values:" + part_vals);
12 }
13 }
14
15 Partition oldPart = null;
16 Exception ex = null;
17 try {
18 firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this));
19 //校驗PartName的規範性
20 if (part_vals != null && !part_vals.isEmpty()) {
21 MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(),
22 partitionValidationPattern);
23 }
24 調用alterHandler的alterPartition進行partition實體上的rename,以及中繼資料修改
25 oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part);
26
27 // Only fetch the table if we actually have a listener
28 Table table = null;
29 for (MetaStoreEventListener listener : listeners) {
30 if (table == null) {
31 table = getMS().getTable(db_name, tbl_name);
32 }
33 AlterPartitionEvent alterPartitionEvent =
34 new AlterPartitionEvent(oldPart, new_part, table, true, this);
35 alterPartitionEvent.setEnvironmentContext(envContext);
36 listener.onAlterPartition(alterPartitionEvent);
37 }
38 } catch (InvalidObjectException e) {
39 ex = e;
40 throw new InvalidOperationException(e.getMessage());
41 } catch (AlreadyExistsException e) {
42 ex = e;
43 throw new InvalidOperationException(e.getMessage());
44 } catch (Exception e) {
45 ex = e;
46 if (e instanceof MetaException) {
47 throw (MetaException) e;
48 } else if (e instanceof InvalidOperationException) {
49 throw (InvalidOperationException) e;
50 } else if (e instanceof TException) {
51 throw (TException) e;
52 } else {
53 throw newMetaException(e);
54 }
55 } finally {
56 endFunction("alter_partition", oldPart != null, ex, tbl_name);
57 }
58 return;
59 }
這裡我們着重看一下,alterHandler.alterPartition方法,前方高能:
1 public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
2 final String name, final List<String> part_vals, final Partition new_part)
3 throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
4 MetaException {
5 boolean success = false;
6
7 Path srcPath = null;
8 Path destPath = null;
9 FileSystem srcFs = null;
10 FileSystem destFs = null;
11 Partition oldPart = null;
12 String oldPartLoc = null;
13 String newPartLoc = null;
14
15 //修改新的partition的DDL時間
16 if (new_part.getParameters() == null ||
17 new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
18 Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
19 new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
20 .currentTimeMillis() / 1000));
21 }
22 //根據dbName、tableName擷取整個Table對象
23 Table tbl = msdb.getTable(dbname, name);
24 //如果傳入的part_vals為空或為0,說明修改的隻是partition的其他中繼資料資訊而不牽扯到partKV,則直接中繼資料,在msdb.alterPartition會直接更新
25 if (part_vals == null || part_vals.size() == 0) {
26 try {
27 oldPart = msdb.getPartition(dbname, name, new_part.getValues());
28 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
29 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
30 }
31 updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
32 msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
33 } catch (InvalidObjectException e) {
34 throw new InvalidOperationException("alter is not possible");
35 } catch (NoSuchObjectException e){
36 //old partition does not exist
37 throw new InvalidOperationException("alter is not possible");
38 }
39 return oldPart;
40 }
41 //rename partition
42 try {
43 msdb.openTransaction();
44 try {
//擷取oldPart對象資訊
45 oldPart = msdb.getPartition(dbname, name, part_vals);
46 } catch (NoSuchObjectException e) {
47 // this means there is no existing partition
48 throw new InvalidObjectException(
49 "Unable to rename partition because old partition does not exist");
50 }
51 Partition check_part = null;
52 try {
//組裝newPart的partValues等Partition資訊
53 check_part = msdb.getPartition(dbname, name, new_part.getValues());
54 } catch(NoSuchObjectException e) {
55 // this means there is no existing partition
56 check_part = null;
57 }
//如果check_part組裝成功,說明該part已經存在,則報already exists
58 if (check_part != null) {
59 throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." +
60 new_part.getValues());
61 }
//table的資訊校驗
62 if (tbl == null) {
63 throw new InvalidObjectException(
64 "Unable to rename partition because table or database do not exist");
65 }
66
67 //如果是外部表的分區變化了,那麼不需要操作檔案系統,直接更新meta資訊即可
68 if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
69 new_part.getSd().setLocation(oldPart.getSd().getLocation());
70 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
71 try {
72 //existing partition column stats is no longer valid, remove
73 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
74 } catch (NoSuchObjectException nsoe) {
75 //ignore
76 } catch (InvalidInputException iie) {
77 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
78 }
79 msdb.alterPartition(dbname, name, part_vals, new_part);
80 } else {
81 try {
//擷取Table的檔案路徑
82 destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name),
83 Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
//拼接新的Partition的路徑
84 destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
85 } catch (NoSuchObjectException e) {
86 LOG.debug(e);
87 throw new InvalidOperationException(
88 "Unable to change partition or table. Database " + dbname + " does not exist"
89 + " Check metastore logs for detailed stack." + e.getMessage());
90 }
//如果destPath不為空,說明改變了檔案路徑
91 if (destPath != null) {
92 newPartLoc = destPath.toString();
93 oldPartLoc = oldPart.getSd().getLocation();
94 //根據原有sd的路徑擷取老的part路徑資訊
95 srcPath = new Path(oldPartLoc);
96
97 LOG.info("srcPath:" + oldPartLoc);
98 LOG.info("descPath:" + newPartLoc);
99 srcFs = wh.getFs(srcPath);
100 destFs = wh.getFs(destPath);
101 //檢視srcFS與destFs是否Wie同一個fileSystem
102 if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
103 throw new InvalidOperationException("table new location " + destPath
104 + " is on a different file system than the old location "
105 + srcPath + ". This operation is not supported");
106 }
107 try {
//校驗老的partition路徑與新的partition路徑是否一緻,同時新的partition路徑是否已經存在
108 srcFs.exists(srcPath); // check that src exists and also checks
109 if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
110 throw new InvalidOperationException("New location for this table "
111 + tbl.getDbName() + "." + tbl.getTableName()
112 + " already exists : " + destPath);
113 }
114 } catch (IOException e) {
115 throw new InvalidOperationException("Unable to access new location "
116 + destPath + " for partition " + tbl.getDbName() + "."
117 + tbl.getTableName() + " " + new_part.getValues());
118 }
119 new_part.getSd().setLocation(newPartLoc);
120 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
121 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
122 }
//拼接oldPartName,并且删除原有oldPart的資訊,寫入新的partition資訊
123 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
124 try {
125 //existing partition column stats is no longer valid, remove
126 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
127 } catch (NoSuchObjectException nsoe) {
128 //ignore
129 } catch (InvalidInputException iie) {
130 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
131 }
132 msdb.alterPartition(dbname, name, part_vals, new_part);
133 }
134 }
135
136 success = msdb.commitTransaction();
137 } finally {
138 if (!success) {
139 msdb.rollbackTransaction();
140 }
141 if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) {
142 //rename the data directory
143 try{
144 if (srcFs.exists(srcPath)) {
145 //如果根路徑海微建立,需要重新進行建立,就好比計算引擎先調用了alterTable,又調用了alterPartition,這時partition的根路徑或許還未建立
146 Path destParentPath = destPath.getParent();
147 if (!wh.mkdirs(destParentPath, true)) {
148 throw new IOException("Unable to create path " + destParentPath);
149 }
//進行原路徑與目标路徑的rename
150 wh.renameDir(srcPath, destPath, true);
151 LOG.info("rename done!");
152 }
153 } catch (IOException e) {
154 boolean revertMetaDataTransaction = false;
155 try {
156 msdb.openTransaction();
157 msdb.alterPartition(dbname, name, new_part.getValues(), oldPart);
158 revertMetaDataTransaction = msdb.commitTransaction();
159 } catch (Exception e1) {
160 LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1);
161 if (!revertMetaDataTransaction) {
162 msdb.rollbackTransaction();
163 }
164 }
165 throw new InvalidOperationException("Unable to access old location "
166 + srcPath + " for partition " + tbl.getDbName() + "."
167 + tbl.getTableName() + " " + part_vals);
168 }
169 }
170 }
171 return oldPart;
172 }
暫時到這裡吧~後續咱們慢慢玩哈~