MongoDB MapReduce with Hadoop

at June 20th, 2013

This article is part two of two detailing my talk at MongoDB San Francisco 2013.
The video and slides for the entire talk can be found here. Part one can be found here.

Mongo offers the useful feature of MapReducing over a collection, making it possible to run operations that don’t necessarily fit Mongo’s normal query model. For example, the Merchant Data team uses a MapReduce job to join our collection of places against another data set. Unfortunately, this method can take many hours for large collections and negatively affects other Mongo queries. Since MapReduce is a batch operation, there is a mismatch when a single Mongo cluster maps over a 500GB collection and also serves latency-sensitive queries from an application. We use Hadoop for most batch jobs, and it excels at scheduling this sort of large workload. Additionally, using Hadoop MapReduce enables other useful capabilities from the Hadoop ecosystem, such as Hive queries.

One reason for Mongo’s MapReduce performance is that it runs in the embedded Javascript engine. Documents must be deserialized from BSON to JSON before the engine is invoked for processing. In Mongo 2.4 the older SpiderMonkey engine was replaced by the V8 Javascript engine, which improves the situation, but still can’t compete with Hadoop in terms of performance. Hadoop has been designed to MapReduce over large data sets efficiently and allows tuning and monitoring of jobs, which makes it a natural choice for large batch operations.

There are two options for MapReducing over Mongo data in Hadoop.

  • Export data as BSON or JSON which involves executing a cursor over an entire collection and dumping the results to flat files, then importing these to HDFS. We have several collections of more than 200GB and found this took much too long.
  • Execute one or more cursors within a Hadoop InputFormat. This is slightly more efficient and works by querying the collection as the job executes, so it has the advantage that the data is up to date. Unfortunately, for large collections that exceed the size of memory this process is also prohibitively expensive, and it fails to create locality in your mappers since they must transfer in data from the machine that stores the Mongo collection.

To solve this problem we backup raw Mongo data files to our Hadoop Distributed File System (HDFS), then read them directly using an InputFormat. This approach has the drawback of not reading the most current Mongo data for each MapReduce, but it means we have a backup of our data in HDFS and can map over an entire collection faster because of the throughput of our Hadoop cluster. Moving data from a sharded Mongo cluster into HDFS, however, has challenges of its own.

Sharded Backups

Backing up a sharded Mongo cluster is more complicated than backing up a single machine, since data often moves between shards, which can break the consistency of the backup if it is not coordinated among the shards of the cluster.

These are the steps we use to backup our Mongo cluster:

  • First, stop the shard balancer.
    This will prevent data from moving between shards during the backup process and can be done in the shell with the command sh.stopBalancer().
  • In each shard find one member that is not the current replica set master. We do this programmatically with the listshards and ismaster commands.
  • Stop writes on this replica to prevent the data on disk from changing.
    You can accomplish this from the shell with the command db.fsyncLock(). Note that profiling should be turned off when this happens.
  • Load Mongo’s state files from disk into HDFS.
    This operation is efficient because it’s just a copy into a distributed file system.
  • Once the copy finishes, unlock each replica and start the shard balancer.
A member of each replica set is locked for writes while copying data into HDFS.

A member of each replica set is locked for writes while copying data into HDFS.

If the operations in a replica set that occur during the fsync lock exceed the size of the oplog, then the member could be stuck, and would need to be synced from scratch. In practice, we have seen this only rarely, since the file copy is efficient and the oplog is is 5% of available disk space by default. A heavy write load during the backup makes this more likely because it will create more oplog entries.

We have scripted the backup process and run it regularly with a cron job. One member of each replica set will be unable to receive writes while it runs. Be sure to size your replica set appropriately to handle this. There are other methods for sharded backups that may be faster overall, such as snapshotting a filesystem in each shard, but this process has been fast enough for us and means our data is available in Hadoop.

Mongo’s Data Format

Once the backup process finishes, our Mongo data is loaded into HDFS and can be used to restore our cluster, but we have a problem if we want to read it directly: it is still in Mongo’s proprietary binary on-disk format.

If you look in the data directory of a Mongo instance, it probably looks similar to the following.


Where place_db is the name of a database. The first file, place_db.ns, is the namespace file, which is a hashtable of records containing information about a collection in the database. The important parts of this record are the collection name and pointers to the first and last extents of the collection. An extent is a chunk of Mongo records stored in one of the data files (which have numbers as their extension).

The namespace file contains a hash table of records with collection information.

The namespace file contains a hash table of records with collection information.

Extents are referenced by an object called a DiskLoc, which looks like:

struct DiskLoc { 
  int fileNum; 
  int offset;  

The fileNum value refers to the data file number so a fileNum of 1 means the extent is in place_db.1. The offset is the byte offset within this file. Data files double in size as new ones are added, but have a maximum size of 2GB, so an integer is sufficient for the offset value. DiskLoc values are used frequently in Mongo’s data format as pointers within the space of data files.

The DiskLoc values from the namespace file point to extent locations, and a Mongo collection is stored in one or more extents. Each extent has DiskLoc objects for the next and previous extents in the collection, creating a doubly linked list. An extent stores some metadata about itself and the locations of the first and last records it contains.

Each record on disk stores metadata followed by the BSON payload. BSON is a standard created by 10gen to store JSON documents in binary and is used to store a single document. A record’s metadata includes the locations of the next and previous records in the Extent which means these are also a doubly-linked list.

A collection contains a doubly-linked list of extents, which each contain a doubly-linked list of records.

A collection contains a doubly-linked list of extents which each contain a doubly-linked list of records.

Integrating with Hadoop

Using our knowledge of Mongo’s binary format, it is possible to read from these files directly in a MapReduce job. The Mongo extents align neatly Hadoop’s InputSplit abstraction; InputSplits are intended to be groups of records with spatial locality. When a MapReduce job executes, InputSplits are handed one by one to the mappers which read records out of the Split and hand each to the map function.

InputSplits exploit locality on two levels. They read groups of records from disk that use the CPU cache as we iterate over them. InputSplit can also be processed by a Mapper on the machine which already stores the data file. This would mean no transfer is necessary for the mapping phase. The result is an efficient and parallel read operation over our binary Mongo data. The job exploits the disk-read throughput and memory size of a Hadoop cluster without interfering with operations on the Mongo cluster.

Each extent is represented as an InputSplit, which allows read access for a MapReduce job.

Each extent is represented as an InputSplit, which allows read access for a MapReduce job.

We manage InputSplits in another Hadoop abstraction called an InputFormat. The InputFormat is used during MapReduce job configuration and creates the InputSplits. The InputFormat also uses a RecordReader which iterates over the records of an InputSplit. In this case, the RecordReader iterates over the BSON records in an Extent. We pass the InputFormat to the MapReduce Job object and configure it based on the data’s location and the collection over which to map.

With the MongoInputFormat, this looks a bit like:



We configure the InputFormat for the location of our Mongo files within HDFS, and tell it which database and collection we want to read. The InputFormat will take care of opening the Mongo files at this location and generating InputSplits.

The MongoInputFormat passes the map function the document’s _id field as a Text object and a WritableBSONObject which contains the entire document.

public static class Map extends Mapper {
    public void map(Text key, WritableBSONObject value, Context context)      
            throws IOException, InterruptedException {                        
        String id = (String) value.get(“_id”);
        context.write(null, new Text(id));

This is a trivial mapper that reads the _id value from a Mongo document, and emits it as text.

This overall process is useful because we kill two birds with one stone; we back up our Mongo data to a safe location and we can read it at high throughputs using our existing Hadoop toolset. A disadvantage here is that these MapReduce jobs operate on offline data so they will not include changes to the collection made after the backup process. If this fits your use case, we hope that this library will be useful, and we have open-sourced the MongoInputFormat code at GitHub.

5 thoughts on “MongoDB MapReduce with Hadoop

  1. Peter, thank you for this post, this is very helpful. We also have a setup that requires data to be exchanged between Hadoop and MongoDB. However, we need to go the other way around: we generate large amounts of data in Hadoop and need to write that data to MongoDB. Currently we are writing the data over the network to a couple of mongos servers using a custom OutputFormat, similar to what you have in your library that you posted on Git. However, the process is very slow and MongoDB becomes unresponsive to clients that are reading data while the load is in progress. In addition, it ties up our Hadoop cluster for a long time. So I figured perhaps we can write the MongoDB native files in HDFS (we assume that the database is initially empty). Writing the namespace file might be challenging in MapReduce though. What do you think? Patrick

    by Patrick Salami on July 18, 2013 at 3:05 am
  2. Thanks for the question, Patrick. As you mentioned, there are several options which come to mind for writing Hadoop data back to Mongo: - Insert into Mongo in the OutputFormat. This is currently what we do since we don't currently have a requirement to write from Hadoop to Mongo very often. This is simple, but as you mentioned, not a quick process for large data sizes because every document needs to be inserted. This can probably be optimized a bit more by buffering in the RecordWriter and inserting batches of documents (so that they get sent in a single message), but you're still limited by how fast Mongo can process the inserts, and you're still affecting other queries. - Write to a JSON or BSON file in the OutputFormat and import into Mongo. This could be a little faster, but means you would have to move the file from HDFS to a location accessible from the mongoimport/mongorestore utilities. Depending on your requirements, this may not be fast enough. - Write to Mongo native files and open in MongoDB. This would also require copying files out of HDFS, but afterwards they would be immediately accessible in Mongo. I haven't tried this, but its definitely possible and I'd love something like this in the library. As you mentioned, it would involve writing a namespace file and at least one data file and writing your documents into Mongo's record and extent scheme. The RecordWriter would have to be a bit more complex because each record needs to have a pointer to the next and previous records. Perhaps more challenging, each extent also needs to know the locations of the previous and next extents. You would also need a method of grouping records into extents and data files. So basically, my feeling is that this would be difficult but very useful if it was working, you're definitely not the only one with this problem. I'd be curious to hear about how you guys end up handling/optimizing this problem.

    by Peter Bakkum on July 18, 2013 at 12:23 pm
  3. Peter, thanks for your reply, it's very encouraging. I'm going through your code right now and I'm also looking at the MongoDB source code to get a better understanding of the native file format. I'll take a shot at this and see how far I get. Can I message you privately in case I have any questions or to share progress?

    by Patrick Salami on July 18, 2013 at 6:37 pm
  4. Sure, email me at

    by Peter Bakkum on July 22, 2013 at 10:33 am
  5. Hi Patrick, We are stuck exactly at the same point where you were. We want 300 million rows from hdfs to mongodb. Any suggestions??

    by on May 8, 2014 at 6:23 am

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>