npx migrate-mongo up period-schema
Migration 2024.0915.1409:0004 failed: MongoBulkWriteError: Transaction with { txnNumber: 2 } has been aborted.
at resultHandler (C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:294:29)
at C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:344:159
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
errorResponse: {
errorLabels: [ 'TransientTransactionError' ],
ok: 0,
errmsg: 'Transaction with { txnNumber: 2 } has been aborted.',
code: 251,
codeName: 'NoSuchTransaction',
'$clusterTime': {
clusterTime: new Timestamp({ t: 1726399290, i: 22 }),
signature: [Object]
},
operationTime: new Timestamp({ t: 1726399290, i: 22 })
},
ok: 0,
code: 251,
codeName: 'NoSuchTransaction',
'$clusterTime': {
clusterTime: new Timestamp({ t: 1726399290, i: 22 }),
signature: {
hash: Binary.createFromBase64('ZM40xQmIbPODKBV/S8dd7Dkl77Y=', 0),
keyId: new Long('7377470525544595465')
}
},
operationTime: new Timestamp({ t: 1726399290, i: 22 }),
writeErrors: [],
result: BulkWriteResult {
insertedCount: 0,
matchedCount: 0,
modifiedCount: 0,
deletedCount: 0,
upsertedCount: 0,
upsertedIds: {},
insertedIds: {
'0': new ObjectId('66e6c33a2a76b92603f2219e'),
'1': new ObjectId('66e6c33a2a76b92603f2219f'),
'2': new ObjectId('66e6c33a2a76b92603f221a0'),
'3': new ObjectId('66e6c33a2a76b92603f221a1'),
'4': new ObjectId('66e6c33a2a76b92603f221a2'),
'5': new ObjectId('66e6c33a2a76b92603f221a3'),
'6': new ObjectId('66e6c33a2a76b92603f221a4'),
'7': new ObjectId('66e6c33a2a76b92603f221a5'),
'8': new ObjectId('66e6c33a2a76b92603f221a6'),
'9': new ObjectId('66e6c33a2a76b92603f221a7'),
'10': new ObjectId('66e6c33a2a76b92603f221a8'),
'11': new ObjectId('66e6c33a2a76b92603f221a9'),
'12': new ObjectId('66e6c33a2a76b92603f221aa'),
'13': new ObjectId('66e6c33a2a76b92603f221ab'),
'14': new ObjectId('66e6c33a2a76b92603f221ac'),
'15': new ObjectId('66e6c33a2a76b92603f221ad'),
'16': new ObjectId('66e6c33a2a76b92603f221ae'),
'17': new ObjectId('66e6c33a2a76b92603f221af'),
'18': new ObjectId('66e6c33a2a76b92603f221b0'),
'19': new ObjectId('66e6c33a2a76b92603f221b1'),
'20': new ObjectId('66e6c33a2a76b92603f221b2'),
'21': new ObjectId('66e6c33a2a76b92603f221b3'),
'22': new ObjectId('66e6c33a2a76b92603f221b4'),
'23': new ObjectId('66e6c33a2a76b92603f221b5'),
'24': new ObjectId('66e6c33a2a76b92603f221b6'),
'25': new ObjectId('66e6c33a2a76b92603f221b7'),
'26': new ObjectId('66e6c33a2a76b92603f221b8'),
'27': new ObjectId('66e6c33a2a76b92603f221b9'),
'28': new ObjectId('66e6c33a2a76b92603f221ba'),
'29': new ObjectId('66e6c33a2a76b92603f221bb'),
'30': new ObjectId('66e6c33a2a76b92603f221bc'),
'31': new ObjectId('66e6c33a2a76b92603f221bd'),
'32': new ObjectId('66e6c33a2a76b92603f221be'),
'33': new ObjectId('66e6c33a2a76b92603f221bf'),
'34': new ObjectId('66e6c33a2a76b92603f221c0'),
'35': new ObjectId('66e6c33a2a76b92603f221c1'),
'36': new ObjectId('66e6c33a2a76b92603f221c2'),
'37': new ObjectId('66e6c33a2a76b92603f221c3'),
'38': new ObjectId('66e6c33a2a76b92603f221c4'),
'39': new ObjectId('66e6c33a2a76b92603f221c5'),
'40': new ObjectId('66e6c33a2a76b92603f221c6'),
'41': new ObjectId('66e6c33a2a76b92603f221c7'),
'42': new ObjectId('66e6c33a2a76b92603f221c8'),
'43': new ObjectId('66e6c33a2a76b92603f221c9'),
'44': new ObjectId('66e6c33a2a76b92603f221ca'),
'45': new ObjectId('66e6c33a2a76b92603f221cb'),
'46': new ObjectId('66e6c33a2a76b92603f221cc'),
'47': new ObjectId('66e6c33a2a76b92603f221cd'),
'48': new ObjectId('66e6c33a2a76b92603f221ce'),
'49': new ObjectId('66e6c33a2a76b92603f221cf')
}
},
[Symbol(errorLabels)]: Set(1) { 'TransientTransactionError' }
}
ERROR: Could not migrate up 20240915073415-trail-schema.js: Transaction with { txnNumber: 2 } has been aborted. MongoBulkWriteError: Transaction with { txnNumber: 2 } has been aborted.
at resultHandler (C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:294:29)
at C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:344:159
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
'use strict';
const migrationVersion = '2024.0915.1409:0005';
const collection_name_which_need_to_migrate = 'periods';
const migration_history_collection_name = 'MigrationHistory';
const migration_changes_collection_name = 'MigrationChanges';
const batch_size = 1000;
const fields_which_need_to_migrate = {
Period: '',
Created: 0,
isLocked: false,
isRequest: false,
whoLocked: [],
whoRequest: []
};
module.exports = {
async up(db, client) {
// const session = client.startSession();
const session = client.startSession({
// transactionLifetimeLimitSeconds: 300000000000,
// defaultTransactionOptions: { maxCommitTimeMS: 30000000 }
causalConsistency: true,
defaultTransactionOptions: {
readConcern: { level: 'majority' },
writeConcern: { w: 'majority', wtimeout: 3600 },
readPreference: 'primary',
maxCommitTimeMS: 10000000000
},
snapshot: false,
transactionLifetimeLimitSeconds: 3600
});
try {
await session.withTransaction(async () => {
const BATCH_SIZE = batch_size;
let skip = 0;
let batch;
const missingFieldsFilter = {
$or: Object.keys(fields_which_need_to_migrate).map((field) => ({
[field]: { $exists: false }
}))
};
// Check if the MigrationHistory and MigrationChanges collections exist
const collections = await db.listCollections().toArray();
const existingCollections = collections.map((col) => col.name);
// Create collections only if they don't exist
if (!existingCollections.includes(migration_history_collection_name)) {
await db.createCollection(migration_history_collection_name, { session });
await db
.collection(migration_history_collection_name)
.createIndex({ version: 1 }, { unique: true, session });
}
if (!existingCollections.includes(migration_changes_collection_name)) {
await db.createCollection(migration_changes_collection_name, { session });
await db
.collection(migration_changes_collection_name)
.createIndex({ migrationVersion: 1 }, { session });
}
// Ensure migration version is unique
const existingMigration = await db
.collection(migration_history_collection_name)
.findOne({ version: migrationVersion }, { session });
if (existingMigration) {
throw new Error(`Migration version ${migrationVersion} already exists.`);
}
const migrationDoc = {
version: migrationVersion,
appliedAt: new Date(),
status: 'in_progress'
};
const migrationId = (
await db.collection(migration_history_collection_name).insertOne(migrationDoc, { session })
).insertedId;
do {
batch = await db
.collection(collection_name_which_need_to_migrate)
.find(missingFieldsFilter)
.skip(skip)
.limit(BATCH_SIZE)
.toArray();
if (batch.length > 0) {
const bulkOperations = batch.map((doc) => {
const updateFields = {};
const originalValues = {};
const updatedFields = {};
for (const [field, defaultValue] of Object.entries(fields_which_need_to_migrate)) {
if (!(field in doc)) {
updateFields[field] = defaultValue;
updatedFields[field] = defaultValue;
} else {
originalValues[field] = doc[field];
}
}
if (Object.keys(updateFields).length > 0) {
// Save each change as a separate document in MigrationChanges
const changeDoc = {
migrationVersion: migrationVersion,
migrationId: migrationId,
collection: collection_name_which_need_to_migrate,
documentId: doc._id,
updatedAt: new Date(),
fieldsUpdated: updatedFields,
originalValues
};
return [
{
insertOne: {
document: changeDoc
}
},
{
updateOne: {
filter: { _id: doc._id },
update: { $set: updateFields }
}
}
];
}
return [];
});
const operations = bulkOperations.flat();
if (operations.length > 0) {
// await db.collection(migration_changes_collection_name).bulkWrite(
// operations.filter((op) => op.insertOne),
// { session }
// );
// await db.collection(collection_name_which_need_to_migrate).bulkWrite(
// operations.filter((op) => op.updateOne),
// { session }
// );
await Promise.all([
await db.collection(migration_changes_collection_name).bulkWrite(
operations.filter((op) => op.insertOne),
{ session }
).execute(),
await db.collection(collection_name_which_need_to_migrate).bulkWrite(
operations.filter((op) => op.updateOne),
{ session }
).execute()
]);
}
skip += BATCH_SIZE;
}
} while (batch.length === BATCH_SIZE);
// Update the main migration document status to success
await db
.collection(migration_history_collection_name)
.updateOne(
{ _id: migrationId },
{ $set: { status: 'success', completedAt: new Date() } },
{ session }
);
console.log(`Migration ${migrationVersion} applied successfully.`);
});
} catch (error) {
await session.abortTransaction()
//This one line fixed our problem
await session.endSession()
console.error(`Migration ${migrationVersion} failed:`, error);
await db
.collection(migration_history_collection_name)
.updateOne(
{ version: migrationVersion },
{ $set: { status: 'failure', reason: error.message, completedAt: new Date() } }
);
throw error;
} finally {
await session.endSession();
}
},