"use strict" ;
Object . defineProperty ( exports , "__esModule" , { value : true } ) ;
exports . executeOperation = void 0 ;
const error _1 = require ( "../error" ) ;
const read _preference _1 = require ( "../read_preference" ) ;
const server _selection _1 = require ( "../sdam/server_selection" ) ;
const utils _1 = require ( "../utils" ) ;
const operation _1 = require ( "./operation" ) ;
const MMAPv1 _RETRY _WRITES _ERROR _CODE = error _1 . MONGODB _ERROR _CODES . IllegalOperation ;
const MMAPv1 _RETRY _WRITES _ERROR _MESSAGE = 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.' ;
function executeOperation ( client , operation , callback ) {
return ( 0 , utils _1 . maybeCallback ) ( ( ) => executeOperationAsync ( client , operation ) , callback ) ;
}
exports . executeOperation = executeOperation ;
async function executeOperationAsync ( client , operation ) {
if ( ! ( operation instanceof operation _1 . AbstractOperation ) ) {
// TODO(NODE-3483): Extend MongoRuntimeError
throw new error _1 . MongoRuntimeError ( 'This method requires a valid operation instance' ) ;
}
if ( client . topology == null ) {
// Auto connect on operation
if ( client . s . hasBeenClosed ) {
throw new error _1 . MongoNotConnectedError ( 'Client must be connected before running operations' ) ;
}
client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] = true ;
try {
await client . connect ( ) ;
}
finally {
delete client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] ;
}
}
const { topology } = client ;
if ( topology == null ) {
throw new error _1 . MongoRuntimeError ( 'client.connect did not create a topology but also did not throw' ) ;
}
if ( topology . shouldCheckForSessionSupport ( ) ) {
await topology . selectServerAsync ( read _preference _1 . ReadPreference . primaryPreferred , { } ) ;
}
// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session = operation . session ;
let owner ;
if ( topology . hasSessionSupport ( ) ) {
if ( session == null ) {
owner = Symbol ( ) ;
session = client . startSession ( { owner , explicit : false } ) ;
}
else if ( session . hasEnded ) {
throw new error _1 . MongoExpiredSessionError ( 'Use of expired sessions is not permitted' ) ;
}
else if ( session . snapshotEnabled && ! topology . capabilities . supportsSnapshotReads ) {
throw new error _1 . MongoCompatibilityError ( 'Snapshot reads require MongoDB 5.0 or later' ) ;
}
}
else {
// no session support
if ( session && session . explicit ) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
throw new error _1 . MongoCompatibilityError ( 'Current topology does not support sessions' ) ;
}
else if ( session && ! session . explicit ) {
// We do not have to worry about ending the session because the server session has not been acquired yet
delete operation . options . session ;
operation . clearSession ( ) ;
session = undefined ;
}
}
const readPreference = operation . readPreference ? ? read _preference _1 . ReadPreference . primary ;
const inTransaction = ! ! session ? . inTransaction ( ) ;
if ( inTransaction && ! readPreference . equals ( read _preference _1 . ReadPreference . primary ) ) {
throw new error _1 . MongoTransactionError ( ` Read preference in a transaction must be primary, not: ${ readPreference . mode } ` ) ;
}
if ( session ? . isPinned && session . transaction . isCommitted && ! operation . bypassPinningCheck ) {
session . unpin ( ) ;
}
let selector ;
if ( operation . hasAspect ( operation _1 . Aspect . MUST _SELECT _SAME _SERVER ) ) {
// GetMore and KillCursor operations must always select the same server, but run through
// server selection to potentially force monitor checks if the server is
// in an unknown state.
selector = ( 0 , server _selection _1 . sameServerSelector ) ( operation . server ? . description ) ;
}
else if ( operation . trySecondaryWrite ) {
// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
selector = ( 0 , server _selection _1 . secondaryWritableServerSelector ) ( topology . commonWireVersion , readPreference ) ;
}
else {
selector = readPreference ;
}
const server = await topology . selectServerAsync ( selector , { session } ) ;
if ( session == null ) {
// No session also means it is not retryable, early exit
return operation . executeAsync ( server , undefined ) ;
}
if ( ! operation . hasAspect ( operation _1 . Aspect . RETRYABLE ) ) {
// non-retryable operation, early exit
try {
return await operation . executeAsync ( server , session ) ;
}
finally {
if ( session ? . owner != null && session . owner === owner ) {
await session . endSession ( ) . catch ( ( ) => null ) ;
}
}
}
const willRetryRead = topology . s . options . retryReads && ! inTransaction && operation . canRetryRead ;
const willRetryWrite = topology . s . options . retryWrites &&
! inTransaction &&
( 0 , utils _1 . supportsRetryableWrites ) ( server ) &&
operation . canRetryWrite ;
const hasReadAspect = operation . hasAspect ( operation _1 . Aspect . READ _OPERATION ) ;
const hasWriteAspect = operation . hasAspect ( operation _1 . Aspect . WRITE _OPERATION ) ;
const willRetry = ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ;
if ( hasWriteAspect && willRetryWrite ) {
operation . options . willRetryWrite = true ;
session . incrementTransactionNumber ( ) ;
}
try {
return await operation . executeAsync ( server , session ) ;
}
catch ( operationError ) {
if ( willRetry && operationError instanceof error _1 . MongoError ) {
return await retryOperation ( operation , operationError , {
session ,
topology ,
selector
} ) ;
}
throw operationError ;
}
finally {
if ( session ? . owner != null && session . owner === owner ) {
await session . endSession ( ) . catch ( ( ) => null ) ;
}
}
}
async function retryOperation ( operation , originalError , { session , topology , selector } ) {
const isWriteOperation = operation . hasAspect ( operation _1 . Aspect . WRITE _OPERATION ) ;
const isReadOperation = operation . hasAspect ( operation _1 . Aspect . READ _OPERATION ) ;
if ( isWriteOperation && originalError . code === MMAPv1 _RETRY _WRITES _ERROR _CODE ) {
throw new error _1 . MongoServerError ( {
message : MMAPv1 _RETRY _WRITES _ERROR _MESSAGE ,
errmsg : MMAPv1 _RETRY _WRITES _ERROR _MESSAGE ,
originalError
} ) ;
}
if ( isWriteOperation && ! ( 0 , error _1 . isRetryableWriteError ) ( originalError ) ) {
throw originalError ;
}
if ( isReadOperation && ! ( 0 , error _1 . isRetryableReadError ) ( originalError ) ) {
throw originalError ;
}
if ( originalError instanceof error _1 . MongoNetworkError &&
session . isPinned &&
! session . inTransaction ( ) &&
operation . hasAspect ( operation _1 . Aspect . CURSOR _CREATING ) ) {
// If we have a cursor and the initial command fails with a network error,
// we can retry it on another connection. So we need to check it back in, clear the
// pool for the service id, and retry again.
session . unpin ( { force : true , forceClear : true } ) ;
}
// select a new server, and attempt to retry the operation
const server = await topology . selectServerAsync ( selector , { session } ) ;
if ( isWriteOperation && ! ( 0 , utils _1 . supportsRetryableWrites ) ( server ) ) {
throw new error _1 . MongoUnexpectedServerResponseError ( 'Selected server does not support retryable writes' ) ;
}
try {
return await operation . executeAsync ( server , session ) ;
}
catch ( retryError ) {
if ( retryError instanceof error _1 . MongoError &&
retryError . hasErrorLabel ( error _1 . MongoErrorLabel . NoWritesPerformed ) ) {
throw originalError ;
}
throw retryError ;
}
}
//# sourceMappingURL=execute_operation.js.map