
深入学习mongodb(三) mongdb聚合、投射和管道操作符
如果你有数据存储在MongoDB中,你想做的可能就不仅仅是将数据提取出来那么简单,而是希望对数据进行分析并加以利用,因此本节介绍MongoDB提供的聚合工具。
一、聚合框架
使用聚合框架可以对集合中的文档进行变换和组合。基本上,可以用多个构件创建一个管道(pipeline),用于对一连串的文档进行处理。这些构件包括筛选(filtering)、投射(projecting)、分组(grouping)、排序(sorting)、限制(limiting)和跳过(skipping)。
下面是一个例子:
如果有一个test集合,里面有以下文档:
{ "_id": ObjectId("60445f35b6e04e8e7e360e2e"), "name": "zbp", "age": 24, "job": "programme"} { "_id": ObjectId("60445f48b6e04e8e7e360e2f"), "name": "zbp1", "age": 23, "job": "job1"} { "_id": ObjectId("60445f55b6e04e8e7e360e30"), "name": "yf", "age": 25, "job": "programme"} { "_id": ObjectId("60445f68b6e04e8e7e360e31"), "name": "ww", "age": 36, "job": "manager"} { "_id": ObjectId("60445f79b6e04e8e7e360e32"), "name": "zbp", "age": 24, "job": "job3"}
现在我希望获取job字段统计,获取每种job的数量count并将结果按count倒序排序,只显示5条结果。
test.aggregate( {"$project":{"job":1}}, // 投射 {"$group":{"_id":"$job", "count":{"$sum":1}}}, //分组 {"$sort":{"count":-1}}, {"$limit":5})
结果为:
{ "_id" : "programme", "count" : 2 }{ "_id" : "job3", "count" : 1 }{ "_id" : "manager", "count" : 1 }{ "_id" : "job1", "count" : 1 }
aggregate()就是聚合管道函数。
上面的操作其实很像是linux中的管道操作,这里使用了4个管道操作符($project、$group、$sort和$limit),每个操作符都会接受一连串的文档,对这些文档做一些处理和转换,最后将处理后的文档作为结果传递给下一个操作符。对于最后一个管道操作符,则是将结果返回给客户端。
二、管道操作符
$match
$match用于对文档集合进行筛选,之后就可以在筛选得到的文档子集上做聚合。相当于where和having条件查询,只不过$match是用在聚合中的条件筛选。
例如,上面的例子中,我希望统计年龄在25岁以下的job的情况,就可以使用:
test.aggregate( {"$match":{"age":{"$lt":25}}}, {"$project":{"job":1}}, {"$group":{"_id":"$job", "count":{"$sum":1}}}, {"$sort":{"count":-1}}, {"$limit":5} );
在实际使用中应该尽可能将"$match"放在管道的前面位置。这样做有两个好处: 一是可以快速将不需要的文档过滤掉,以减少管道的工作量;
二是如果在投射和分组之前执行"$match"(相当于where,如果是在分组之后执行$match,那么就相当于having),查询可以使用到索引(也就是说如果在分组和投射之后的条件筛选是用不到索引的)。
$project $project操作(投射操作)是从文档中选择想要的字段。可以指定查询结果中显示或者不显示一个字段,还可以为字段起别名。
例如:
test.aggregate({"$project":{"job":1}});
结果为:
{ "_id": ObjectId("60445f35b6e04e8e7e360e2e"), "job": "programme"} { "_id": ObjectId("60445f48b6e04e8e7e360e2f"), "job": "job1"} { "_id": ObjectId("60445f55b6e04e8e7e360e30"), "job": "programme"} { "_id": ObjectId("60445f68b6e04e8e7e360e31"), "job": "manager"} { "_id": ObjectId("60445f79b6e04e8e7e360e32"), "job": "job3"}
只显示了job字段,没有显示name和age字段。
_id字段是默认显示的,如果希望不显示_id可以这样:
test.aggregate({"$project":{"job":1,"_id":0}})
将投射的字段进行重命名:
test.aggregate({"$project":{"career":"$job","_id":0}});
这里将job字段重命名为career。 这里的"$fieldname"语法是为了在聚合框架中引用fieldname字段(上面的例子中是"job")的值。
例如,“$age”
会被替换为"age"字段的内容(可能是数值,也可能是字符串),“$tags.3"会被替换为tags数组中的第4个元素。所以,上面例子中的”$job"会被替换为进入管道的每个文档的"job"字段的值。
可以使用这种技术生成字段的多个副本,以便在之后的"$group"中使用。
如果在"job"字段上有一个索引,但是使用了career为job重命名后,聚合框架无法在下面的排序操作中使用这个索引。
所以,应该尽量在修改字段名称之前使用索引(例如将$sort放在$project前)。 除了上面的基本用法,$project还可以接表达式,例如:
db.employees.aggregate({ "$project" : { "totalPay" :{ "$add" : ["$salary", "$bonus"] } }})
表示将salary和bonus字段的值相加得到totalPay字段。
数字表达式-加减乘除:
"$add" : [expr1[, expr2, , exprN]]"$subtract" : [expr1, expr2]"$multiply" : [expr1[, expr2, , exprN]]"$divide" : [expr1, expr2]
还能组合使用:
db.employees.aggregate( { "$project" : { "totalPay" : { "$subtract" : [{"$add" : ["$salary", "$bonus"]}, "$wh"] } } })
// salary+bonus-wh
日期表达式
聚合框架中包含了一些用于提取日期信息的表式:“$year”、“$month”、“$week”、“$dayOfMonth”、“$dayOfWeek”、“$dayOfYear”、“$hour”、“$minute"和”$second"
只能对日期类型(Date类型)的字段进行日期操作,不能对数值类型字段做日期操作。
例如: hireDate字段必须是日期类型,得到的hireIn字段是hireDate字段的月份部分。
db.employees.aggregate({ "$project" : { "hiredIn" : {"$month" : "$hireDate"} }})
又例如: 用当前时间的年减去hireDate的年。
db.employees.aggregate({ "$project" : { "tenure" : { "$subtract" : [{"$year" : new Date()}, {"$year" : "$hireDate"}] } }})
字符串表达式
// 字符串截取(这里截取的是字节而不是字符)"$substr" : [expr, startOffset, numToReturn]// 连接多个字符串字段"$concat" : [expr1[, expr2, , exprN]]// 大小写"$toLower" : expr"$toUpper" : expr
例如:
db.employees.aggregate({ "$project" : { "email" : { "$concat" : [ {"$substr" : ["$firstName", 0, 1]}, ".", "$lastName", "@example.com" ] } } });
$group
$group操作可以将文档依据特定字段的不同值进行分组。如果选定了需要进行分组的字段,就可以将选定的字段传递给"$group"函数的"_id"字段。
例如:
{ "$group": { "_id": "$day" }} // 对day字段分组{ "$group": { "_id": "$grade" }} // 对grade字段分组{ "$group": { "_id": { "state": "$state", "city": "$city" } }} //对state和city字段分组
分组时我们一般会做一些统计的计算
"$sum" // 求和"$average" // 均值
// 最大最小值"$max" :expr"$min" :expr// 返回分组的第一个/最后一个值,忽略后面所有值。只有排序之后,明确知道数据顺序时这个操作才有意义。"$first" :expr"$last" :expr
例如:
db.sales.aggregate({ "$group" : { "_id" : "$country", "totalRevenue" : {"$sum" : "$revenue"} }})
如果使用 “$sum”:1,相当于mysql中的count(*),即获取每个分类下的记录行数。
$max"和"$min"会查看每一个文档,以便得到极值。因此,如果数据是无序的,这两个操作符也可以有效工作;如果数据是有序的,这两个操作符就会有些浪费。假设有一个存有学生考试成绩的数据集,需要找到其中的最高分与最低分。
db.scores.aggregate({ "$group" : { "_id" : "$grade", "lowestScore" : {"$min" : "$score"}, "highestScore" : {"$max" : "$score"} }})
另一方面,如果数据集是按照希望的字段排序过的,那么"$first"和"$last"操作符就会非常有用。下面的代码与上面的代码可以得到同样的结果:
db.scores.aggregate({ "$sort" : {"score" : 1}},{ "$group" : { "_id" : "$grade", "lowestScore":{"$first" :"$score"}, "highestScore" : {"$last" : "$score"} }})
如果数据是排过序的,那么$first和$last会比$min和$max效率更高。如果不准备对数据进行排序,那么直接使用$min和$max会比先排序再使用$first和$last效率更高。
需要注意:
在分片的情况下"$group"会先在每个分片上执行,然后各个分片上的分组结果会被发送到mongos再进行最后的统一分组,剩余的管道工作(也就是$group之后的管道操作符操作)也都是在mongos(而不是在分片)上运行的。
$unwind
拆分(unwind)可以将数组中的每一个值拆分为单独的文档。例如,如果有一篇拥有多条评论的博客文章,可以使用$unwind将每条评论拆分为一个独立的文档:
> db.blog.findOne(){ "_id" : ObjectId("50eeffc4c82a5271290530be"), "author" : "k", "post" : "Hello, world!", "comments" : [ { "author" : "mark", "date" : ISODate("2013-01-10T17:52:04.148Z"), "text" : "Nice post" }, { "author" : "bill", "date" : ISODate("2013-01-10T17:52:04.148Z"), "text" : "I agree" } ]}
> db.blog.aggregate({"$unwind" : "$comments"}){ "results" : { "_id" : ObjectId("50eeffc4c82a5271290530be"), "author" : "k", "post" : "Hello, world!", "comments" : { "author" : "mark", "date" : ISODate("2013-01-10T17:52:04.148Z"), "text" : "Nice post" } }, { "_id" : ObjectId("50eeffc4c82a5271290530be"), "author" : "k", "post" : "Hello, world!", "comments" : { "author" : "bill", "date" : ISODate("2013-01-10T17:52:04.148Z"), "text" : "I agree" } } ], "ok" : 1}
如果希望在查询中得到特定的子文档,这个操作符就会非常有用:先使用"$unwind"得到所有子文档,再使用"$match"得到想要的文档。
例如,如果要得到特定用户的所有评论(只需要得到评论,不需要返回评论所属的文章),使用普通的查询是不可能做到的。但是,通过提取、拆分、匹配,就很容易了。
db.blog.aggregate({"$project" : {"comments" : "$comments"}}, // 提取comments字段{"$unwind" : "$comments"}, // 将comments数组中的每一个元素拆分为一行{"$match" : {"comments.author" : "Mark"}} // 条件匹配所有评论的作者为Mark的评论)
如果经常需要查询特定作者的所有评论,建议对comments.author建立嵌套索引,然后在聚合的时候将$match放在最前面,这样可以用到索引:
db.blog.aggregate({"$match" : {"comments.author" : "Mark"}} // 条件匹配所有评论的作者为Mark的评论{"$project" : {"comments" : "$comments"}}, // 提取comments字段{"$unwind" : "$comments"}, // 将comments数组中的每一个元素拆分为一行)
$sort
和$match一样,如果在投射$project和分组$group之前使用$sort排序,这时的排序操作可以使用索引,否则,排序过程就会比较慢,而且会占用大量内存。
db.employees.aggregate( { "$project" : { "compensation" : { "$add" : ["$salary", "$bonus"] }, "name" : 1 } }, { "$sort" : {"compensation" : -1, "name" : 1} })
这个例子会对员工排序,最终的结果是按照报酬从高到低,姓名从A到Z的顺序排列。排序方向可以是1(升序)和-1(降序)。
与前面讲过的"$group"一样,“$sort"在分片环境下,先在各个分片上进行排序,然后将各个分片的排序结果发送到mongos做进一步处理。
$limit $limit会接受一个数字n,返回结果集中的前n个文档 $skip
$skip也是接受一个数字n,丢弃结果集中的前n个文档,将剩余文档作为结果返回。在“普通”查询中,如果需要跳过大量的数据,那么这个操作符的效率会很低。在聚合中也是如此,因为它必须要先匹配到所有需要跳过的文档,然后再将这些文档丢弃。
应该尽量在管道的开始阶段(执行”$project"、“$group"或者”$unwind"操作之前)就将尽可能多的文档和字段过滤掉($match)。因为管道如果不是直接从原先的集合中使用数据,那就无法在筛选和排序中使用索引。如果可能,聚合管道会尝试对操作进行排序,以便能够有效使用索引。
MongoDB不允许单一的聚合操作占用过多的系统内存:如果MongoDB发现某个聚合操作占用了20%以上的内存(比如由于聚合分组和排序而产生了临时集合,即类似于mysql中的临时表),这个操作就会直接输出错误。允许将输出结果利用管道放入一个集合中是为了方便以后使用(这样可以将所需的内存减至最小)。
如果能够通过"$match"操作迅速减小结果集的大小,就可以使用管道进行实时聚合。由于管道会不断包含更多的文档,会越来越复杂,所以几乎不可能实时得到管道的操作结果。
三、MapReduce 在用 MongoDB 查询时,若返回的数据量很大,或者做一些比较复杂的统计和聚合操作做花费的时间很长时,可以使用
MongoDB 中的 mapReduce 进行实现。mapReduce
是个灵活且强大的数据聚合工具,它的好处是可以把一个聚合任务分解为多个小的任务,分配到多个服务器上并行处理。
在 MongoDB 中我们可以使用 mapReduce 命令来执行 mapReduce 操作。
在 mapReduce 命令中要实现两个函数,分别是 map 函数和 reduce 函数,其中 map 函数调用 emit(key,
value),遍历集合中的所有记录,并将 key 与 value 传递给 reduce 函数进行处理,如下所示:
> db.collection_name.mapReduce( function() {emit(key, value);}, // map 函数 function(key, values) {return reduceFunction}, // reduce 函数 { out: collection, query: document, sort: document, limit: number })
参数说明如下:
-
map 函数:一个 javascript 函数,它用一个键映射一个值并发出一个键值对;
-
reduce 函数:一个 javascript 函数,用于减少或分组具有相同键的所有文档;
-
out:指定 map-reduce 查询结果的位置;
-
query:指定用于选择文档的可选选择条件;
-
sort:指定可选的排序条件;
-
limit:指定要返回的最大文档数(可选)。
下面我们看一个MapReduce的使用例子。
假如有一个存储用户帖子信息的集合,其中存储了若干用户的用户名、状态等信息,例如下面这样:
{ "post_text": "编程帮(biancheng.net),一个在线学习编程的网站,专注于分享优质编程教程。", "user_name": "bianchengbang", "status":"active"}
接下来在上述的集合中使用 mapReduce 命令来选择集合中所有 "status":"active"
的文档,然后根据用户名对它们进行分组,最后统计每个用户的发帖数量。示例代码如下:
> db posts mapReduce( function() { emit(this user_name,1); }, function(key, values) {return Array sum(values)}, { query:{status:"active"}, out:"post_total" })
上面 mapReduce 命令的输出结果如下所示
{ "result" : "post_total", "timeMillis" : 48, "counts" : { "input" : 11, "emit" : 11, "reduce" : 2, "output" : 2 }, "ok" : 1}
关于运行结果,有如下几点需要说明:
-
result:储存结果的 collection 的名字,这是个临时集合,mapReduce 的连接关闭后会被自动删除;
-
timeMillis:执行花费的时间,单位为毫秒;
-
input:满足条件被发送到 map 函数的文档个数;
-
emit:在 map 函数中 emit 被调用的次数,也就是所有集合中的数据总量;
-
ouput:结果集合中的文档个数(count 对调试非常有帮助);
-
ok:查询是否执行成功,成功为 1;
-
err:若执行失败,则会在这里显示失败原因。
通过运行结果可以发现,共有 11 个文档与查询匹配(状态为“active”),map 函数生成了 11 个具有键值对的文档,最后 reduce
函数将具有相同键值的映射文档分为 2 个组。
若要查看 mapReduce 查询的结果,可以使用 find 方法,如下所示:
> db.posts.mapReduce( function() { emit(this.user_name,1); }, function(key, values) {return Array.sum(values)}, { query:{status:"active"}, out:"post_total" }).find()
上面的查询语句,执行结果如下:
{ "_id" : "biancheng", "value" : 5 }{ "_id" : "bianchengbang", "value" : 6 }
以类似的方式,mapReduce 查询可用于构造大型复杂的聚合查询,自定义 JavaScript 函数的使用使得 mapReduce 非常灵活和强大。
MapReduce能够表达任意复杂的逻辑。 然而,这种强大是有代价的: MapReduce非常慢,不应该用在实时的数据分析中,而是应用在 离线数据分析
或 统计数据后缓存起来,用户直接查询缓存数据。
对MapReduce更详细的介绍,想了解的朋友可以参考官方文档。