Tag Archives: big data

Nutanix: Cloud-like DevOps powering NoSQL for BigData

The popularity of NoSQL has increasingly come about as developers want to use the same in-memory data structures in their applications and have them map directly into a database persistence layer. For example, storing data in XML or JSON format is often hierarchical and potentially does not lend itself to being easily stored in row based tables. It becomes more complicated if the data also contains lists and objects. Not having to convert these in-memory structures into relational database structures is a major advantage in terms of time to value. Such considerations have been made all the more acute by the rise of the web as a platform for services. There’s also an economic aspect, like the prohibitive infrastructure costs required to scale up traditional RDBMS to support high availability etc. Compare this to such Web-Scale or cloud aware apps like NoSQL, which expects to “just drop in” commodity hardware at the infrastructure layer and scale out horizontally on demand.

So if we were to consider the requirements from a modern hyper-converged infrastructure (HCI) that employed the same Web-Scale paradigms used by modern cloud-aware applications. Then to deploy apps, like a NoSQL database for example, the first thing I would want to do is virtualise. This means a right-sized, sandboxed environment (ie a virtual machine) to run individual NoSQL instances. If there was a need to scale up, then it’s a simple case of increasing RAM and CPU. As the application landscape grows over time and starts to scale out, there’s increased need for more nodes/VMs.  Hence, any HCI platform needs cloud like provisioning of nodes. So providing faster time to deploy and time to value. The ability to auto-discover and add new nodes by the click of a button is quite compelling. In short, horizontal scale out needs to be easily undertaken. Say, in the middle of the production day, while running the month end workload?

Intelligent, automated data tiering, locality and balancing via post-process techniques like Mapreduce is another key requirement. As any database working set grows over time, ie: more users will mean more queries, new tables, indexes, aggregations, etc. So the ability to maintain a responsive I/O profile via SSD, as more I/O is periodically obtained from disk, will be key. If all VMs are then able to get local access to their data via SSD from a global storage fabric so much the better. While we are here, consider how you would migrate to a new(er) hardware fleet with and without a distributed storage fabric. Far easier to just drop in units of converged compute/storage and then migrate VMs to it. Compare how that would work with a large white box server estate spread across numerous racks in a DC? There’s yet another aspect of economics to all this. In that auto tiering of the storage layer means the current “working set” data is held at the most performant (and by comparison more expensive) layer. While colder data sits on cheaper spinning disk.

Another advantage of a distributed storage fabric is one of data service features. Take point in time (PIT) backups of sharded DBs, which can sometimes be a complicated issue. In which case, a data service that supports VM centric snapshots of key VMs in a consistency group can avoid another potential pain point. Also, rapid cloning of preconfigured VMs will improve deployment times and speaks to the DevOps workflows that many IT shops have increasingly adopted. Consider how easy might it be to create dev/QA environments with production style data using such mechanisms? What about burst workloads? The ability to migrate VMs between public and private cloud would bring further benefits, both as a means to provide offsite backups or move VMs between geographies.

Bear in mind there isn’t 20+ years of ecosystem software (or even tribal knowledge perhaps?) in the NoSQL community – unlike in traditional RDBMS. For this reason continual monitoring is a major requirement. The ability to support a floor to ceiling overview of VMs, hypervisor and hardware platform in terms of performance, alerts and events is paramount. We mentioned briefly above how working set size and IO throughput could affect end user experience. So the ability to predict trends in such behaviour means timely decisions about when to scale or shard an application can be made.  No discussion of any DevOps processes is complete without including REST API and/or Powershell automation capabilities. Automation is key in terms of workflow agility, allowing routine tasks to be performed repeatedly with a well understood outcome. Dev/QA environments can benefit greatly from the features already described. In addition, via the API, developers can build self-service portal software allowing them to spin up new environments in a matter of minutes.

In previous roles I worked with customers running UNIX based failover clusters protecting traditional SQL RDBMS and ERP software. Think Solaris and SUN Cluster, underpinning Oracle and SAP installs.  While running this kind of ‘Big Iron’ was considered ‘state of the art’. Coming up fast on the inside was ‘Big Data’ and with it a complete rethink on how to achieve massive scale. Traditionally, systems had scaled vertically by adding more CPU and RAM to the host platform, and horizontally by adding system boards to a midframe chassis. This came at a price and often a staggering level of administrative complexity. While Web-Scale technologies may not have completely replaced this approach yet, large scale big iron systems will continue to become more niche as time goes on in my opinion.

So, coming back to the beginning of this post. HCI is not only about scaling just to support Big Data workloads, it’s also about creating lower time to value and radical ease of use synergies with the application that sits on top of the stack. Having a HCI platform designed from the ground up with the same underlying principles as modern Web-Scale applications, means we are able to remove the operational delays and complexity that tend to act as drag anchors in today’s rapid deployment environments. IT departments are then free to focus on innovations that help the business succeed.

Getting started with MongoDB shell and pymongo

In my last blog article I described how to setup a MongoDB instance in a VM. In order to use that VM and run various diagnostic commands, we are going to need a some data to play with. The easiest way to get data is to use a data science archive . I was able to find the Enron Mail Corpus in mongodump format (credit for this must go to Bryan Nehl). This then becomes trivially easy to import the ~500,000 emails in the corpus in MongoDB document format. See below.

We uncompress and extract the downloaded tarfile to get the dump directory structure…

drwxr-xr-x. 3 mongod mongod 4096 Jan 18 2012 dump
-rw-r--r--. 1 mongod mongod 1459855360 Feb 2 2012 enron_mongo.tar

and using the mongorestore utility to load the database – we don’t specify additional cli options as mongorestore will look for the dump directory structure in the current directory by default:

$ mongorestore
2015-07-28T14:03:20.079+0100 using default 'dump' directory
2015-07-28T14:03:20.108+0100 building a list of dbs and collections to restore fr om dump dir
2015-07-28T14:03:20.131+0100 no metadata file; reading indexes from dump/enron_ma 
2015-07-28T14:03:20.140+0100 restoring enron_mail.messages from file dump/enron_m 
2015-07-28T14:03:23.124+0100 [##......................] enron_mail.messages 142.0 MB/1.4 GB (10.2%)
2015-07-28T14:03:26.124+0100 [#####...................] enron_mail.messages 337.5 MB/1.4 GB (24.2%)
2015-07-28T14:03:29.124+0100 [########................] enron_mail.messages 499.2 MB/1.4 GB (35.9%)
2015-07-28T14:03:32.124+0100 [###########.............] enron_mail.messages 645.9 MB/1.4 GB (46.4%)
2015-07-28T14:03:35.124+0100 [##############..........] enron_mail.messages 828.4 MB/1.4 GB (59.5%)
2015-07-28T14:03:38.124+0100 [#################.......] enron_mail.messages 1003.7 MB/1.4 GB 
2015-07-28T14:03:41.124+0100 [####################....] enron_mail.messages 1.1 GB/1.4 GB (83.5%)
2015-07-28T14:03:44.124+0100 [######################..] enron_mail.messages 1.3 GB/1.4 GB 
2015-07-28T14:03:45.326+0100 restoring indexes for collection enron_mail.messages from metadata
2015-07-28T14:03:45.372+0100 finished restoring enron_mail.messages
2015-07-28T14:03:45.372+0100 done

We can now see the database in a local mongo shell session :

> show dbs
enron_mail 1.435GB
local 0.000GB

To remove the database for any reason. For example, say you need to run subsequent benchmarks that reload a test database. Then run the following command to drop the current database prior to reloading it afresh.

from the mongo shell using sbtest database as an example ….

> use sbtest
switched to db sbtest
> db.runCommand( { dropDatabase: 1 } )
{ "dropped" : "sbtest", "ok" : 1 }

Configuration and sizing

The following commands can be used to size a database working set. This is useful in terms platform design and capacity planning. The db.serverStatus() command gives a great deal of information about the running instance. We will only concern ourselves with the memory component at this point. Note that it is imperative for good performance that the working set and associated indexes are always held in RAM. So, for pre 3.0 versions of MongoDB then :

"pagesInMemory" : 91521

Multiply working set pages by PAGESIZE to get size in bytes

# getconf PAGESIZE

The db.stats() command provides the size of the indexes in use


So for our example this can be calculated as follows :

(915211 * 4096) + 7131826688 ~ 6GB

As of MongoDB 3.0 the working set section is no longer available – the document now returns:

> db.serverStatus().mem
 "bits" : 64,
 "resident" : 20466,
 "virtual" : 148248,
 "supported" : true,
 "mapped" : 73725,
 "mappedWithJournal" : 147450

The above sizes (in bold) are in megabytes (MB), and correspond respectively to the virtual memory of the mongod process, the amount of mapped memory and the amount of mapped memory including the memory used for journaling. These numbers can be used in order to allocate sufficient RAM to your guest VM database host.

The following db.hostInfo() command reveals among other things, the instruction set supported by the VM, the various operating system limit settings and whether NUMA is disabled:

> db.hostInfo()
 "system" : {
 "currentTime" : ISODate("2015-07-28T13:45:36.093Z"),
 "hostname" : "mongowt01",
 "cpuAddrSize" : 64,
 "memSizeMB" : 64427,
 "numCores" : 8,
 "cpuArch" : "x86_64",
 "numaEnabled" : false
 "os" : {
 "type" : "Linux",
 "name" : "CentOS release 6.6 (Final)",
 "version" : "Kernel 2.6.32-504.el6.x86_64"
 "extra" : {
 "versionString" : "Linux version 2.6.32-504.el6.x86_64 
 (mockbuild@c6b9.bsys.dev.centos.org) (gcc version 4.4.7 20120313 (Red Hat 4.4.7-11) (GCC) ) #1 SMP Wed Oct 15 04:27:16 UTC 2014",
 "libcVersion" : "2.12",
 "kernelVersion" : "2.6.32-504.el6.x86_64",
 "cpuFrequencyMHz" : "2799.998",
 "cpuFeatures" : "fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss syscall nx pdpe1gb lm constant_tsc rep_good unfair_spinlock pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm xsaveopt fsgsbase smep erms",
 "pageSize" : NumberLong(4096),
 "numPages" : 16493566,
 "maxOpenFiles" : 65536
 "ok" : 1


In order to take a crash consistent backup then the following command sequence is required before and after the backup :

> db.fsyncLock()
 "info" : "now locked against writes, use db.fsyncUnlock() to unlock",
 "seeAlso" : "http://dochub.mongodb.org/core/fsynccommand",
 "ok" : 1

perform host level OS backup or better still, take VM-centric snapshot and then …

> db.fsyncUnlock()
{ "ok" : 1, "info" : "unlock completed" }

Delving into the database structure, show collections will list the document collections within a database (in this case, the previously loaded enron_mail db) and you can use that information to inspect individual documents:

> show collections

These next commands can be used to retrieve a document or set of documents. The document below has been edited to retain the privacy of the original sender.

> db.messages.findOne()
"_id" : ObjectId("4f16fc97d1e2d32371003e27"),
"body" : "the scrimmage is still up in the air...\n\n\nwebb said that they didnt want to scrimmage...\n\nthe aggies are scrimmaging each other... (the aggie teams practiced on \nSunday)\n\nwhen I called the aggie captains to see if we could use their field.... they \nsaid that it was tooo smalll for us to use...\n\n\nsounds like bullsh*t to me... but what can we do....\n\n\nanyway... we will have to do another practice Wed. night.... and I dont' \nknow where we can practice.... any suggestions...\n\n\nalso, we still need one more person...",
"subFolder" : "notes_inbox",
db.messages.findOne({_id: "4f16fc97d1e2d32371003e27" })

Just for the record – the database can be manually shutdown using:

>use admin

Performance issues

db.currentOp() is one of the commands available from the database profiler that allows admins to locate any queries or write operations that are running slow.

> db.currentOp()
 "inprog" : [
 "desc" : "conn374",
 "threadId" : "0x16574c340
 "connectionId" : 374,
 "opid" : 1032339378,
 "active" : true,
 "secs_running" : 0,
 "microsecs_running" : NumberLong(105738),
 "op" : "insert",
 "ns" : "sbtest.sbtest6",
 "insert" : {

A badly behaving database operation can be killed using :

> db.killOp(1032339378)
{ "info" : "attempting to kill op" }

In order to see the five most recent operations that took 100 milliseconds (the default) or more, you can enable profiling – see below (output shortened)

setProfilingLevel() arguments are 0 for no profiling, 1 for only slow operations, or 2 for all operations. You can add a second argument to change the threshold for what is considered a slow db operation, for example this can be reduced to 10 ms.

> db.setProfilingLevel(2)
{ "was" : 0, "slowms" : 100, "ok" : 1 }

> db.system.profile.find()
"op" : "insert", 
"ns" : "sbtest.sbtest6", 
"query" : { 
 "_id" : 2628714, 
 "k" : 4804469, 
 "c" : "42025084972-52016328459-02616906732-06037924356-25803606931-90180435635-33434735556-64942463775-51942983544-69579223058", 
 "pad" : "83483501744-16275794559-91512432879-42096600452- 97899816846" 
 "ninserted" : 1, 
 "keyUpdates" : 0, 
 "writeConflicts" : 0,
 "numYield" : 0,
 "locks" : {
 "Global" : {
 "acquireCount" : { 
 "w" : NumberLong(720) 
 "Database" : { 
 "acquireCount" : { 
 "w" : NumberLong(720)
 "Collection" : {
 "acquireCount" : {
 "w" : NumberLong(720)
"millis" : 0,
 "execStats" : { },
 "ts" : ISODate("2015-07-28T16:37:01.959Z"),
 "client" : "",
 "allUsers" : [ ],
 "user" : "" }

So far we have simply been working on a previously created database. If we wanted to generate a workload, we would need to use a well known synthetic workload generator such as sysbench or YCSB (more on these in a future post). One other alternative, is using the pymongo device driver to connect to a MongoDB instance. Then use standard Python idioms to call the MongoDB API. To install the pymongo driver, either install the pre-packaged version from the EPEL repo (for RHEL based Linux) or download the git repo and build the driver manually.

sudo yum -y install epel-release
sudo yum -y install python-pip
sudo pip install pymongo


git clone git://github.com/mongodb/mongo-python-driver.git
cd mongod-python-driver
python setup.py install

The following python interpreter session shows the basics of connecting to a MongoDB instance and how to load documents into a collection. This could be extended to do various read and write based workloads depending on what you are looking to test or characterise.

create a database client connection :

>>> from pymongo import MongoClient
>>> uri = 'mongodb://'
>>> conn = MongoClient(uri)

create a document collection object:

>>> collection = conn.mydocs.docs

Inserting documents:

>>> doc1 = {'author': 'Ray Hassan', 'title': 'My first doc'}
 >>> conn.mydocs.docs.insert_one(doc1)
<pymongo.results.InsertOneResult object at 0x11b9780>
>>> doc2 = {'author': 'Ray Hassan', 'title': 'My 2nd doc'}
>>> conn.mydocs.docs.insert_one(doc2)
<pymongo.results.InsertOneResult object at 0x11b90f0>

retrieving documents via a python list:

>>> cursor = collection.find()
>>> for doc in cursor: print doc
{u'_id': ObjectId('55b8ec5bd7cf7a74c8bdd3bf'), u'author': u'Ray Hassan', u'title': u'My first doc'}
{u'_id': ObjectId('55b8ec69d7cf7a74c8bdd3c0'), u'author': u'Ray Hassan', u'title': u'My 2nd doc'}

If we wanted to improve the performance of a particular query we can use the explain() command. First lets take a look at the explain() output from a query that uses a document without an index

>>> collection.find({'author': 'Ray Hassan'}).explain()
{u'executionStats': {u'executionTimeMillis': 0, u'nReturned': 4, u'totalKeysExamined': 0, u'allPlansExecution': [], u'executionSuccess': True, u'executionStages': {u'docsExamined': 4, u'restoreState': 0, u'direction': u'forward',u'saveState': 0, u'isEOF': 1, u'needFetch': 0, u'nReturned': 4, u'needTime': 1, u'filter': {u'author': {u'$eq': u'Ray 
Hassan'}}, u'executionTimeMillisEstimate': 0, u'invalidates': 0, u'works': 6, u'advanced': 4, u'stage': u'COLLSCAN'}, u'totalDocsExamined': 4}, u'queryPlanner': {u'parsedQuery': {u'author': {u'$eq': u'Ray Hassan'}}, u'rejectedPlans': [], u'namespace': u'mydocs.docs', u'winningPlan': {u'filter': {u'author': {u'$eq': u'Ray Hassan'}}, u'direction': u'forward', u'stage': u'COLLSCAN'}, u'indexFilterSet': False, u'plannerVersion': 1}, u'serverInfo': {u'host': u'mongowt01', u'version': u'3.0.3', u'port': 27017, u'gitVersion': u'b40106b36eecd1b4407eb1ad1af6bc60593c6105 modules: enterprise'}

Without an index the query above has to perform a full scan of the collection (COLLSCAN) and we get 4 documents returned (nReturned). In order to improve the performance of the query we could consider adding an index to one of the document fields.

>>>from pymongo import ASCENDING, DESCENDING
>>> collection.create_index([('author', ASCENDING)])

>>> collection.find({'author': 'Ray Hassan'}).explain()
{u'executionStats': {u'executionTimeMillis': 0, u'nReturned': 4, u'totalKeysExamined': 4, u'allPlansExecution': [], u'executionSuccess': True, u'executionStages': {u'restoreState': 0, u'docsExamined': 4, u'saveState': 0, u'isEOF': 1, u'inputStage': {u'matchTested': 0, u'restoreState': 0, u'direction': u'forward', u'saveState': 0, u'indexName': 
u'author_1', u'dupsTested': 0, u'isEOF': 1, u'needFetch': 0, u'nReturned': 4, u'needTime': 0, u'seenInvalidated': 0, u'dupsDropped': 0, u'keysExamined': 4, u'indexBounds': {u'author': [u'["Ray Hassan", "Ray Hassan"]']}, u'executionTimeMillisEstimate': 0, u'isMultiKey': False, u'keyPattern': {u'author': 1}, u'invalidates': 0, u'works': 4, u'advanced': 4, u'stage': u'IXSCAN'}, u'needFetch': 0, u'nReturned': 4, u'needTime': 0, 
u'executionTimeMillisEstimate': 0, u'alreadyHasObj': 0, u'invalidates': 0, u'works': 5, u'advanced': 4, u'stage': u'FETCH'}, u'totalDocsExamined': 4}, u'queryPlanner': {u'parsedQuery': {u'author': {u'$eq': u'Ray Hassan'}}, u'rejectedPlans': [], u'namespace': u'mydocs.docs', u'winningPlan': {u'inputStage': {u'direction': u'forward', u'indexName': u'author_1', u'indexBounds': {u'author': [u'["Ray Hassan", "Ray Hassan"]']}, u'isMultiKey': False, u'stage': u'IXSCAN', u'keyPattern': {u'author': 1}}, u'stage': u'FETCH'}, u'indexFilterSet': False, u'plannerVersion': 1}, 
u'serverInfo': {u'host': u'mongowt01', u'version': u'3.0.3', u'port': 27017, u'gitVersion': u'b40106b36eecd1b4407eb1ad1af6bc60593c6105 modules: enterprise'}}

In the above output we can see since adding an index that we now perform an Index scan (IXSCAN) – if appropriately chosen, this can reduce the number of documents returned in a query. In our case (a very trivial example) this has not been the case. Ordinarily for a larger (or perhaps better?) example this would tend to be more performant.

The above merely touches on what can be done based on NoSQL workload testing requirements. I do hope however, that  you find it a good place start.