程序员阿沛
发布于 2026-06-27 / 0 阅读
0
0

深入学习mongodb三mongdb聚合投射和管道操作符

深入学习mongodb(三) mongdb聚合、投射和管道操作符

如果你有数据存储在MongoDB中,你想做的可能就不仅仅是将数据提取出来那么简单,而是希望对数据进行分析并加以利用,因此本节介绍MongoDB提供的聚合工具。
一、聚合框架
使用聚合框架可以对集合中的文档进行变换和组合。基本上,可以用多个构件创建一个管道(pipeline),用于对一连串的文档进行处理。这些构件包括筛选(filtering)、投射(projecting)、分组(grouping)、排序(sorting)、限制(limiting)和跳过(skipping)。
下面是一个例子:
如果有一个test集合,里面有以下文档:

{  "_id": ObjectId("60445f35b6e04e8e7e360e2e"),  "name": "zbp",  "age": 24,  "job": "programme"} {  "_id": ObjectId("60445f48b6e04e8e7e360e2f"),  "name": "zbp1",  "age": 23,  "job": "job1"} {  "_id": ObjectId("60445f55b6e04e8e7e360e30"),  "name": "yf",  "age": 25,  "job": "programme"} {  "_id": ObjectId("60445f68b6e04e8e7e360e31"),  "name": "ww",  "age": 36,  "job": "manager"} {  "_id": ObjectId("60445f79b6e04e8e7e360e32"),  "name": "zbp",  "age": 24,  "job": "job3"}

现在我希望获取job字段统计,获取每种job的数量count并将结果按count倒序排序,只显示5条结果。

test.aggregate(  {"$project":{"job":1}},    // 投射  {"$group":{"_id":"$job", "count":{"$sum":1}}},  //分组  {"$sort":{"count":-1}},  {"$limit":5})

结果为:

{ "_id" : "programme", "count" : 2 }{ "_id" : "job3", "count" : 1 }{ "_id" : "manager", "count" : 1 }{ "_id" : "job1", "count" : 1 }

aggregate()就是聚合管道函数。
上面的操作其实很像是linux中的管道操作,这里使用了4个管道操作符($project、$group、$sort和$limit),每个操作符都会接受一连串的文档,对这些文档做一些处理和转换,最后将处理后的文档作为结果传递给下一个操作符。对于最后一个管道操作符,则是将结果返回给客户端。

二、管道操作符
$match
$match用于对文档集合进行筛选,之后就可以在筛选得到的文档子集上做聚合。相当于where和having条件查询,只不过$match是用在聚合中的条件筛选。
例如,上面的例子中,我希望统计年龄在25岁以下的job的情况,就可以使用:

test.aggregate(  {"$match":{"age":{"$lt":25}}},  {"$project":{"job":1}},   {"$group":{"_id":"$job", "count":{"$sum":1}}}, {"$sort":{"count":-1}}, {"$limit":5} );

在实际使用中应该尽可能将"$match"放在管道的前面位置。这样做有两个好处: 一是可以快速将不需要的文档过滤掉,以减少管道的工作量;
二是如果在投射和分组之前执行"$match"(相当于where,如果是在分组之后执行$match,那么就相当于having),查询可以使用到索引(也就是说如果在分组和投射之后的条件筛选是用不到索引的)。
$project $project操作(投射操作)是从文档中选择想要的字段。可以指定查询结果中显示或者不显示一个字段,还可以为字段起别名。
例如:

test.aggregate({"$project":{"job":1}});

结果为:

{  "_id": ObjectId("60445f35b6e04e8e7e360e2e"),  "job": "programme"} {  "_id": ObjectId("60445f48b6e04e8e7e360e2f"),  "job": "job1"} {  "_id": ObjectId("60445f55b6e04e8e7e360e30"),  "job": "programme"} {  "_id": ObjectId("60445f68b6e04e8e7e360e31"),  "job": "manager"} {  "_id": ObjectId("60445f79b6e04e8e7e360e32"),  "job": "job3"}

只显示了job字段,没有显示name和age字段。

_id字段是默认显示的,如果希望不显示_id可以这样:

test.aggregate({"$project":{"job":1,"_id":0}})

将投射的字段进行重命名:

test.aggregate({"$project":{"career":"$job","_id":0}});

这里将job字段重命名为career。 这里的"$fieldname"语法是为了在聚合框架中引用fieldname字段(上面的例子中是"job")的值。
例如,“$age”
会被替换为"age"字段的内容(可能是数值,也可能是字符串),“$tags.3"会被替换为tags数组中的第4个元素。所以,上面例子中的”$job"会被替换为进入管道的每个文档的"job"字段的值。
可以使用这种技术生成字段的多个副本,以便在之后的"$group"中使用。
如果在"job"字段上有一个索引,但是使用了career为job重命名后,聚合框架无法在下面的排序操作中使用这个索引。
所以,应该尽量在修改字段名称之前使用索引(例如将$sort放在$project前)。 除了上面的基本用法,$project还可以接表达式,例如:

db.employees.aggregate({  "$project"  :  {  "totalPay"  :{    "$add"  :  ["$salary",  "$bonus"]    }  }})

表示将salary和bonus字段的值相加得到totalPay字段。

数字表达式-加减乘除:

"$add" : [expr1[, expr2, , exprN]]"$subtract" : [expr1, expr2]"$multiply" : [expr1[, expr2, , exprN]]"$divide" : [expr1, expr2]

还能组合使用:

db.employees.aggregate(  {    "$project"  :  {    "totalPay"  :  {      "$subtract"  :  [{"$add"  :  ["$salary",  "$bonus"]},  "$wh"]    }    }  })
// salary+bonus-wh

日期表达式
聚合框架中包含了一些用于提取日期信息的表式:“$year”、“$month”、“$week”、“$dayOfMonth”、“$dayOfWeek”、“$dayOfYear”、“$hour”、“$minute"和”$second"
只能对日期类型(Date类型)的字段进行日期操作,不能对数值类型字段做日期操作。
例如: hireDate字段必须是日期类型,得到的hireIn字段是hireDate字段的月份部分。

db.employees.aggregate({  "$project"  :  {    "hiredIn"  :  {"$month"  :  "$hireDate"}  }})

又例如: 用当前时间的年减去hireDate的年。

db.employees.aggregate({  "$project"  :  {      "tenure"  :  {      "$subtract"  :  [{"$year"  :  new  Date()},  {"$year"  :  "$hireDate"}]    }  }})

字符串表达式

// 字符串截取(这里截取的是字节而不是字符)"$substr" : [expr, startOffset, numToReturn]// 连接多个字符串字段"$concat" : [expr1[, expr2, , exprN]]// 大小写"$toLower" : expr"$toUpper" : expr

例如:

db.employees.aggregate({  "$project"  :  {                   "email"  :  {    "$concat"  :  [      {"$substr"  :  ["$firstName",  0,  1]},      ".",      "$lastName",      "@example.com"    ]    }  }  });

$group
$group操作可以将文档依据特定字段的不同值进行分组。如果选定了需要进行分组的字段,就可以将选定的字段传递给"$group"函数的"_id"字段。
例如:

{  "$group": {    "_id": "$day"  }} // 对day字段分组{  "$group": {    "_id": "$grade"  }} // 对grade字段分组{  "$group": {    "_id": {      "state": "$state",      "city": "$city"    }  }} //对state和city字段分组

分组时我们一般会做一些统计的计算

"$sum"  // 求和"$average"  // 均值
// 最大最小值"$max" :expr"$min" :expr//  返回分组的第一个/最后一个值,忽略后面所有值。只有排序之后,明确知道数据顺序时这个操作才有意义。"$first" :expr"$last" :expr

例如:

db.sales.aggregate({  "$group"  :  {    "_id"  :  "$country",    "totalRevenue"  :  {"$sum"  :  "$revenue"}  }})

如果使用 “$sum”:1,相当于mysql中的count(*),即获取每个分类下的记录行数。
$max"和"$min"会查看每一个文档,以便得到极值。因此,如果数据是无序的,这两个操作符也可以有效工作;如果数据是有序的,这两个操作符就会有些浪费。假设有一个存有学生考试成绩的数据集,需要找到其中的最高分与最低分。

db.scores.aggregate({           "$group"  :  {    "_id"  :  "$grade",    "lowestScore"  :  {"$min"  :  "$score"},    "highestScore"  :  {"$max"  :  "$score"}    }})

另一方面,如果数据集是按照希望的字段排序过的,那么"$first"和"$last"操作符就会非常有用。下面的代码与上面的代码可以得到同样的结果:

db.scores.aggregate({  "$sort"  :  {"score"  :  1}},{  "$group"  :  {    "_id"  :  "$grade",    "lowestScore":{"$first"  :"$score"},              "highestScore"  :  {"$last"  :  "$score"}  }})

如果数据是排过序的,那么$first和$last会比$min和$max效率更高。如果不准备对数据进行排序,那么直接使用$min和$max会比先排序再使用$first和$last效率更高。
需要注意:
在分片的情况下"$group"会先在每个分片上执行,然后各个分片上的分组结果会被发送到mongos再进行最后的统一分组,剩余的管道工作(也就是$group之后的管道操作符操作)也都是在mongos(而不是在分片)上运行的。
$unwind
拆分(unwind)可以将数组中的每一个值拆分为单独的文档。例如,如果有一篇拥有多条评论的博客文章,可以使用$unwind将每条评论拆分为一个独立的文档:

> db.blog.findOne(){  "_id"  :  ObjectId("50eeffc4c82a5271290530be"),  "author"  :  "k",  "post"  :  "Hello,  world!",  "comments"  :  [                    {      "author"  :  "mark",      "date"  :  ISODate("2013-01-10T17:52:04.148Z"),      "text"  :  "Nice  post"    },    {      "author"  :  "bill",      "date"  :  ISODate("2013-01-10T17:52:04.148Z"),                              "text"  :  "I  agree"    }  ]}
>  db.blog.aggregate({"$unwind"  :  "$comments"}){  "results"  :    {      "_id"  :  ObjectId("50eeffc4c82a5271290530be"),                              "author"  :  "k",      "post"  :  "Hello,  world!",      "comments"  :  {        "author"  :  "mark",        "date"  :  ISODate("2013-01-10T17:52:04.148Z"),        "text"  :  "Nice  post"      }    },    {                              "_id"  :  ObjectId("50eeffc4c82a5271290530be"),      "author"  :  "k",      "post"  :  "Hello,  world!",      "comments"  :  {        "author"  :  "bill",        "date"  :  ISODate("2013-01-10T17:52:04.148Z"),        "text"  :  "I  agree"      }                    }  ],  "ok"  :  1}

如果希望在查询中得到特定的子文档,这个操作符就会非常有用:先使用"$unwind"得到所有子文档,再使用"$match"得到想要的文档。
例如,如果要得到特定用户的所有评论(只需要得到评论,不需要返回评论所属的文章),使用普通的查询是不可能做到的。但是,通过提取、拆分、匹配,就很容易了。

db.blog.aggregate({"$project"  :  {"comments"  :  "$comments"}},    // 提取comments字段{"$unwind"  :  "$comments"},      // 将comments数组中的每一个元素拆分为一行{"$match"  :  {"comments.author"  :  "Mark"}}    // 条件匹配所有评论的作者为Mark的评论)

如果经常需要查询特定作者的所有评论,建议对comments.author建立嵌套索引,然后在聚合的时候将$match放在最前面,这样可以用到索引:

db.blog.aggregate({"$match"  :  {"comments.author"  :  "Mark"}}        // 条件匹配所有评论的作者为Mark的评论{"$project"  :  {"comments"  :  "$comments"}},    // 提取comments字段{"$unwind"  :  "$comments"},            // 将comments数组中的每一个元素拆分为一行)

$sort
和$match一样,如果在投射$project和分组$group之前使用$sort排序,这时的排序操作可以使用索引,否则,排序过程就会比较慢,而且会占用大量内存。

db.employees.aggregate(  {    "$project"  :  {      "compensation"  :  {                                "$add"  :  ["$salary",  "$bonus"]      },      "name"  :  1    }  },  {    "$sort"  :  {"compensation"  :  -1,  "name"  :  1}  })

这个例子会对员工排序,最终的结果是按照报酬从高到低,姓名从A到Z的顺序排列。排序方向可以是1(升序)和-1(降序)。

与前面讲过的"$group"一样,“$sort"在分片环境下,先在各个分片上进行排序,然后将各个分片的排序结果发送到mongos做进一步处理。
$limit $limit会接受一个数字n,返回结果集中的前n个文档 $skip
$skip也是接受一个数字n,丢弃结果集中的前n个文档,将剩余文档作为结果返回。在“普通”查询中,如果需要跳过大量的数据,那么这个操作符的效率会很低。在聚合中也是如此,因为它必须要先匹配到所有需要跳过的文档,然后再将这些文档丢弃。
应该尽量在管道的开始阶段(执行”$project"、“$group"或者”$unwind"操作之前)就将尽可能多的文档和字段过滤掉($match)。因为管道如果不是直接从原先的集合中使用数据,那就无法在筛选和排序中使用索引。如果可能,聚合管道会尝试对操作进行排序,以便能够有效使用索引。
MongoDB不允许单一的聚合操作占用过多的系统内存:如果MongoDB发现某个聚合操作占用了20%以上的内存(比如由于聚合分组和排序而产生了临时集合,即类似于mysql中的临时表),这个操作就会直接输出错误。允许将输出结果利用管道放入一个集合中是为了方便以后使用(这样可以将所需的内存减至最小)。
如果能够通过"$match"操作迅速减小结果集的大小,就可以使用管道进行实时聚合。由于管道会不断包含更多的文档,会越来越复杂,所以几乎不可能实时得到管道的操作结果。
三、MapReduce 在用 MongoDB 查询时,若返回的数据量很大,或者做一些比较复杂的统计和聚合操作做花费的时间很长时,可以使用
MongoDB 中的 mapReduce 进行实现。mapReduce
是个灵活且强大的数据聚合工具,它的好处是可以把一个聚合任务分解为多个小的任务,分配到多个服务器上并行处理。

在 MongoDB 中我们可以使用 mapReduce 命令来执行 mapReduce 操作。
在 mapReduce 命令中要实现两个函数,分别是 map 函数和 reduce 函数,其中 map 函数调用 emit(key,
value),遍历集合中的所有记录,并将 key 与 value 传递给 reduce 函数进行处理,如下所示:

> db.collection_name.mapReduce(    function() {emit(key, value);},                  // map 函数    function(key, values) {return reduceFunction},   // reduce 函数    {        out: collection,        query: document,        sort: document,        limit: number    })

参数说明如下:

  • map 函数:一个 javascript 函数,它用一个键映射一个值并发出一个键值对;

  • reduce 函数:一个 javascript 函数,用于减少或分组具有相同键的所有文档;

  • out:指定 map-reduce 查询结果的位置;

  • query:指定用于选择文档的可选选择条件;

  • sort:指定可选的排序条件;

  • limit:指定要返回的最大文档数(可选)。

下面我们看一个MapReduce的使用例子。

假如有一个存储用户帖子信息的集合,其中存储了若干用户的用户名、状态等信息,例如下面这样:

{    "post_text": "编程帮(biancheng.net),一个在线学习编程的网站,专注于分享优质编程教程。",    "user_name": "bianchengbang",    "status":"active"}

接下来在上述的集合中使用 mapReduce 命令来选择集合中所有 "status":"active"
的文档,然后根据用户名对它们进行分组,最后统计每个用户的发帖数量。示例代码如下:

> db posts mapReduce(    function() { emit(this user_name,1); },    function(key, values) {return Array sum(values)},    {       query:{status:"active"},       out:"post_total"    })

上面 mapReduce 命令的输出结果如下所示

{  "result" : "post_total",  "timeMillis" : 48,  "counts" : {      "input" : 11,      "emit" : 11,      "reduce" : 2,      "output" : 2  },  "ok" : 1}

关于运行结果,有如下几点需要说明:

  • result:储存结果的 collection 的名字,这是个临时集合,mapReduce 的连接关闭后会被自动删除;

  • timeMillis:执行花费的时间,单位为毫秒;

  • input:满足条件被发送到 map 函数的文档个数;

  • emit:在 map 函数中 emit 被调用的次数,也就是所有集合中的数据总量;

  • ouput:结果集合中的文档个数(count 对调试非常有帮助);

  • ok:查询是否执行成功,成功为 1;

  • err:若执行失败,则会在这里显示失败原因。

通过运行结果可以发现,共有 11 个文档与查询匹配(状态为“active”),map 函数生成了 11 个具有键值对的文档,最后 reduce
函数将具有相同键值的映射文档分为 2 个组。

若要查看 mapReduce 查询的结果,可以使用 find 方法,如下所示:

> db.posts.mapReduce(     function() { emit(this.user_name,1); },    function(key, values) {return Array.sum(values)},    {       query:{status:"active"},       out:"post_total"    }).find()

上面的查询语句,执行结果如下:

{ "_id" : "biancheng", "value" : 5 }{ "_id" : "bianchengbang", "value" : 6 }

以类似的方式,mapReduce 查询可用于构造大型复杂的聚合查询,自定义 JavaScript 函数的使用使得 mapReduce 非常灵活和强大。
MapReduce能够表达任意复杂的逻辑。 然而,这种强大是有代价的: MapReduce非常慢,不应该用在实时的数据分析中,而是应用在 离线数据分析
或 统计数据后缓存起来,用户直接查询缓存数据。
对MapReduce更详细的介绍,想了解的朋友可以参考官方文档。


评论