Skip to main content

MongoDB BulkWrite Java API

Since version 3.2, MongoDB has introduced Bulk Update methods. In context of RDBMS, it's like SQL Batch Jobs, where SQL Statements are prepared in different chunks and a batch of statements are submitted to DB for update/insert.

Here are some important points about MongoDB Bulk Write operation..


  1. Useful in case you've huge data to update/insert.
  2. Mongo automatically prepares batches (of 1000 default) and start execution in an ordered/unordered manner.
  3. This drastically reduce DB trip time. Let's say there are 50 thousand records to update, now instead of 50k round trips to DB from your app server, using Bulk Update it would be reduced to just 50 round trips.

Let's see an example below:


List<WriteModel<Document>> updateDocuments = new ArrayList<WriteModel<Document>>();
for (Long entityId : entityIDs) {

    //Finder doc
    Document filterDocument = new Document();
    filterDocument.append("_id", entityId);

    //Update doc
    Document updateDocument = new Document();
    Document setDocument = new Document();
    setDocument.append("name", "xyz");
    setDocument.append("role", "abc");

    updateDocument.append("$set", setDocument);

    //Update option
    UpdateOptions updateOptions = new UpdateOptions();
    updateOptions.upsert(true); //if true, will create a new doc in case of unmatched find
    updateOptions.bypassDocumentValidation(true); //set true/false

    //Prepare list of Updates
    updateDocuments.add(
            new UpdateOneModel<Document>(
                    filterDocument,
                    updateDocument,
                    updateOptions));

}

//Bulk write options
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
bulkWriteOptions.ordered(false); //False to allow parallel execution
bulkWriteOptions.bypassDocumentValidation(true);

MongoCollection<Document> mongoCollection = mongoDB.getCollection("myCollection");

BulkWriteResult bulkWriteResult = null;
try {
    //Perform bulk update
    bulkWriteResult = mongoCollection.bulkWrite(updateDocuments,
            bulkWriteOptions);
} catch (BulkWriteException e) {
    //Handle bulkwrite exception
    List<BulkWriteError> bulkWriteErrors = e.getWriteErrors();
    for (BulkWriteError bulkWriteError : bulkWriteErrors) {
        int failedIndex = bulkWriteError.getIndex();
        Long failedEntityId = entityIDs.get(failedIndex);
        System.out.println("Failed record: " + failedEntityId);
        //handle rollback
    }
}

int rowsUpdated = bulkWriteResult.getModifiedCount();


Now let's understand the process..

entityIDs: List of _id s to update

filterDocument: query filter. equivalent to SQL where clause

setDocuments: values to update. equivalent to SQL set statement

updateOptions: manner in which update should happen. 

bulkWriteOptions: write operation preferences. If entityIDs are independent of each other, you should go for un-ordered execution, simply like parallel threads.


bulkWriteErrors: Errors if any during update process 


for Bulk deletion, we just need to prepare DeleteOneModel instead of UpdateOneModel documents, rest would be same

List<WriteModel<Document>> deleteDocuments = new ArrayList<WriteModel<Document>>();
for (Long entityId : entityIDs) {

    //Finder doc
    Document filterDocument = new Document();
    filterDocument.append("_id", entityId);

    //Delete doc
    Document deleteDocument = new DeleteOneModel<Document>(filterDocument);
    //Prepare list of Deletes
    deleteDocuments.add(deleteDocument);
}


Comments

Post a Comment