天天看點

Hive metastore整體代碼分析及詳解

  從上一篇對Hive metastore表結構的簡要分析中,我再根據資料設計的實體對象,再進行整個代碼結構的總結。那麼我們先打開metadata的目錄,其目錄結構:

Hive metastore整體代碼分析及詳解

  可以看到,整個hivemeta的目錄包含metastore(用戶端與服務端調用邏輯)、events(事件目錄包含table生命周期中的檢查、權限認證等listener實作)、hooks(這裡的hooks僅包含了jdo connection的相關接口)、parser(對于表達樹的解析)、spec(partition的相關代理類)、tools(jdo execute相關方法)及txn及model,下來我們從整個metadata分逐一進行代碼分析及注釋:

  沒有把包打開,很多類?是不是感覺害怕很想死?我也想死,咱們繼續。。一開始,我們可能覺得一團亂麻煩躁,這是啥玩意兒啊這。。冷靜下來,我們從Hive這個大類開始看,因為它是metastore中繼資料調用的入口。整個生命周期分析流程為: HiveMetaStoreClient用戶端的建立及加載、HiveMetaStore服務端的建立及加載、createTable、dropTable、AlterTable、createPartition、dropPartition、alterPartition。當然,這隻是完整metadata的一小部分。

  1、HiveMetaStoreClient用戶端的建立及加載

  那麼我們從Hive這個類一點點開始看:

1   private HiveConf conf = null;
 2   private IMetaStoreClient metaStoreClient;
 3   private UserGroupInformation owner;
 4 
 5   // metastore calls timing information
 6   private final Map<String, Long> metaCallTimeMap = new HashMap<String, Long>();
 7 
 8   private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
 9     @Override
10     protected synchronized Hive initialValue() {
11       return null;
12     }
13 
14     @Override
15     public synchronized void remove() {
16       if (this.get() != null) {
17         this.get().close();
18       }
19       super.remove();
20     }
21   };      

  這裡聲明的有hiveConf對象、metaStoreClient 、操作使用者組userGroupInfomation以及調用時間Map,這裡存成一個map,用來記錄每一個動作的運作時長。同時維護了一個本地線程hiveDB,如果db為空的情況下,會重新建立一個Hive對象,代碼如下:

1   public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
 2     Hive db = hiveDB.get();
 3     if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
 4       if (db != null) {
 5         LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
 6           ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
 7       }
 8       closeCurrent();
 9       c.set("fs.scheme.class", "dfs");
10       Hive newdb = new Hive(c);
11       hiveDB.set(newdb);
12       return newdb;
13     }
14     db.conf = c;
15     return db;
16   }      

  随後我們會發現,在建立Hive對象時,便已經将function進行注冊,什麼是function呢,通過上次的表結構分析,可以了解為所有udf等jar包的中繼資料存儲。代碼如下:

1   // register all permanent functions. need improvement
 2   static {
 3     try {
 4       reloadFunctions();
 5     } catch (Exception e) {
 6       LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
 7     }
 8   }
 9 
10   public static void reloadFunctions() throws HiveException {
    //擷取 Hive對象,用于後續方法的調用
11     Hive db = Hive.get();
    //通過周遊每一個dbName
12     for (String dbName : db.getAllDatabases()) {
      //通過dbName查詢挂在該db下的所有function的資訊。
13       for (String functionName : db.getFunctions(dbName, "*")) {
14         Function function = db.getFunction(dbName, functionName);
15         try {
     //這裡的register便是将查詢到的function的資料注冊到Registry類中的一個Map<String,FunctionInfo>中,以便計算引擎在調用時,不必再次查詢資料庫。
16       FunctionRegistry.registerPermanentFunction(
17           FunctionUtils.qualifyFunctionName(functionName, dbName), function.getClassName(),
18           false, FunctionTask.toFunctionResource(function.getResourceUris()));
19         } catch (Exception e) {
20           LOG.warn("Failed to register persistent function " +
21               functionName + ":" + function.getClassName() + ". Ignore and continue.");
22         }
23       }
24     }
25   }      

  調用getMSC()方法,進行metadataClient用戶端的建立,代碼如下:

1  1   private IMetaStoreClient createMetaStoreClient() throws MetaException {
 2  2   
 3     //這裡實作接口HiveMetaHookLoader
 4  3     HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
 5  4         @Override
 6  5         public HiveMetaHook getHook(
 7  6           org.apache.hadoop.hive.metastore.api.Table tbl)
 8  7           throws MetaException {
 9  8 
10  9           try {
11 10             if (tbl == null) {
12 11               return null;
13 12             }
14          //根據tble的kv屬性加載不同storage的執行個體,比如hbase、redis等等拓展存儲,作為外部表進行存儲
15 13             HiveStorageHandler storageHandler =
16 14               HiveUtils.getStorageHandler(conf,
17 15                 tbl.getParameters().get(META_TABLE_STORAGE));
18 16             if (storageHandler == null) {
19 17               return null;
20 18             }
21 19             return storageHandler.getMetaHook();
22 20           } catch (HiveException ex) {
23 21             LOG.error(StringUtils.stringifyException(ex));
24 22             throw new MetaException(
25 23               "Failed to load storage handler:  " + ex.getMessage());
26 24           }
27 25         }
28 26       };
29 27     return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
30 28         SessionHiveMetaStoreClient.class.getName());
31 29   }      

  2、HiveMetaStore服務端的建立及加載

  在HiveMetaStoreClient初始化時,會初始化HiveMetaStore用戶端,代碼如下:

1   public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
 2     throws MetaException {
 3 
 4     this.hookLoader = hookLoader;
 5     if (conf == null) {
 6       conf = new HiveConf(HiveMetaStoreClient.class);
 7     }
 8     this.conf = conf;
 9     filterHook = loadFilterHooks();
10    //根據hive-site.xml中的hive.metastore.uris配置,如果配置該參數,則認為是遠端連接配接,否則為本地連接配接
11     String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
12     localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
13     if (localMetaStore) {
      //本地連接配接直接連接配接HiveMetaStore
16       client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
17       isConnected = true;
18       snapshotActiveConf();
19       return;
20     }
21 
22     //擷取配置中的重試次數及timeout時間
23     retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
24     retryDelaySeconds = conf.getTimeVar(
25         ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
26 
27     //拼接metastore uri
28     if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
29       String metastoreUrisString[] = conf.getVar(
30           HiveConf.ConfVars.METASTOREURIS).split(",");
31       metastoreUris = new URI[metastoreUrisString.length];
32       try {
33         int i = 0;
34         for (String s : metastoreUrisString) {
35           URI tmpUri = new URI(s);
36           if (tmpUri.getScheme() == null) {
37             throw new IllegalArgumentException("URI: " + s
38                 + " does not have a scheme");
39           }
40           metastoreUris[i++] = tmpUri;
41 
42         }
43       } catch (IllegalArgumentException e) {
44         throw (e);
45       } catch (Exception e) {
46         MetaStoreUtils.logAndThrowMetaException(e);
47       }
48     } else {
49       LOG.error("NOT getting uris from conf");
50       throw new MetaException("MetaStoreURIs not found in conf file");
51     }
52     調用open方法建立連接配接
53     open();
54   }      

  從上面代碼中可以看出,如果我們是遠端連接配接,需要配置hive-site.xml中的hive.metastore.uri,是不是很熟悉?加入你的client與server不在同一台機器,就需要配置進行遠端連接配接。那麼我們繼續往下面看,建立連接配接的open方法:

1   private void open() throws MetaException {
  2     isConnected = false;
  3     TTransportException tte = null;
     //是否使用Sasl
  4     boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
     //If true, the metastore Thrift interface will use TFramedTransport. When false (default) a standard TTransport is used.
  5     boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
     //If true, the metastore Thrift interface will use TCompactProtocol. When false (default) TBinaryProtocol will be used 具體他們之間的差別我們後續再讨論
  6     boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
     //擷取socket timeout時間
  7     int clientSocketTimeout = (int) conf.getTimeVar(
  8         ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
  9 
 10     for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
 11       for (URI store : metastoreUris) {
 12         LOG.info("Trying to connect to metastore with URI " + store);
 13         try {
 14           transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
 15           if (useSasl) {
 16             // Wrap thrift connection with SASL for secure connection.
 17             try {
            //建立HadoopThriftAuthBridge client
 18               HadoopThriftAuthBridge.Client authBridge =
 19                 ShimLoader.getHadoopThriftAuthBridge().createClient();
 20          //權限認證相關
 21               // check if we should use delegation tokens to authenticate
 22               // the call below gets hold of the tokens if they are set up by hadoop
 23               // this should happen on the map/reduce tasks if the client added the
 24               // tokens into hadoop's credential store in the front end during job
 25               // submission.
 26               String tokenSig = conf.get("hive.metastore.token.signature");
 27               // tokenSig could be null
 28               tokenStrForm = Utils.getTokenStrForm(tokenSig);
 29               if(tokenStrForm != null) {
 30                 // authenticate using delegation tokens via the "DIGEST" mechanism
 31                 transport = authBridge.createClientTransport(null, store.getHost(),
 32                     "DIGEST", tokenStrForm, transport,
 33                         MetaStoreUtils.getMetaStoreSaslProperties(conf));
 34               } else {
 35                 String principalConfig =
 36                     conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL);
 37                 transport = authBridge.createClientTransport(
 38                     principalConfig, store.getHost(), "KERBEROS", null,
 39                     transport, MetaStoreUtils.getMetaStoreSaslProperties(conf));
 40               }
 41             } catch (IOException ioe) {
 42               LOG.error("Couldn't create client transport", ioe);
 43               throw new MetaException(ioe.toString());
 44             }
 45           } else if (useFramedTransport) {
 46             transport = new TFramedTransport(transport);
 47           }
 48           final TProtocol protocol;
         //後續詳細說明兩者的差別(因為俺還沒看,哈哈)
 49           if (useCompactProtocol) {
 50             protocol = new TCompactProtocol(transport);
 51           } else {
 52             protocol = new TBinaryProtocol(transport);
 53           }
         //建立ThriftHiveMetastore client
 54           client = new ThriftHiveMetastore.Client(protocol);
 55           try {
 56             transport.open();
 57             isConnected = true;
 58           } catch (TTransportException e) {
 59             tte = e;
 60             if (LOG.isDebugEnabled()) {
 61               LOG.warn("Failed to connect to the MetaStore Server...", e);
 62             } else {
 63               // Don't print full exception trace if DEBUG is not on.
 64               LOG.warn("Failed to connect to the MetaStore Server...");
 65             }
 66           }
 67       //使用者組及使用者的加載
 68           if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){
 69             // Call set_ugi, only in unsecure mode.
 70             try {
 71               UserGroupInformation ugi = Utils.getUGI();
 72               client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
 73             } catch (LoginException e) {
 74               LOG.warn("Failed to do login. set_ugi() is not successful, " +
 75                        "Continuing without it.", e);
 76             } catch (IOException e) {
 77               LOG.warn("Failed to find ugi of client set_ugi() is not successful, " +
 78                   "Continuing without it.", e);
 79             } catch (TException e) {
 80               LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
 81                   + "Continuing without it.", e);
 82             }
 83           }
 84         } catch (MetaException e) {
 85           LOG.error("Unable to connect to metastore with URI " + store
 86                     + " in attempt " + attempt, e);
 87         }
 88         if (isConnected) {
 89           break;
 90         }
 91       }
 92       // Wait before launching the next round of connection retries.
 93       if (!isConnected && retryDelaySeconds > 0) {
 94         try {
 95           LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
 96           Thread.sleep(retryDelaySeconds * 1000);
 97         } catch (InterruptedException ignore) {}
 98       }
 99     }
100 
101     if (!isConnected) {
102       throw new MetaException("Could not connect to meta store using any of the URIs provided." +
103         " Most recent failure: " + StringUtils.stringifyException(tte));
104     }
105 
106     snapshotActiveConf();
107 
108     LOG.info("Connected to metastore.");
109   }      

  本篇先對對protocol的原理放置一邊。從代碼中可以看出HiveMetaStore服務端是通過ThriftHiveMetaStore建立,它本是一個class類,但其中定義了接口Iface、AsyncIface,這樣做的好處是利于繼承實作。那麼下來,我們看一下HMSHandler的初始化。如果是在本地調用的過程中,直接調用newRetryingHMSHandler,便會直接進行HMSHandler的初始化。代碼如下:

1     public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
2       super(name);
3       hiveConf = conf;
4       if (init) {
5         init();
6       }
7     }      

  下倆我們繼續看它的init方法:

1     public void init() throws MetaException {
      //擷取與資料互動的實作類className,該類為objectStore,是RawStore的實作,負責JDO與資料庫的互動。
 2       rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
      //加載Listeners,來自hive.metastore.init.hooks,可自行實作并加載
 3       initListeners = MetaStoreUtils.getMetaStoreListeners(
 4           MetaStoreInitListener.class, hiveConf,
 5           hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
 6       for (MetaStoreInitListener singleInitListener: initListeners) {
 7           MetaStoreInitContext context = new MetaStoreInitContext();
 8           singleInitListener.onInit(context);
 9       }
10     //初始化alter的實作類
11       String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",
12           HiveAlterHandler.class.getName());
13       alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(
14           alterHandlerName), hiveConf);
      //初始化warehouse
15       wh = new Warehouse(hiveConf);
16     //建立預設db以及使用者,同時加載currentUrl
17       synchronized (HMSHandler.class) {
18         if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) {
19           createDefaultDB();
20           createDefaultRoles();
21           addAdminUsers();
22           currentUrl = MetaStoreInit.getConnectionURL(hiveConf);
23         }
24       }
25     //計數資訊的初始化
26       if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
27         try {
28           Metrics.init();
29         } catch (Exception e) {
30           // log exception, but ignore inability to start
31           LOG.error("error in Metrics init: " + e.getClass().getName() + " "
32               + e.getMessage(), e);
33         }
34       }
35     //Listener的PreListener的初始化
36       preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
37           hiveConf,
38           hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));
39       listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf,
40           hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
41       listeners.add(new SessionPropertiesListener(hiveConf));
42       endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
43           MetaStoreEndFunctionListener.class, hiveConf,
44           hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));
45     //針對partitionName的正則校驗,可自行設定,根據hive.metastore.partition.name.whitelist.pattern進行設定
46       String partitionValidationRegex =
47           hiveConf.getVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN);
48       if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
49         partitionValidationPattern = Pattern.compile(partitionValidationRegex);
50       } else {
51         partitionValidationPattern = null;
52       }
53 
54       long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS);
55       if (cleanFreq > 0) {
56         // In default config, there is no timer.
57         Timer cleaner = new Timer("Metastore Events Cleaner Thread", true);
58         cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq);
59       }
60     }      

  它初始化了與資料庫互動的rawStore的實作類、實體操作的warehouse以及Event與Listener。進而通過接口調用相關meta生命周期方法進行表的操作。

  3、createTable

 從createTable方法開始。上代碼:

1   public void createTable(String tableName, List<String> columns, List<String> partCols,
 2                           Class<? extends InputFormat> fileInputFormat,
 3                           Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
 4                           Map<String, String> parameters) throws HiveException {
 5     if (columns == null) {
 6       throw new HiveException("columns not specified for table " + tableName);
 7     }
 8 
 9     Table tbl = newTable(tableName);
    //SD表屬性,設定該表的input及output class名,在計算引擎計算時,拉取相應的ClassName 通過反射進行input及output類的加載
10     tbl.setInputFormatClass(fileInputFormat.getName());
11     tbl.setOutputFormatClass(fileOutputFormat.getName());
12   
    //封裝FileSchema對象,該為每個column的名稱及字段類型,并加入到sd對象的的column屬性中
13     for (String col : columns) {
14       FieldSchema field = new FieldSchema(col, STRING_TYPE_NAME, "default");
15       tbl.getCols().add(field);
16     }
17 
    //如果在建立表時,設定了分區資訊,比如dt字段為該分區。則進行分區資訊的記錄,最終寫入Partition表中
18     if (partCols != null) {
19       for (String partCol : partCols) {
20         FieldSchema part = new FieldSchema();
21         part.setName(partCol);
22         part.setType(STRING_TYPE_NAME); // default partition key
23         tbl.getPartCols().add(part);
24       }
25     }
    //設定序列化的方式
26     tbl.setSerializationLib(LazySimpleSerDe.class.getName());
    //設定分桶資訊
27     tbl.setNumBuckets(bucketCount);
28     tbl.setBucketCols(bucketCols);
    //設定table額外添加的kv資訊
29     if (parameters != null) {
30       tbl.setParamters(parameters);
31     }
32     createTable(tbl);
33   }      

  從代碼中可以看到,Hive 構造了一個Table的對象,該對象可以當做是一個model,包含了幾乎所有以Tbls表為主表的所有以table_id為的外鍵表屬性(具體可參考hive metastore表結構),封裝完畢後在進行createTable的調用,接下來的調用如下:

public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
    try {
    //這裡再次擷取SessionState中的CurrentDataBase進行setDbName(安全)
      if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
        tbl.setDbName(SessionState.get().getCurrentDatabase());
      }
    //這裡主要對每一個column屬性進行校驗,比如是否有非法字元等等
      if (tbl.getCols().size() == 0 || tbl.getSd().getColsSize() == 0) {
        tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(),
            tbl.getDeserializer()));
      }
    //該方法對table屬性中的input、output以及column屬性的校驗
      tbl.checkValidity();
      if (tbl.getParameters() != null) {
        tbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
      }
      org.apache.hadoop.hive.metastore.api.Table tTbl = tbl.getTTable();
    //這裡開始進行權限認證,牽扯到的便是我們再hive中配置的 hive.security.authorization.createtable.user.grants、hive.security.authorization.createtable.group.grants、
    hive.security.authorization.createtable.role.grants配置參數,來自于hive自己封裝的 使用者、角色、組的概念。

      PrincipalPrivilegeSet principalPrivs = new PrincipalPrivilegeSet();
      SessionState ss = SessionState.get();
      if (ss != null) {
        CreateTableAutomaticGrant grants = ss.getCreateTableGrants();
        if (grants != null) {
          principalPrivs.setUserPrivileges(grants.getUserGrants());
          principalPrivs.setGroupPrivileges(grants.getGroupGrants());
          principalPrivs.setRolePrivileges(grants.getRoleGrants());
          tTbl.setPrivileges(principalPrivs);
        }
      }
   //通過用戶端連結服務端進行table的建立
      getMSC().createTable(tTbl);
    } catch (AlreadyExistsException e) {
      if (!ifNotExists) {
        throw new HiveException(e);
      }
    } catch (Exception e) {
      throw new HiveException(e);
    }
  }      

  那麼下來,我們來看一下受到調用的HiveMetaClient中createTable方法,代碼如下:

1   public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
 2       InvalidObjectException, MetaException, NoSuchObjectException, TException {
    //這裡擷取HiveMeetaHook對象,針對不同的存儲引擎進行建立前的加載及驗證
 3     HiveMetaHook hook = getHook(tbl);
 4     if (hook != null) {
 5       hook.preCreateTable(tbl);
 6     }
 7     boolean success = false;
 8     try {
     //随即調用HiveMetaStore進行服務端與資料庫的建立互動
10       create_table_with_environment_context(tbl, envContext);
11       if (hook != null) {
12         hook.commitCreateTable(tbl);
13       }
14       success = true;
15     } finally {
      如果建立失敗的話,進行復原操作
16       if (!success && (hook != null)) {
17         hook.rollbackCreateTable(tbl);
18       }
19     }
20   }      

  這裡簡要說下Hook的作用,HiveMetaHook為接口,接口方法包括preCreate、rollbackCreateTable、preDropTable等等操作,它的實作為不同存儲類型的預建立加載及驗證,以及失敗復原等動作。代碼如下:

1 public interface HiveMetaHook {
 2   /**
 3    * Called before a new table definition is added to the metastore
 4    * during CREATE TABLE.
 5    *
 6    * @param table new table definition
 7    */
 8   public void preCreateTable(Table table)
 9     throws MetaException;
10 
11   /** 
12    * Called after failure adding a new table definition to the metastore
13    * during CREATE TABLE.
14    *
15    * @param table new table definition
16    */
17   public void rollbackCreateTable(Table table)
18     throws MetaException;
35   public void preDropTale(Table table)
36     throws MetaException;
...............................      

  随後,我們再看一下HiveMetaStore服務端的createTable方法,如下:

1     private void create_table_core(final RawStore ms, final Table tbl,
  2         final EnvironmentContext envContext)
  3         throws AlreadyExistsException, MetaException,
  4         InvalidObjectException, NoSuchObjectException {
  5     //名稱正則校驗,校驗是否含有非法字元
  6       if (!MetaStoreUtils.validateName(tbl.getTableName())) {
  7         throw new InvalidObjectException(tbl.getTableName()
  8             + " is not a valid object name");
  9       }
      //改端代碼屬于校驗代碼,對于column的名稱及column type類型j及partitionKey的名稱校驗
 10       String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
 11       if (validate != null) {
 12         throw new InvalidObjectException("Invalid column " + validate);
 13       }
 14       if (tbl.getPartitionKeys() != null) {
 15         validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
 16         if (validate != null) {
 17           throw new InvalidObjectException("Invalid partition column " + validate);
 18         }
 19       }
 20       SkewedInfo skew = tbl.getSd().getSkewedInfo();
 21       if (skew != null) {
 22         validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
 23         if (validate != null) {
 24           throw new InvalidObjectException("Invalid skew column " + validate);
 25         }
 26         validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
 27             skew.getSkewedColNames(), tbl.getSd().getCols());
 28         if (validate != null) {
 29           throw new InvalidObjectException("Invalid skew column " + validate);
 30         }
 31       }
 32 
 33       Path tblPath = null;
 34       boolean success = false, madeDir = false;
 35       try {
       //建立前的事件調用,metastore已實作的listner事件包含DummyPreListener、AuthorizationPreEventListener、AlternateFailurePreListener以及MetaDataExportListener。
       //這些Listener是幹嘛的呢?詳細解釋由分析meta設計模式時,詳細說明。
 36         firePreEvent(new PreCreateTableEvent(tbl, this));
 37 
        //打開事務
 38         ms.openTransaction();
 39 
       //如果db不存在的情況下,則抛異常
 40         Database db = ms.getDatabase(tbl.getDbName());
 41         if (db == null) {
 42           throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist");
 43         }
 44     
 45         // 校驗該db下,table是否存在
 46         if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) {
 47           throw new AlreadyExistsException("Table " + tbl.getTableName()
 48               + " already exists");
 49         }
 50      // 如果該表不為視圖表,則組裝完整的tbleParth ->fs.getUri().getScheme()+fs.getUri().getAuthority()+path.toUri().getPath())      
51         if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
 52           if (tbl.getSd().getLocation() == null
 53               || tbl.getSd().getLocation().isEmpty()) {
 54             tblPath = wh.getTablePath(
 55                 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
 56           } else {
          //如果該表不是内部表同時tbl的kv中storage_handler為空時,則隻是警告
 57             if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
 58               LOG.warn("Location: " + tbl.getSd().getLocation()
 59                   + " specified for non-external table:" + tbl.getTableName());
 60             }
 61             tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
 62           }
        //将拼接完的tblPath set到sd的location中
 63           tbl.getSd().setLocation(tblPath.toString());
 64         }
 65      //建立table的路徑
 66         if (tblPath != null) {
 67           if (!wh.isDir(tblPath)) {
 68             if (!wh.mkdirs(tblPath, true)) {
 69               throw new MetaException(tblPath
 70                   + " is not a directory or unable to create one");
 71             }
 72             madeDir = true;
 73           }
 74         }
       // hive.stats.autogather 配置判斷
 75         if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
 76             !MetaStoreUtils.isView(tbl)) {
 77           if (tbl.getPartitionKeysSize() == 0)  { // Unpartitioned table
 78             MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir);
 79           } else { // Partitioned table with no partitions.
 80             MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true);
 81           }
 82         }
 83 
 84         // set create time
 85         long time = System.currentTimeMillis() / 1000;
 86         tbl.setCreateTime((int) time);
 87         if (tbl.getParameters() == null ||
 88             tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
 89           tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
 90         }
       執行createTable資料庫操作
 91         ms.createTable(tbl);
 92         success = ms.commitTransaction();
 93 
 94       } finally {
 95         if (!success) {
 96           ms.rollbackTransaction();
         //如果由于某些原因沒有建立,則進行已建立表路徑的删除
 97           if (madeDir) {
 98             wh.deleteDir(tblPath, true);
 99           }
100         }
       //進行create完成時的listener類發送 比如 noftify通知
101         for (MetaStoreEventListener listener : listeners) {
102           CreateTableEvent createTableEvent =
103               new CreateTableEvent(tbl, success, this);
104           createTableEvent.setEnvironmentContext(envContext);
105           listener.onCreateTable(createTableEvent);
106         }
107       }
108     }      

  這裡的listener後續會詳細說明,那麼我們繼續垂直往下看,這裡的 ms.createTable方法。ms便是RawStore接口對象,這個接口對象包含了所有生命周期的統一方法調用,部分代碼如下:

1   public abstract Database getDatabase(String name)
 2       throws NoSuchObjectException;
 3 
 4   public abstract boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException;
 5 
 6   public abstract boolean alterDatabase(String dbname, Database db) throws NoSuchObjectException, MetaException;
 7 
 8   public abstract List<String> getDatabases(String pattern) throws MetaException;
 9 
10   public abstract List<String> getAllDatabases() throws MetaException;
11 
12   public abstract boolean createType(Type type);
13 
14   public abstract Type getType(String typeName);
15 
16   public abstract boolean dropType(String typeName);
17 
18   public abstract void createTable(Table tbl) throws InvalidObjectException,
19       MetaException;
20 
21   public abstract boolean dropTable(String dbName, String tableName)
22       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException;
23 
24   public abstract Table getTable(String dbName, String tableName)
25       throws MetaException;
26   ..................      

  那麼下來我們來看一下具體怎麼實作的,首先hive metastore會通過調用getMS()方法,擷取本地線程中的RawStore的實作,代碼如下:

1     public RawStore getMS() throws MetaException {
     //擷取本地線程中已存在的RawStore
 2       RawStore ms = threadLocalMS.get();
     //如果不存在,則建立該對象的實作,并加入到本地線程中
 3       if (ms == null) {
 4         ms = newRawStore();
 5         ms.verifySchema();
 6         threadLocalMS.set(ms);
 7         ms = threadLocalMS.get();
 8       }
 9       return ms;
10     }

      

  看到這裡,是不是很想看看newRawStore它幹嘛啦?那麼我們繼續:

1   public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName,
 2       int id) throws MetaException {
 3   //通過反射,建立baseClass,随後再進行該實作對象的建立
 4     Class<? extends RawStore> baseClass = (Class<? extends RawStore>) MetaStoreUtils.getClass(
 5         rawStoreClassName);
 6 
 7     RawStoreProxy handler = new RawStoreProxy(hiveConf, conf, baseClass, id);
 8 
 9     // Look for interfaces on both the class and all base classes.
10     return (RawStore) Proxy.newProxyInstance(RawStoreProxy.class.getClassLoader(),
11         getAllInterfaces(baseClass), handler);
12   }      

  那麼問題來了,rawstoreClassName從哪裡來呢?它是在HiveMetaStore進行初始化時加載的,來源于HiveConf中的METASTORE_RAW_STORE_IMPL,配置參數,也就是RawStore的實作類ObjectStore。好了,既然RawStore的實作類已經建立,那麼我們繼續深入ObjectStore,代碼如下:

  

1   @Override
 2   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
 3     boolean commited = false;
 4     try {
      //建立事務
 5       openTransaction();
      //這裡再次進行db 、table的校驗,代碼不再貼出來,具體為什麼又要做一次校驗,還需要深入思考
 6       MTable mtbl = convertToMTable(tbl);
     這裡的pm為ObjectStore建立時,init的JDO PersistenceManage對象。這裡便是送出Table對象的地方,具體可研究下JDO module對象與資料庫的互動
 7       pm.makePersistent(mtbl);
     //封裝權限使用者、角色、組對象并寫入
 8       PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges();
 9       List<Object> toPersistPrivObjs = new ArrayList<Object>();
10       if (principalPrivs != null) {
11         int now = (int)(System.currentTimeMillis()/1000);
12 
13         Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges();
14         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER);
15 
16         Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges();
17         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP);
18 
19         Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges();
20         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE);
21       }
22       pm.makePersistentAll(toPersistPrivObjs);
23       commited = commitTransaction();
24     } finally {
      //如果失敗則復原
25       if (!commited) {
26         rollbackTransaction();
27       }
28     }
29   }      

  4、dropTable

   二話不說上從Hive類中上代碼:

1   public void dropTable(String tableName, boolean ifPurge) throws HiveException {
    //這裡Hive 将dbName與TableName合并成一個數組
2     String[] names = Utilities.getDbTableName(tableName);
3     dropTable(names[0], names[1], true, true, ifPurge);
4   }      

  為什麼要進行這樣的處理呢,其實是因為 drop table的時候 我們的sql語句會是drop table dbName.tableName 或者是drop table tableName,這裡進行tableName和DbName的組裝,如果為drop table tableName,則擷取目前session中的dbName,代碼如下:

1   public static String[] getDbTableName(String dbtable) throws SemanticException {
    //擷取目前Session中的DbName
 2     return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable);
 3   }
 4 
 5   public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException {
 6     if (dbtable == null) {
 7       return new String[2];
 8     }
 9     String[] names =  dbtable.split("\\.");
10     switch (names.length) {
11       case 2:
12         return names;
     //如果長度為1,則重新組裝
13       case 1:
14         return new String [] {defaultDb, dbtable};
15       default:
16         throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable);
17     }
18   }      

  随後通過getMSC()調用HiveMetaStoreClient中的dropTable,代碼如下:

1   public void dropTable(String dbname, String name, boolean deleteData,
 2       boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
 3       NoSuchObjectException, UnsupportedOperationException {
 4     Table tbl;
 5     try {
     //通過dbName與tableName擷取正個Table對象,也就是通過dbName與TableName擷取該Table存儲的所有中繼資料
 6       tbl = getTable(dbname, name);
 7     } catch (NoSuchObjectException e) {
 8       if (!ignoreUnknownTab) {
 9         throw e;
10       }
11       return;
12     }
    //根據table type來判斷是否為IndexTable,如果為索引表則不允許删除  
13     if (isIndexTable(tbl)) {
14       throw new UnsupportedOperationException("Cannot drop index tables");
15     }
    //這裡的getHook 與create時getHook一緻,擷取對應table存儲的hook
16     HiveMetaHook hook = getHook(tbl);
17     if (hook != null) {
18       hook.preDropTable(tbl);
19     }
20     boolean success = false;
21     try {
      調用HiveMetaStore服務端的dropTable方法
22       drop_table_with_environment_context(dbname, name, deleteData, envContext);
23       if (hook != null) {
24         hook.commitDropTable(tbl, deleteData);
25       }
26       success=true;
27     } catch (NoSuchObjectException e) {
28       if (!ignoreUnknownTab) {
29         throw e;
30       }
31     } finally {
32       if (!success && (hook != null)) {
33         hook.rollbackDropTable(tbl);
34       }
35     }
36   }      

  下面我們重點看下服務端HiveMetaStore幹了些什麼,代碼如下:

1    private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
 2         final boolean deleteData, final EnvironmentContext envContext,
 3         final String indexName) throws NoSuchObjectException,
 4         MetaException, IOException, InvalidObjectException, InvalidInputException {
 5       boolean success = false;
 6       boolean isExternal = false;
 7       Path tblPath = null;
 8       List<Path> partPaths = null;
 9       Table tbl = null;
10       boolean ifPurge = false;
11       try {
12         ms.openTransaction();
13         // 擷取正個Table的對象屬性
14         tbl = get_table_core(dbname, name);
15         if (tbl == null) {
16           throw new NoSuchObjectException(name + " doesn't exist");
17         }
       //如果sd資料為空,則認為該表資料損壞
18         if (tbl.getSd() == null) {
19           throw new MetaException("Table metadata is corrupted");
20         }
21         ifPurge = isMustPurge(envContext, tbl);
22 
23         firePreEvent(new PreDropTableEvent(tbl, deleteData, this));
       //判斷如果該表存在索引,則需要先删除該表的索引
25         boolean isIndexTable = isIndexTable(tbl);
26         if (indexName == null && isIndexTable) {
27           throw new RuntimeException(
28               "The table " + name + " is an index table. Please do drop index instead.");
29         }
       //如果不是索引表,則删除索引中繼資料
31         if (!isIndexTable) {
32           try {
33             List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
34             while (indexes != null && indexes.size() > 0) {
35               for (Index idx : indexes) {
36                 this.drop_index_by_name(dbname, name, idx.getIndexName(), true);
37               }
38               indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
39             }
40           } catch (TException e) {
41             throw new MetaException(e.getMessage());
42           }
43         }
       //判斷是否為外部表
44         isExternal = isExternal(tbl);
45         if (tbl.getSd().getLocation() != null) {
46           tblPath = new Path(tbl.getSd().getLocation());
47           if (!wh.isWritable(tblPath.getParent())) {
48             String target = indexName == null ? "Table" : "Index table";
49             throw new MetaException(target + " metadata not deleted since " +
50                 tblPath.getParent() + " is not writable by " +
51                 hiveConf.getUser());
52           }
53         }
54 
56         checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge);
57         //擷取所有partition的location path 這裡有個奇怪的地方,為什麼不将Table對象直接傳入,而是又在該方法中重新getTable,同時校驗上級目錄的讀寫權限
58         partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
59             tbl.getPartitionKeys(), deleteData && !isExternal);
60      //調用ObjectStore進行meta資料的删除
61         if (!ms.dropTable(dbname, name)) {
62           String tableName = dbname + "." + name;
63           throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
64               "Unable to drop index table " + tableName + " for index " + indexName);
65         }
66         success = ms.commitTransaction();
67       } finally {
68         if (!success) {
69           ms.rollbackTransaction();
70         } else if (deleteData && !isExternal) {
        //删除實體partition
73           deletePartitionData(partPaths, ifPurge);
74           //删除Table路徑
75           deleteTableData(tblPath, ifPurge);
76           // ok even if the data is not deleted
77        
       //Listener 處理
78         for (MetaStoreEventListener listener : listeners) {
79           DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this);
80           dropTableEvent.setEnvironmentContext(envContext);
81           listener.onDropTable(dropTableEvent);
82         }
83       }
84       return success;
85     }      

   我們繼續深入ObjectStore中的dropTable,會發現 再一次通過dbName與tableName擷取整個Table對象,随後逐一删除。也許代碼并不是同一個人寫的也可能是由于安全性考慮?很多可以通過接口傳入的Table對象,都重新擷取了,這樣會不會加重資料庫的負擔呢?ObjectStore代碼如下:

1   public boolean dropTable(String dbName, String tableName) throws MetaException,
 2     NoSuchObjectException, InvalidObjectException, InvalidInputException {
 3     boolean success = false;
 4     try {
 5       openTransaction();
      //重新擷取Table對象
 6       MTable tbl = getMTable(dbName, tableName);
 7       pm.retrieve(tbl);
 8       if (tbl != null) {
 9         //下列代碼查詢并删除所有的權限
10         List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName);
11         if (tabGrants != null && tabGrants.size() > 0) {
12           pm.deletePersistentAll(tabGrants);
13         }
       
14         List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(dbName,
15             tableName);
16         if (tblColGrants != null && tblColGrants.size() > 0) {
17           pm.deletePersistentAll(tblColGrants);
18         }
19 
20         List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(dbName, tableName);
21         if (partGrants != null && partGrants.size() > 0) {
22           pm.deletePersistentAll(partGrants);
23         }
24 
25         List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName,
26             tableName);
27         if (partColGrants != null && partColGrants.size() > 0) {
28           pm.deletePersistentAll(partColGrants);
29         }
30         // delete column statistics if present
31         try {
        //删除column統計表資料
32           deleteTableColumnStatistics(dbName, tableName, null);
33         } catch (NoSuchObjectException e) {
34           LOG.info("Found no table level column statistics associated with db " + dbName +
35           " table " + tableName + " record to delete");
36         }
37      //删除mcd表資料
38         preDropStorageDescriptor(tbl.getSd());
39         //删除整個Table對象相關表資料
40         pm.deletePersistentAll(tbl);
41       }
42       success = commitTransaction();
43     } finally {
44       if (!success) {
45         rollbackTransaction();
46       }
47     }
48     return success;
49   }      

  5、AlterTable

  下來我們看下AlterTable,AlterTable包含的邏輯較多,因為牽扯到實體存儲上的路徑修改等,那麼我們來一點點檢視。還是從Hive類中開始,上代碼:

1   public void alterTable(String tblName, Table newTbl, boolean cascade)
 2       throws InvalidOperationException, HiveException {
 3     String[] names = Utilities.getDbTableName(tblName);
 4     try {
 5       //删除table kv中的DDL_TIME 因為要alterTable是以,該事件會被改變
 6       if (newTbl.getParameters() != null) {
 7         newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
 8       }
      //進行相關校驗,包含dbName、tableName、column、inputOutClass、outputClass的校驗等,如果校驗不通過則抛出HiveException
 9       newTbl.checkValidity();
      //調用alterTable
10       getMSC().alter_table(names[0], names[1], newTbl.getTTable(), cascade);
11     } catch (MetaException e) {
12       throw new HiveException("Unable to alter table. " + e.getMessage(), e);
13     } catch (TException e) {
14       throw new HiveException("Unable to alter table. " + e.getMessage(), e);
15     }
16   }      

  對于HiveMetaClient,并沒有做相應處理,是以我們直接來看HiveMetaStore服務端做了些什麼呢?

1     private void alter_table_core(final String dbname, final String name, final Table newTable,
 2         final EnvironmentContext envContext, final boolean cascade)
 3         throws InvalidOperationException, MetaException {
 4       startFunction("alter_table", ": db=" + dbname + " tbl=" + name
 5           + " newtbl=" + newTable.getTableName());
 6 
 7       //更新DDL_Time
 8       if (newTable.getParameters() == null ||
 9           newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
10         newTable.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
11             .currentTimeMillis() / 1000));
12       }
13       boolean success = false;
14       Exception ex = null;
15       try {
       //擷取已有Table的整個對象
16         Table oldt = get_table_core(dbname, name);
       //進行Event處理
17         firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
       //進行alterTable處理,後面詳細說明
18         alterHandler.alterTable(getMS(), wh, dbname, name, newTable, cascade);
19         success = true;
20     
       //進行Listener處理
21         for (MetaStoreEventListener listener : listeners) {
22           
23           AlterTableEvent alterTableEvent =
24               new AlterTableEvent(oldt, newTable, success, this);
25           alterTableEvent.setEnvironmentContext(envContext);
26           listener.onAlterTable(alterTableEvent);
27         }
28       } catch (NoSuchObjectException e) {
29         // thrown when the table to be altered does not exist
30         ex = e;
31         throw new InvalidOperationException(e.getMessage());
32       } catch (Exception e) {
33         ex = e;
34         if (e instanceof MetaException) {
35           throw (MetaException) e;
36         } else if (e instanceof InvalidOperationException) {
37           throw (InvalidOperationException) e;
38         } else {
39           throw newMetaException(e);
40         }
41       } finally {
42         endFunction("alter_table", success, ex, name);
43       }
44     }      

  那麼,我們重點看下alterHandler具體所做的事情,在這之前簡要說下alterHandler的初始化,它是在HiveMetaStore init時擷取的hive.metastore.alter.impl參數的className,也就是HiveAlterHandler的name,那麼具體,我們來看下它alterTable時的實作,前方高能,小心火燭:)

1   public void alterTable(RawStore msdb, Warehouse wh, String dbname,
  2       String name, Table newt, boolean cascade) throws InvalidOperationException, MetaException {
  3     if (newt == null) {
  4       throw new InvalidOperationException("New table is invalid: " + newt);
  5     }
  6    //校驗新的tableName是否合法
  7     if (!MetaStoreUtils.validateName(newt.getTableName())) {
  8       throw new InvalidOperationException(newt.getTableName()
  9           + " is not a valid object name");
 10     }
     //校驗新的column Name type是否合法
 11     String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols());
 12     if (validate != null) {
 13       throw new InvalidOperationException("Invalid column " + validate);
 14     }
 15 
 16     Path srcPath = null;
 17     FileSystem srcFs = null;
 18     Path destPath = null;
 19     FileSystem destFs = null;
 20 
 21     boolean success = false;
 22     boolean moveData = false;
 23     boolean rename = false;
 24     Table oldt = null;
 25     List<ObjectPair<Partition, String>> altps = new ArrayList<ObjectPair<Partition, String>>();
 26 
 27     try {
 28       msdb.openTransaction();
      //這裡直接轉換小寫,可以看出 代碼不是一個人寫的
 29       name = name.toLowerCase();
 30       dbname = dbname.toLowerCase();
 31 
 32       //校驗新的tableName是否存在
 33       if (!newt.getTableName().equalsIgnoreCase(name)
 34           || !newt.getDbName().equalsIgnoreCase(dbname)) {
 35         if (msdb.getTable(newt.getDbName(), newt.getTableName()) != null) {
 36           throw new InvalidOperationException("new table " + newt.getDbName()
 37               + "." + newt.getTableName() + " already exists");
 38         }
 39         rename = true;
 40       }
 41 
 42       //擷取老的table對象
 43       oldt = msdb.getTable(dbname, name);
 44       if (oldt == null) {
 45         throw new InvalidOperationException("table " + newt.getDbName() + "."
 46             + newt.getTableName() + " doesn't exist");
 47       }
 48     //alter Table時 擷取 METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES配置項,如果為true的話,将改變column的type類型,這裡為false
 49       if (HiveConf.getBoolVar(hiveConf,
 50             HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES,
 51             false)) {
 52         // Throws InvalidOperationException if the new column types are not
 53         // compatible with the current column types.
 54         MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange(
 55             oldt.getSd().getCols(), newt.getSd().getCols());
 56       }
 57     //cascade參數由調用Hive altertable方法穿過來的,也就是引擎調用時參數的設定,這裡用來檢視是否需要alterPartition資訊
 58       if (cascade) {
 59         //校驗新的column是否與老的column一緻,如不一緻,說明進行了column的添加或删除操作
 60         if(MetaStoreUtils.isCascadeNeededInAlterTable(oldt, newt)) {
        //根據dbName與tableName擷取整個partition的資訊
 61           List<Partition> parts = msdb.getPartitions(dbname, name, -1);
 62           for (Partition part : parts) {
 63             List<FieldSchema> oldCols = part.getSd().getCols();
 64             part.getSd().setCols(newt.getSd().getCols());
 65             String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
          //如果columns不一緻,則删除已有的column統計資訊
 66             updatePartColumnStatsForAlterColumns(msdb, part, oldPartName, part.getValues(), oldCols, part);
          //更新整個Partition的資訊
 67             msdb.alterPartition(dbname, name, part.getValues(), part);
 68           }
 69         } else {
 70           LOG.warn("Alter table does not cascade changes to its partitions.");
 71         }
 72       }
 73 
 74       //判斷parititonkey是否改變,也就是dt 或 hour等partName是否改變
 76       boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(),
 77           newt.getPartitionKeys());
 78     
      //如果已有表為視圖表,同時發現老的partkey與新的partKey不一緻,則報錯
 79       if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
 80         if (oldt.getPartitionKeys().size() != newt.getPartitionKeys().size()
 81             || !partKeysPartiallyEqual) {
 82           throw new InvalidOperationException(
 83               "partition keys can not be changed.");
 84         }
 85       }
 86 
       //如果該表不為視圖表,同時,該表的location資訊并未發生變化,同時新的location資訊并不為空,并且已有的該表不為外部表,說明使用者是想要移動資料到新的location位址,那麼該操作
       // 為alter table rename操作
 91       if (rename
 92           && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())
 93           && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0
 94             || StringUtils.isEmpty(newt.getSd().getLocation()))
 95           && !MetaStoreUtils.isExternalTable(oldt)) {
 96      //擷取新的location資訊
 97         srcPath = new Path(oldt.getSd().getLocation());
 98         srcFs = wh.getFs(srcPath);
 99 
100         // that means user is asking metastore to move data to new location
101         // corresponding to the new name
102         // get new location
103         Database db = msdb.getDatabase(newt.getDbName());
104         Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
105         destPath = new Path(databasePath, newt.getTableName());
106         destFs = wh.getFs(destPath);
107      //設定新的table location資訊 用于後續更新動作
108         newt.getSd().setLocation(destPath.toString());
109         moveData = true;
110 
       //校驗實體目标位址是否存在,如果存在則會override所有資料,是不允許的。
114         if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
115           throw new InvalidOperationException("table new location " + destPath
116               + " is on a different file system than the old location "
117               + srcPath + ". This operation is not supported");
118         }
119         try {
120           srcFs.exists(srcPath); // check that src exists and also checks
121                                  // permissions necessary
122           if (destFs.exists(destPath)) {
123             throw new InvalidOperationException("New location for this table "
124                 + newt.getDbName() + "." + newt.getTableName()
125                 + " already exists : " + destPath);
126           }
127         } catch (IOException e) {
128           throw new InvalidOperationException("Unable to access new location "
129               + destPath + " for table " + newt.getDbName() + "."
130               + newt.getTableName());
131         }
132         String oldTblLocPath = srcPath.toUri().getPath();
133         String newTblLocPath = destPath.toUri().getPath();
134     
135         //擷取old table中的所有partition資訊
136         List<Partition> parts = msdb.getPartitions(dbname, name, -1);
137         for (Partition part : parts) {
138           String oldPartLoc = part.getSd().getLocation();
        //這裡,便開始新老partition位址的變換,修改partition中繼資料資訊
139           if (oldPartLoc.contains(oldTblLocPath)) {
140             URI oldUri = new Path(oldPartLoc).toUri();
141             String newPath = oldUri.getPath().replace(oldTblLocPath, newTblLocPath);
142             Path newPartLocPath = new Path(oldUri.getScheme(), oldUri.getAuthority(), newPath);
143             altps.add(ObjectPair.create(part, part.getSd().getLocation()));
144             part.getSd().setLocation(newPartLocPath.toString());
145             String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
146             try {
147               //existing partition column stats is no longer valid, remove them
148               msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, part.getValues(), null);
149             } catch (InvalidInputException iie) {
150               throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
151             }
152             msdb.alterPartition(dbname, name, part.getValues(), part);
153           }
154         }
       //更新stats相關資訊
155       } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) &&
156         (newt.getPartitionKeysSize() == 0)) {
157           Database db = msdb.getDatabase(newt.getDbName());
158           // Update table stats. For partitioned table, we update stats in
159           // alterPartition()
160           MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true);
161       }
162       updateTableColumnStatsForAlterTable(msdb, oldt, newt);
163       // now finally call alter table
164       msdb.alterTable(dbname, name, newt);
165       // commit the changes
166       success = msdb.commitTransaction();
167     } catch (InvalidObjectException e) {
168       LOG.debug(e);
169       throw new InvalidOperationException(
170           "Unable to change partition or table."
171               + " Check metastore logs for detailed stack." + e.getMessage());
172     } catch (NoSuchObjectException e) {
173       LOG.debug(e);
174       throw new InvalidOperationException(
175           "Unable to change partition or table. Database " + dbname + " does not exist"
176               + " Check metastore logs for detailed stack." + e.getMessage());
177     } finally {
178       if (!success) {
179         msdb.rollbackTransaction();
180       }
181       if (success && moveData) {
       //開始更新hdfs路徑,進行老路徑的rename到新路徑 ,調用fileSystem的rename操作
185         try {
186           if (srcFs.exists(srcPath) && !srcFs.rename(srcPath, destPath)) {
187             throw new IOException("Renaming " + srcPath + " to " + destPath + " failed");
188           }
189         } catch (IOException e) {
190           LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e);
191           boolean revertMetaDataTransaction = false;
192           try {
193             msdb.openTransaction();
           //這裡會發現,又一次進行了alterTable中繼資料動作,或許跟JDO的特性有關?還是因為安全?
194             msdb.alterTable(newt.getDbName(), newt.getTableName(), oldt);
195             for (ObjectPair<Partition, String> pair : altps) {
196               Partition part = pair.getFirst();
197               part.getSd().setLocation(pair.getSecond());
198               msdb.alterPartition(newt.getDbName(), name, part.getValues(), part);
199             }
200             revertMetaDataTransaction = msdb.commitTransaction();
201           } catch (Exception e1) {
202             // we should log this for manual rollback by administrator
203             LOG.error("Reverting metadata by HDFS operation failure failed During HDFS operation failed", e1);
204             LOG.error("Table " + Warehouse.getQualifiedName(newt) +
205                 " should be renamed to " + Warehouse.getQualifiedName(oldt));
206             LOG.error("Table " + Warehouse.getQualifiedName(newt) +
207                 " should have path " + srcPath);
208             for (ObjectPair<Partition, String> pair : altps) {
209               LOG.error("Partition " + Warehouse.getQualifiedName(pair.getFirst()) +
210                   " should have path " + pair.getSecond());
211             }
212             if (!revertMetaDataTransaction) {
213               msdb.rollbackTransaction();
214             }
215           }
216           throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name +
217             " failed to move data due to: '" + getSimpleMessage(e) + "' See hive log file for details.");
218         }
219       }
220     }
221     if (!success) {
222       throw new MetaException("Committing the alter table transaction was not successful.");
223     }
224   }      

  6、createPartition

  在分區資料寫入之前,會先進行partition的中繼資料注冊及實體檔案路徑的建立(内部表),Hive類代碼如下:

1   public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
2     try {
    //new出來一個Partition對象,傳入Table對象,調用Partition的構造方法來initialize Partition的資訊
3       return new Partition(tbl, getMSC().add_partition(
4           Partition.createMetaPartitionObject(tbl, partSpec, null)));
5     } catch (Exception e) {
6       LOG.error(StringUtils.stringifyException(e));
7       throw new HiveException(e);
8     }
9   }      

  這裡的createMetaPartitionObject作用在于整個Partition傳入對象的校驗對對象的封裝,代碼如下:

1   public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject(
 2       Table tbl, Map<String, String> partSpec, Path location) throws HiveException {
 3     List<String> pvals = new ArrayList<String>();
    //周遊整個PartCols,并且校驗partMap中是否一一對應
 4     for (FieldSchema field : tbl.getPartCols()) {
 5       String val = partSpec.get(field.getName());
 6       if (val == null || val.isEmpty()) {
 7         throw new HiveException("partition spec is invalid; field "
 8             + field.getName() + " does not exist or is empty");
 9       }
10       pvals.add(val);
11     }
12   //set相關的屬性資訊,包括DbName、TableName、PartValues、以及sd資訊
13     org.apache.hadoop.hive.metastore.api.Partition tpart =
14         new org.apache.hadoop.hive.metastore.api.Partition();
15     tpart.setDbName(tbl.getDbName());
16     tpart.setTableName(tbl.getTableName());
17     tpart.setValues(pvals);
18 
19     if (!tbl.isView()) {
20       tpart.setSd(cloneS d(tbl));
21       tpart.getSd().setLocation((location != null) ? location.toString() : null);
22     }
23     return tpart;
24   }      

  随之MetaDataClient對于該對象調用MetaDataService的addPartition,并進行了深拷貝,這裡不再詳細說明,那麼我們直接看下服務端幹了什麼:

1     private Partition add_partition_core(final RawStore ms,
 2         final Partition part, final EnvironmentContext envContext)
 3         throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
 4       boolean success = false;
 5       Table tbl = null;
 6       try {
 7         ms.openTransaction(); 
       //根據DbName、TableName擷取整個Table對象資訊
 8         tbl = ms.getTable(part.getDbName(), part.getTableName());
 9         if (tbl == null) {
10           throw new InvalidObjectException(
11               "Unable to add partition because table or database do not exist");
12         }
13      //事件處理
14         firePreEvent(new PreAddPartitionEvent(tbl, part, this));
15      //在建立Partition之前,首先會校驗中繼資料中該partition是否存在
16         boolean shouldAdd = startAddPartition(ms, part, false);
17         assert shouldAdd; // start would throw if it already existed here
       //建立Partition路徑
18         boolean madeDir = createLocationForAddedPartition(tbl, part);
19         try {
        //加載一些kv資訊
20           initializeAddedPartition(tbl, part, madeDir);
        //寫入中繼資料
21           success = ms.addPartition(part);
22         } finally {
23           if (!success && madeDir) {
         //如果沒有成功,便删除實體路徑
24             wh.deleteDir(new Path(part.getSd().getLocation()), true);
25           }
26         }
27         // we proceed only if we'd actually succeeded anyway, otherwise,
28         // we'd have thrown an exception
29         success = success && ms.commitTransaction();
30       } finally {
31         if (!success) {
32           ms.rollbackTransaction();
33         }
34         fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success);
35       }
36       return part;
37     }      

  這裡提及一個設計上的點,從之前的表結構設計上,沒有直接存儲PartName,而是将key與value單獨存在與kv表中,這裡我們看下createLocationForAddedPartition:

1     private boolean createLocationForAddedPartition(
 2         final Table tbl, final Partition part) throws MetaException {
 3       Path partLocation = null;
 4       String partLocationStr = null;
      //如果sd不為null,則将sd的location資訊作為表跟目錄賦給partLocationStr
 5       if (part.getSd() != null) {
 6         partLocationStr = part.getSd().getLocation();
 7       }
 8     //如果為null,則重新拼接part Location
 9       if (partLocationStr == null || partLocationStr.isEmpty()) {
10         // set default location if not specified and this is
11         // a physical table partition (not a view)
12         if (tbl.getSd().getLocation() != null) { 
        //如果不為null,則繼續拼接檔案路徑及part的路徑,組成完成的Partition location
13           partLocation = new Path(tbl.getSd().getLocation(), Warehouse
14               .makePartName(tbl.getPartitionKeys(), part.getValues()));
15         }
16       } else {
17         if (tbl.getSd().getLocation() == null) {
18           throw new MetaException("Cannot specify location for a view partition");
19         }
20         partLocation = wh.getDnsPath(new Path(partLocationStr));
21       }
22 
23       boolean result = false;
     //将location資訊寫入sd表
24       if (partLocation != null) {
25         part.getSd().setLocation(partLocation.toString());
26 
27         // Check to see if the directory already exists before calling
28         // mkdirs() because if the file system is read-only, mkdirs will
29         // throw an exception even if the directory already exists.
30         if (!wh.isDir(partLocation)) {
31           if (!wh.mkdirs(partLocation, true)) {
32             throw new MetaException(partLocation
33                 + " is not a directory or unable to create one");
34           }
35           result = true;
36         }
37       }
38       return result;
39     }      

  總結:

  7、dropPartition

  删除partition就不再從Hive開始了,我們直接看HiveMetaStore服務端做了什麼:

1     private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
 2       List<String> part_vals, final boolean deleteData, final EnvironmentContext envContext)
 3       throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
 4       InvalidInputException {
 5       boolean success = false;
 6       Path partPath = null;
 7       Table tbl = null;
 8       Partition part = null;
 9       boolean isArchived = false;
10       Path archiveParentDir = null;
11       boolean mustPurge = false;
12 
13       try {
14         ms.openTransaction();
       //根據dbName、tableName、part_values擷取整個part資訊
15         part = ms.getPartition(db_name, tbl_name, part_vals);
       //擷取所有Table對象
16         tbl = get_table_core(db_name, tbl_name);
17         firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
18         mustPurge = isMustPurge(envContext, tbl);
19 
20         if (part == null) {
21           throw new NoSuchObjectException("Partition doesn't exist. "
22               + part_vals);
23         }
24      //這一片還沒有深入看Arrchived partition
25         isArchived = MetaStoreUtils.isArchived(part);
26         if (isArchived) {
27           archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
28           verifyIsWritablePath(archiveParentDir);
29           checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, mustPurge);
30         }
31         if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
32           throw new MetaException("Unable to drop partition");
33         }
34         success = ms.commitTransaction();
35         if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
36           partPath = new Path(part.getSd().getLocation());
37           verifyIsWritablePath(partPath);
38           checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, mustPurge);
39         }
40       } finally {
41         if (!success) {
42           ms.rollbackTransaction();
43         } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) {
44           if (tbl != null && !isExternal(tbl)) {
45             if (mustPurge) {
46               LOG.info("dropPartition() will purge " + partPath + " directly, skipping trash.");
47             }
48             else {
49               LOG.info("dropPartition() will move " + partPath + " to trash-directory.");
50             }
         //删除partition
51             // Archived partitions have har:/to_har_file as their location.
52             // The original directory was saved in params
53             if (isArchived) {
54               assert (archiveParentDir != null);
55               wh.deleteDir(archiveParentDir, true, mustPurge);
56             } else {
57               assert (partPath != null);
58               wh.deleteDir(partPath, true, mustPurge);
59               deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge);
60             }
61             // ok even if the data is not deleted
62           }
63         }
64         for (MetaStoreEventListener listener : listeners) {
65           DropPartitionEvent dropPartitionEvent =
66             new DropPartitionEvent(tbl, part, success, deleteData, this);
67           dropPartitionEvent.setEnvironmentContext(envContext);
68           listener.onDropPartition(dropPartitionEvent);
69         }
70       }
71       return true;
72     }      

  8、alterPartition

  alterPartition牽扯的校驗及檔案目錄的修改,我們直接從HiveMetaStore中的rename_partition中檢視:

1     private void rename_partition(final String db_name, final String tbl_name,
 2         final List<String> part_vals, final Partition new_part,
 3         final EnvironmentContext envContext)
 4         throws InvalidOperationException, MetaException,
 5         TException {
      //日志記錄
 6       startTableFunction("alter_partition", db_name, tbl_name);
 7 
 8       if (LOG.isInfoEnabled()) {
 9         LOG.info("New partition values:" + new_part.getValues());
10         if (part_vals != null && part_vals.size() > 0) {
11           LOG.info("Old Partition values:" + part_vals);
12         }
13       }
14 
15       Partition oldPart = null;
16       Exception ex = null;
17       try {
18         firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this));
19      //校驗PartName的規範性
20         if (part_vals != null && !part_vals.isEmpty()) {
21           MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(),
22               partitionValidationPattern);
23         }
24      調用alterHandler的alterPartition進行partition實體上的rename,以及中繼資料修改
25         oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part);
26 
27         // Only fetch the table if we actually have a listener
28         Table table = null;
29         for (MetaStoreEventListener listener : listeners) {
30           if (table == null) {
31             table = getMS().getTable(db_name, tbl_name);
32           }
33           AlterPartitionEvent alterPartitionEvent =
34               new AlterPartitionEvent(oldPart, new_part, table, true, this);
35           alterPartitionEvent.setEnvironmentContext(envContext);
36           listener.onAlterPartition(alterPartitionEvent);
37         }
38       } catch (InvalidObjectException e) {
39         ex = e;
40         throw new InvalidOperationException(e.getMessage());
41       } catch (AlreadyExistsException e) {
42         ex = e;
43         throw new InvalidOperationException(e.getMessage());
44       } catch (Exception e) {
45         ex = e;
46         if (e instanceof MetaException) {
47           throw (MetaException) e;
48         } else if (e instanceof InvalidOperationException) {
49           throw (InvalidOperationException) e;
50         } else if (e instanceof TException) {
51           throw (TException) e;
52         } else {
53           throw newMetaException(e);
54         }
55       } finally {
56         endFunction("alter_partition", oldPart != null, ex, tbl_name);
57       }
58       return;
59     }      

  這裡我們着重看一下,alterHandler.alterPartition方法,前方高能:

1   public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
  2       final String name, final List<String> part_vals, final Partition new_part)
  3       throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
  4       MetaException {
  5     boolean success = false;
  6 
  7     Path srcPath = null;
  8     Path destPath = null;
  9     FileSystem srcFs = null;
 10     FileSystem destFs = null;
 11     Partition oldPart = null;
 12     String oldPartLoc = null;
 13     String newPartLoc = null;
 14 
 15     //修改新的partition的DDL時間
 16     if (new_part.getParameters() == null ||
 17         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
 18         Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
 19       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
 20           .currentTimeMillis() / 1000));
 21     }
 22    //根據dbName、tableName擷取整個Table對象
 23     Table tbl = msdb.getTable(dbname, name);
 24     //如果傳入的part_vals為空或為0,說明修改的隻是partition的其他中繼資料資訊而不牽扯到partKV,則直接中繼資料,在msdb.alterPartition會直接更新
 25     if (part_vals == null || part_vals.size() == 0) {
 26       try {
 27         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
 28         if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
 29           MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
 30         }
 31         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
 32         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
 33       } catch (InvalidObjectException e) {
 34         throw new InvalidOperationException("alter is not possible");
 35       } catch (NoSuchObjectException e){
 36         //old partition does not exist
 37         throw new InvalidOperationException("alter is not possible");
 38       }
 39       return oldPart;
 40     }
 41     //rename partition
 42     try {
 43       msdb.openTransaction();
 44       try {
       //擷取oldPart對象資訊
 45         oldPart = msdb.getPartition(dbname, name, part_vals);
 46       } catch (NoSuchObjectException e) {
 47         // this means there is no existing partition
 48         throw new InvalidObjectException(
 49             "Unable to rename partition because old partition does not exist");
 50       }
 51       Partition check_part = null;
 52       try {
       //組裝newPart的partValues等Partition資訊
 53         check_part = msdb.getPartition(dbname, name, new_part.getValues());
 54       } catch(NoSuchObjectException e) {
 55         // this means there is no existing partition
 56         check_part = null;
 57       }
      //如果check_part組裝成功,說明該part已經存在,則報already exists
 58       if (check_part != null) {
 59         throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." +
 60             new_part.getValues());
 61       }
      //table的資訊校驗
 62       if (tbl == null) {
 63         throw new InvalidObjectException(
 64             "Unable to rename partition because table or database do not exist");
 65       }
 66 
 67       //如果是外部表的分區變化了,那麼不需要操作檔案系統,直接更新meta資訊即可
 68       if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
 69         new_part.getSd().setLocation(oldPart.getSd().getLocation());
 70         String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
 71         try {
 72           //existing partition column stats is no longer valid, remove
 73           msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
 74         } catch (NoSuchObjectException nsoe) {
 75           //ignore
 76         } catch (InvalidInputException iie) {
 77           throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
 78         }
 79         msdb.alterPartition(dbname, name, part_vals, new_part);
 80       } else {
 81         try {
         //擷取Table的檔案路徑
 82           destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name),
 83             Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
         //拼接新的Partition的路徑
 84           destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
 85         } catch (NoSuchObjectException e) {
 86           LOG.debug(e);
 87           throw new InvalidOperationException(
 88             "Unable to change partition or table. Database " + dbname + " does not exist"
 89               + " Check metastore logs for detailed stack." + e.getMessage());
 90         }
       //如果destPath不為空,說明改變了檔案路徑
 91         if (destPath != null) {
 92           newPartLoc = destPath.toString();
 93           oldPartLoc = oldPart.getSd().getLocation();
 94       //根據原有sd的路徑擷取老的part路徑資訊
 95           srcPath = new Path(oldPartLoc);
 96 
 97           LOG.info("srcPath:" + oldPartLoc);
 98           LOG.info("descPath:" + newPartLoc);
 99           srcFs = wh.getFs(srcPath);
100           destFs = wh.getFs(destPath);
101           //檢視srcFS與destFs是否Wie同一個fileSystem
102           if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
103             throw new InvalidOperationException("table new location " + destPath
104               + " is on a different file system than the old location "
105               + srcPath + ". This operation is not supported");
106           }
107           try {
          //校驗老的partition路徑與新的partition路徑是否一緻,同時新的partition路徑是否已經存在  
108             srcFs.exists(srcPath); // check that src exists and also checks
109             if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
110               throw new InvalidOperationException("New location for this table "
111                 + tbl.getDbName() + "." + tbl.getTableName()
112                 + " already exists : " + destPath);
113             }
114           } catch (IOException e) {
115             throw new InvalidOperationException("Unable to access new location "
116               + destPath + " for partition " + tbl.getDbName() + "."
117               + tbl.getTableName() + " " + new_part.getValues());
118           }
119           new_part.getSd().setLocation(newPartLoc);
120           if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
121             MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
122           }
         //拼接oldPartName,并且删除原有oldPart的資訊,寫入新的partition資訊
123           String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
124           try {
125             //existing partition column stats is no longer valid, remove
126             msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
127           } catch (NoSuchObjectException nsoe) {
128             //ignore
129           } catch (InvalidInputException iie) {
130             throw new InvalidOperationException("Unable to update partition stats in table rename." + iie);
131           }
132           msdb.alterPartition(dbname, name, part_vals, new_part);
133         }
134       }
135 
136       success = msdb.commitTransaction();
137     } finally {
138       if (!success) {
139         msdb.rollbackTransaction();
140       }
141       if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) {
142         //rename the data directory
143         try{
144           if (srcFs.exists(srcPath)) {
145             //如果根路徑海微建立,需要重新進行建立,就好比計算引擎先調用了alterTable,又調用了alterPartition,這時partition的根路徑或許還未建立
146             Path destParentPath = destPath.getParent();
147             if (!wh.mkdirs(destParentPath, true)) {
148                 throw new IOException("Unable to create path " + destParentPath);
149             }
          //進行原路徑與目标路徑的rename
150             wh.renameDir(srcPath, destPath, true);
151             LOG.info("rename done!");
152           }
153         } catch (IOException e) {
154           boolean revertMetaDataTransaction = false;
155           try {
156             msdb.openTransaction();
157             msdb.alterPartition(dbname, name, new_part.getValues(), oldPart);
158             revertMetaDataTransaction = msdb.commitTransaction();
159           } catch (Exception e1) {
160             LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1);
161             if (!revertMetaDataTransaction) {
162               msdb.rollbackTransaction();
163             }
164           }
165           throw new InvalidOperationException("Unable to access old location "
166               + srcPath + " for partition " + tbl.getDbName() + "."
167               + tbl.getTableName() + " " + part_vals);
168         }
169       }
170     }
171     return oldPart;
172   }      

  暫時到這裡吧~後續咱們慢慢玩哈~