上次寫了hive metastore的partition的生命周期,但是簡略概括了下alter_partition的操作,這裡補一下alter_partition,因為随着項目的深入,發現它涉及的地方較多,比如insert into 時如果路徑存在情況下會調用alter_partition,調用insert overwrite語句時,也會調用該方法,
入口依舊是Hive.java這個類:
1 public void alterPartition(String dbName, String tblName, Partition newPart)
2 throws InvalidOperationException, HiveException {
3 try {
4 // Remove the DDL time so that it gets refreshed
5 if (newPart.getParameters() != null) {
6 newPart.getParameters().remove(hive_metastoreConstants.DDL_TIME);
7 }
8 newPart.checkValidity();
9 getMSC().alter_partition(dbName, tblName, newPart.getTPartition());
10
11 } catch (MetaException e) {
12 throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
13 } catch (TException e) {
14 throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
15 }
16 }
随後通過HiveMetaStoreClient調用alter_partition請求服務端,傳入的參數中包含新的partition,然後服務端調用了rename_partition方法,詳細不再說了,上一篇大體的也說明了,這裡直接從alterHandler.alterPartition進行partition的更改開始。
1 public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
2 final String name, final List<String> part_vals, final Partition new_part)
3 throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
4 MetaException {
5 boolean success = false;
6
7 Path srcPath = null;
8 Path destPath = null;
9 FileSystem srcFs = null;
10 FileSystem destFs = null;
11 Partition oldPart = null;
12 String oldPartLoc = null;
13 String newPartLoc = null;
14
15 // Set DDL time to now if not specified
16 if (new_part.getParameters() == null ||
17 new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
18 Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
19 new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
20 .currentTimeMillis() / 1000));
21 }
22
23 Table tbl = msdb.getTable(dbname, name);
24 //alter partition
25 if (part_vals == null || part_vals.size() == 0) {
26 try {
27 oldPart = msdb.getPartition(dbname, name, new_part.getValues());
28 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
29 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
30 }
31 updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
32 msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
33 } catch (InvalidObjectException e) {
34 throw new InvalidOperationException("alter is not possible");
35 } catch (NoSuchObjectException e){
36 //old partition does not exist
37 throw new InvalidOperationException("alter is not possible");
38 }
39 return oldPart;
40 }
。。。。。。
從代碼中我們可以看到:
1、通過Table tbl = msdb.getTable(dbname, name); get到該表的整個中繼資料的封裝資訊。
2、随後oldPart = msdb.getPartition(dbname, name, new_part.getValues());,通過dbName、tableName、Values擷取partition的中繼資料資訊,Values便是新的partition分區結構eg:(2017-09-11),随後調用MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl),進行中繼資料存在校驗,如果不存在,則調用updatePartitionStatsFast進行更新(這裡就不再詳細說明,因為我不知道裡面StatsSetupConst的配置參數是幹嘛的哈哈哈哈哈~尴尬~一步步來嘛)
3、随後調用了updatePartColumnStats方法,進行實體partition位址的更新,我們一步一步看,代碼如下:
1 private void updatePartColumnStats(RawStore msdb, String dbName, String tableName,
2 List<String> partVals, Partition newPart) throws MetaException, InvalidObjectException {
3 dbName = HiveStringUtils.normalizeIdentifier(dbName);
4 tableName = HiveStringUtils.normalizeIdentifier(tableName);
5 String newDbName = HiveStringUtils.normalizeIdentifier(newPart.getDbName());
6 String newTableName = HiveStringUtils.normalizeIdentifier(newPart.getTableName());
7
8 Table oldTable = msdb.getTable(dbName, tableName);
9 if (oldTable == null) {
10 return;
11 }
12
13 try {
14 String oldPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), partVals);
15 String newPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), newPart.getValues());
16 if (!dbName.equals(newDbName) || !tableName.equals(newTableName)
17 || !oldPartName.equals(newPartName)) {
18 msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals, null);
19 } else {
20 Partition oldPartition = msdb.getPartition(dbName, tableName, partVals);
21 if (oldPartition == null) {
22 return;
23 }
24 if (oldPartition.getSd() != null && newPart.getSd() != null) {
25 List<FieldSchema> oldCols = oldPartition.getSd().getCols();
26 if (!MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())) {
27 updatePartColumnStatsForAlterColumns(msdb, oldPartition, oldPartName, partVals, oldCols, newPart);
28 }
29 }
30 }
31 } catch (NoSuchObjectException nsoe) {
32 LOG.debug("Could not find db entry." + nsoe);
33 //ignore
34 } catch (InvalidInputException iie) {
35 throw new InvalidObjectException("Invalid input to update partition column stats." + iie);
36 }
37 }
5、Table oldTable = msdb.getTable(dbName, tableName);這裡擷取oldTable的所有中繼資料資訊,随後通過makePartName拼接新老partition的partName(eg:/dt=2017-09-11/hour/1)用于新老partition的hdfs的路徑對比,因為alterPartition操作,可能是通過alter table、table rename等操作執行的,是以如果老的dbName、tableName、以及partition Name與新的不同,那麼就需要将中繼資料中類似于meta_partition的資料清空。随後通過用戶端重新建立partition。
6、如果是相同的,那麼說明修改是partition的列資訊,通過MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())進行校驗(内部方法不再把代碼貼出來了)
7、調用updatePartColumnStatsForAlterColumns開始進行column的更新,這裡面代碼還是要貼出來一起玩一下:
private void updatePartColumnStatsForAlterColumns(RawStore msdb, Partition oldPartition,
String oldPartName, List<String> partVals, List<FieldSchema> oldCols, Partition newPart)
throws MetaException, InvalidObjectException {
String dbName = oldPartition.getDbName();
String tableName = oldPartition.getTableName();
try {
List<String> oldPartNames = Lists.newArrayList(oldPartName);
List<String> oldColNames = new ArrayList<String>(oldCols.size());
for (FieldSchema oldCol : oldCols) {
oldColNames.add(oldCol.getName());
}
List<FieldSchema> newCols = newPart.getSd().getCols();
List<ColumnStatistics> partsColStats = msdb.getPartitionColumnStatistics(dbName, tableName,
oldPartNames, oldColNames);
assert (partsColStats.size() <= 1);
for (ColumnStatistics partColStats : partsColStats) { //actually only at most one loop
List<ColumnStatisticsObj> statsObjs = partColStats.getStatsObj();
for (ColumnStatisticsObj statsObj : statsObjs) {
boolean found =false;
for (FieldSchema newCol : newCols) {
if (statsObj.getColName().equals(newCol.getName())
&& statsObj.getColType().equals(newCol.getType())) {
found = true;
break;
}
}
if (!found) {
msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals,
statsObj.getColName());
}
}
}
} catch (NoSuchObjectException nsoe) {
LOG.debug("Could not find db entry." + nsoe);
//ignore
} catch (InvalidInputException iie) {
throw new InvalidObjectException
("Invalid input to update partition column stats in alter table change columns" + iie);
}
}
這裡可以看到,它查詢中繼資料并封裝了一個ColumnStatistics對象,這個對象主要封裝了tableName、PartName、colName等資訊,随後将其取出來使新老ColName進行對比,注意,這裡是對colName以及type進行對比,如果不同,則删除老的colName資訊。
好的,現在相當于将所有old的不一緻的資料删除,下來我們回到之前的alterPartition中來,随後調用alterPartition(dbname, name, new_part.getValues(), new_part)将新的partition資料注冊到中繼資料中。以上,隻是當調用rename_partition時,par_vals為null的情況下,對oldPart所進行的操作,那麼不為null時呢?是不是很絕望?我們慢慢折磨哈哈。。。
8、在par_vals不為null的情況下,會通過dbName、tableName、以及part_vals進行oldPart的查找并進行校驗。
9、對表的類型進行判斷,如果該表為内部表,則将原有的oldPart的table所在storage路徑,也就是hdfs路徑賦給newPart,這裡注意的是不是partition的location路徑,是storage的location路徑。随之調用deletePartitionColumnStatistics直接删除原有partition meta資訊。
10、如果該表為外部表,其實就是進行check,随後删除中繼資料meta(其實是中間有沒懂得地方哈哈哈。。而且太晚了,後續補上....)代碼如下:
1 try {
2 destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name),
3 Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
4 destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation()));
5 } catch (NoSuchObjectException e) {
6 LOG.debug(e);
7 throw new InvalidOperationException(
8 "Unable to change partition or table. Database " + dbname + " does not exist"
9 + " Check metastore logs for detailed stack." + e.getMessage());
10 }
11 if (destPath != null) {
12 newPartLoc = destPath.toString();
13 oldPartLoc = oldPart.getSd().getLocation();
14
15 srcPath = new Path(oldPartLoc);
16
17 LOG.info("srcPath:" + oldPartLoc);
18 LOG.info("descPath:" + newPartLoc);
19 srcFs = wh.getFs(srcPath);
20 destFs = wh.getFs(destPath);
21 // check that src and dest are on the same file system
22 if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
23 throw new InvalidOperationException("table new location " + destPath
24 + " is on a different file system than the old location "
25 + srcPath + ". This operation is not supported");
26 }
27 try {
28 srcFs.exists(srcPath); // check that src exists and also checks
29 if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
30 throw new InvalidOperationException("New location for this table "
31 + tbl.getDbName() + "." + tbl.getTableName()
32 + " already exists : " + destPath);
33 }
34 } catch (IOException e) {
35 throw new InvalidOperationException("Unable to access new location "
36 + destPath + " for partition " + tbl.getDbName() + "."
37 + tbl.getTableName() + " " + new_part.getValues());
38 }
39 new_part.getSd().setLocation(newPartLoc);
40 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
41 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
42 }
43 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues());
44 try {
45 //existing partition column stats is no longer valid, remove
46 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null);
總的來說,會發現調用alterPartition的時候,并沒有與實體操作耦合在一起,隻是對ColumnStats中繼資料進行查找更新删除等動作,但是真正在調用alterPartition時,對于中繼資料本身,其實是更新了該partition的sd資訊,以及重要的location.
相關的操作還是蠻多的,這裡知識大緻的分析了下,邊看源碼邊寫, 如有錯誤之處,還望各位大神之處,謝謝~ 碎覺~~明天去作死的幹活咯~