MongoSpark為入口類,調用MongoSpark.load,該方法返回一個MongoRDD類對象,Mongo Spark Connector框架本質上就是一個大號的自定義RDD,加了些自定義配置、適配幾種分區器規則、Sql的數據封裝等等,個人認為相對核心的也就是分區器的規則實現;弄清楚了其分析 ...
MongoSpark為入口類,調用MongoSpark.load,該方法返回一個MongoRDD類對象,Mongo Spark Connector框架本質上就是一個大號的自定義RDD,加了些自定義配置、適配幾種分區器規則、Sql的數據封裝等等,個人認為相對核心的也就是分區器的規則實現;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。
當前實現的分區器(Partitioner):
MongoPaginateByCountPartitioner 基於總數的分頁分區器
MongoPaginateBySizePartitioner 基於大小的分頁分區器
MongoSamplePartitioner 基於採樣的分區器
MongoShardedPartitioner 基於分片的分區器
MongoSinglePartitioner 單分區分區器
MongoSplitVectorPartitioner 基於分割向量的分區器
這裡根據源碼簡單介紹MongoSinglePartitioner與MongoSamplePartitioner分區器,這或許就是用得最多的兩種分區器,他的預設分區器(DefaultMongoPartitioner)就是MongoSamplePartitioner分區器;
該分區預設的PartitionKey為_id、預設PartitionSizeMB為64MB、預設每個分區採樣為10;
MongoSamplePartitioner
該類的核心也是唯一的方法為:partitions方法,下麵為該方法的執行流程與核心邏輯;
1、檢查執行buildInfo指令檢查Mongo版本用於判斷是否支持隨機採樣聚合運算,版本大於3.2。 hasSampleAggregateOperator方法。Mongo3.2版本中才新增了數據採樣功能。
Mongodb中的語法為:
db.cName.aggregate([
{$sample:{ size: 10 } }
])
上示例N等於10,如果N大於collection中總數據的5%,那麼$sample將會執行collection掃描、sort,然後選擇top N條文檔;如果N小於5%,對於wiredTiger而言則會遍歷collection並使用“偽隨機”的方式選取N條文檔,對於MMAPv1引擎則在_id索引上隨機選取N條文檔。
2、執行collStats,用於獲取集合的存儲信息,如行數、大小、存儲大小等等信息;
matchQuery: 過濾條件
partitionerOptions: ReadConfig傳進去的分析器選項
partitionKey: 分區key,預設為_id
partitionSizeInBytes: 分區大小,預設64MB
samplesPerPartition: 每個分區預設採樣數量,預設10
count: 集合總條數
avgObjSizeInBytes: 對象平均位元組數
numDocumentsPerPartition: 每個分區文檔數, partitionSizeInBytes / avgObjSizeInBytes:分區大小/對象平均大小
numberOfSamples: 採樣數量,samplesPerPartition * count / numDocumentsPerPartition,每個分區採樣數*集合總數/每個分區文檔數
如每個分區文檔數大於集合總文檔數,則將直接創建單分區,不採取採樣數據方式創建分區,因為此時數據量太少單個分區已經可以容得下無需多個分區;
一、創建單分區
在MongoSinglePartitioner類中通過PartitionerHelper.createPartitions執行相關邏輯;
_id作為partitionKey,
二、通過採樣數據創建分區
指定採樣條件、採樣數據量、PartitionKey、排序條件等,獲取採樣數據;
集合拆分:
def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count – 1
右側邊界:
val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}
獲取右側邊界,使用採樣數據數組索引對每個分區採樣數求餘等於0對採樣數據進行過濾取右側邊界(如匹配條件不為空則再取最後一條數據);
如採樣得到62條數據,並且沒有存在匹配條件,根據上述的採樣數據過濾條件最後取得7條數據,分別為數據數組索引為0、索引為10、20、30、40、50、60的7條數據,數據的值為PartitionKey預設就是集合中_id欄位的值;
創建分區(Partitions)
獲取得到PartitionKey、rightHandBoundaries後就可以調用PartitionerHelper.createPartitions創建Partition;下麵為創建Partition的具體邏輯;
使用PartitionKey創建查詢邊界,每個分區具有不同的查詢邊界,有最大、最小邊界; 此處創建分區Partition併在每個分區中指定了查詢邊界;
上面獲取得到了7條數據,此處將創建8個分區;下麵給出了簡單數據用於說明該分區邊界條件的基本邏輯與實現;
1、創建Min、1、3、5、7、9、11、13、Max的序列
2、創建1、3、5、7、9、11、13、Max序列
3、使用zip將兩個序列拉鏈式的合併:合併後的數據為:
4、Min,1、1,3、3,5、5,7、7,9、9,11、11,13、13,Max
Partition的邊界條件將使用上面的邊界條件,8條數據八個Partition一個對應;
0 Partition的邊界條件為:小於1
1 Partition的邊界條件為:大於等於1 小於 3
2 Partition的邊界條件為:大於等於3 小於 5
3 Partition的邊界條件為:大於等於5 小於 7
4 Partition的邊界條件為:大於等於7小於 9
5 Partition的邊界條件為:大於等於9 小於 11
6 Partition的邊界條件為:大於等於11 小於 13
7 Partition的邊界條件為:大於等於13
上面的8個Partition為8個MongoPartition對象,每個對象的index、查詢邊界與上面所說的一一對應;
在MongoRDD類的compute方法中可以看到根據對應的分區與上面創建分區時所建立的邊界條件用於計算(從Mongo中獲取對應數據);
MongoSinglePartitioner
創建單分區分區器時,直接調用PartitionerHelper.createPartitions方法創建分區,該類並無其他邏輯,並且固定的PartitionKey為_id,右側邊界條件為空集合,然後創建id為0的MongoPartition對象,並無查詢邊界;