Skip to main content

MongoDB BulkWrite Java API

Since version 3.2, MongoDB has introduced Bulk Update methods. In context of RDBMS, it's like SQL Batch Jobs, where SQL Statements are prepared in different chunks and a batch of statements are submitted to DB for update/insert.

Here are some important points about MongoDB Bulk Write operation..


  1. Useful in case you've huge data to update/insert.
  2. Mongo automatically prepares batches (of 1000 default) and start execution in an ordered/unordered manner.
  3. This drastically reduce DB trip time. Let's say there are 50 thousand records to update, now instead of 50k round trips to DB from your app server, using Bulk Update it would be reduced to just 50 round trips.

Let's see an example below:


List<WriteModel<Document>> updateDocuments = new ArrayList<WriteModel<Document>>();
for (Long entityId : entityIDs) {

    //Finder doc
    Document filterDocument = new Document();
    filterDocument.append("_id", entityId);

    //Update doc
    Document updateDocument = new Document();
    Document setDocument = new Document();
    setDocument.append("name", "xyz");
    setDocument.append("role", "abc");

    updateDocument.append("$set", setDocument);

    //Update option
    UpdateOptions updateOptions = new UpdateOptions();
    updateOptions.upsert(true); //if true, will create a new doc in case of unmatched find
    updateOptions.bypassDocumentValidation(true); //set true/false

    //Prepare list of Updates
    updateDocuments.add(
            new UpdateOneModel<Document>(
                    filterDocument,
                    updateDocument,
                    updateOptions));

}

//Bulk write options
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
bulkWriteOptions.ordered(false); //False to allow parallel execution
bulkWriteOptions.bypassDocumentValidation(true);

MongoCollection<Document> mongoCollection = mongoDB.getCollection("myCollection");

BulkWriteResult bulkWriteResult = null;
try {
    //Perform bulk update
    bulkWriteResult = mongoCollection.bulkWrite(updateDocuments,
            bulkWriteOptions);
} catch (BulkWriteException e) {
    //Handle bulkwrite exception
    List<BulkWriteError> bulkWriteErrors = e.getWriteErrors();
    for (BulkWriteError bulkWriteError : bulkWriteErrors) {
        int failedIndex = bulkWriteError.getIndex();
        Long failedEntityId = entityIDs.get(failedIndex);
        System.out.println("Failed record: " + failedEntityId);
        //handle rollback
    }
}

int rowsUpdated = bulkWriteResult.getModifiedCount();


Now let's understand the process..

entityIDs: List of _id s to update

filterDocument: query filter. equivalent to SQL where clause

setDocuments: values to update. equivalent to SQL set statement

updateOptions: manner in which update should happen. 

bulkWriteOptions: write operation preferences. If entityIDs are independent of each other, you should go for un-ordered execution, simply like parallel threads.


bulkWriteErrors: Errors if any during update process 


for Bulk deletion, we just need to prepare DeleteOneModel instead of UpdateOneModel documents, rest would be same

List<WriteModel<Document>> deleteDocuments = new ArrayList<WriteModel<Document>>();
for (Long entityId : entityIDs) {

    //Finder doc
    Document filterDocument = new Document();
    filterDocument.append("_id", entityId);

    //Delete doc
    Document deleteDocument = new DeleteOneModel<Document>(filterDocument);
    //Prepare list of Deletes
    deleteDocuments.add(deleteDocument);
}


Comments

Post a Comment

Popular posts from this blog

MongoDB Aggregation using Java API

A very common problem scenario in programming is to get the records or record count by certain fields. For developers familiar with RDBMS, it's like creating a SQL with combination of count function and group by attributes. For MongoDB too, it's very similar. Let's look at the example below fetching no of employees group by department Ids. public Map < Long , Integer > getEmployeeCountMapByDeptId () { Map < Long , Integer > empCountMap = new HashMap <>(); AggregateIterable < Document > iterable = getMongoCollection (). aggregate ( Arrays . asList ( new Document ( "$match" , new Document ( "active" , Boolean . TRUE ) . append ( "region" , "India" )), new Document ( "$group" , new Document ( "_id" , "$" + "deptId" ). append ( "count...

How to create users in MongoDB

Assuming you've already completed the basic setup process mentioned in this blog . In this example, we'll see how we can create admin user and enable authorization. Connect to admin database On a cmd window, connect to MongoDB by hitting command " mongo ". By default, it connects to " test " database. Once connection is successful, switch to admin db #mongo #use admin Create Role After switching to admin db, create executiveFunction role db . runCommand ({ createRole : "executeFunctions" , privileges : [ { resource : { anyResource : true }, actions : [ "anyAction" ] } ], roles : [], writeConcern : { w : "majority" , wtimeout : 5000 } }); Create admin user In this step, we'll create admin user mongoadmin with password mongo123 db . runCommand ( { "createUser" : "mongoadmin" , "pwd" : "mongo123...