天天看點

Mongo Spark Connector中的分區器(一)

  MongoSpark為入口類,調用MongoSpark.load,該方法傳回一個MongoRDD類對象,Mongo Spark Connector架構本質上就是一個大号的自定義RDD,加了些自定義配置、适配幾種分區器規則、Sql的資料封裝等等,個人認為相對核心的也就是分區器的規則實作;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。

目前實作的分區器(Partitioner):

  MongoPaginateByCountPartitioner 基于總數的分頁分區器

  MongoPaginateBySizePartitioner 基于大小的分頁分區器

  MongoSamplePartitioner 基于采樣的分區器

  MongoShardedPartitioner 基于分片的分區器

  MongoSinglePartitioner 單分區分區器

  MongoSplitVectorPartitioner 基于分割向量的分區器

  這裡根據源碼簡單介紹MongoSinglePartitioner與MongoSamplePartitioner分區器,這或許就是用得最多的兩種分區器,他的預設分區器(DefaultMongoPartitioner)就是MongoSamplePartitioner分區器;

該分區預設的PartitionKey為_id、預設PartitionSizeMB為64MB、預設每個分區采樣為10;

MongoSamplePartitioner

  該類的核心也是唯一的方法為:partitions方法,下面為該方法的執行流程與核心邏輯;

  1、檢查執行buildInfo指令檢查Mongo版本用于判斷是否支援随機采樣聚合運算,版本大于3.2。 hasSampleAggregateOperator方法。Mongo3.2版本中才新增了資料采樣功能。

  Mongodb中的文法為:

db.cName.aggregate([
  {$sample:{ size: 10 } }
])
           

  上示例N等于10,如果N大于collection中總資料的5%,那麼$sample将會執行collection掃描、sort,然後選擇top N條文檔;如果N小于5%,對于wiredTiger而言則會周遊collection并使用“僞随機”的方式選取N條文檔,對于MMAPv1引擎則在_id索引上随機選取N條文檔。

  2、執行collStats,用于擷取集合的存儲資訊,如行數、大小、存儲大小等等資訊;

  matchQuery: 過濾條件

  partitionerOptions: ReadConfig傳進去的分析器選項

  partitionKey: 分區key,預設為_id

  partitionSizeInBytes: 分區大小,預設64MB

  samplesPerPartition: 每個分區預設采樣數量,預設10

  count: 集合總條數

  avgObjSizeInBytes: 對象平均位元組數

  numDocumentsPerPartition: 每個分區文檔數,   partitionSizeInBytes / avgObjSizeInBytes:分區大小/對象平均大小

  numberOfSamples: 采樣數量,samplesPerPartition * count / numDocumentsPerPartition,每個分區采樣數*集合總數/每個分區文檔數

Mongo Spark Connector中的分區器(一)

  如每個分區文檔數大于集合總文檔數,則将直接建立單分區,不采取采樣資料方式建立分區,因為此時資料量太少單個分區已經可以容得下無需多個分區;

一、建立單分區

  在MongoSinglePartitioner類中通過PartitionerHelper.createPartitions執行相關邏輯;

  _id作為partitionKey,

二、通過采樣資料建立分區

Mongo Spark Connector中的分區器(一)

  指定采樣條件、采樣資料量、PartitionKey、排序條件等,擷取采樣資料;

集合拆分:

def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count – 1
           

右側邊界:

val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}
           

  擷取右側邊界,使用采樣資料數組索引對每個分區采樣數求餘等于0對采樣資料進行過濾取右側邊界(如比對條件不為空則再取最後一條資料);

  如采樣得到62條資料,并且沒有存在比對條件,根據上述的采樣資料過濾條件最後取得7條資料,分别為資料數組索引為0、索引為10、20、30、40、50、60的7條資料,資料的值為PartitionKey預設就是集合中_id字段的值;

Mongo Spark Connector中的分區器(一)

建立分區(Partitions)

Mongo Spark Connector中的分區器(一)

  擷取得到PartitionKey、rightHandBoundaries後就可以調用PartitionerHelper.createPartitions建立Partition;下面為建立Partition的具體邏輯;

  使用PartitionKey建立查詢邊界,每個分區具有不同的查詢邊界,有最大、最小邊界; 此處建立分區Partition并在每個分區中指定了查詢邊界;

  上面擷取得到了7條資料,此處将建立8個分區;下面給出了簡單資料用于說明該分區邊界條件的基本邏輯與實作;

  1、建立Min、1、3、5、7、9、11、13、Max的序列

  2、建立1、3、5、7、9、11、13、Max序列

  3、使用zip将兩個序列拉鍊式的合并:合并後的資料為:

  4、Min,1、1,3、3,5、5,7、7,9、9,11、11,13、13,Max

  Partition的邊界條件将使用上面的邊界條件,8條資料八個Partition一個對應;

  0 Partition的邊界條件為:小于1

  1 Partition的邊界條件為:大于等于1 小于 3

  2 Partition的邊界條件為:大于等于3 小于 5

  3 Partition的邊界條件為:大于等于5 小于 7

  4 Partition的邊界條件為:大于等于7小于 9

  5 Partition的邊界條件為:大于等于9 小于 11

  6 Partition的邊界條件為:大于等于11 小于 13

  7 Partition的邊界條件為:大于等于13

  上面的8個Partition為8個MongoPartition對象,每個對象的index、查詢邊界與上面所說的一一對應;

  在MongoRDD類的compute方法中可以看到根據對應的分區與上面建立分區時所建立的邊界條件用于計算(從Mongo中擷取對應資料);

Mongo Spark Connector中的分區器(一)

MongoSinglePartitioner

  建立單分區分區器時,直接調用PartitionerHelper.createPartitions方法建立分區,該類并無其他邏輯,并且固定的PartitionKey為_id,右側邊界條件為空集合,然後建立id為0的MongoPartition對象,并無查詢邊界;