如果需要進行資料分析,那麼可以使用MongoDB的聚合架構,可以對一個或多個集合中的文檔進行分析。聚合架構基于管道的概念。使用聚合管道可以從 MongoDB 集合擷取輸入,并将該集合中的文檔傳遞到一個或多個階段,每個階段對其輸入執行不同的操作。每個階段都将之前階段輸出的内容作為輸入。所有階段的輸入和輸出都是文檔——可以稱為文檔流。

如果你熟悉 Linux shell 中的管道,比如 bash,那麼這是一個非常相似的概念。每個階段都有其特定的工作。它會接收特定形式的文檔并産生特定的輸出,該輸出本身就是文檔流。可以在管道的終點對輸出進行通路,這與執行 find 查詢的方式非常相似。也就是說,我們擷取一個文檔流,然後對其做一些處理,無論是建立某種類型的報告、生成一個網站,還是其他類型的任務。
現在來更深入地研究各個階段。在聚合管道中,一個階段就是一個資料處理單元。它一次接收一個輸入文檔流,一次處理一個文檔,并且一次産生一個輸出文檔流。
每個階段都會提供一組旋鈕或可調參數(tunables),可以通過控制它們來設定該階段的參數,以執行任何感興趣的任務。一個階段會執行某種類型的通用任務,我們會為正在使用的特定集合以及希望該階段如何處理這些文檔設定階段的參數。這些可調參數通常采用運算符的形式,可以使用這些運算符來修改字段、執行算術運算、調整文檔形狀、執行某種累加任務或其他各種操作。
特别注意:通常,我們希望在單個管道中包含多個相同類型的階段。例如,我們可能希望執行一個初始過濾器,這樣就不必将整個集合都傳遞到管道中了。稍後,在進行一些其他處理之後,我們可能希望應用一系列不同的條件進一步進行過濾。
概括來說,管道是與 MongoDB 集合一起使用的。它們由階段組成,每個階段對其輸入執行不同的資料處理任務,并生成文檔以作為輸出傳遞到下一個階段。最終,在處理結束時,管道會産生一些輸出,這些輸出可以用來在應用程式中執行某些操作,或者被發送到某個集合以供後續使用。在許多情況下,為了執行所需的分析,我們會在單個管道中包含多個相同類型的階段。
先看一個最簡單的聚合管道示例:
db.orders.insertMany( [
{ _id: 0, name: "Pepperoni", size: "small", price: 19,quantity: 10, date: ISODate( "2021-03-13T08:14:30Z" ) },
{ _id: 1, name: "Pepperoni", size: "medium", price: 20,quantity: 20, date : ISODate( "2021-03-13T09:13:24Z" ) },
{ _id: 2, name: "Pepperoni", size: "large", price: 21,quantity: 30, date : ISODate( "2021-03-17T09:22:12Z" ) },
{ _id: 3, name: "Cheese", size: "small", price: 12,quantity: 15, date : ISODate( "2021-03-13T11:21:39.736Z" ) },
{ _id: 4, name: "Cheese", size: "medium", price: 13,quantity:50, date : ISODate( "2022-01-12T21:23:13.331Z" ) },
{ _id: 5, name: "Cheese", size: "large", price: 14,quantity: 10, date : ISODate( "2022-01-12T05:08:13Z" ) },
{ _id: 6, name: "Vegan", size: "small", price: 17,quantity: 10, date : ISODate( "2021-01-13T05:08:13Z" ) },
{ _id: 7, name: "Vegan", size: "medium", price: 18,quantity: 10, date : ISODate( "2021-01-13T05:10:13Z" ) },
{ _id: 8, name: "Vegan", size: "medium", price: 18,quantity: 10, date : ISODate( "2021-01-13T05:14:13Z" ) }
] )
db.orders.aggregate( [
// Stage 1: 隻查詢medium大小的 pizza
{
$match: { size: "medium" }
},
// Stage 2: 按照name分組并且對quantity求和
{
$group: { _id: "$name", totalQuantity: { $sum: "$quantity" } }
}
] )
1.階段入門(stages):常見操作
為了開發聚合管道,我們将研究如何建構一些管道,其中包含你已經熟悉的操作。下面會介紹比對(match)、投射(project)、排序(sort)、跳過(skip)和限制(limit)這 5個階段。
要完成這些聚合示例,需要使用一個公司的資料集合。該集合中有許多字段,這些字段指定了有關公司的詳細資訊,比如公司名稱、公司簡介以及公司成立的時間。還有一些字段描述了公司已進行的數輪融資、公司的重要裡程碑,以及該公司是否進行了首次公開發行(IPO),如果是,那麼其 IPO 的詳細情況是什麼。下面是一個包含 Facebook公司資料的示例文檔:
db.companies.insert(
{
"name": "Facebook",
"category_code": "social",
"founded_year": 2004,
"description": "Social network",
"funding_rounds": [{
"id": 4,
"round_code": "b",
"raised_amount": 27500000,
"raised_currency_code": "USD",
"funded_year": 2006,
"investments": [
{
"company": null,
"financial_org": {
"name": "Greylock Partners",
"permalink": "greylock"
},
"person": null
},
{
"company": null,
"financial_org": {
"name": "Meritech Capital Partners",
"permalink": "meritech-capital-partners"
},
"person": null
},
{
"company": null,
"financial_org": {
"name": "Founders Fund",
"permalink": "founders-fund"
},
"person": null
},
{
"company": null,
"financial_org": {
"name": "SV Angel",
"permalink": "sv-angel"
},
"person": null
}
]
},
{
"id": 2197,
"round_code": "c",
"raised_amount": 15000000,
"raised_currency_code": "USD",
"funded_year": 2008,
"investments": [
{
"company": null,
"financial_org": {
"name": "European Founders Fund",
"permalink": "european-founders-fund"
},
"person": null
}
]
}],
"ipo": {
"valuation_amount": NumberLong("104000000000"),
"valuation_currency_code": "USD",
"pub_year": 2012,
"pub_month": 5,
"pub_day": 18,
"stock_symbol": "NASDAQ:FB"
}
}
)
(1).對 2004 年成立的所有公司進行簡單的過濾
db.companies.aggregate([
{$match: {founded_year: 2004}},
])
這相當于使用 find 執行以下操作:
db.companies.find({founded_year: 2004})
現在在管道中添加一個投射階段來将每個文檔的輸出減少到幾個字段。排除 "_id" 字段,但将 "name" 字段和 "founded_year" 字段包含在内。管道如下所示:
db.companies.aggregate([
{ $match: { founded_year: 2004 } },
{
$project: {
_id: 0,
name: 1,
founded_year: 1
}
}
])
運作聚合查詢時調用的方法。要進行聚合,就需要傳入一個聚合管道。管道是一個以文檔為元素的數組。每個文檔必須規定一個特定的階段運算符。本例中使用了包含兩個階段的管道: 一個是用于過濾的比對階段,另一個是投射階段。在投射階段中,每個文檔的輸出被限制為隻有兩個字段。
比對階段會對集合進行過濾,并将結果文檔一次一個地傳遞到投射階段。然後投射階段會執行其操作,調整文檔形狀,并從管道中将輸出傳遞回來。
(2).把結果集限制為 5,然後投射出想要的字段,為簡單起見,将輸出限制為每個公司的名稱
db.companies.aggregate([
{$match: {founded_year: 2004}},
{$limit: 5},
{$project: {
_id: 0,
name: 1}}
])
注意,建構的這條管道已在投射階段之前進行限制。如果先運作投射階段,然後再進行限制,那麼就像下面的查詢一樣,将得到完全相同的結果,但這樣就必須在投射階段傳遞數百個文檔,最後才能将結果限制為 5 個。
db.companies.aggregate([
{$match: {founded_year: 2004}},
{$project: {
_id: 0,
name: 1}},
{$limit: 5}
])
無論 MongoDB 查詢規劃器在給定版本中進行何種類型的優化,都應該始終注意聚合管道的效率。確定在建構管道時限制從一個階段傳遞到另一個階段的文檔數量。
2.表達式
在建構聚合管道時,了解可以使用的不同類型的表達式是很重要的。聚合架構支援許多表達式類型。具體如下:
(1).布爾表達式允許使用 AND、OR 和 NOT。
(2).集合表達式允許将數組作為集合來處理。特别地,可以取兩個或多個集合的交集或并集,也可以取兩個集合的內插補點并執行一些其他的集合運算。
(3).比較表達式能夠表達許多不同類型的範圍過濾器
(4).算術表達式能夠計算上限(ceiling)、下限(floor)、自然對數和對數,以及執行簡單的算術運算,比如乘法、除法、加法和減法。甚至可以執行更複雜的運算,比如計算值的平方根。
(5).字元串表達式允許連接配接、查找子字元串,以及執行與大小寫和文本搜尋相關的操作。
(6).數組表達式為操作數組提供了強大的功能,包括過濾數組元素、對數組進行分割或從特定數組中擷取某一個範圍的值。
(7).變量表達式這類表達式允許處理文字、解析日期值及條件表達式。
(8).累加器提供了計算總和、描述性統計和許多其他類型值的能力。
3.
$project
(1).首先看一下如何提取嵌套字段。在以下管道中進行一個比對操作
我們正在篩選 Greylock Partners 參與融資的所有公司。permalink 值為 "greylock",它是此類文檔的唯一辨別符。
db.companies.aggregate([
{ $match: { "funding_rounds.investments.financial_org.permalink": "greylock" } },
{
$project: {
_id: 0,
name: 1,
ipo: "$ipo.pub_year",
valuation: "$ipo.valuation_amount",
funders: "$funding_rounds.investments.financial_org.permalink"
}
}
]).pretty()
在輸出中,每個文檔都有一個 "name" 字段和 "funders" 字段。對于那些已經進行過 IPO的公司,"ipo" 字段包含公司上市的年份,"valuation" 字段包含公司在 IPO 時的估值。
注意,在所有文檔中,這些都是頂級字段,這些字段的值是從嵌套的文檔和數組中提升上來的。你可能已經注意到 funders 顯示出了多個值。實際上,我們看到的是數組的數組。
4.
$unwind
在聚合管道中處理數組字段時,通常需要包含一個或多個展開(unwind)階段。這允許我們将指定數組字段中的每個元素都形成一個輸出文檔,如下圖所示:
在上圖有一個輸入文檔,它有 3 個鍵及其相應的值。第三個鍵的值是一個包含 3 個元素的數組。如果在這種類型的輸入文檔中運作 $unwind,并配置為展開 key3 字段,那麼将生成類似如上圖下部所示的文檔。這點可能不太直覺,在每個輸出文檔中都會有一個key3 字段,但是該字段包含的是一個值而不是數組,并且該數組中的每個元素都将有一個單獨的文檔。換句話說,如果數組中有 10 個元素,則展開階段将生成 10 個輸出文檔。
(1).回到 companies 的例子,看看展開階段的使用
首先先看看沒有使用unwind的資料
db.companies.aggregate([
{$match: {"funding_rounds.investments.financial_org.permalink": "greylock"} },
{$project: {
_id: 0,
name: 1,
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
}}
])
該查詢生成了同時具有 "amount" 數組和 "year" 數組的文檔,因為我們正在通路 "funding_rounds" 數組中每個元素的 "raised_amount" 字段和 "funded_year" 字段。
為了解決這個問題,可以在聚合管道中的投射階段之前包含一個展開階段,并通過指定應該展開的 "funding_rounds" 數組來參數化這個階段
以下是更新後的聚合查詢:
db.companies.aggregate([
{ $match: {"funding_rounds.investments.financial_org.permalink": "greylock"} },
{ $unwind: "$funding_rounds" },
{ $project: {
_id: 0,
name: 1,
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
} }
])
5.數組表達式
(1).現在把注意力轉向數組表達式。我們會嘗試在投射階段中使用數組表達式,這個是需要深入研究的。首先要介紹的是過濾器表達式。過濾器表達式根據過濾條件選擇數組中的元素子集。再次使用 companies 資料集,用相同的條件比對 Greylock 參與的融資輪。下面看一下這個管道中的 rounds 字段:
db.companies.aggregate([
{ $match: { "funding_rounds.investments.financial_org.permalink": "greylock" } },
{
$project: {
_id: 0,
name: 1,
founded_year: 1,
rounds: {
$filter: {
input: "$funding_rounds",
as: "round",
cond: { $gte: ["$$round.raised_amount", 20000000] }
}
}
}
},
{ $match: { "rounds.investments.financial_org.permalink": "greylock" } },
]).pretty()
rounds 字段使用了一個過濾器表達式。 $filter 運算符用來處理數組字段,并指定必須提供的選項。
$filter
的第一個選項是 input。對于 input,隻需為其指定一個數組。本例使用了一個字段路徑說明符來辨別在 companies 集合的文檔中找到的 "funding_rounds" 數組。接下來指定這個 "funding_rounds" 數組在過濾器表達式的其餘部分中使用的名稱。然後,作為第三個選項,需要指定一個條件。這個條件應該提供用于過濾作為輸入的任何數組的條件,選擇一個子集。在本例中,所過濾的是隻選擇那些 "funding_rounds" 的"raised_amount" 大于或等于 20000000 的元素。
(2).
$arrayElemAt
運算符允許選擇數組中特定位置的元素。下面的管道提供了一個使用
$arrayElemAt
的例子
> db.companies.aggregate([
... { $match: { "founded_year": 2004 } },
... { $project: {
... _id: 0,
... name: 1,
... founded_year: 1,
... first_round: { $arrayElemAt: [ "$funding_rounds", 0 ] },
... last_round: { $arrayElemAt: [ "$funding_rounds", -1 ] }
... } }
... ]).pretty()
{
"name" : "Facebook",
"founded_year" : 2004,
"first_round" : {
"id" : 4,
"round_code" : "b",
"raised_amount" : 27500000,
"raised_currency_code" : "USD",
"funded_year" : 2006,
"investments" : [
{
"company" : null,
"financial_org" : {
"name" : "Greylock Partners",
"permalink" : "greylock"
},
"person" : null
},
{
"company" : null,
"financial_org" : {
"name" : "Meritech Capital Partners",
"permalink" : "meritech-capital-partners"
},
"person" : null
},
{
"company" : null,
"financial_org" : {
"name" : "Founders Fund",
"permalink" : "founders-fund"
},
"person" : null
},
{
"company" : null,
"financial_org" : {
"name" : "SV Angel",
"permalink" : "sv-angel"
},
"person" : null
}
]
},
"last_round" : {
"id" : 2197,
"round_code" : "c",
"raised_amount" : 15000000,
"raised_currency_code" : "USD",
"funded_year" : 2008,
"investments" : [
{
"company" : null,
"financial_org" : {
"name" : "European Founders Fund",
"permalink" : "european-founders-fund"
},
"person" : null
}
]
}
}
>
注意在投射階段中使用
$arrayElemAt
的文法。這裡定義了一個想要投射出來的字段,并指定了一個文檔,以
$arrayElemAt
作為字段名,以一個雙元素數組作為值。第一個元素應該是一個字段路徑,用于指定要從中選擇的數組字段。第二個元素辨別了數組中的位置。記住數組是從 0 開始索引的。
在許多情況下,數組的長度不容易獲得。選擇從數組末尾開始的數組位置可以使用負整數。數組中的最後一個元素用 -1 辨別。
(3).與
$arrayElemAt
相關的是
$slice
表達式,其允許在數組中從一個特定的索引開始按順序傳回多個元素
> db.companies.aggregate([
... { $match: { "founded_year": 2004 } },
... { $project: {
... _id: 0,
... name: 1,
... founded_year: 1,
... early_rounds: { $slice: [ "$funding_rounds", 1, 3 ] }
... } }
... ]).pretty()
{
"name" : "Facebook",
"founded_year" : 2004,
"early_rounds" : [
{
"id" : 2197,
"round_code" : "c",
"raised_amount" : 15000000,
"raised_currency_code" : "USD",
"funded_year" : 2008,
"investments" : [
{
"company" : null,
"financial_org" : {
"name" : "European Founders Fund",
"permalink" : "european-founders-fund"
},
"person" : null
}
]
}
]
}
>
(4).過濾和選擇數組的單個元素或片段是對數組執行的常見操作之一。然而,最常見的操作可能是确定數組的大小或長度。可以使用 $size 運算符執行此操作
db.companies.aggregate([
{ $match: { "founded_year": 2004 } },
{ $project: {
_id: 0,
name: 1,
founded_year: 1,
total_rounds: { $size: "$funding_rounds" }
} }
]).pretty()
在投射階段中使用時,
$size
表達式隻是簡單地提供了一個值,即數組中的元素個數。
6.累加器
累加器本質上是另一種類型的表達式,因為它的值是從多個文檔中的字段計算得來的。
在投射階段使用累加器,下面從一個在投射階段使用累加器的例子開始。注意,比對階段用于過濾包含 "funding_rounds" 字段且 funding_rounds 數組不為空的文檔:
db.companies.aggregate([
{ $match: { "funding_rounds": { $exists: true, $ne: [ ]} } },
{ $project: {
_id: 0,
name: 1,
largest_round: { $max: "$funding_rounds.raised_amount" }
} }
])
再舉一個例子,使用
$sum
累加器來計算集合中每個公司的總資金:
db.companies.aggregate([
{ $match: { "funding_rounds": { $exists: true, $ne: [ ]} } },
{ $project: {
_id: 0,
name: 1,
total_funding: { $sum: "$funding_rounds.raised_amount" }
}
}
])
7.分組簡介
(1).分組階段中的_id字段
db.companies.aggregate([
{ $match: { founded_year: { $gte: 2000 } } },
{
$group: {
_id: { founded_year: "$founded_year" },
companies: { $push: "$name" }
}
},
{ $sort: { "_id.founded_year": 1 } }
]).pretty()
在輸出的文檔中有兩個字段:"_id" 和 "companies"。每個文檔都包含一個在 "founded_year" 内成立的公司清單,"companies" 是由公司名稱組成的數組。
在某些情況下可能需要使用另一種方法,其中 _id 的值是由多個字段組成的文檔。
db.companies.aggregate([
{ $match: { founded_year: { $gte: 2000 } } },
{
$group: {
_id: { founded_year: "$founded_year", category_code: "$category_code" },
companies: { $push: "$name" }
}
},
{ $sort: { "_id.founded_year": 1 } }
]).pretty()
當分組階段在其輸入流中處理文檔時,
$push
表達式會将結果的值添加到其在運作過程中所建構的數組中。在前面的管道中,分組階段建立了一個由公司名稱組成的數組。
db.companies.aggregate([
{
$group: {
_id: { ipo_year: "$ipo.pub_year" },
companies: { $push: "$name" }
}
},
{ $sort: { "_id.ipo_year": 1 } }
]).pretty()
在看一個完整的例子:
db.companies.aggregate([
{ $match: { "relationships.person": { $ne: null } } },
{ $project: { relationships: 1, _id: 0 } },
{ $unwind: "$relationships" },
{
$group: {
_id: "$relationships.person",
count: { $sum: 1 }
}
},
{ $sort: { count: -1 } }
])
(2).分組與投射
db.companies.aggregate([
{ $match: { funding_rounds: { $ne: [] } } },
{ $unwind: "$funding_rounds" },
{
$sort: {
"funding_rounds.funded_year": 1,
"funding_rounds.funded_month": 1,
"funding_rounds.funded_day": 1
}
},
{
$group: {
_id: { company: "$name" },
funding: {
$push: {
amount: "$funding_rounds.raised_amount",
year: "$funding_rounds.funded_year"
}
}
}
},
]).pretty()
這裡,首先将 funding_rounds 數組不為空的文檔過濾出來,然後展開 funding_rounds。這樣,每個公司的 funding_rounds 數組中的每個元素在排序階段和分組階段都會有一個文檔。這個管道中的排序階段按照年、月、日進行排序,全部都是升序。這意味着,這一階段會首先輸出最早的幾輪融資。在排序之後的分組階段,根據公司名稱進行分組,并使用
$push
累加器來構造排序後的融資輪數組。由于在排序階段已經對所有融資輪進行了全局排序,是以每個公司的 funding_rounds 數組都是排好序的。
在這個管道中使用了
$push
來生成一個數組。本例指定了
$push
表達式将文檔添加到數組的末尾。由于各輪融資都是按時間順序進行的,是以将其排在末尾可以保證每家公司的融資金額是按時間順序進行排序的。
$push
表達式僅适用于分組階段。這是因為分組階段被設計成了接受文檔的輸入流,并通過依次處理每個文檔來對值進行累加操作。另外,投射階段會單獨處理輸入流中的每個文檔。
再看另一個例子。這個管道有點兒長,但它其實是建立在前面例子基礎上的:
db.companies.aggregate([
{ $match: { funding_rounds: { $exists: true, $ne: [] } } },
{ $unwind: "$funding_rounds" },
{
$sort: {
"funding_rounds.funded_year": 1,
"funding_rounds.funded_month": 1,
"funding_rounds.funded_day": 1
}
},
{
$group: {
_id: { company: "$name" },
first_round: { $first: "$funding_rounds" },
last_round: { $last: "$funding_rounds" },
num_rounds: { $sum: 1 },
total_raised: { $sum: "$funding_rounds.raised_amount" }
}
},
{
$project: {
_id: 0,
company: "$_id.company",
first_round: {
amount: "$first_round.raised_amount",
article: "$first_round.source_url",
year: "$first_round.funded_year"
},
last_round: {
amount: "$last_round.raised_amount",
article: "$last_round.source_url",
year: "$last_round.funded_year"
},
num_rounds: 1,
total_raised: 1,
}
},
{ $sort: { total_raised: -1 } }
]).pretty()
同樣,還是展開 funding_rounds 并按照時間排序。然而,本例并未将 funding_rounds 作為數組累積到一起,而是使用了兩個尚未介紹過的累加器: first和last 。
$first
表達式隻是儲存通過輸入流傳入階段的第一個值。
$last
表達式則會跟蹤所有傳入分組階段的值并保留最後一個。
與
$push
一樣, last 是不能在投射階段使用的,因為投射階段的目的并不是基于經過的多個文檔來對值進行累加。相反,它們是用來調整單個文檔形狀的。
除了 last ,本例還使用了 sum 表達式用來計算它在每個分組中所看到的文檔數量。
最後,這個管道包含了一個相當複雜的投射階段。然而,它真正的作用隻是讓輸出變得更美觀。這個投射階段既沒有展示 first_round 的值,也沒有展示首輪和末輪融資的整個文檔,而是建立了一個摘要。注意,這種做法維護了良好的語義,因為每個值都被清楚地進行了标記。對于 first_round,我們将生成一個簡單的内嵌文檔,其中隻包含金額、年份等基本細節,這些值是從原始的融資輪文檔中提取出來的,并最終形成了
$first_round
。投射階段中的
$last_round
也做了類似的操作。最後,此投射階段将基于輸入文檔計算出的 num_rounds 值和 total_raised 值傳遞到輸出文檔。