我們前面採集的日誌數據已經保存到 Kafka 中,作為日誌數據的 ODS 層,從 Kafka 的ODS 層讀取的日誌數據分為 3 類, 頁面日誌、啟動日誌和曝光日誌。這三類數據雖然都是用戶行為數據,但是有著完全不一樣的數據結構,所以要拆分處理。將拆分後的不同的日誌寫回 Kafka 不同主題中,作為日 ...
前言、官方文檔、MongoTemplate中的概念
前言
最近在做基於SpringBoot的MongoDB的聚合管道操作,JSON語句不難寫,但是理清楚邏輯、順序很麻煩,而且在Java(Springboot)上操作聚合管道,部分操作符的使用不清楚,加之網上可以參考的示例很零散,很多不夠直觀全面。
所以在翻閱了官方文檔和一些個人分享的技術文章後,自己做了測試驗證,彙總了這篇筆記,分享一下基於SpringBoot的MongoDB的聚合管道操作。
主要是聚焦於理解MongoDB Template提供的兩種實現聚合管道的操作,重點基於$group,$lookup, $unwind, $facet, 這幾個操作符,實際代碼中也有涉及到$match, $count, $sortByCount的使用。
同時梳理一下MongoDB template中的幾個定義(見“MongoTemplate中的概念”),希望有助於大家。
請大家配合標題導航食用,風味更佳~
禁止轉載!!!!!!
官方文檔
https://www.mongodb.com/docs/manual/reference/operator/aggregation/facet/
MongoTemplate中的概念
- MongoTemplate:官方提供的操作MongoDB的對象。位於: org.springframework.data.mongodb.core。 使用的時候,需要註入。
- Query:用於創建查詢條件的對象。 位於:package org.springframework.data.mongodb.core.query。 使用時一般需要傳入如"Criteria"構建的查詢條件。
- Criteria: 構建具體查詢條件的對象,和Query位於同個包下。
- AggregationOperation:聚合管道的操作對象,這是適用於Aggregate Pipeline Stages的操作,比如$group/$lookup/$unwind/$sort.......使用的時候,需要先構建對應的聚合操作,比如$group(需要構建具體操作), 可以創建多個,最後一併傳入到Aggregation對象中,再交給template去執行管道聚合。 位於:
- Aggregation:Pipeline stage的集合,也就是上面AggregationOperation的集合,把上面的所有聚合操作存在一起,template調用aggregate方法的時候,傳入該對象。
- 以上類位於 package org.springframework.data.mongodb.core.aggregation;
- Aggregates: Pipeline stage操作對象。 和Aggregation有幾乎一樣的功能,但是會更加靈活,一般除了預先提供的操作符,還可以自己傳入Bson操作對象去靈活實現。 整體的使用難度,比Aggregation可能高一些。
- Bson、BsonDocument、BsonField: Bson我理解就是靈活的表達式,查詢條件、聚合操作符之類的構建定義,都可以由它接收,並最後傳給template的aggregate方法去執行聚合操作。BsonDocument則是Bson的具體實現,用於靈活構建表達式的對象。 關於這部分,具體可以往下看。BsonField也是構建靈活的聚合表達式的一個類,比如快速地定義{"count": { $sum: 1 } ,作為聚合操作的一部分傳入到具體的聚合階段中。
- 以上類位於 package com.mongodb.client.model; Bson/BsonDocument則是另外的包中。org.bson中。感興趣自行去源碼中查找。
開發環境和參考文檔
JDK1.8 + Maven
SpringBoot(Springboot-starter-parent): 2.7.5
Mongodb(spring-boot-starter-data-mongodb) 4.6.1
參考文檔:
http://www.mydlq.club/article/85/#1maven-%E5%BC%95%E5%85%A5%E7%9B%B8%E5%85%B3%E4%BE%9D%E8%B5%96
https://learnku.com/articles/61052
參考了以上網友的分享案例。
代碼和案例
$count 和$match操作符
官方定義
$count: Passes a document to the next stage that contains a count of the number of documents input to the stage.
https://www.mongodb.com/docs/manual/reference/operator/aggregation/count/
就是統計當前stage(聚合管道操作的階段)存在的文檔數量。
$match: Filters the documents to pass only the documents that match the specified condition(s) to the next pipeline stage.
過濾符合條件的數據到下個pipeline stage。
語法
{ $count: <string> // 這裡的名稱隨便寫,最後顯示出來的結果就是 xxx : 總數 }
// match語法
{ $match: { <query> } } // 就是傳入查詢語句Json格式
在MongoDB中操作的官方示例
// 數據
{ "_id" : 1, "subject" : "History", "score" : 88 }
{ "_id" : 2, "subject" : "History", "score" : 92 }
{ "_id" : 3, "subject" : "History", "score" : 97 }
{ "_id" : 4, "subject" : "History", "score" : 71 }
{ "_id" : 5, "subject" : "History", "score" : 79 }
{ "_id" : 6, "subject" : "History", "score" : 83 }
// 執行
db.scores.aggregate(
// 先用match查找匹配的文檔,然後直接用count統計當前match階段存在的文檔數量。
[{$match: {score: {$gt: 80}}},
{$count: "passing_scores" // 這裡的passing_scores 也可以是其他任意名稱
}]
)
// 返回結果
{ "passing_scores" : 4 }
MongoTemplate中實現的Java代碼
數據參考上面官方示例。 以下分別是Aggregation和Aggregates的實現。 任意一種都可以。
/** * @Author zaoyu */ @Autowired private MongoTemplate mongoTemplate; private String DEMO_COLLECTION = "demo"; /** * 用Aggregates和Bson構建聚合操作對象,用預先生成的MongoCollection對象調用aggregate執行即可。 */ @Test public void testCountWithAggregates(){ MongoCollection<Document> collection = mongoTemplate.getCollection(DEMO_COLLECTION); // Aggregates提供各種操作符,返回一個Bson對象。這裡用match,然後用Filters來實現過濾條件的構建,也是返回一個Bson對象。 Bson matchBson = Aggregates.match(Filters.gt("score", 80)); // 直接用Aggregates的count方法,如果不傳自定義的名稱,預設用“count”接收。 Bson countBson = Aggregates.count("myCount"); // 構建一個List<Bson>, 並把每一個聚合操作Bson加進去,最後傳入aggregate方法中執行。 List<Bson> bsonList = new ArrayList<>(); bsonList.add(matchBson); bsonList.add(countBson); AggregateIterable<Document> resultList = collection.aggregate(bsonList); for (Document document : resultList) { System.out.println("result is :" + document); } } /** * 用Aggregation集合接收聚合操作,用MongoTemplate對象直接調用aggregate,傳入聚合操作集合、表名、映射對象。 */ @Test public void testCountWithAggregation(){ // 構建查詢match條件:分數大於80 MatchOperation matchOperation = Aggregation.match(Criteria.where("score").gt(80)); // 構建count操作,用“myCount”名稱接收 CountOperation countOperation = Aggregation.count().as("myCount"); // 傳入多個aggregation(聚合操作),用Aggregation對象接收。 Aggregation aggregation = Aggregation.newAggregation(matchOperation, countOperation); // 直接用mongoTemplate調用aggregate方法,傳入aggregation集合,表名,還有用什麼對象接收數據,這裡我用Document接收,不再建類。 AggregationResults<Document> resultList = mongoTemplate.aggregate(aggregation, DEMO_COLLECTION, Document.class); for (Document document : resultList) { System.out.println("result is :" + document); } }
// 以上2個方法的輸出結果一樣,如下。
result is :Document{{myCount=4}}
$group 操作符
官方定義
The $group stage separates documents into groups according to a "group key". The output is one document for each unique group key. A group key is often a field, or group of fields. The group key can also be the result of an expression. Use the _id field in the $group pipeline stage to set the group key.
大概意思就是把文檔做分組。 輸出的格式是一條數據有一個唯一的分組鍵。 這裡可以簡單類比mysql 的group by分組。
語法
{ $group: { _id: <expression>, // 用來分組的欄位 <field1>: { <accumulator1> : <expression1> }, // 對某欄位做處理 accumulator操作。 ... } }
在MongoDB中操作的官方示例
// 插入數據 db.sales.insertMany([ { "_id" : 1, "item" : "abc", "price" : NumberDecimal("10"), "quantity" : NumberInt("2"), "date" : ISODate("2014-03-01T08:00:00Z") }, { "_id" : 2, "item" : "jkl", "price" : NumberDecimal("20"), "quantity" : NumberInt("1"), "date" : ISODate("2014-03-01T09:00:00Z") }, { "_id" : 3, "item" : "xyz", "price" : NumberDecimal("5"), "quantity" : NumberInt( "10"), "date" : ISODate("2014-03-15T09:00:00Z") }, { "_id" : 4, "item" : "xyz", "price" : NumberDecimal("5"), "quantity" : NumberInt("20") , "date" : ISODate("2014-04-04T11:21:39.736Z") }, { "_id" : 5, "item" : "abc", "price" : NumberDecimal("10"), "quantity" : NumberInt("10") , "date" : ISODate("2014-04-04T21:23:13.331Z") }, { "_id" : 6, "item" : "def", "price" : NumberDecimal("7.5"), "quantity": NumberInt("5" ) , "date" : ISODate("2015-06-04T05:08:13Z") }, { "_id" : 7, "item" : "def", "price" : NumberDecimal("7.5"), "quantity": NumberInt("10") , "date" : ISODate("2015-09-10T08:43:00Z") }, { "_id" : 8, "item" : "abc", "price" : NumberDecimal("10"), "quantity" : NumberInt("5" ) , "date" : ISODate("2016-02-06T20:20:13Z") }, ])
// 執行group,這裡還加上了match 和 project和sort。 一併使用展示。
db.getCollection("sales").aggregate( // 第一個聚合管道:過濾出日期在2014-01-01到2015-01-01之間的數據 [ { $match : { "date": { $gte: new ISODate("2014-01-01"), $lt: new ISODate("2015-01-01") } } }, // 第二個聚合管道:處理一下日期格式,方便等下做group。 {$project: {quantity:1, price:1, myDate:{"$dateToString":{format: "%Y-%m-%d", date: "$date"}} }}, // 第三個聚合管道:分組統計,先按照日期分組,再統計每天的銷售數量。 {$group:{_id:"$myDate", perDayQuantity:{$sum:"$quantity"}, myCount: {$sum:1} }}, // 第四個聚合管道:按照每日銷售數量降序排序 {$sort:{"perDayQuantity":-1}} ])
MongoTemplate中實現的Java代碼
/**
* @Author zaoyu * Aggregation 實現match, group, sort。 */ @Test public void testGroupAggregations(){ // 第一階段,過濾查詢日期介於14-1-1~15-1-1之間的數據,用Aggregation實現類MatchOperation接收。 MatchOperation match = Aggregation.match(Criteria .where("date").gte(Instant.parse("2014-01-01T08:00:00.000Z")) .andOperator(Criteria.where("date").lte(Instant.parse("2015-01-01T08:00:00.000Z")))); // 第二階段,處理一下日期格式,方便等下做group,用ProjectionOperation接收,也是Aggregation的實現類。 ProjectionOperation project = Aggregation.project("quantity", "price") .andExpression("{\"$dateToString\":{format: \"%Y-%m-%d\", date: \"$date\"}}").as("myDate"); // 第三階段,分組統計,先按照日期分組,再統計每天的銷售數量。 GroupOperation group = Aggregation.group("myDate") .sum("quantity").as("perDayQuantity") // 這裡是計算文檔條數 .count().as("myCount"); // 第四階段,排序。按照perDayQuantity欄位升序展示。 SortOperation sort = Aggregation.sort(Sort.Direction.ASC, "perDayQuantity"); // 用newAggregation接收以上多個階段的管道聚合指令,執行,得到結果。 Aggregation aggregations =Aggregation.newAggregation(match, project, group, sort); AggregationResults<Document> resultList = mongoTemplate.aggregate(aggregations, SALES_COLLECTION, Document.class); for (Document document : resultList) { System.out.println("result is :" + document); } }
// 返回結果
result is :Document{{_id=2014-03-01, perDayQuantity=3, myCount=2}}
result is :Document{{_id=2014-03-15, perDayQuantity=10, myCount=1}}
result is :Document{{_id=2014-04-04, perDayQuantity=30, myCount=2}}
$unwind 操作符
官方定義
Deconstructs an array field from the input documents to output a document for each element. Each output document is the input document with the value of the array field replaced by the element.
大概意思就是把輸入文檔的數組欄位按元素一個個拆分出來,並和原來的數據一併形成一條新文檔輸出。 好比原來10條數據,其中每條數據都有長度為3的數組,那麼拆出來(在元素不重覆的情況下),會得到30條數據。
語法
{ $unwind: <field path> } 和 { $unwind: { path: <field path>, // path是固定名稱,沿用即可。 <field path> 是數組欄位,就是你要拆分的欄位(值得是一個數組,不然沒有意義) includeArrayIndex: <string>, preserveNullAndEmptyArrays: <boolean> } }
在MongoDB中操作的官方示例
// 插入數據 db.inventory.insertOne({ "_id" : 1, "item" : "ABC1", sizes: [ "S", "M", "L"] }) // 執行unwind操作,這裡是把 sizes欄位的數組拆分出來 db.inventory.aggregate( [ { $unwind : "$sizes" } ] ) // 執行結果 可以看到每條數據的sizes不再是list,而是具體的元素。 { "_id" : 1, "item" : "ABC1", "sizes" : "S" } { "_id" : 1, "item" : "ABC1", "sizes" : "M" } { "_id" : 1, "item" : "ABC1", "sizes" : "L" }
註意,如果要拆分的欄位是一個空數組或者null,那麼實際輸出的數據,不會包含那條數據。 如下示例。
// 插入多條數據,這裡還放了三條特殊數據,一個是空集合,一個是null,一個是沒有要拆分的欄位 sizes db.clothing.insertMany([ { "_id" : 1, "item" : "Shirt", "sizes": [ "S", "M", "L"] }, { "_id" : 2, "item" : "Shorts", "sizes" : [ ] }, { "_id" : 3, "item" : "Hat", "sizes": "M" }, { "_id" : 4, "item" : "Gloves" }, { "_id" : 5, "item" : "Scarf", "sizes" : null } ]) // 執行$unwind db.clothing.aggregate( [ { $unwind: { path: "$sizes" } } ] ) // 返回結果, 可以看到,sizes值為空數組和null的id=2以及id=5的數據都沒有展示出來,同時沒有該欄位的id=4,也沒有展示出來。 { _id: 1, item: 'Shirt', sizes: 'S' }, { _id: 1, item: 'Shirt', sizes: 'M' }, { _id: 1, item: 'Shirt', sizes: 'L' }, { _id: 3, item: 'Hat', sizes: 'M' }
MongoTemplate中實現的Java代碼
以下分別是Aggregation和Aggregates的實現
/** * @Author zaoyu * Aggregation 實現$unwind */ @Test public void testUnwindAggregations() { String CLOTHING_COLLECTION = "clothing"; // 調用Aggregation中的unwind的聚合操作符 UnwindOperation unwind = Aggregation.unwind("sizes"); // 用newAggregation接收管道聚合指令,執行,得到結果。 Aggregation aggregations =Aggregation.newAggregation(unwind); // mongoTemplate 直接調用aggregate方法,傳入Aggregation對象,基於的表,映射類(這裡簡單化,我用Document) AggregationResults<Document> resultList = mongoTemplate.aggregate(aggregations, CLOTHING_COLLECTION, Document.class); for (Document document : resultList) { System.out.println("result is :" + document); } } /** * @Author zaoyu * Aggregates/Bson 實現$unwind */ @Test public void testUnwindAggregates(){ String CLOTHING_COLLECTION = "clothing"; // 調用Aggregates的unwind聚合操作符 註意,Aggregates這裡需要傳入$ Bson unwindBson = Aggregates.unwind("$sizes"); // 建一個List<Bson> 把unwindBson傳進去 List<Bson> bsonList = new ArrayList<>(); bsonList.add(unwindBson); // mongoTemplate先獲得對應的collection對象,然後調用aggregate,傳入List<Bson> 獲得結果 MongoCollection<Document> collection = mongoTemplate.getCollection(CLOTHING_COLLECTION); AggregateIterable<Document> resultList = collection.aggregate(bsonList); for (Document document : resultList) { System.out.println("result is :" + document); } }
$lookup 操作符
官方定義
Performs a left outer join to a collection in the same database to filter in documents from the "joined" collection for processing. The $lookup stage adds a new array field to each input document. The new array field contains the matching documents from the "joined" collection.
The $lookup stage passes these reshaped documents to the next stage.
Starting in MongoDB 5.1, $lookup works across sharded collections.
其實可以簡單理解類比Mysql的子查詢。 會把另外一張表匹配的數據,作為一個數組存入到當前數據中,需要自定義一個欄位來接收顯示。
類比如下的sql語句
SELECT *, <output array field> FROM collection WHERE <output array field> IN ( SELECT * FROM <collection to join> WHERE <foreignField> = <collection.localField> );
【特別註意】如果當前DB是集群部署,那麼在DB版本為5.1之前的情況,$lookup是不會生效的。 如果你資料庫是集群的,然後又要用$lookup,一定要檢查版本是否大於等於5.1,否則是查不出來的。 前陣子不知道這個,一直沒有頭緒為什麼數據查不出來。
語法
{ $lookup: { from: <collection to join>, // 要聯表查的表名 localField: <field from the input documents>, // 當前表的要和聯表關聯的欄位 foreignField: <field from the documents of the "from" collection>, // 要被關聯表的外鍵欄位 as: <output array field> // 定義一個欄位接收匹配關聯的數據 } }
在MongoDB中操作的官方示例
// 插入表orders數據 db.orders.insertMany( [ { "_id" : 1, "item" : "almonds", "price" : 12, "quantity" : 2 }, { "_id" : 2, "item" : "pecans", "price" : 20, "quantity" : 1 }, { "_id" : 3 } ] ) // 插入表inventory數據 db.inventory.insertMany( [ { "_id" : 1, "sku" : "almonds", "description": "product 1", "instock" : 120 }, { "_id" : 2, "sku" : "bread", "description": "product 2", "instock" : 80 }, { "_id" : 3, "sku" : "cashews", "description": "product 3", "instock" : 60 }, { "_id" : 4, "sku" : "pecans", "description": "product 4", "instock" : 70 }, { "_id" : 5, "sku": null, "description": "Incomplete" }, { "_id" : 6 } ] )
執行代碼
db.orders.aggregate( [ // db.orders 表示基於orders做聚合操作 { $lookup: { from: "inventory", // 聯表inventory localField: "item", // 當前orders的欄位 foreignField: "sku", // inventory中的sku欄位,和orders的item關聯 as: "inventory_docs" // 定義一個欄位名接收 inventory中sku 和orders的item相同的數據,數組形式。 } } ] )
返回結果
{ "_id" : 1, "item" : "almonds", "price" : 12, "quantity" : 2, "inventory_docs" : [ // 這個inventory_docs欄位就是自己命名的欄位,存儲著來自inventory的數據 { "_id" : 1, "sku" : "almonds", "description" : "product 1", "instock" : 120 } ] } { "_id" : 2, "item" : "pecans", "price" : 20, "quantity" : 1, "inventory_docs" : [ { "_id" : 4, "sku" : "pecans", "description" : "product 4", "instock" : 70 } ] } { "_id" : 3, "inventory_docs" : [ { "_id" : 5, "sku" : null, "description" : "Incomplete" }, { "_id" : 6 } ] }
MongoTemplate中實現的Java代碼
以下分別是Aggregation和Aggregates的實現
/** * @Author zaoyu * Aggregation 實現$lookup */ @Test public void testLookupAggregations(){ String INVENTORY_COLLECTION = "inventory"; String ORDERS_COLLECTION = "orders"; // Aggregation類,直接可以調用lookup方法,傳入要關聯的表、當前表和關聯表關聯的欄位、要關聯的表的欄位、自定義名稱接收關聯匹配的數據 LookupOperation lookup = Aggregation.lookup(INVENTORY_COLLECTION, "item", "sku", "inventory_docs"); // 用newAggregation接收管道聚合指令,執行,得到結果。 Aggregation aggregations =Aggregation.newAggregation(lookup); // mongoTemplate 直接調用aggregate方法,傳入Aggregation對象,基於的表,映射類(這裡簡單化,我用Document) AggregationResults<Document> resultList = mongoTemplate.aggregate(aggregations, ORDERS_COLLECTION, Document.class); for (Document document : resultList) { System.out.println("result is :" + document); } } /** * @Author zaoyu * Aggregates/Bson 實現$lookup */ @Test public void testLookupAggregates(){ String INVENTORY_COLLECTION = "inventory"; String ORDERS_COLLECTION = "orders"; // 這裡用Aggregates類直接調用lookup,傳入的參數和上面的Aggregations的lookup是一樣的,只不過這裡返回的結果是一個Bson對象。 Bson lookupBson = Aggregates.lookup(INVENTORY_COLLECTION, "item", "sku", "inventory_docs"); // 建一個List<Bson> 把lookupBson傳進去 List<Bson> bsonList = new ArrayList<>(); bsonList.add(lookupBson); // mongoTemplate先獲得對應的collection對象,然後調用aggregate,傳入List<Bson> 獲得結果 MongoCollection<Document> collection = mongoTemplate.getCollection(ORDERS_COLLECTION); AggregateIterable<Document> resultList = collection.aggregate(bsonList); for (Document document : resultList) { System.out.println("result is :" + document); } }
返回結果
result is :Document{{_id=1.0, item=almonds, price=12.0, quantity=2.0, inventory_docs=[Document{{_id=1.0, sku=almonds, description=product 1, instock=120.0}}]}} result is :Document{{_id=2.0, item=pecans, price=20.0, quantity=1.0, inventory_docs=[Document{{_id=4.0, sku=pecans, description=product 4, instock=70.0}}]}} result is :Document{{_id=3.0, inventory_docs=[Document{{_id=5.0, sku=null, description=Incomplete}}, Document{{_id=6.0}}]}}
$facet 操作符
官方定義
Processes multiple aggregation pipelines within a single stage on the same set of input documents. Each sub-pipeline has its own field in the output document where its results are stored as an array of documents.
Input documents are passed to the $facet stage only once. $facet enables various aggregations on the same set of input documents, without needing to retrieve the input documents multiple times.
簡單來說,就是facet可以實現在facet管道操作完成多個stage管道操作。減少獲取輸入文檔的次數。
我個人覺得有種場景很適合使用facet:分頁查詢文檔數據的同時,把符合查詢條件的總數也查詢出來的場景下,如果使用$facet,同時獲取分頁數據和總數,不用做兩次資料庫查詢(分別查詢分頁數據和總數)。
語法
{ $facet: { <outputField1>: [ <stage1>, <stage2>, ... ], // 這裡outputpufield 是自己定義的用來接收stage集合返回的文檔數據。 <outputField2>: [ <stage1>, <stage2>, ... ], // 可以基於上一個Facet繼續做facet ... } }
在MongoDB中操作的官方示例
// 數據 插入artwork 表中 { "_id" : 1, "title" : "The Pillars of Society", "artist" : "Grosz", "year" : 1926, "price" : NumberDecimal("199.99"), "tags" : [ "painting", "satire", "Expressionism", "caricature" ] } { "_id" : 2, "title" : "Melancholy III", "artist" : "Munch", "year" : 1902, "price" : NumberDecimal("280.00"), "tags" : [ "woodcut", "Expressionism" ] } { "_id" : 3, "title" : "Dancer", "artist" : "Miro", "year" : 1925, "price" : NumberDecimal("76.04"), "tags" : [ "oil", "Surrealism", "painting" ] } { "_id" : 4, "title" : "The Great Wave off Kanagawa", "artist" : "Hokusai", "price" : NumberDecimal("167.30"), "tags" : [ "woodblock", "ukiyo-e" ] } { "_id" : 5, "title" : "The Persistence of Memory", "artist" : "Dali", "year" : 1931, "price" : NumberDecimal("483.00"), "tags" : [ "Surrealism", "painting", "oil" ] } { "_id" : 6, "title" : "Composition VII", "artist" : "Kandinsky", "year" : 1913, "price" : NumberDecimal("385.00"), "tags" : [ "oil", "painting", "abstract" ] } { "_id" : 7, "title" : "The Scream", "artist" : "Munch", "year" : 1893, "tags" : [ "Expressionism", "painting", "oil" ] } { "_id" : 8, "title" : "Blue Flower", "artist" : "O'Keefe", "year" : 1918, "price" : NumberDecimal("118.42"), "tags" : [ "abstract", "painting" ] } // 執行$facet聚合 db.artwork.aggregate( [ { $facet: { // 第一個Facet操作,按照tag分類:先用unwind拆分tags欄位的數組值,交給下一個聚合 $sortByCount, 按照tags的個數排序。 "categorizedByTags": [ { $unwind: "$tags" }, { $sortByCount: "$tags" } ], // 第二個Facet操作,按照price分類:先過濾數據(只處理存在price數據的文檔),然後執行$bucket按照價格區間分組 0~150,151~200, 201~300, 301~400這樣。 "categorizedByPrice": [ { $match: { price: { $exists: 1 } } }, { $bucket: { groupBy: "$price", boundaries: [ 0, 150, 200, 300, 400 ], default: "Other", output: { "count": { $sum: 1 }, "titles": { $push: "$title" } } } } ], // 第三個Facet, 按照years分類。 分成4個區間。 "categorizedByYears(Auto)": [ { $bucketAuto: { groupBy: "$year", buckets: 4 } } ] } } ])
MongoTemplate中實現的Java代碼
註:以下代碼的實現,數據來源參考上邊的官方示例的數據, artwork表。 請自行插入數據。
1. 使用Aggregation對象實現
/** * @Author zaoyu * Aggregation 實現$facet */ @Test public void testFacetAggregations(){ String ARTWORK_COLLECTION = "artwork"; // Facet中第一組分類(categorizedByTags)的兩個聚合操作unwind 和 sortByCount UnwindOperation unwindForByTags = Aggregation.unwind("$tags"); SortByCountOperation sortByCountForByTags = Aggregation.sortByCount("$tags"); // Facet中第二組分類(categorizedByPrice)的聚合操作match 和 match MatchOperation matchForByPrice = Aggregation.match(Criteria.where("price").exists(true)); // 分別傳入bucket分組的欄位price,設置區間值,並設置桶內條數統計和值(這裡用titles接收title的值) BucketOperation bucketForByPrice = Aggregation.bucket("$price") .withBoundaries(0, 150, 200, 300, 400) .withDefaultBucket("Other") .andOutput("count").sum(1).as("count") .andOutput("$title").push().as("titles"); // Facet中第三組分類 (categorizedByYears(Auto))的聚合操作,按年自動分成4個區間。 BucketAutoOperation bucketForByYears = Aggregation.bucketAuto("$year", 4); // Aggregation調用facet方法,按照組別分類順序,把每一組的聚合操作和輸出的名稱傳進去。 FacetOperation facetOperation = Aggregation.facet(unwindForByTags, sortByCountForByTags).as("categorizedByTags") .and(matchForByPrice, bucketForByPrice).as("categorizedByPrice") .and(bucketForByYears).as("categorizedByYears(Auto)"); // 把facetOperation傳入newAggregation得到Aggregation對象,調用mongoTemplate的Aggregate方法執行得到結果 Aggregation aggregation = Aggregation.newAggregation(facetOperation); AggregationResults<Document> resultList = mongoTemplate.aggregate(aggregation, ARTWORK_COLLECTION, Document.class); for (Document document : resultList) { System.out.println("result is :" + document); } }
2. 使用Aggregates實現
/** * @Author zaoyu * Aggregates 實現$facet */ @Test public void testFacetAggregates() { String ARTWORK_COLLECTION = "artwork"; // Facet中第一組分類(categorizedByTags)的兩個聚合操作unwind 和 sortByCount Bson unwindBsonForByTags = Aggregates.unwind("$tags"); Bson sortByCountBsonForByTags = Aggregates.sortByCount("$tags"); // 新建Facet對象,傳入第一組分類的接收名稱,以及在第一組分類中要做的聚合操作。 Facet categorizedByTags = new Facet("categorizedByTags", unwindBsonForByTags, sortByCountBsonForByTags); // Facet中第二組分類(categorizedByPrice)的聚合操作match 和 match Bson matchBsonForPrice = Aggregates.match(Filters.exists("price")); // 這裡面要新建BsonField構建 {"count": { $sum: 1 } 和 "titles": { $push: "$title" }} 作為第二組分類中$Bucket聚合操作中output值 BsonField countOutput = new BsonField("count", new Document("$sum", 1)); BsonField titleOutput = new BsonField("titles", new Document("$push", "$price")); // 上面2個操作傳入到BucketOption對象,最後傳到bucket操作 BucketOptions bucketOptions = new BucketOptions().defaultBucket("Other").output(countOutput).output(titleOutput); Bson bucketBsonForByPrice = Aggregates.bucket("$price", Arrays.asList(0, 150, 200, 300, 400), bucketOptions); Facet categorizedByPrice = new Facet("categorizedByPrice", matchBsonForPrice, bucketBsonForByPrice); // Facet中第三組分類 (categorizedByYears(Auto))的聚合操作,按年自動分成4個區間。 Bson bucketAutoBsonForByYears = Aggregates.bucketAuto("$year", 4); Facet categorizedByYears = new Facet("categorizedByYears", bucketAutoBsonForByYears); // 新建一個List<Facet>把每組分類的Facet對象傳進去。 List<Facet> facetList = new ArrayList<>(); facetList.add(categorizedByTags); facetList.add(categorizedByPrice); facetList.add(categorizedByYears); // 調用Aggregates的facet方法,傳入List<Facet>得到最終Bson對象,並添加到Bson集合中。 Bson facetBson = Aggregates.facet(facetList); List<Bson> bsonList = new ArrayList<>(); bsonList.add(facetBson); // 調用方法執行得到結果 MongoCollection<Document> collection = mongoTemplate.getCollection(ARTWORK_COLLECTION); AggregateIterable<Document> resultList = collection.aggregate(bsonList); for (Document document : resultList) { System.out.println("result is :" + document); } }
最終返回結果, 二者一樣。
result is :Document{{categorizedByTags=[Document{{_id=painting, count=6}}, Document{{_id=oil, count=4}}, Document{{_id=Expressionism, count=3}}, Document{{_id=Surrealism, count=2}}, Document{{_id=abstract, count=2}}, Document{{_id=woodblock, count=1}}, Document{{_id=ukiyo-e, count=1}}, Document{{_id=satire, count=1}}, Document{{_id=caricature, count=1}}, Document{{_id=woodcut, count=1}}], categorizedByPrice=[Document{{_id=0, titles=[76.04, 118.42]}}, Document{{_id=150, titles=[199.99, 167.30]}}, Document{{_id=200, titles=[280.00]}}, Document{{_id=300, titles=[385.00]}}, Document{{_id=Other, titles=[483.00]}}], categorizedByYears=[Document{{_id=Document{{min=null, max=1902.0}}, count=2}}, Document{{_id=Document{{min=1902.0, max=1918.0}}, count=2}}, Document{{_id=Document{{min=1918.0, max=1926.0}}, count=2}}, Document{{_id=Document{{min=1926.0, max=1931.0}}, count=2}}]}}
小結
整體來說,MonogoDB官方提供了很詳細的資料,但是對於Java 層面的操作,或者說SpringBoot層面的操作,文檔就比較簡單。
個人感覺而言,Aggregations提供的方法比較直接,更適合不太熟悉Springboot上操作Mongo的同學來使用,而Aggregates會更加靈活,但是需要你知道Document, BsonField, Bson之間的轉換和獲取。
希望這篇文章能幫到大家,有錯漏之處,歡迎指正。