天天看點

MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

MongoDB聚合查詢

  • MongoDB聚合查詢
    • 什麼是聚合查詢
      • Pipeline聚合管道方法
        • 聚合流程
        • 詳細流程
        • 聚合文法
        • 常用聚合管道
          • $count
          • $group
          • $match
          • $unwind
          • $project
          • $limit
          • $skip
          • $sort
          • $sortByCount
      • MapReduce
        • 執行階段
        • 文法
        • 使用示例
        • 程式設計文法
          • map
          • reduce
          • out
          • action
          • db
          • sharded
          • nonAtomic
          • query
          • sort
          • limit
          • finalize
          • scope
          • jsMode
    • 一些練習

MongoDB聚合查詢

什麼是聚合查詢

聚合操作主要用于處理資料并傳回計算結果。聚合操作将來自多個文檔的值組合在一起,按條件分組後,再進行一系列操作(如求和、平均值、最大值、最小值)以傳回單個結3果。

聚合是MongoDB的進階查詢語言,它允許我們通過轉化合并由多個文檔的資料來生成新的在單個文檔裡不存在的文檔資訊。MongoDB中聚合(aggregate)主要用于處理資料(例如分組統計平均值、求和、最大值等),并傳回計算後的資料結果,有點類似sql語句中的 count(*)、group by。

在MongoDB中,有兩種方式計算聚合:Pipeline 和 MapReduce。Pipeline查詢速度快于MapReduce,但是MapReduce的強大之處在于能夠在多台Server上并行執行複雜的聚合邏輯。MongoDB不允許Pipeline的單個聚合操作占用過多的系統記憶體。

Pipeline聚合管道方法

MongoDB 的聚合架構就是将文檔輸入處理管道,在管道内完成對文檔的操作,最終将文檔轉換為聚合結果,MongoDB的聚合管道将MongoDB文檔在一個管道處理完畢後将結果傳遞給下一個管道處理,管道操作是可以重複的。

最基本的管道階段提供過濾器,其操作類似查詢和文檔轉換,可以修改輸出文檔的形式。其他管道操作提供了按特定字段對文檔進行分組和排序的工具,以及用于聚合數組内容(包括文檔數組)的工具。

此外,在管道階段還可以使用運算符來執行諸如計算平均值或連接配接字元串之類的任務。聚合管道可以在分片集合上運作。

聚合流程

db.collection.aggregate()是基于資料處理的聚合管道,每個文檔通過一個由多個階段(stage)組成的管道,可以對每個階段的管道進行分組、過濾等功能,然後經過一系列的處理,輸出相應的結果。

聚合管道方法的流程參見下圖

MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

上圖的聚合操作相當于 MySQL 中的以下語句:

select cust_id as _id, sum(amount) as total from orders where status like "%A%" 
group by cust_id; 
           

詳細流程

  1. db.collection.aggregate() 可以用多個構件建立一個管道,對于一連串的文檔進行處理。這些構件包括:篩選操作的 match 、映射操作的 project 、分組操作的 group 、排序操作的 sort 、限制操作的 limit 、和跳過操作的 skip 。
  2. db.collection.aggregate() 使用了MongoDB内置的原生操作,聚合效率非常高,支援類似于SQL Group By操作的功能,而不再需要使用者編寫自定義的JavaScript例程。
  3. 每個階段管道限制為100MB的記憶體。如果一個節點管道超過這個極限,MongoDB将産生一個錯誤。為了能夠在處理大型資料集,可以設定 allowDiskUse 為 true 來在聚合管道節點把資料寫入臨時檔案。這樣就可以解決100MB的記憶體的限制。
  4. db.collection.aggregate() 可以作用在分片集合,但結果不能輸在分片集合,MapReduce 可 以 作用在分片集合,結果也可以輸在分片集合。
  5. db.collection.aggregate() 方法可以傳回一個指針( cursor ),資料放在記憶體中,直接操

    作。跟Mongo shell 一樣指針操作。

  6. db.collection.aggregate() 輸出的結果隻能儲存在一個文檔中, BSON Document 大小限制為16M。可以通過傳回指針解決,版本2.6中: DB.collect.aggregate() 方法傳回一個指針,可以傳回任何結果集的大小。

聚合文法

參數說明

聚合管道操作符

MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

注意事項

使用db.collection.aggregate()直接查詢會提示錯誤,但是傳一個空數組如

db.collection.aggregate([])則不會報錯,且會和find一樣傳回所有文檔。

常用聚合管道

與mysql聚合類比

為了便于了解,先将常見的mongo的聚合操作和mysql的查詢做下類比

MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢
$count

傳回包含輸入到stage的文檔的計數,了解為傳回與表或視圖的find()查詢比對的文檔的計數。

db.collection.count()方法不執行find()操作,而是計數并傳回與查詢比對的結果數。

文法

c o u n t 階 段 相 當 于 下 面 count階段相當于下面 count階段相當于下面group+$project的序列:

db.collection.aggregate([ 
{ 
"$group": { 
"_id": null, 
"count": {// 這裡count自定義,相當于mysql的select count(*) as tables
"$sum": 1 
} 
} 
},
{ 
"$project": {// 傳回不顯示_id字段 
"_id": 0 
} 
} 
])

           

示例

查詢人數是 100000 以上的城市的數量

  • $match:階段排除pop小于等于100000的文檔,将大于100000的文檔傳到下個階段
  • $count:階段傳回聚合管道中剩餘文檔的計數,并将該值配置設定給名為 count 的字段
db.zips.aggregate([ 
{ 
"$match": { 
"pop": { 
"$gt": 100000 
} 
} 
},
{ 
"$count": "count" 
} 
])
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢
$group

**按指定的表達式對文檔進行分組,并将每個不同分組的文檔輸出到下一個階段。**輸出文檔包含一個_id字段,該字段按鍵包含不同的組。

輸出文檔還可以包含計算字段,該字段儲存由$group的_id字段分組的一些accumulator表達式的值。 $group不會輸出具體的文檔而隻是統計資訊。

文法

{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... 
} }

           
  • _id 字段是必填的;但是,可以指定_id值為null來為整個輸入文檔計算累計值。
  • 剩餘的計算字段是可選的,并使用 運算符進行計算。
  • _id 和 表達式可以接受任何有效的表達式。

accumulator操作符

MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

g r o u p 階 段 的 内 存 限 制 為 100 M , 默 認 情 況 下 , 如 果 s t a g e 超 過 此 限 制 , group階段的記憶體限制為100M,預設情況下,如果stage超過此限制, group階段的記憶體限制為100M,預設情況下,如果stage超過此限制,group将産生錯誤,但是,要允許處理大型資料集,請将allowDiskUse選項設定為true以啟用$group操作以寫入臨時檔案。

注意:

  • “$addToSet”:expr,如果目前數組中不包含expr,那就将它添加到數組中。
  • “$push”:expr,不管expr是什麼值,都将它添加到數組中,傳回包含所有值的數組。

示例

按照 state 分組,并計算每一個state分組的總人數,平均人數以及每個分組的數量

db.zips.aggregate([ 
{ 
"$group": { 
"_id": "$state", 
"totalPop": { 
"$sum": "$pop" 
},
"avglPop": { 
"$avg": "$pop" 
},
"count": { 
"$sum": 1 
} 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

統計美國的所有人數以及每個州的平均人數

db.zips.aggregate([ 
{ 
"$group": { 
"_id": null, 
"totalPop": { 
"$sum": "$pop" 
},
"avglPop": { 
"$avg": "$pop" 
},
"count": { 
"$sum": 1 
} 
} 
} 
]).pretty();
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

查找不重複的所有的 state 的值

db.zips.aggregate([ 
{ 
"$group": { 
"_id": "$state" 
} 
} 
])
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

按照 city 分組,并且分組内的 state 字段清單加入到 stateItem 并顯示

db.zips.aggregate([ 
{ 
"$group": { 
"_id": "$city", 
"stateItem": { 
"$push": "$state" 
} 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

下面聚合操作使用系統變量$$ROOT按item對文檔進行分組,生成的文檔不得超過BSON文檔大小限制

db.zips.aggregate([ 
{ 
"$group": { 
"_id": "$city", 
"item": { 
"$push": "$$ROOT" 
} 
} 
} 
]).pretty();
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢
$match

過濾文檔,僅将符合指定條件的文檔傳遞到下一個管道階段。

$match接受一個指定查詢條件的文檔,查詢文法與讀操作查詢文法相同。

文法

管道優化

m a t c h 用 于 對 文 檔 進 行 篩 選 , 之 後 可 以 在 得 到 的 文 檔 子 集 上 做 聚 合 , match用于對文檔進行篩選,之後可以在得到的文檔子集上做聚合, match用于對文檔進行篩選,之後可以在得到的文檔子集上做聚合,match可以使用除了地理空間之外的所有正常查詢操作符,在實際應用中盡可能将$match放在管道的前面位置。這樣有兩個好處:

  • 一是可以快速将不需要的文檔過濾掉,以減少管道的工作量;
  • 二是如果再投射和分組之前執行$match,查詢可以使用索引。

使用限制

不能在 $match 查詢中使用 $ 作為聚合管道的一部分。

要在 $match 階段使用 $text , $match 階段必須是管道的第一階段。

視圖不支援文本搜尋

示例

使用 $match做簡單的比對查詢,查詢縮寫是 NY 的城市資料

db.zips.aggregate([ 
{ 
"$match": { 
"state": "NY" 
} 
} 
]).pretty();

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

使用 m a t c h 管 道 選 擇 要 處 理 的 文 檔 , 然 後 将 結 果 輸 出 到 match管道選擇要處理的文檔,然後将結果輸出到 match管道選擇要處理的文檔,然後将結果輸出到group管道以計算文檔的計數

db.zips.aggregate([ 
{ 
"$match": { 
"state": "NY" 
} 
},
{ 
"$group": { 
"_id": null, 
"sum": { 
"$sum": "$pop" 
},
"avg": { 
"$avg": "$pop" 
},
"count": { 
"$sum": 1 
} 
} 
} 
]).pretty();
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢
$unwind

從輸入文檔解構數組字段以輸出每個元素的文檔,簡單說就是 可以将數組拆分為單獨的文檔。

文法

要指定字段路徑,在字段名稱前加上$符并用引号括起來。

v3.2+支援如下文法

{ 
$unwind: 
{ 
path: <field path>, 
#可選,一個新字段的名稱用于存放元素的數組索引。該名稱不能以$開頭。 
includeArrayIndex: <string>, 
#可選,default :false,若為true,如果路徑為空,缺少或為空數組,則$unwind輸出文檔 
preserveNullAndEmptyArrays: <boolean> 
} } 
           

如果為輸入文檔中不存在的字段指定路徑,或者該字段為空數組,則$unwind預設會忽略輸入文檔,并且不會輸出該輸入文檔的文檔。

版本3.2中的新功能:要輸出數組字段丢失的文檔,null或空數組,請使用選項

preserveNullAndEmptyArrays。

示例

以下聚合使用$unwind為loc數組中的每個元素輸出一個文檔:

db.zips.aggregate([ 
{ 
"$match": { 
"_id": "01002" 
} 
},
{ 
"$unwind": "$loc" 
} 
]).pretty();

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢
$project

$project可以從文檔中選擇想要的字段,和不想要的字段(指定的字段可以是來自輸入文檔或新計算字段的現有字段),也可以通過管道表達式進行一些複雜的操作,例如數學操作,日期操作,字元串操作,邏輯操作。

文法

$project 管道符的作用是選擇字段(指定字段,添加字段,不顯示字段,_id:0,排除字段等),重命名字段,派生字段。

specifications有以下形式:

: <1 or true> 是否包含該字段,field:1/0,表示選擇/不選擇 field

_id : <0 or false> 是否指定_id字段

: 添加新字段或重置現有字段的值。 在版本3.6中更改:MongoDB 3.6添加變量REMOVE。如果表達式的計算結果為$$REMOVE,則該字段将排除在輸出中。

: <0 or false> v3.4新增功能,指定排除字段

  • 預設情況下,_id字段包含在輸出文檔中。要在輸出文檔中包含輸入文檔中的任何其他字段,必須明确指定 p r o j e c t 中 的 包 含 。 如 果 指 定 包 含 文 檔 中 不 存 在 的 字 段 , project中的包含。如果指定包含文檔中不存在的字段, project中的包含。如果指定包含文檔中不存在的字段,project将忽略該字段包含,并且不會将該字段添加到文檔中。
  • 預設情況下,id字段包含在輸出文檔中。要從輸出文檔中排除id字段,必須明确指定$project中的_id字段為0。
  • v3.4版新增功能-如果指定排除一個或多個字段,則所有其他字段将在輸出文檔中傳回。

    如果指定排除_id以外的字段,則不能使用任何其他$project規範表單:即,如果排除字段,則不能指定包含字段,重置現有字段的值或添加新字段。此限制不适用于使用REMOVE變量條件排除字段。

  • v3.6版本中的新功能- 從MongoDB 3.6開始,可以在聚合表達式中使用變量REMOVE來有條件地禁止一個字段。
  • 要添加新字段或重置現有字段的值,請指定字段名稱并将其值設定為某個表達式。
  • 要将字段值直接設定為數字或布爾文本,而不是将字段設定為解析為文字的表達式,請使用 l i t e r a l 操 作 符 。 否 則 , literal操作符。否則, literal操作符。否則,project會将數字或布爾文字視為包含或排除該字段的标志。
  • 通過指定新字段并将其值設定為現有字段的字段路徑,可以有效地重命名字段。
  • 從MongoDB3.2開始,$project階段支援使用方括号[]直接建立新的數組字段。如果數組規範包含文檔中不存在的字段,則該操作會将空值替換為該字段的值。
  • 在版本3.4中更改-如果$project 是一個空文檔,MongoDB 3.4和更高版本會産生一個錯誤。
  • 投影或添加/重置嵌入文檔中的字段時,可以使用點符号

示例

以下$project階段的輸出文檔中隻包含_id,city和state字段

db.zips.aggregate([ 
{ 
"$project": { 
"_id": 1, 
"city": 1, 
"state": 1 
} 
} 
]).pretty();

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

_id 字段預設包含在内。要從$ project階段的輸出文檔中排除 _id 字段,請在project文檔中将

_id 字段設定為0來指定排除_id字段。

db.zips.aggregate([ 
{ 
"$project": { 
"_id": 0, 
"city": 1, 
"state": 1 
} 
} 
]).pretty();
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

以下$ project階段從輸出中排除loc字段

db.zips.aggregate([ 
{ 
"$project": { 
"loc": 0 
} 
} 
]).pretty();
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

可以在聚合表達式中使用變量REMOVE來有條件地禁止一個字段,

db.zips.aggregate([ 
{ 
"$project": { 
"_id": 1, 
"city": 1, 
"state": 1, 
"pop": 1, 
"loc": { 
"$cond": { 
"if": { 
"$gt": [ 
"$pop", 
1000 
] 
},
"then": "$$REMOVE", 
"else": "$loc" 
} 
} 
} 
} 
]).pretty();

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

我們還可以改變資料,将人數大于1000的城市坐标重置為0

db.zips.aggregate([ 
{ 
"$project": { 
"_id": 1, 
"city": 1, 
"state": 1, 
"pop": 1, 
"loc": { 
"$cond": { 
"if": { 
"$gt": [ 
"$pop", 
1000 
] 
},
"then": [ 
0,
0 
],
"else": "$loc" 
} 
} 
} 
} 
]).pretty();

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

新增字段列

db.zips.aggregate([ 
{ 
"$project": { 
"_id": 1, 
"city": 1, 
"state": 1, 
"pop": 1, 
"disc": { 
"$cond": { 
"if": { 
"$gt": [ 
"$pop", 
1000 
]
},
"then": "人數過多", 
"else": "人數過少" 
} 
},
"loc": { 
"$cond": { 
"if": { 
"$gt": [ 
"$pop", 
1000 
] 
},
"then": [ 
0,
0 
],
"else": "$loc" 
} 
} 
} 
} 
]).pretty(); 
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢
$limit

限制傳遞到管道中下一階段的文檔數

文法

示例,此操作僅傳回管道傳遞給它的前5個文檔。 $limit對其傳遞的文檔内容沒有影響

db.zips.aggregate({ 
"$limit": 5 
});
           

注意

當 s o r t 在 管 道 中 的 sort在管道中的 sort在管道中的limit之前立即出現時,$sort操作隻會在過程中維持前n個結果,其中n是指定的限制,而MongoDB隻需要将n個項存儲在記憶體中。當allowDiskUse為true并且n個項目超過聚合記憶體限制時,此優化仍然适用。

$skip

跳過進入stage的指定數量的文檔,并将其餘文檔傳遞到管道中的下一個階段

文法

示例,此操作将跳過管道傳遞給它的前5個文檔, $skip對沿着管道傳遞的文檔的内容沒有影響。

db.zips.aggregate({ 
"$skip": 5 
}); 
           
$sort

對所有輸入文檔進行排序,并按排序順序将它們傳回到管道。

文法

$sort指定要排序的字段和相應的排序順序的文檔。 可以具有以下值之一:

  • 1指定升序。
  • -1指定降序。
  • {$meta:“textScore”}按照降序排列計算出的textScore中繼資料。

示例

要對字段進行排序,請将排序順序設定為1或-1,以分别指定升序或降序排序,如下例所示:

db.zips.aggregate([ 
{ 
"$sort": { 
"pop": -1, 
"city": 1 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢
$sortByCount

根據指定表達式的值對傳入文檔分組,然後計算每個不同組中文檔的數量。每個輸出文檔都包含兩個字段:包含不同分組值的_id字段和包含屬于該分組或類别的文檔數的計數字段,檔案按降序排列。

文法

使用示例

下面舉了一些常用的mongo聚合例子和mysql對比,假設有一條如下的資料庫記錄(表名:zips)作為例子:

統計所有資料

SQL的文法格式如下

mongoDB的文法格式

db.zips.aggregate([ 
{ 
"$group": { 
"_id": null, 
"count": { 
"$sum": 1 
} 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

對所有城市人數求合

SQL的文法格式如下

mongoDB的文法格式

db.zips.aggregate([ 
{ 
"$group": { 
"_id": null, 
"total": { 
"$sum": "$pop" 
} 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

對城市縮寫相同的城市人數求合

SQL的文法格式如下

mongoDB的文法格式

db.zips.aggregate([ 
{ 
"$group": { 
"_id": "$state", 
"total": { 
"$sum": "$pop" 
} 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

state重複的城市個數

SQL的文法格式如下

mongoDB的文法格式

db.zips.aggregate([ 
{ 
"$group": { 
"_id": "$state", 
"total": { 
"$sum": 1 
} 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

state重複個數大于100的城市

SQL的文法格式如下

mongoDB的文法格式

db.zips.aggregate([ 
{ 
"$group": { 
"_id": "$state", 
"total": { 
"$sum": 1 
} 
} 
},
{ 
"$match": { 
"total": { 
"$gt": 100 
} 
} 
} 
])

           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

MapReduce

MongoDB的聚合操作主要是對資料的批量處理,一般都是将記錄按條件分組之後進行一系列求最大值,最小值,平均值的簡單操作,也可以對記錄進行資料統計,資料挖掘的複雜操作,聚合操作的輸入是集中的文檔,輸出可以是一個文檔也可以是多個文檔。

Pipeline查詢速度快于MapReduce,但是MapReduce的強大之處在于能夠在多台Server上并行執行複雜的聚合邏輯,MongoDB不允許Pipeline的單個聚合操作占用過多的系統記憶體,如果一個聚合操作消耗20%以上的記憶體,那麼MongoDB直接停止操作,并向用戶端輸出錯誤消息。

MapReduce是一種計算模型,簡單的說就是将大批量的工作(資料)分解(MAP)執行,然後再将結果合并成最終結果(REDUCE)

mapreduce使用javascript文法編寫,其内部也是基于javascript V8引擎解析并執行,javascript語言的靈活性也讓mapreduce可以處理更加複雜的業務場景;當然這相對于aggreation pipleine而言,意味着需要書寫大量的腳本,而且調試也将更加困難。(調試可以基于javascript調試,成功後再嵌入到mongodb中)

執行階段

mapreduce有2個階段:map和reduce;

  • mapper處理每個document,然後emits一個或者多個objects,object為key-value對;
  • reducer将map操作的結果進行聯合操作(combine)。此外mapreduce還可以有一個finalize階段,這是可選的,它可以調整reducer計算的結果。在進行mapreduce之前,mongodb支援使用query來篩選文檔,也支援sort排序和limit。

文法

MapReduce 的基本文法如下:

db.collection.mapReduce( 
function() {emit(key,value);}, //map 函數 
function(key,values) {return reduceFunction}, //reduce 函數 
{ 
out: collection, 
query: document, 
sort: document, 
limit: number, 
finalize: <function>, 
scope: <document>, 
jsMode: <boolean>, 
verbose: <boolean> 
} 
)

           

使用 MapReduce 要實作兩個函數 Map 函數和 Reduce 函數,Map 函數調用 emit(key, value), 遍 曆 collection 中所有的記錄, 将 key 與 value 傳遞給 Reduce 函數進行處理。

參數說明

  • map:是JavaScript 函數,負責将每一個輸入文檔轉換為零或多個文檔,通過key進行分組,生成鍵值對序列,作為 reduce函數參數
  • reduce:是JavaScript 函數,對map操作的輸出做合并的化簡的操作(将key-values變成key-value,也就是把values數組變成一個單一的值value)
  • out:統計結果存放集合 (不指定則使用臨時集合,在用戶端斷開後自動删除)。
  • query: 一個篩選條件,隻有滿足條件的文檔才會調用map函數。(query。limit,sort可以随意組合)
  • sort: 和limit結合的sort排序參數(也是在發往map函數前給文檔排序),可以優化分組機制
  • limit: 發往map函數的文檔數量的上限(要是沒有limit,單獨使用sort的用處不大)
  • finalize:可以對reduce輸出結果再一次修改,跟group的finalize一樣,不過MapReduce沒有group的4MB文檔的輸出限制
  • scope:向map、reduce、finalize導入外部變量
  • verbose:是否包括結果資訊中的時間資訊,預設為fasle

使用示例

按照state分組統計

樣例SQL

mapReduce寫法

這是統計每一個作者的部落格分數是100以上的文章數

db.blog.mapReduce( 
function(){ 
emit(this.by,1); 
},
function(key,values){ 
return Array.sum(values); 
},
{ 
query:{likes:{$gt:100}}, 
out:"result001", 
} 
)

           

輸出結果

将結果輸出

# 顯示集合 
show tables; 
# 查詢結果集資料 
db.result001.find({})
           
MongoDB聚合查詢 Pipeline 和 MapReduceMongoDB聚合查詢

程式設計文法

在mongodb中,mapreduce除了包含mapper和reducer之外,還包含其他的一些選項,不過整體遵循mapreduce的規則:

map

javascript方法,此方法中可以使用emit(key,value),一次map調用中允許傳回調用多次emit(也可以不調用),它不需要傳回值;其中key用來分組,value将來會被傳遞給reducer用于“聚合計算”。每條document都會調用一次map方法。

mapper中輸入的是目前document,可以通過this.來擷取字段的值。mapper應該是封閉的,它不能通路外部資源,比如collection、database,不能修改外部的值,但允許通路“scope”中的變量。emit的值不能大于16M,即document最大的尺寸,否則mongodb将會抛出錯誤。

function() { 
this.items.forEach(function(item) {emit(item.sku,1);}); //多次emit 
}

           
reduce

javascript方法,此方法接收key和values兩個參數,經過mapper處理和“歸并之後”,一個key将會對應一組values(分組,key:values),此values将會在reduce中進行**“聚合計算”,比如:sum、平均數、資料分揀等等。**

reducer和mapper一樣是封閉的,它内部不允許通路database、collection等外部資源,不能修改外部值,但可以通路“scope”中的變量;如果一個key隻有一個value,那麼mongodb就不會調用reduce方法。可能一個key對應的values條數很多,将會調用多次reduce,即前一次reduce的結果可能被包含在values中再次傳遞給reduce方法,這也要求,reduce傳回的結果需要和value的結構保持一緻。同樣,reduce傳回的資料尺寸不能大于8M(document最大尺寸的一半,因為reduce的結果可能會作為input再次reduce)。

//mapper 
function() { 
emit(this.categoryId,{'count' : 1}); 
}
//reducer 
function(key,values) { 
var current = {'count' : 0}; 
values.forEach(function(item) { current.count += item.count;}); 
return current; 
}

           

此外reduce内的算法需要是幂等的,且與輸入values的順序無關的,因為即使相同的input文檔,也無法保證map-reduce的每個過程都是逐位元組相同的,但應該確定計算的結果是一緻的。

out

document結構,包含一些配置選項;用于指定reduce的結果最終如何儲存。可以将結果以inline的方式直接輸出(cursor),或者寫入一個collection中。

out : { 
<action> : <collectionName> 
[,db:<dbName>] 
[,sharded:<boolean>] 
[,nonAtomic:<boolean>]

           

out方式預設為inline,即不儲存資料,而是傳回一個cursor,用戶端直接讀取資料即可。

action

表示如果儲存結果的已經存在時,将如何處理:

  • replace:替換,替換原collection中的内容;先将資料儲存在臨時collection,此後rename,再将舊collection删除
  • merge:将結果與原有内容合并,如果原有文檔中持有相同的key(即_id字段),則直接覆寫原值
  • reduce:将結果與原有内容合并,如果原有文檔中有相同的key,則将新值、舊值合并後再次應用reduce方法,并将得到的值覆寫原值(對于“使用者留存”、“資料增量統計”非常有用)。
db

結果資料儲存在哪個database中,預設為目前db;開發者可能為了進一步使用資料,将統計結果統一放在單獨的database中

sharded

輸出結果的collection将使用sharding模式,使用_id作為shard key;不過首先需要開發者對

所在的database開啟sharding,否則将無法執行。

nonAtomic

“非原子性”,僅對“merge”和“replace”有效,控制output collection,預設為false,即“原子性”; 即mapreduce在輸出階段将會對output collection所在的資料庫加鎖,直到輸出結束,可能性能會有影響;

如果為true,則不會對db加鎖,其他用戶端可以讀取到output collection的中間狀态資料。我們通常将ouput collection單獨放在一個db中,和application資料分離開,而且nonAtomic為false,我們也不希望使用者讀到“中間狀态資料”。

可以通過指定“out:{inline : 1}”将輸出結果儲存在記憶體中,并傳回一個cursor,用戶端可以直接讀取即可。

query

篩選文檔,隻需要将符合條件的documents傳遞給mapper

sort

對刷選之後的文檔排序,然後才傳遞給mapper。如果根據map的key進行排序,則可以減少

reduce的操作次數。排序必須能夠使用index。

limit

限定輸入到map的文檔條數

finalize

終結操作,在輸出之前調整reduce的結果。它和map、reduce一樣,也是一個javascript方法,接收key和value,其中value為reduce輸出結果,finalize方法中可以修改value的值作為最終的輸出結果:

function(key,value) { 
var final = {count : 0,key:""}; 
final.key = key; 
return final; 
}
           
scope

document結構,儲存一些global級别的變量值,它們可以在map、reduce、finalize中被通路。

jsMode

可選值為true或者false;表示是否将map執行的中間結果資料由javascript對象轉換成BSON對象,預設為false。

  • false表示,在mapper中emit最終輸出的是javascript對象,因為是javascript引擎處理的,不過mapper可能産生大量的資料,這些資料将會被儲存在臨時的存儲中(collection),是以需要将javascript對象轉換成BSON;在reduce階段,這些BSON結果再被轉換成javascript對象,傳遞給reduce方法,轉換意味着性能消耗和慢速,它解決的問題就是“臨時存儲”以适應較大資料集的資料分析。
  • 如果為true,将不會進行類型轉換,資料被暫存在記憶體中,reduce階段直接使用mapper的結果即可,但是key的個數不能超過50W個。在production環境中,此值建議為false。

mongo特性、 搭建 、 springboot 、 索引調優 、 explain分析工具、索引設計,進階特性: geo 、 聚合查詢 、 叢集,mongodbshell

一些練習

// 篩選人數大于10萬的文檔

match = {

KaTeX parse error: Expected '}', got 'EOF' at end of input: match: {pop: {gt: 10}}

}

pipeline = [match]

db.zips.aggregate(pipeline);

// 不顯示id字段,如果人數pop大于1000,則修改loc地理位置為[0,0],否則顯示原位置資訊?

project = {

KaTeX parse error: Expected '}', got 'EOF' at end of input: … city: {literal: 1},

state: 1,

pop: 1,

newPop: “$pop”,

loc: {

KaTeX parse error: Expected '}', got 'EOF' at end of input: … if: {gt: [“KaTeX parse error: Expected 'EOF', got '}' at position 12: pop", 1000]}̲, …loc”

}

}

}

}

pipeline = [project]

db.zips.aggregate(pipeline);

// 計算美國每個州(state)的總人數(totalPop),平均人數(avgPop),數量(count) 并将每個州包含的城市(city)輸出到cities字段中?

group = {

KaTeX parse error: Expected '}', got 'EOF' at end of input: … _id: "state",

totalPop: { s u m : " sum: " sum:"pop"},

avgPop: { a v g : " avg: " avg:"pop"},

count: {KaTeX parse error: Expected 'EOF', got '}' at position 7: sum: 1}̲, citie…addToSet: “$city”}

}

}

pipeline = [group]

db.zips.aggregate(pipeline);

// 根據state進行升序排序,state相同的按照pop進行降序排序

sort = {

$sort: {

state: 1,

pop: -1

}

}

pipeline = [sort]

db.zips.aggregate(pipeline);

// 輸出前5個文檔

limit = {

$limit: 5

}

pipeline = [limit]

db.zips.aggregate(pipeline);

// 跳過前5個文檔

skip = {

$skip: 5

}

pipeline = [skip]

db.zips.aggregate(pipeline);

// 統計文檔的數量

count = {

$count: “totalCount”

}

pipeline = [count]

db.zips.aggregate(pipeline);

// 将loc數組中每個元素輸出一個文檔

match = {

$match: {

_id: “01001”

}

}

unwind = {

KaTeX parse error: Expected '}', got 'EOF' at end of input: … path: "loc",

includeArrayIndex: “loc_index”,

preserveNullAndEmptyArrays: true

}

}

pipeline = [match, unwind]

db.zips.aggregate(pipeline);

// 按州進行分組計數

sortByCount = {

s o r t B y C o u n t : " sortByCount: " sortByCount:"state"

}

pipeline = [sortByCount]

db.zips.aggregate(pipeline);

// 按州進行分組計數

function map() {

// this 代表的是集合中每一個文檔

key = this.state

value = {count: 1}

emit(key, value);

}

//{“MA”, {count: 1}}

//{“MA”, {count: 1}}

//{“MA”, {count: 1}}

//{“MA”, {count: 1}}

function reduce(key, values) {

// key = “MA”,

// values = [{count: 1}, {count: 1}, {count: 3}]

var totalCount = 0;

values.forEach(function(obj){

totalCount += obj.count

});

value = {count: totalCount};

return value;// {count: 3}

}

optionsOrOutString = {

out: “result”,

query: {state: {$in: [“MA”, “IN”, “DE”]}},

sort: {state: 1},

limit: 9000,

finalize: function(key, value) {

return custom_var + value.count;

},

scope: {custom_var: "val = "},

jsMode: true,

verbose: true

}

db.zips.mapReduce(map, reduce, optionsOrOutString);

db.result.find();

// 按州進行分組計數

match = {

KaTeX parse error: Expected '}', got 'EOF' at end of input: …atch: {state: {in: [“IN”, “DE”]}}

}

group = {

KaTeX parse error: Expected '}', got 'EOF' at end of input: …oup: { _id: "state",

count: {$sum: 1}

}

}

pipeline = [match, group]

db.zips.aggregate(pipeline);