npx migrate-mongo up period-schema
Migration 2024.0915.1409:0004 failed: MongoBulkWriteError: Transaction with { txnNumber: 2 } has been aborted.
    at resultHandler (C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:294:29)
    at C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:344:159
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  errorResponse: {
    errorLabels: [ 'TransientTransactionError' ],
    ok: 0,
    errmsg: 'Transaction with { txnNumber: 2 } has been aborted.',
    code: 251,
    codeName: 'NoSuchTransaction',
    '$clusterTime': {
      clusterTime: new Timestamp({ t: 1726399290, i: 22 }),
      signature: [Object]
    },
    operationTime: new Timestamp({ t: 1726399290, i: 22 })
  },
  ok: 0,
  code: 251,
  codeName: 'NoSuchTransaction',
  '$clusterTime': {
    clusterTime: new Timestamp({ t: 1726399290, i: 22 }),
    signature: {
      hash: Binary.createFromBase64('ZM40xQmIbPODKBV/S8dd7Dkl77Y=', 0),
      keyId: new Long('7377470525544595465')
    }
  },
  operationTime: new Timestamp({ t: 1726399290, i: 22 }),
  writeErrors: [],
  result: BulkWriteResult {
    insertedCount: 0,
    matchedCount: 0,
    modifiedCount: 0,
    deletedCount: 0,
    upsertedCount: 0,
    upsertedIds: {},
    insertedIds: {
      '0': new ObjectId('66e6c33a2a76b92603f2219e'),
      '1': new ObjectId('66e6c33a2a76b92603f2219f'),
      '2': new ObjectId('66e6c33a2a76b92603f221a0'),
      '3': new ObjectId('66e6c33a2a76b92603f221a1'),
      '4': new ObjectId('66e6c33a2a76b92603f221a2'),
      '5': new ObjectId('66e6c33a2a76b92603f221a3'),
      '6': new ObjectId('66e6c33a2a76b92603f221a4'),
      '7': new ObjectId('66e6c33a2a76b92603f221a5'),
      '8': new ObjectId('66e6c33a2a76b92603f221a6'),
      '9': new ObjectId('66e6c33a2a76b92603f221a7'),
      '10': new ObjectId('66e6c33a2a76b92603f221a8'),
      '11': new ObjectId('66e6c33a2a76b92603f221a9'),
      '12': new ObjectId('66e6c33a2a76b92603f221aa'),
      '13': new ObjectId('66e6c33a2a76b92603f221ab'),
      '14': new ObjectId('66e6c33a2a76b92603f221ac'),
      '15': new ObjectId('66e6c33a2a76b92603f221ad'),
      '16': new ObjectId('66e6c33a2a76b92603f221ae'),
      '17': new ObjectId('66e6c33a2a76b92603f221af'),
      '18': new ObjectId('66e6c33a2a76b92603f221b0'),
      '19': new ObjectId('66e6c33a2a76b92603f221b1'),
      '20': new ObjectId('66e6c33a2a76b92603f221b2'),
      '21': new ObjectId('66e6c33a2a76b92603f221b3'),
      '22': new ObjectId('66e6c33a2a76b92603f221b4'),
      '23': new ObjectId('66e6c33a2a76b92603f221b5'),
      '24': new ObjectId('66e6c33a2a76b92603f221b6'),
      '25': new ObjectId('66e6c33a2a76b92603f221b7'),
      '26': new ObjectId('66e6c33a2a76b92603f221b8'),
      '27': new ObjectId('66e6c33a2a76b92603f221b9'),
      '28': new ObjectId('66e6c33a2a76b92603f221ba'),
      '29': new ObjectId('66e6c33a2a76b92603f221bb'),
      '30': new ObjectId('66e6c33a2a76b92603f221bc'),
      '31': new ObjectId('66e6c33a2a76b92603f221bd'),
      '32': new ObjectId('66e6c33a2a76b92603f221be'),
      '33': new ObjectId('66e6c33a2a76b92603f221bf'),
      '34': new ObjectId('66e6c33a2a76b92603f221c0'),
      '35': new ObjectId('66e6c33a2a76b92603f221c1'),
      '36': new ObjectId('66e6c33a2a76b92603f221c2'),
      '37': new ObjectId('66e6c33a2a76b92603f221c3'),
      '38': new ObjectId('66e6c33a2a76b92603f221c4'),
      '39': new ObjectId('66e6c33a2a76b92603f221c5'),
      '40': new ObjectId('66e6c33a2a76b92603f221c6'),
      '41': new ObjectId('66e6c33a2a76b92603f221c7'),
      '42': new ObjectId('66e6c33a2a76b92603f221c8'),
      '43': new ObjectId('66e6c33a2a76b92603f221c9'),
      '44': new ObjectId('66e6c33a2a76b92603f221ca'),
      '45': new ObjectId('66e6c33a2a76b92603f221cb'),
      '46': new ObjectId('66e6c33a2a76b92603f221cc'),
      '47': new ObjectId('66e6c33a2a76b92603f221cd'),
      '48': new ObjectId('66e6c33a2a76b92603f221ce'),
      '49': new ObjectId('66e6c33a2a76b92603f221cf')
    }
  },
  [Symbol(errorLabels)]: Set(1) { 'TransientTransactionError' }
}
ERROR: Could not migrate up 20240915073415-trail-schema.js: Transaction with { txnNumber: 2 } has been aborted. MongoBulkWriteError: Transaction with { txnNumber: 2 } has been aborted.
    at resultHandler (C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:294:29)
    at C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules\.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:344:159
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)



'use strict';
const migrationVersion = '2024.0915.1409:0005';
const collection_name_which_need_to_migrate = 'periods';
const migration_history_collection_name = 'MigrationHistory';
const migration_changes_collection_name = 'MigrationChanges';
const batch_size = 1000;
const fields_which_need_to_migrate = {
	Period: '',
	Created: 0,
	isLocked: false,
	isRequest: false,
	whoLocked: [],
	whoRequest: []
};

module.exports = {
	async up(db, client) {
		// const session = client.startSession();

		const session = client.startSession({
			// transactionLifetimeLimitSeconds: 300000000000,
			// defaultTransactionOptions: { maxCommitTimeMS: 30000000 }
			causalConsistency: true,
			defaultTransactionOptions: {
				readConcern: { level: 'majority' },
				writeConcern: { w: 'majority', wtimeout: 3600 },
				readPreference: 'primary',
				maxCommitTimeMS: 10000000000
			},
			snapshot: false,
			transactionLifetimeLimitSeconds: 3600
		});
		try {
			await session.withTransaction(async () => {
				const BATCH_SIZE = batch_size;
				let skip = 0;
				let batch;

				const missingFieldsFilter = {
					$or: Object.keys(fields_which_need_to_migrate).map((field) => ({
						[field]: { $exists: false }
					}))
				};

				// Check if the MigrationHistory and MigrationChanges collections exist
				const collections = await db.listCollections().toArray();
				const existingCollections = collections.map((col) => col.name);

				// Create collections only if they don't exist
				if (!existingCollections.includes(migration_history_collection_name)) {
					await db.createCollection(migration_history_collection_name, { session });
					await db
						.collection(migration_history_collection_name)
						.createIndex({ version: 1 }, { unique: true, session });
				}

				if (!existingCollections.includes(migration_changes_collection_name)) {
					await db.createCollection(migration_changes_collection_name, { session });
					await db
						.collection(migration_changes_collection_name)
						.createIndex({ migrationVersion: 1 }, { session });
				}

				// Ensure migration version is unique
				const existingMigration = await db
					.collection(migration_history_collection_name)
					.findOne({ version: migrationVersion }, { session });
				if (existingMigration) {
					throw new Error(`Migration version ${migrationVersion} already exists.`);
				}

				const migrationDoc = {
					version: migrationVersion,
					appliedAt: new Date(),
					status: 'in_progress'
				};

				const migrationId = (
					await db.collection(migration_history_collection_name).insertOne(migrationDoc, { session })
				).insertedId;

				do {
					batch = await db
						.collection(collection_name_which_need_to_migrate)
						.find(missingFieldsFilter)
						.skip(skip)
						.limit(BATCH_SIZE)
						.toArray();

					if (batch.length > 0) {
						const bulkOperations = batch.map((doc) => {
							const updateFields = {};
							const originalValues = {};
							const updatedFields = {};

							for (const [field, defaultValue] of Object.entries(fields_which_need_to_migrate)) {
								if (!(field in doc)) {
									updateFields[field] = defaultValue;
									updatedFields[field] = defaultValue;
								} else {
									originalValues[field] = doc[field];
								}
							}

							if (Object.keys(updateFields).length > 0) {
								// Save each change as a separate document in MigrationChanges
								const changeDoc = {
									migrationVersion: migrationVersion,
									migrationId: migrationId,
									collection: collection_name_which_need_to_migrate,
									documentId: doc._id,
									updatedAt: new Date(),
									fieldsUpdated: updatedFields,
									originalValues
								};

								return [
									{
										insertOne: {
											document: changeDoc
										}
									},
									{
										updateOne: {
											filter: { _id: doc._id },
											update: { $set: updateFields }
										}
									}
								];
							}
							return [];
						});

						const operations = bulkOperations.flat();

						if (operations.length > 0) {
							// await db.collection(migration_changes_collection_name).bulkWrite(
							// 	operations.filter((op) => op.insertOne),
							// 	{ session }
							// );
							// await db.collection(collection_name_which_need_to_migrate).bulkWrite(
							// 	operations.filter((op) => op.updateOne),
							// 	{ session }
							// );
							await Promise.all([
								await db.collection(migration_changes_collection_name).bulkWrite(
									operations.filter((op) => op.insertOne),
									{ session }
								).execute(),
								await db.collection(collection_name_which_need_to_migrate).bulkWrite(
									operations.filter((op) => op.updateOne),
									{ session }
								).execute()
							]);
						}

						skip += BATCH_SIZE;
					}
				} while (batch.length === BATCH_SIZE);

				// Update the main migration document status to success
				await db
					.collection(migration_history_collection_name)
					.updateOne(
						{ _id: migrationId },
						{ $set: { status: 'success', completedAt: new Date() } },
						{ session }
					);

				console.log(`Migration ${migrationVersion} applied successfully.`);
			});
		} catch (error) {
			await session.abortTransaction()
			//This one line fixed our problem
			await session.endSession()
			console.error(`Migration ${migrationVersion} failed:`, error);
			await db
				.collection(migration_history_collection_name)
				.updateOne(
					{ version: migrationVersion },
					{ $set: { status: 'failure', reason: error.message, completedAt: new Date() } }
				);
			throw error;
		} finally {
			await session.endSession();
		}
	},