问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

如何将 MongoDB MapReduce 速度提升 20 倍

发布网友 发布时间:2022-04-07 21:39

我来回答

2个回答

懂视网 时间:2022-04-08 02:00

MongoDB基本用法(增删改高级查询、mapreduce)                

博客分类:
  • SQL/NOSQL

  • 分享一下我经常用到的自己写的mongo用法示例

    该示例基于当前最新的mongo驱动,版本为mongo-2.10.1.jar,用junit写的单元测试。

     

    TestCase.java

    package com.wujintao.mongo;
    
    import java.net.UnknownHostException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Set;
    import java.util.regex.Pattern;
    
    import org.junit.Test;
    
    import com.mongodb.AggregationOutput;
    import com.mongodb.BasicDBList;
    import com.mongodb.BasicDBObject;
    import com.mongodb.BasicDBObjectBuilder;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBCursor;
    import com.mongodb.DBObject;
    import com.mongodb.MapReduceCommand;
    import com.mongodb.MapReduceOutput;
    import com.mongodb.Mongo;
    import com.mongodb.QueryBuilder;
    import com.mongodb.WriteConcern;
    
    public class TestCase {
           //DBCursor cursor = coll.find(condition).addOption(Bytes.QUERYOPTION_NOTIMEOUT);//设置游标不要超时
    
    	@Test
    	/**
    	 * 获取所有数据库实例
    	 */
    	public void testGetDBS() {
    		List<String> dbnames = MongoUtil.getMong().getDatabaseNames();
    		for (String dbname : dbnames) {
    			System.out.println("dbname:" + dbname);
    		}
    	}
    
    	@Test
    	/**
    	 * 删除数据库
    	 */
    	public void dropDatabase() {
    		MongoUtil.getMong().dropDatabase("my_new_db");
    	}
    
    	@Test
    	/**
    	 * 查询所有表名
    	 */
    	public void getAllCollections() {
    		Set<String> colls = MongoUtil.getDB().getCollectionNames();
    		for (String s : colls) {
    			System.out.println(s);
    		}
    	}
    
    	@Test
    	public void dropCollection() {
    		MongoUtil.getColl("jellonwu").drop();
    	}
    
    	/**
    	 * 添加一条记录
    	 */
    	@Test
    	public void addData() {
    		DBCollection coll = MongoUtil.getColl("wujintao");
    		BasicDBObject doc = new BasicDBObject();
    		doc.put("name", "MongoDB");
    		doc.put("type", "database");
    		doc.put("count", 1);
    
    		BasicDBObject info = new BasicDBObject();
    		info.put("x", 203);
    		info.put("y", 102);
    		doc.put("info", info);
    		coll.insert(doc);
    		// 设定write concern,以便操作失败时得到提示
    		coll.setWriteConcern(WriteConcern.SAFE);
    	}
    
    	@Test
    	/**
    	 * 创建索引
    	 */
    	public void createIndex() {
    		MongoUtil.getColl("wujintao").createIndex(new BasicDBObject("i", 1));
    	}
    
    	@Test
    	/**
    	 * 获取索引信息
    	 */
    	public void getIndexInfo() {
    		List<DBObject> list = MongoUtil.getColl("hems_online").getIndexInfo();
    		for (DBObject o : list) {
    			System.out.println(o);
    		}
    	}
    
    	@Test
    	/**
    	 * 添加多条记录
    	 */
    	public void addMultiData() {
    		for (int i = 0; i < 100; i++) {
    			MongoUtil.getColl("wujintao").insert(
    					new BasicDBObject().append("i", i));
    		}
    
    		List<DBObject> docs = new ArrayList<DBObject>();
    		for (int i = 0; i < 50; i++) {
    			docs.add(new BasicDBObject().append("i", i));
    		}
    		MongoUtil.getColl("wujintao").insert(docs);
    		// 设定write concern,以便操作失败时得到提示
    		MongoUtil.getColl("wujintao").setWriteConcern(WriteConcern.SAFE);
    	}
    
    	@Test
    	/**
    	 * 查找第一条记录
    	 */
    	public void findOne() {
    		DBObject myDoc = MongoUtil.getColl("wujintao").findOne();
    		System.out.println(myDoc);
    	}
    
    	@Test
    	/**
    	 * 获取表中所有记录条数
    	 */
    	public void count() {
    		System.out.println(MongoUtil.getColl("wujintao").getCount());
    		System.out.println(MongoUtil.getColl("wujintao").count());
    	}
    
    	@Test
    	/**
    	 * 获取查询结果集的记录数
    	 */
    	public void getCount() {
    		DBObject query = new BasicDBObject("name", "a");
    		long count = MongoUtil.getColl("wujintao").count(query);
    		System.out.println(count);
    	}
    
    	@Test
    	/**
    	 * 查询所有结果
    	 */
    	public void getAllDocuments() {
    		DBCursor cursor = MongoUtil.getColl("wujintao").find();
    		try {
    			while (cursor.hasNext()) {
    				System.out.println(cursor.next());
    			}
    		} finally {
    			cursor.close();
    		}
    	}
    
    	@Test
    	/**
    	 * 按照一个条件查询
    	 */
    	public void queryByConditionOne() {
    		BasicDBObject query = new BasicDBObject();
    		query.put("name", "MongoDB");
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(query);
    
    		try {
    			while (cursor.hasNext()) {
    				System.out.println(cursor.next());
    			}
    		} finally {
    			cursor.close();
    		}
    	}
    
    	@Test
    	/**
    	 * AND多条件查询,区间查询
    	 */
    	public void queryMulti() {
    		BasicDBObject query = new BasicDBObject();
    		// 查询j不等于3,k大于10的结果集
    		query.put("j", new BasicDBObject("$ne", 3));
    		query.put("k", new BasicDBObject("$gt", 10));
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(query);
    		try {
    			while (cursor.hasNext()) {
    				System.out.println(cursor.next());
    			}
    		} finally {
    			cursor.close();
    		}
    	}
    
    	@Test
    	/**
    	 * 区间查询
    	 * select * from table where i >50
    	 */
    	public void queryMulti2() {
    		BasicDBObject query = new BasicDBObject();
    		query = new BasicDBObject();
    		query.put("i", new BasicDBObject("$gt", 50)); // e.g. find all where i >
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(query);
    		try {
    			while (cursor.hasNext()) {
    				System.out.println(cursor.next());
    			}
    		} finally {
    			cursor.close();
    		}
    	}
    
    	@Test
    	/**
    	 * 区间查询
    	 * select * from table where 20 < i <= 30
    	    //比较符   
    	    //"$gt": 大于   
    	    //"$gte":大于等于   
    	    //"$lt": 小于   
    	    //"$lte":小于等于   
    	    //"$in": 包含   
    	 */
    	public void queryMulti3() {
    		BasicDBObject query = new BasicDBObject();
    		query = new BasicDBObject();
    
    		query.put("i", new BasicDBObject("$gt", 20).append("$lte", 30));
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(query);
    		try {
    			while (cursor.hasNext()) {
    				System.out.println(cursor.next());
    			}
    		} finally {
    			cursor.close();
    		}
    	}
    
    	/**
    	 * 组合in和and select * from test_Table where (a=5 or b=6) and (c=5 or d = 6)
    	 */
    	public void queryMulti4() {
    		BasicDBObject query11 = new BasicDBObject();
    		query11.put("a", 1);
    		BasicDBObject query12 = new BasicDBObject();
    		query12.put("b", 2);
    		List<BasicDBObject> orQueryList1 = new ArrayList<BasicDBObject>();
    		orQueryList1.add(query11);
    		orQueryList1.add(query12);
    		BasicDBObject orQuery1 = new BasicDBObject("$or", orQueryList1);
    
    		BasicDBObject query21 = new BasicDBObject();
    		query21.put("c", 5);
    		BasicDBObject query22 = new BasicDBObject();
    		query22.put("d", 6);
    		List<BasicDBObject> orQueryList2 = new ArrayList<BasicDBObject>();
    		orQueryList2.add(query21);
    		orQueryList2.add(query22);
    		BasicDBObject orQuery2 = new BasicDBObject("$or", orQueryList2);
    
    		List<BasicDBObject> orQueryCombinationList = new ArrayList<BasicDBObject>();
    		orQueryCombinationList.add(orQuery1);
    		orQueryCombinationList.add(orQuery2);
    
    		BasicDBObject finalQuery = new BasicDBObject("$and",
    				orQueryCombinationList);
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(finalQuery);
    	}
    
    	@Test
    	/**
    	 * IN查询
    	 * if i need to query name in (a,b); just use { name : { $in : [‘a‘, ‘b‘] } }
    	 * select * from things where name=‘a‘ or name=‘b‘
    	 * @param coll
    	 */
    	public void queryIn() {
    		BasicDBList values = new BasicDBList();
    		values.add("a");
    		values.add("b");
    		BasicDBObject in = new BasicDBObject("$in", values);
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(
    				new BasicDBObject("name", in));
    		try {
    			while (cursor.hasNext()) {
    				System.out.println(cursor.next());
    			}
    		} finally {
    			cursor.close();
    		}
    	}
    
    	@Test
    	/**
    	 * 或查询
    	 * select * from table where name  = ‘12‘ or title = ‘p‘
    	 * @param coll
    	 */
    	public void queryOr() {
    		QueryBuilder query = new QueryBuilder();
    		query.or(new BasicDBObject("name", 12), new BasicDBObject("title", "p"));
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(query.get()).addSpecial("$returnKey", "");
    		try {
    			while (cursor.hasNext()) {
    				System.out.println(cursor.next());
    			}
    		} finally {
    			cursor.close();
    		}
    	}
    	
    	@Test
    	public void customQueryField() throws UnknownHostException{
            Mongo mongo = new Mongo("localhost", 27017);
            DB db = mongo.getDB("zhongsou_ad");
            BasicDBObjectBuilder bulder = new BasicDBObjectBuilder();
            bulder.add("times",1);
            bulder.add("aid",1);
            DBCursor cusor =  db.getCollection("ad_union_ad_c_1").find(new BasicDBObject(),bulder.get());
            for (DBObject dbObject : cusor) {
                System.out.println(dbObject);
            }
    	}
    	
    	@Test
    	public void mapReduce() throws UnknownHostException{
            Mongo mongo = new Mongo("localhost", 27017);
            DB db = mongo.getDB("zhongsou_ad");
            /***
             *  book1 = {name : "Understanding JAVA", pages : 100}
             *  book2 = {name : "Understanding JSON", pages : 200}
             *  db.books.save(book1)
             *  db.books.save(book2)
             *  book = {name : "Understanding XML", pages : 300}
             *  db.books.save(book)
             *  book = {name : "Understanding Web Services", pages : 400}
             *  db.books.save(book)
             *  book = {name : "Understanding Axis2", pages : 150}
             *  db.books.save(book)  
             *  
            var map = function() {
                var category;
                if ( this.pages >= 250 )
                    category = ‘Big Books‘;
                else
                    category = "Small Books";
                emit(category, {name: this.name});
            };
            var reduce = function(key, values) {
                var sum = 0;
                values.forEach(function(doc) {
                    sum += 1;
                });
                return {books: sum};
            };       
            var count  = db.books.mapReduce(map, reduce, {out: "book_results"});
             */
            try {
    
                DBCollection books = db.getCollection("books");
    
                BasicDBObject book = new BasicDBObject();
                book.put("name", "Understanding JAVA");
                book.put("pages", 100);
                books.insert(book);
                
                book = new BasicDBObject();  
                book.put("name", "Understanding JSON");
                book.put("pages", 200);
                books.insert(book);
                
                book = new BasicDBObject();
                book.put("name", "Understanding XML");
                book.put("pages", 300);
                books.insert(book);
                
                book = new BasicDBObject();
                book.put("name", "Understanding Web Services");
                book.put("pages", 400);
                books.insert(book);
              
                book = new BasicDBObject();
                book.put("name", "Understanding Axis2");
                book.put("pages", 150);
                books.insert(book);
                
                String map = "function() { "+ 
                          "var category; " +  
                          "if ( this.pages >= 250 ) "+  
                          "category = ‘Big Books‘; " +
                          "else " +
                          "category = ‘Small Books‘; "+  
                          "emit(category, {name: this.name});}";
                
                String reduce = "function(key, values) { " +
                                         "var sum = 0; " +
                                         "values.forEach(function(doc) { " +
                                         "sum += 1; "+
                                         "}); " +
                                         "return {books: sum};} ";
                
                MapReduceCommand cmd = new MapReduceCommand(books, map, reduce,
                  null, MapReduceCommand.OutputType.INLINE, null);
    
                MapReduceOutput out = books.mapReduce(cmd);
    
                for (DBObject o : out.results()) {
                 System.out.println(o.toString());
                }
               } catch (Exception e) {
                 e.printStackTrace();
               }
    	}
    	
    	@Test
        public void GroupByManyField() throws UnknownHostException{
    	    //此方法没有运行成功
            Mongo mongo = new Mongo("localhost", 27017);
            DB db = mongo.getDB("libary");
            DBCollection books = db.getCollection("books");
            BasicDBObject groupKeys = new BasicDBObject();
            groupKeys.put("total", new BasicDBObject("$sum","pages"));
            
            BasicDBObject condition = new BasicDBObject();
            condition.append("pages", new BasicDBObject().put("$gt", 0));
       
            
            String reduce = "function(key, values) { " +
                    "var sum = 0; " +
                    "values.forEach(function(doc) { " +
                    "sum += 1; "+
                    "}); " +
                    "return {books: sum};} ";
            /**
             BasicDBList basicDBList = (BasicDBList)db.getCollection("mongodb中集合编码或者编码")
                       .group(DBObject key,   --分组字段,即group by的字段
                    DBObject cond,        --查询中where条件
                    DBObject initial,     --初始化各字段的值
                    String reduce,        --每个分组都需要执行的Function
                    String finial         --终结Funciton对结果进行最终的处理
             */
            DBObject obj = books.group(groupKeys, condition,  new BasicDBObject(), reduce);
            System.out.println(obj);
            
            AggregationOutput ouput = books.aggregate(new BasicDBObject("$group",groupKeys));
            System.out.println(ouput.getCommandResult());
            System.out.println(books.find(new BasicDBObject("$group",groupKeys)));
        }	
    	
    
    	@Test
    	/**
    	 * 分页查询   
    	 */
    	public void pageQuery() {
    		DBCursor cursor = MongoUtil.getColl("wujintao").find().skip(0)
    				.limit(10);
    		while (cursor.hasNext()) {
    			System.out.println(cursor.next());
    		}
    	}
    
    	/**
    	 * 模糊查询
    	 */
    	public void likeQuery() {
    		Pattern john = Pattern.compile("joh?n");
    		BasicDBObject query = new BasicDBObject("name", john);
    
    		// finds all people with "name" matching /joh?n/i
    		DBCursor cursor = MongoUtil.getColl("wujintao").find(query);
    	}
    
    	@Test
    	/**
    	 * 条件删除   
    	 */
    	public void delete() {
    		BasicDBObject query = new BasicDBObject();
    		query.put("name", "xxx");
    		// 找到并且删除,并返回删除的对象
    		DBObject removeObj = MongoUtil.getColl("wujintao").findAndRemove(query);
    		System.out.println(removeObj);
    	}
    
    	@Test
    	/**
    	 * 更新
    	 */
    	public void update() {
    		BasicDBObject query = new BasicDBObject();
    		query.put("name", "liu");
    		DBObject stuFound = MongoUtil.getColl("wujintao").findOne(query);
    		stuFound.put("name", stuFound.get("name") + "update_1");
    		MongoUtil.getColl("wujintao").update(query, stuFound);
    	}
    
    
    }

     

    这里面涉及到的MongoUtil.java如下:

    package com.wujintao.mongo;
    
    import java.net.UnknownHostException;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.Mongo;
    
    
    /**
     * to see:http://www.mongodb.org/display/DOCS/Java+Driver+Concurrency
     * Mongo工具类:设计为单例模式,每当月份发生变化,数据库连接名称就会发生变化,这是业务规则
     * 因 MongoDB的Java驱动是线程安全的,对于一般的应用,只要一个Mongo实例即可,Mongo有个内置的连接池(池大小默认为10个)。
     * 对于有大量写和读的环境中,为了确保在一个Session中使用同一个DB时,我们可以用以下方式保证一致性:
     *	 DB mdb = mongo.getDB(‘dbname‘);
     *	 mdb.requestStart();
     *	 // 业务代码
     *	 mdb.requestDone();
     * DB和DBCollection是绝对线程安全的
     * @author wujintao
     */
    public class MongoUtil{
    	
    	private static Mongo mongo;
    	private static DBCollection coll;
    	private static Log log = LogFactory.getLog(MongoUtil.class);
    	private static DB db;
    	
    	static{
    		try {
    		      MongoOptions options = new MongoOptions();
                          options.autoConnectRetry = true;
                          options.connectionsPerHost = 1000;
                          options.maxWaitTime = 5000;
                          options.socketTimeout = 0;
                          options.connectTimeout = 15000;
                          options.threadsAllowedToBlockForConnectionMultiplier = 5000;
    			//事实上,Mongo实例代表了一个数据库连接池,即使在多线程的环境中,一个Mongo实例对我们来说已经足够了
    			mongo = new Mongo(new ServerAddress(DBMongoConfig.getHost(),DBMongoConfig.getPort()),options);
    			//mongo = new Mongo(DBMongoConfig.getHost(),DBMongoConfig.getPort());
    			// or, to connect to a replica set, supply a seed list of members
    			// Mongo m = new Mongo(Arrays.asList(new ServerAddress("localhost",
    			// 27017),
    			// new ServerAddress("localhost", 27018),
    			// new ServerAddress("localhost", 27019)));
    
    			// 注意Mongo已经实现了连接池,并且是线程安全的。
    			// 大部分用户使用mongodb都在安全内网下,但如果将mongodb设为安全验证模式,就需要在客户端提供用户名和密码:
    			// boolean auth = db.authenticate(myUserName, myPassword);
    		} catch (UnknownHostException e) {
    			log.info("get mongo instance failed");
    		}
    	}
    	
    	public static DB getDB(){
    		if(db==null){
    			db = mongo.getDB(DBMongoConfig.getDbname());
    		}
    		return db;
    	}
    	
    	
    	public static Mongo getMong(){
    		return mongo;
    	}
    	
    	public static DBCollection getColl(String collname){
    		return getDB().getCollection(collname);
    	}
    	
    }

     


    MongoDB基本用法(增删改高级查询、mapreduce)

    标签:

    热心网友 时间:2022-04-07 23:08

    分析在MongoDB中正成为越来越重要的话题,因为它在越来越多的大型项目中使用。人们厌倦了使用不同的软件来做分析(包括Hadoop),它们显然需要传输大量开销的数据。

    MongoDB提供了两种内置分析数据的方法:Map
    Rece和Aggregation框架。MR非常灵活,很容易部署。它通过分区工作良好,并允许大量输出。MR在MongoDB
    v2.4中,通过使用JavaScript引擎把Spider
    Monkey替换成V8,性能提升很多。老板抱怨它太慢了,尤其是和Agg框架(使用C++)相比。让我们看看能否从中榨出点果汁。

    练习

    让我们插入1千万条文档,每个文档包含一个从0到1000000的整数。这意味着平均有10个文档会具有相同的值。
    > for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
    > db.uniques.findOne()
    { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
    > db.uniques.ensureIndex({dim0: 1})
    > db.uniques.stats()
    {
    "ns" : "test.uniques",
    "count" : 10000000,
    "size" : 360000052,
    "avgObjSize" : 36.0000052,
    "storageSize" : 582864896,
    "numExtents" : 18,
    "nindexes" : 2,
    "lastExtentSize" : 153874432,
    "paddingFactor" : 1,
    "systemFlags" : 1,
    "userFlags" : 0,
    "totalIndexSize" : 576040080,
    "indexSizes" : {
    "_id_" : 324456384,
    "dim0_1" : 251583696
    },
    "ok" : 1
    }

    从这其中,我们想要计算出现的不同值的个数。可以用下列MR任务轻松完成这个工作:
    > db.runCommand(
    { maprece: "uniques",
    map: function () { emit(this.dim0, 1); },
    rece: function (key, values) { return Array.sum(values); },
    out: "mrout" })
    {
    "result" : "mrout",
    "timeMillis" : 1161960,
    "counts" : {
    "input" : 10000000,
    "emit" : 10000000,
    "rece" : 1059138,
    "output" : 999961
    },
    "ok" : 1
    }

    正如你在输出内容中看到的,这耗费了大概1200秒(在EC2 M3实例上进行的测试)。有1千万个map,1百万个rece,输出了999961个文档。结果就像下面这样:
    > db.mrout.find()
    { "_id" : 1, "value" : 10 }
    { "_id" : 2, "value" : 5 }
    { "_id" : 3, "value" : 6 }
    { "_id" : 4, "value" : 10 }
    { "_id" : 5, "value" : 9 }
    { "_id" : 6, "value" : 12 }
    { "_id" : 7, "value" : 5 }
    { "_id" : 8, "value" : 16 }
    { "_id" : 9, "value" : 10 }
    { "_id" : 10, "value" : 13 }
    ...

    使用排序

    我在上一篇博文中
    提到了在MR中使用排序多么有益。这个特性很少被理解。在这个例子中,处理未排序的输入意味着MR引擎将得到随机顺序的值,在RAM中根本无法
    rece。相反,它将不得不把所有文章写入一个临时收集的磁盘,然后按顺序读取并rece。让我们看看使用排序是否有助:
    > db.runCommand(
    { maprece: "uniques",
    map: function () { emit(this.dim0, 1); },
    rece: function (key, values) { return Array.sum(values); },
    out: "mrout",
    sort: {dim0: 1} })
    {
    "result" : "mrout",
    "timeMillis" : 192589,
    "counts" : {
    "input" : 10000000,
    "emit" : 10000000,
    "rece" : 1000372,
    "output" : 999961
    },
    "ok" : 1
    }

    确实大有助益!我们下降到192秒,已经提升了6倍。rece的数量基本相同,但现在它们在写入磁盘前,可以在RAM内完成。

    使用多线程

    MongoDB对单独的MR作业并不使用多线程——它仅仅对多作业使用多线程。但通过多核CPU,在单个服务器使用Hadoop风格来并行作业非常
    有优势。我们需要做的是把输入分成几块,通过各个块来加速一个MR作业。也许数据集有简单的方法来分割,但其他使用splitVector命令(不明确)
    可以使你很快的找到分割点:
    > db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})
    {
    "timeMillis" : 6006,
    "splitKeys" : [
    {
    "dim0" : 18171
    },
    {
    "dim0" : 36378
    },
    {
    "dim0" : 54528
    },
    {
    "dim0" : 72717
    },

    {
    "dim0" : 963598
    },
    {
    "dim0" : 981805
    }
    ],
    "ok" : 1
    }
    这个命令在超过1千万个文档中找到分割点仅仅需要花费5秒,很快!那么现在我们仅仅需要一个方法来创建多个MR作业。从一个应用服务器,使用多线程和为MR命令使用$gt/$It查询 相当简单。通过shell,你可以使用ScopedThread,使用方法如下:
    > var t = new ScopedThread(mapred, 963598, 981805)
    > t.start()
    > t.join()

    现在我们把一些快速运行的js代码放在一起,它们会产生4个线程(或者更多的线程),执行后呈现出下面的结果:
    > var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
    > var keys = res.splitKeys
    > keys.length
    39
    > var mapred = function(min, max) {
    return db.runCommand({ maprece: "uniques",
    map: function () { emit(this.dim0, 1); },
    rece: function (key, values) { return Array.sum(values); },
    out: "mrout" + min,
    sort: {dim0: 1},
    query: { dim0: { $gte: min, $lt: max } } }) }
    > var numThreads = 4
    > var inc = Math.floor(keys.length / numThreads) + 1
    > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
    min:0 max:274736
    min:274736 max:524997
    min:524997 max:775025
    min:775025 max:{ "$maxKey" : 1 }
    connecting to: test
    connecting to: test
    connecting to: test
    connecting to: test
    > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
    {
    "result" : "mrout0",
    "timeMillis" : 205790,
    "counts" : {
    "input" : 2750002,
    "emit" : 2750002,
    "rece" : 274828,
    "output" : 274723
    },
    "ok" : 1
    }
    {
    "result" : "mrout274736",
    "timeMillis" : 189868,
    "counts" : {
    "input" : 2500013,
    "emit" : 2500013,
    "rece" : 250364,
    "output" : 250255
    },
    "ok" : 1
    }
    {
    "result" : "mrout524997",
    "timeMillis" : 191449,
    "counts" : {
    "input" : 2500014,
    "emit" : 2500014,
    "rece" : 250120,
    "output" : 250019
    },
    "ok" : 1
    }
    {
    "result" : "mrout775025",
    "timeMillis" : 184945,
    "counts" : {
    "input" : 2249971,
    "emit" : 2249971,
    "rece" : 225057,
    "output" : 224964
    },
    "ok" : 1
    }
    "ok" : 1
    }
    {
    "result" : "mrout775025",
    "timeMillis" : 184945,
    "counts" : {
    "input" : 2249971,
    "emit" : 2249971,
    "rece" : 225057,
    "output" : 224964
    },
    "ok" : 1
    }

    第一个线程时间确实超过了其他的线程,但是平均每个线程仍然用了大约190s的时间.这意味着并没有一个线程快!这有点奇怪,自从用了‘top’,在某种程度上,你可以看到所有的内核运行情况。

    使用多数据库

    问题是在多线程之间会有很多锁竞争。在上锁时,MR并不是那么无私的(它每1000次读操作就会产生一次锁定),而且MR任务还会执行许多写操作,
    导致线程最终都会在等待另一个线程。由于每个MongoDB数据库都有私有锁,让我们尝试为每一个线程使用一个不同的输出数据库:
    > var mapred = function(min, max) {
    return db.runCommand({ maprece: "uniques",
    map: function () { emit(this.dim0, 1); },
    rece: function (key, values) { return Array.sum(values); },
    out: { replace: "mrout" + min, db: "mrdb" + min },
    sort: {dim0: 1},
    query: { dim0: { $gte: min, $lt: max } } }) }
    > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
    min:0 max:274736
    min:274736 max:524997
    min:524997 max:775025
    min:775025 max:{ "$maxKey" : 1 }
    connecting to: test
    connecting to: test
    connecting to: test
    connecting to: test
    > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
    ...
    {
    "result" : {
    "db" : "mrdb274736",
    "collection" : "mrout274736"
    },
    "timeMillis" : 105821,
    "counts" : {
    "input" : 2500013,
    "emit" : 2500013,
    "rece" : 250364,
    "output" : 250255
    },
    "ok" : 1
    }
    ...

    这才像话!我们现在降到了100秒,这意味着相比一个线程而言已经提升了2倍。还算差强人意吧。现在我们只有4个核所以只快了2倍,要是在8核CPU上将会快4倍,以此类推。

    使用纯JavaScript模式

    当把输入数据拆分到不同线程上去的时候,发生了一些有趣的事情:每个线程现在有大约250000个不同的值来输出,而不是1百万。这意味着我们可以
    使用“纯JS模式”,它可以通过使用jsMode:true来开启。开启后,MongoDB在处理时将不会把对象在JS和BSON之间来回翻译,相反,它
    使用一个限额500000个key的内部JS字典来化简所有对象。让我们看看这是否有用:
    > var mapred = function(min, max) {
    return db.runCommand({ maprece: "uniques",
    map: function () { emit(this.dim0, 1); },
    rece: function (key, values) { return Array.sum(values); },
    out: { replace: "mrout" + min, db: "mrdb" + min },
    sort: {dim0: 1},
    query: { dim0: { $gte: min, $lt: max } },
    jsMode: true }) }
    > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
    min:0 max:274736
    min:274736 max:524997
    min:524997 max:775025
    min:775025 max:{ "$maxKey" : 1 }
    connecting to: test
    connecting to: test
    connecting to: test
    connecting to: test
    > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
    ...
    {
    "result" : {
    "db" : "mrdb274736",
    "collection" : "mrout274736"
    },
    "timeMillis" : 70507,
    "counts" : {
    "input" : 2500013,
    "emit" : 2500013,
    "rece" : 250156,
    "output" : 250255
    },
    "ok" : 1
    }
    ...

    现在我们降到了70秒,就搞定了任务!jsMode真心有用,尤其是当对象有很多字段的时候。这里只有一个数字字段就已经下降了30%。

    MongoDB在2.6版本上的改进

    在很早的2.6版本中,在任何的js函数调用的时候,我们就通过一段代码设置一个可选参数”args“。这种做法并不标准,不在使用。但是它确有留下来的原因(查看 SERVER-4654)。让我们从Git资源库中导入MongoDB,编译并运行进行测试:
    ...
    {
    "result" : {
    "db" : "mrdb274736",
    "collection" : "mrout274736"
    },
    "timeMillis" : 62785,
    "counts" : {
    "input" : 2500013,
    "emit" : 2500013,
    "rece" : 250156,
    "output" : 250255
    },
    "ok" : 1
    }
    ...

    这是明显的提高了3倍的运行速度,时间降低到了60s,大约10-15%。这种变化也提高了整体JS引擎的堆消耗。
    声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
    备忘录在手机的哪里 刚性消费有哪些 中国经济快速增长的原因 什么是刚性消费 什么叫刚性增长 特别精辟的个性签名(非常经典的个性句子) 特别经典的个性签名(非常惊艳的个性句子) 文艺范十足的个性签名(温柔治愈的个性签名句子) wps文字怎么设置每页头和尾 27岁的女人需要补充哪些营养元素 哔哩哔哩的缓存视频如何定位到上次播放位置继续播放? 哔哩哔哩下载的视频,在文件夹中无法播放怎么办 别克和大众有什么区别 为什么说别克是一个真正成功的汽车品牌啊? 别克的汽车怎么样? 送钥匙扣代表这什么意思呢? 钥匙扣、龙凤柱 有什么寓意? 钥匙扣是什么 朋友送我一个钥匙扣什么意思? 女人送男人“钥匙扣”代表什么含义 在最重要的520情人节那天,突然收到对方寄过来的一个钥匙扣?钥匙扣代表什么意思呢? 钥匙扣有什么作用 送钥匙扣的含义是什么? 戴“算盘”小饰品有什么寓意? 抖音 粉丝 点赞 合拍 评论 页面如何切换 我想知道这张图片里所有动漫人物的名字,最好也有动漫人物的简介,和所在的动漫名字。 跪求PS1的游戏下载,模拟器我已经有了. 好像超级小黑咪吖那些游戏. 有谁知道在那下单机版的超级酷乐猫呀?就是和PS2上面的那个一样的~只不过是用电脑玩的而已 为什么steam战地4锁区? ,, 求超级酷乐猫/霹雳酷乐猫/超级小黑咪游戏和PS模拟器下载 姚策养母:错换人生28年的始末,这件事都有哪些细节值得斟酌? 双侧股总静脉中度返流是什么原因引起的,最终会导致什么结果? 什么是股静脉返流,是什么原因引起的股静脉返流?它和静脉血栓有什么关系. 请问大佬有两男变错身2011年上映的由瑞安·雷诺兹主演的高清视频在线观看资源吗 静脉返流是怎么回事 双侧股静脉瓣反流怎么得来的、怎么治疗,以后怎么预防 乏氏实验左侧股总静脉引流出8s返流信号是什么意思? 静脉回流是怎么回事 如果一个人没有及时纠正错误而变成坏习惯会很难更改,这个观点是对的吗? “焚琴煮鹤鞭名马,惟恐情多误美人”这句话什么意思 《女男变错身》女主角资料... 有关双侧股静脉瓣及股隐静脉瓣功能不全的问题 女男变错身的男主角是谁 双腿静脉曲张,腹壁有根血管曲张,想查病因【静脉曲张】 我想知道静脉回流是什么原因 左下肢大隐静脉曲张双侧股隐静脉瓣、左侧股静脉瓣功能不全 锌业股份会重组吗? 锌业股份能重组吗? 请专业律师给解答一下。关于锌业股份重整进展公告的解释问题。 锌业股份这次23号开始的停牌,大概会持续多久啊