Tag Archives: balancer

Sharded MongoDB config in Nutanix (3) : Backup & DR

Backing up sharded NoSQL databases can often require some additional consideration.  For example, any backup of a sharded MongoDB config needs to capture a backup for each shard and a single member of the configuration database quorum. The configuration database (configdb) holds the cluster metadata and so supports the ability to shard.  In a production environment you will need three config databases and they will all contain the same (meta)data. In this post I intend to cover the steps I recently used to backup a sharded MongoDB deployment using the snapshot technology available on my Nutanix platform.

First step prior to any backup should always be to stop the balancer. The balancer is responsible for migrating/balancing data “chunks” between the various shards. If such a migration is running while backing up then the resultant backup is potentially invalidated.

mongos> use config
switched to db config
mongos> sh.stopBalancer()
Waiting for active hosts...
Waiting for the balancer lock...
Waiting again for active hosts after balancer is off...
mongos>

At which point we can proceed to lock one of the secondary replicas in each shard. I outlined how to do this in my post relating to backing up replica sets. The command sequence is repeated below, note that this needs to be done on one secondary for each shard (and should only be done if running MMAPv1 storage engine on the replica):

rs01:SECONDARY> db.fsyncLock()
{
 "info" : "now locked against writes, use db.fsyncUnlock() to unlock",
 "seeAlso" : "http://dochub.mongodb.org/core/fsynccommand",
 "ok" : 1

Having locked the secondaries for writes, the next step is to create a virtual machine (VM) snapshot of a configdb and of a secondary belonging to each shard (replica set). Using the Nutanix Acropolis App Mobility Fabric as follows :

<acropolis> vm.snapshot_create mongo-configdb01,mongodb03,mongowt03 snapshot_name_list=mongoconfigdb01-bk,mongodb02-bk,mongowt03-bk
SnapshotCreate: complete

The above snapshots have all been created at once within a single consistency group. The next step will be to create clones from them…

<acropolis> vm.clone configdb01-clone clone_from_snapshot=mongoconfigdb01-bk
configdb01-clone: complete
<acropolis> vm.clone mongodb03-clone clone_from_snapshot=mongodb03-bk
mongodb03-clone: complete
<acropolis> vm.clone mongowt03-clone clone_from_snapshot=mongowt03-bk
mongowt03-clone: complete

At this point we can unlock each of the secondaries :

rs01:SECONDARY> db.fsyncUnlock()
{ "ok" : 1, "info" : "unlock completed" }
rs01:SECONDARY>

and re-enable the balancer:

mongos> use config
switched to db config
mongos> sh.setBalancerState(true)
mongos>

As of now I merely have the “bare bones” of a MongoDB cluster encapsulated in the three VM clones just created. The thing to bear in mind is that each clone generated from the replica snapshots contains only a subset of any sharded collection. Hopefully, ~50% each, if our shard key selection is any good! That means we can’t just proceed as in previous posts and bring up each clone as a standalone MongoDB instance. The simplest way to make use of the current clones might be to just rsync any data to new hosts in a freshly sharded deployment. So essentially, we would just transfer the data to the required volumes on the newly set up VMs. In any case, there would still be some work to do around the replica set memberships and associated config.

Alternatively, to have access to any sharded collection held in my newly created clones above. I could begin by reconfiguring each replica clone as the new primary in the replica set and create additional configdb VMs that can be registered with a new mongos VM. Recall that mongos is stateless, and gets its info from the configdbs. At which stage we can re-register the replica shards within the configdb service. For example, here’s the state of the replica sets after they have been cloned:

> rs.status()
{
 "state" : 10,
 "stateStr" : "REMOVED",
 "uptime" : 97,
 "optime" : Timestamp(1443441939, 1),
 "optimeDate" : ISODate("2015-09-28T12:05:39Z"),
 "ok" : 0,
 "errmsg" : "Our replica set config is invalid or we are not a member of it",
 "code" : 93
}
> rs.conf()
{
 "_id" : "rs01",
 "version" : 7,
 "members" : [
 {
 "_id" : 0,
 "host" : "10.68.64.111:27017",
 "arbiterOnly" : false,
 "buildIndexes" : true,
 "hidden" : false,
 "priority" : 1,
 "tags" : {

 },
 "slaveDelay" : 0,
 "votes" : 1
 },
 {
 "_id" : 1,
 "host" : "10.68.64.131:27017",
 "arbiterOnly" : false,
 "buildIndexes" : true,
 "hidden" : false,
 "priority" : 1,
 "tags" : {

 },
 "slaveDelay" : 0,
 "votes" : 1
 },
 {
 "_id" : 2,
 "host" : "10.68.64.144:27017",
 "arbiterOnly" : false,
 "buildIndexes" : true,
 "hidden" : false,
 "priority" : 1,
 "tags" : {

 },
 "slaveDelay" : 0,
 "votes" : 1
 }
 ],
 "settings" : {
 "chainingAllowed" : true,
 "heartbeatTimeoutSecs" : 10,
 "getLastErrorModes" : {

 },
 "getLastErrorDefaults" : {
 "w" : 1,
 "wtimeout" : 0
 }
 }
}

So first off we need to set each cloned replica VM as the new replica set primary and remove the no longer required (or available) hosts from the set membership :

> cfg=rs.conf()
> printjson(cfg) 
> cfg.members = [cfg.members[0]]
[
 {
 "_id" : 0,
 "host" : "10.68.64.111:27017",
 "arbiterOnly" : false,
 "buildIndexes" : true,
 "hidden" : false,
 "priority" : 1,
 "tags" : {
 },
 "slaveDelay" : 0,
 "votes" : 1
 }
]
 
> cfg.members[0].host="10.68.64.153:27017"
10.68.64.153:27017

> rs.reconfig(cfg, {force : true})
{ "ok" : 1 }

rs01:PRIMARY> rs.status()
{
 "set" : "rs01",
 "date" : ISODate("2015-10-06T14:02:23.263Z"),
 "myState" : 1,
 "members" : [
 {
 "_id" : 0,
 "name" : "10.68.64.152:27017",
 "health" : 1,
 "state" : 1,
 "stateStr" : "PRIMARY",
 "uptime" : 396,
 "optime" : Timestamp(1443441939, 1),
 "optimeDate" : ISODate("2015-09-28T12:05:39Z"),
 "electionTime" : Timestamp(1444140137, 1),
 "electionDate" : ISODate("2015-10-06T14:02:17Z"),
 "configVersion" : 97194,
 "self" : true
 }
 ],
 "ok" : 1
}

Once you have done this for all the required replica sets (these are your shards dont forget), the next step is to set up the configdb clone and create additional identical VMs that will contain the cluster metadata. The configdbs can be verified for correctness as follows :

configsvr> db.runCommand("dbhash")
{
 "numCollections" : 14,
 "host" : "localhost.localdomain:27019",
 "collections" : {
 "actionlog" : "bd8d8c2425e669fbc55114af1fa4df97",
 "changelog" : "fcb8ee4ce763a620ac93c5e6b7562eda",
 "chunks" : "bd7a2c0f62805fa176c6668f12999277",
 "collections" : "f8b0074495fc68b64c385bf444e4cc90",
 "databases" : "c9ee555dde6fc84a7bbdb64b74ef19bd",
 "lockpings" : "ba67ca64d12fd36f8b35a54e167649a8",
 "locks" : "c226b1a2601cf3e61ba45aeab146663d",
 "mongos" : "690326c2edcb410eeeb9212ad7c6c269",
 "settings" : "ce32ef7c2b99ca137c5a20ea477062f7",
 "shards" : "77d49755ba04fe38639c5c18ee5be78d",
 "tags" : "d41d8cd98f00b204e9800998ecf8427e",
 "version" : "14e1d35ba0d32a5ff393ddc7f16125a1"
 },
 "md5" : "61bde8ac240aead03080f4dde3ec2932",
 "timeMillis" : 43,
 "fromCache" : [ ],
 "ok" : 1
}

The above hashes in bold need to agree across the configdb membership. They are key to having all configdb servers in agreement. Once you have the configdbs enabled, then register them with a newly created mongos VM. Below, I am just using a single configdb to test for correctness. A production setup should always have three per cluster:

 mongos --configdb 10.68.64.151:27019

The next issue will be to correct the configdb shard info.  So as you can see from the mongos session below, the replica info in the configdb is still referring to the previous deployment:

mongos> db.adminCommand( { listShards: 1 } )
{
 "shards" : [
 {
 "_id" : "rs01",
 "host" : "rs01/10.68.64.111:27017,10.68.64.131:27017,10.68.64.144:27017"
 },
 {
 "_id" : "rs02",
 "host" : "rs02/10.68.64.110:27017,10.68.64.114:27017,10.68.64.137:27017"
 }
 ],
 "ok" : 1
}

We can correct the above setup to reflect our newly cloned shard/replica VMs. In a mongo shell session on the configdb server VM.  :

use config
configsvr> db.shards.update({_id: "rs01"} , {$set: {"host" : "10.68.64.152:27017"}})
configsvr> db.shards.update({_id: "rs02"} , {$set: {"host" : "10.68.64.153:27017"}})

You will have to restart the mongos server so that it picks up the new info from the configdb server.

mongos> db.adminCommand( { listShards: 1 } )
{
 "shards" : [
 {
 "_id" : "rs01",
 "host" : "10.68.64.152:27017"
 },
 {
 "_id" : "rs02",
 "host" : "10.68.64.153:27017"
 }
 ],
 "ok" : 1

And that, as they say, is how babies get made. At this stage you have a MongoDB cluster consisting of a configdb, registered with a mongos server, that can access both shards, formed of a replica set, formed of a single primary member. To flesh this out to production standards you could increase the configdb count (to 3) and add secondaries to the replica sets for higher availability. With some additional work perhaps (ie : renaming replica sets ?) this could form the basis of a Dev/QA system, containing a potential production workload.

Sharded MongoDB config on Nutanix (1) : Deployment

So far I have posted on MongoDB deployments either as standalone or as part of a replica set. This is fine when you can size your VM memory to hold the entire database working set. However, if your VM’s RAM will not accommodate the working set in memory, you will need to shard to aggregate RAM from multiple replica sets and form a MongoDB cluster.

Having already discussed using clones of gold image VMs to create members for a replica set, then the most basic of MongoDB clusters requires at least two replica sets. On top of which we need a number of MongoDB “infrastructure” VMs that make MongoDB cluster operation possible. These entail a minimum of three (3) Configuration Databases (mongod –configsvr) per cluster and around one (1) Query Router (mongos) for every two shards. Here is the layout of a cluster deployment on my lab system:

2shard-system

In the above lab deployment, for availability considerations, I avoid co-locating any primary replica VM on the same physical host, and likewise any of the Query Router or ConfigDB VMs. One thing to bear in mind is that sharding is done on a per collection basis. Simply put, the idea behind sharding is that you split the collections across the replica sets and then by connecting to a mongos process you are routed to the appropriate shard holding the part of the collection that can serve your query. The following commands show the syntax to create one of the three required configdb’s (ran on three separate VMs, and need to be started first), and a Query Router, or mongos process (where we add the IP addresses of each configdb server VM) :

Config DB Servers – each ran as:
mongod --configsvr --dbpath /data/configdb --port 27019

Query Router - ran as:
mongos --configdb 10.68.64.142:27019,10.68.64.143:27019,10.68.64.145:27019

- the above IP addresses in mongos command line are the addresses of each config DB.

This brings up an issue if you are not cloning replica VMs from “blank” gold VMs. By cloning a new replica set from a current working replica set, ie: so that you essentially have each replica set holding a full copy of all your databases and their collections. Then when you come to add such a replica set as a shard, you generate the error condition shown below.

Here’s the example of what can happen when you attempt to shard and your new replica set (rs02)  is simply cloned off a current running replica set (rs01):

mongos> sh.addShard("rs02/192.168.1.52")
{s
 "ok" : 0,
 "errmsg" : "can't add shard rs02/192.168.1.52:27017 because a local database 'ycsb' 
exists in another rs01:rs01/192.168.1.27:27017,192.168.1.32:27017,192.168.1.65:27017"
}

This is the successful workflow adding both shards (the primary of each replica set) via the mongos router VM:

$ mongo --host localhost --port 27017
MongoDB shell version: 3.0.3
connecting to: localhost:27017/test
mongos>
 
mongos> sh.addShard("rs01/10.68.64.111")
{ "shardAdded" : "rs01", "ok" : 1 }
mongos> sh.addShard("rs02/10.68.64.110")
{ "shardAdded" : "rs02", "ok" : 1 }

We next need to enable sharding on the database and subsequently shard on the collection we want to distribute across the replica sets available. The choice of shard key is crucial to future MongoDB cluster performance. Issues such as read and write scaling, cardinality etc are covered here. For my test cluster I am using the _id field for demonstration purposes.

mongos> sh.enableSharding("ycsb")
{ "ok" : 1 }

mongos> sh.shardCollection("ycsb.usertable", { "_id": 1})
{ "collectionsharded" : "ycsb.usertable", "ok" : 1 }

The balancer process will run for the period of time needed to migrate data between the available shards. This can take anywhere from a number of hours to a number of days depending on the size of the collection, the number of shards, the current workload etc. Once complete however, this results in the following sharding status output. Notice  the “chunks” of the usertable collection held in the ycsb database are now shared across both shards (522 chunks in each shard) :

 mongos> sh.status()
--- Sharding Status ---
 sharding version: {
 "_id" : 1,
 "minCompatibleVersion" : 5,
 "currentVersion" : 6,
 "clusterId" : ObjectId("55f96e6c5dfc4a5c6490bea3")
}
 shards:
 { "_id" : "rs01", "host" : "rs01/10.68.64.111:27017,10.68.64.131:27017,10.68.64.144:27017" }
 { "_id" : "rs02", "host" : "rs02/10.68.64.110:27017,10.68.64.114:27017,10.68.64.137:27017" }
 balancer:
 Currently enabled: yes
 Currently running: no
 Failed balancer rounds in last 5 attempts: 0
 Migration Results for the last 24 hours:
 No recent migrations
 databases:
 { "_id" : "admin", "partitioned" : false, "primary" : "config" }
 { "_id" : "enron_mail", "partitioned" : false, "primary" : "rs01" }
 { "_id" : "mydocs", "partitioned" : false, "primary" : "rs01" }
 { "_id" : "sbtest", "partitioned" : false, "primary" : "rs01" }
 { "_id" : "ycsb", "partitioned" : true, "primary" : "rs01" }
 ycsb.usertable
 shard key: { "_id" : 1 }
 chunks:
 rs01 522
 rs02 522
 too many chunks to print, use verbose if you want to force print
 { "_id" : "test", "partitioned" : false, "primary" : "rs02" }

Additional Links: