Managing references in a MongoDB database has been challenging, especially when dealing with related documents and ensuring atomic updates. In this post, we'll explore how I created a robust function that processes references in a document using Mongoose, handling both updates and inserts (upserts) efficiently.
We'll be using Mongoose's bulkWrite
operation to perform multiple updates or upserts in a single operation, ensuring high performance and consistency.
The Problem
With complex schemas, we often need to update or insert related documents based on unique fields, such as an email address, while maintaining the integrity of our data. This can be particularly challenging when the dataset grows or when handling arrays of related documents.
The Solution
Here's a function that processes fields in a given data object, updates existing documents, or creates new ones if they don't exist, using Mongoose's bulkWrite
. We'll handle scenarios where the field is an array or a single object and focus on a unique identifier, like an email address, to manage updates and upserts.
1. Defining Mongoose Schema
First, ensure that the Mongoose schema supports the operations we need. Here’s an example of a ContactDetail
schema:
const contactDetailMongooseSchema = new Schema(
{
name: { type: String, required: true },
email: { type: String, required: true, unique: true },
phoneNumber: { type: String, required: true },
createdAt: { type: Date, default: Date.now },
updatedAt: { type: Date, default: Date.now },
createdBy: { type: Schema.Types.ObjectId, ref: 'User' },
updatedBy: { type: Schema.Types.ObjectId, ref: 'User' },
},
{
timestamps: true,
}
);
const ContactDetail = model('ContactDetail', contactDetailMongooseSchema);
2. Create the Processing Function
Function to get and processes references, using bulkWrite
for efficiency:
import { Model, Schema, Types } from 'mongoose';
export function getReferenceFields(schema: Schema): string[] {
const refFields: string[] = [];
schema.eachPath((path, type) => {
if (isRefField(type)) {
refFields.push(path);
}
});
return refFields;
}
export async function processReferences(
data: any,
schema: Schema,
models: { [key: string]: Model<any> }
): Promise<any> {
const referenceFields = getReferenceFields(schema);
console.log('Reference fields:', referenceFields);
for (const field of referenceFields) {
console.log('Processing field:', field);
const model = models[field];
// Skip processing if no model is found
if (!model) {
console.error(`No model found for field: ${field}`);
continue;
}
if (Array.isArray(data[field])) {
console.log('Processing array field:', field);
// Prepare bulk operations, filter by email
const bulkOps = data[field].map((item: any) => ({
updateOne: {
filter: { email: item.email }, // Use email to filter
update: { $set: item },
upsert: true,
},
}));
// Ensure the model has the bulkWrite function
if (typeof model.bulkWrite !== 'function') {
console.error(`bulkWrite is not a function on model for field: ${field}`);
continue;
}
console.log('Executing bulk operations...');
// Execute bulk operations
try {
const result = await model.bulkWrite(bulkOps);
console.log('Bulk write result:', result);
// Extract upserted IDs
const upsertedIds = result.upsertedIds.map((upserted: any) => upserted._id);
// Fetch updated or upserted documents
const updatedDocs = await model.find({
email: { $in: data[field].map((item: any) => item.email) },
});
console.log('Updated documents:', updatedDocs);
// Update the data with the IDs of the processed documents
data[field] = updatedDocs.map((doc: any) => doc._id);
} catch (error) {
console.error('Error in bulk operation', error);
}
console.log('Bulk operations executed successfully.');
} else if (typeof data[field] === 'object' && data[field] !== null) {
console.log('Processing object field:', field);
// Ensure the model has the methods
if (typeof model.findOneAndUpdate !== 'function') {
console.error(`findOneAndUpdate is not a function on model for field: ${field}`);
continue;
}
// Update or create the document
try {
const doc = await model.findOneAndUpdate(
{ _id: data[field]._id || new Types.ObjectId() },
data[field],
{ upsert: true, new: true }
);
console.log('Document processed:', doc);
// Update the data with the ID of the processed document
data[field] = doc._id;
} catch (error) {
console.error('Error processing document:', error);
}
}
}
return data;
}
3. Handling Updates and Inserts
-
Filter by Email: We are using email address (will try to create generics for this too) as the unique identifier for each
ContactDetail
to determine if an entry should be updated or inserted. -
Use
bulkWrite
: This allows us to perform multiple operations in one go, reducing the number of database calls and improving performance.
Conclusion
By using Mongoose's bulkWrite
operation and a unique field like an email, we can efficiently manage updates and upserts for related documents. This approach is scalable and maintains data integrity, making it a valuable tool for any MongoDB application.
Do post a comment on how cool it was to you.
Top comments (0)