diff --git a/common/changes/@microsoft/rush/sennyeya-unassigned-operation-weight_2024-06-28-14-34.json b/common/changes/@microsoft/rush/sennyeya-unassigned-operation-weight_2024-06-28-14-34.json new file mode 100644 index 00000000000..d9776231878 --- /dev/null +++ b/common/changes/@microsoft/rush/sennyeya-unassigned-operation-weight_2024-06-28-14-34.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@microsoft/rush", + "comment": "Adjusts how cobuilt operations are added and requeued to the operation graph. Removes the 'RemoteExecuting' status.", + "type": "none" + } + ], + "packageName": "@microsoft/rush" +} \ No newline at end of file diff --git a/common/reviews/api/rush-lib.api.md b/common/reviews/api/rush-lib.api.md index 1abcde33f6d..803bc42cd93 100644 --- a/common/reviews/api/rush-lib.api.md +++ b/common/reviews/api/rush-lib.api.md @@ -970,7 +970,6 @@ export enum OperationStatus { NoOp = "NO OP", Queued = "QUEUED", Ready = "READY", - RemoteExecuting = "REMOTE EXECUTING", Skipped = "SKIPPED", Success = "SUCCESS", SuccessWithWarning = "SUCCESS WITH WARNINGS", diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index a653208f122..12ac10c7e83 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -5,20 +5,6 @@ import type { OperationExecutionRecord } from './OperationExecutionRecord'; import { OperationStatus } from './OperationStatus'; import { RushConstants } from '../RushConstants'; -/** - * When the queue returns an unassigned operation, it means there is at least one remote executing operation, - * at this time, the caller has a chance to make a decision: - * 1. Manually invoke `tryGetRemoteExecutingOperation()` to get the remote executing operation. - * 2. If there is no remote executing operation available, wait for some time and return in callback, which - * internally invoke `assignOperations()` to assign new operations. - * NOTE: the caller must wait for some time to avoid busy loop and burn CPU cycles. - */ -export const UNASSIGNED_OPERATION: 'UNASSIGNED_OPERATION' = 'UNASSIGNED_OPERATION'; - -export type IOperationIteratorResult = - | OperationExecutionRecord - | { weight: 1; status: typeof UNASSIGNED_OPERATION }; - /** * Implementation of the async iteration protocol for a collection of IOperation objects. * The async iterator will wait for an operation to be ready for execution, or terminate if there are no more operations. @@ -29,10 +15,10 @@ export type IOperationIteratorResult = * stall until another operations completes. */ export class AsyncOperationQueue - implements AsyncIterable, AsyncIterator + implements AsyncIterable, AsyncIterator { private readonly _queue: OperationExecutionRecord[]; - private readonly _pendingIterators: ((result: IteratorResult) => void)[]; + private readonly _pendingIterators: ((result: IteratorResult) => void)[]; private readonly _totalOperations: number; private readonly _completedOperations: Set; @@ -57,11 +43,11 @@ export class AsyncOperationQueue * For use with `for await (const operation of taskQueue)` * @see {AsyncIterator} */ - public next(): Promise> { + public next(): Promise> { const { _pendingIterators: waitingIterators } = this; - const promise: Promise> = new Promise( - (resolve: (result: IteratorResult) => void) => { + const promise: Promise> = new Promise( + (resolve: (result: IteratorResult) => void) => { waitingIterators.push(resolve); } ); @@ -129,10 +115,6 @@ export class AsyncOperationQueue // This operation is not yet ready to be executed // next one plz :) continue; - } else if (record.status === OperationStatus.RemoteExecuting) { - // This operation is not ready to execute yet, but it may become ready later - // next one plz :) - continue; } else if (record.status !== OperationStatus.Ready) { // Sanity check throw new Error(`Unexpected status "${record.status}" for queued operation: ${record.name}`); @@ -162,37 +144,13 @@ export class AsyncOperationQueue } return; } - - if (waitingIterators.length > 0) { - // returns an unassigned operation to let caller decide when there is at least one - // remote executing operation which is not ready to process. - if (queue.some((operation) => operation.status === OperationStatus.RemoteExecuting)) { - waitingIterators.shift()!({ - value: { weight: 1, status: UNASSIGNED_OPERATION }, - done: false - }); - } - } - } - - public tryGetRemoteExecutingOperation(): OperationExecutionRecord | undefined { - const { _queue: queue } = this; - // cycle through the queue to find the next operation that is executed remotely - for (let i: number = queue.length - 1; i >= 0; i--) { - const operation: OperationExecutionRecord = queue[i]; - - if (operation.status === OperationStatus.RemoteExecuting) { - return operation; - } - } - return undefined; } /** * Returns this queue as an async iterator, such that multiple functions iterating this object concurrently * receive distinct iteration results. */ - public [Symbol.asyncIterator](): AsyncIterator { + public [Symbol.asyncIterator](): AsyncIterator { return this; } } diff --git a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts index df66382f86d..ae044c45bf2 100644 --- a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts @@ -212,6 +212,7 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { } const record: OperationExecutionRecord = runnerContext as OperationExecutionRecord; + const { associatedProject: project, associatedPhase: phase, @@ -393,8 +394,10 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { }); periodicCallback.start(); } else { - // failed to acquire the lock, mark current operation to remote executing - return OperationStatus.RemoteExecuting; + setTimeout(() => { + record.status = OperationStatus.Ready; + }, 500); + return OperationStatus.Executing; } } }; @@ -429,14 +432,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { } // No need to run for the following operation status - switch (record.status) { - case OperationStatus.NoOp: - case OperationStatus.RemoteExecuting: { - return; - } - default: { - break; - } + if (!record.isTerminal || record.status === OperationStatus.NoOp) { + return; } const { cobuildLock, projectBuildCache, isCacheWriteAllowed, buildCacheTerminal, cacheRestored } = diff --git a/libraries/rush-lib/src/logic/operations/ConsoleTimelinePlugin.ts b/libraries/rush-lib/src/logic/operations/ConsoleTimelinePlugin.ts index c600475ec4d..8ad9f7d0663 100644 --- a/libraries/rush-lib/src/logic/operations/ConsoleTimelinePlugin.ts +++ b/libraries/rush-lib/src/logic/operations/ConsoleTimelinePlugin.ts @@ -79,7 +79,6 @@ const TIMELINE_CHART_SYMBOLS: Record = { [OperationStatus.Ready]: '?', [OperationStatus.Queued]: '?', [OperationStatus.Executing]: '?', - [OperationStatus.RemoteExecuting]: '?', [OperationStatus.Success]: '#', [OperationStatus.SuccessWithWarning]: '!', [OperationStatus.Failure]: '!', @@ -104,7 +103,6 @@ const TIMELINE_CHART_COLORIZER: Record stri [OperationStatus.Ready]: Colorize.yellow, [OperationStatus.Queued]: Colorize.yellow, [OperationStatus.Executing]: Colorize.yellow, - [OperationStatus.RemoteExecuting]: Colorize.yellow, [OperationStatus.Success]: Colorize.green, [OperationStatus.SuccessWithWarning]: Colorize.yellow, [OperationStatus.Failure]: Colorize.red, diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index 997a3bb6aae..f1e06dabf49 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -12,12 +12,7 @@ import { import { StreamCollator, type CollatedTerminal, type CollatedWriter } from '@rushstack/stream-collator'; import { NewlineKind, Async, InternalError, AlreadyReportedError } from '@rushstack/node-core-library'; -import { - AsyncOperationQueue, - type IOperationIteratorResult, - type IOperationSortFunction, - UNASSIGNED_OPERATION -} from './AsyncOperationQueue'; +import { AsyncOperationQueue, type IOperationSortFunction } from './AsyncOperationQueue'; import type { Operation } from './Operation'; import { OperationStatus } from './OperationStatus'; import { type IOperationExecutionRecordContext, OperationExecutionRecord } from './OperationExecutionRecord'; @@ -103,7 +98,12 @@ export class OperationExecutionManager { this._beforeExecuteOperation = beforeExecuteOperation; this._afterExecuteOperation = afterExecuteOperation; this._beforeExecuteOperations = beforeExecuteOperations; - this._onOperationStatusChanged = onOperationStatusChanged; + this._onOperationStatusChanged = (record: OperationExecutionRecord) => { + if (record.status === OperationStatus.Ready) { + this._executionQueue.assignOperations(); + } + onOperationStatusChanged?.(record); + }; // TERMINAL PIPELINE: // @@ -124,7 +124,7 @@ export class OperationExecutionManager { // Convert the developer graph to the mutable execution graph const executionRecordContext: IOperationExecutionRecordContext = { streamCollator: this._streamCollator, - onOperationStatusChanged, + onOperationStatusChanged: this._onOperationStatusChanged, debugMode, quietMode }; @@ -252,30 +252,11 @@ export class OperationExecutionManager { await Async.forEachAsync( this._executionQueue, - async (operation: IOperationIteratorResult) => { - let record: OperationExecutionRecord | undefined; - /** - * If the operation is UNASSIGNED_OPERATION, it means that the queue is not able to assign a operation. - * This happens when some operations run remotely. So, we should try to get a remote executing operation - * from the queue manually here. - */ - if (operation.status === UNASSIGNED_OPERATION) { - // Pause for a few time - await Async.sleepAsync(5000); - record = this._executionQueue.tryGetRemoteExecutingOperation(); - } else { - record = operation; - } - - if (!record) { - // Fail to assign a operation, start over again - return; - } else { - await record.executeAsync({ - onStart: onOperationStartAsync, - onResult: onOperationCompleteAsync - }); - } + async (operation: OperationExecutionRecord) => { + await operation.executeAsync({ + onStart: onOperationStartAsync, + onResult: onOperationCompleteAsync + }); }, { concurrency: maxParallelism, @@ -421,11 +402,10 @@ export class OperationExecutionManager { } } - if (record.status !== OperationStatus.RemoteExecuting) { + if (record.isTerminal) { // If the operation was not remote, then we can notify queue that it is complete this._executionQueue.complete(record); } else { - // Attempt to requeue other operations if the operation was remote this._executionQueue.assignOperations(); } } diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts index 7514c89c6ad..191907766ae 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts @@ -15,7 +15,7 @@ import { import { InternalError, NewlineKind } from '@rushstack/node-core-library'; import { CollatedTerminal, type CollatedWriter, type StreamCollator } from '@rushstack/stream-collator'; -import { OperationStatus } from './OperationStatus'; +import { OperationStatus, TERMINAL_STATUSES } from './OperationStatus'; import type { IOperationRunner, IOperationRunnerContext } from './IOperationRunner'; import type { Operation } from './Operation'; import { Stopwatch } from '../../utilities/Stopwatch'; @@ -166,6 +166,10 @@ export class OperationExecutionRecord implements IOperationRunnerContext { return this._operationMetadataManager?.stateFile.state?.cobuildRunnerId; } + public get isTerminal(): boolean { + return TERMINAL_STATUSES.has(this.status); + } + /** * The current execution status of an operation. Operations start in the 'ready' state, * but can be 'blocked' if an upstream operation failed. It is 'executing' when @@ -277,7 +281,7 @@ export class OperationExecutionRecord implements IOperationRunnerContext { onStart: (record: OperationExecutionRecord) => Promise; onResult: (record: OperationExecutionRecord) => Promise; }): Promise { - if (this.status === OperationStatus.RemoteExecuting) { + if (!this.isTerminal) { this.stopwatch.reset(); } this.stopwatch.start(); @@ -299,7 +303,7 @@ export class OperationExecutionRecord implements IOperationRunnerContext { // Delegate global state reporting await onResult(this); } finally { - if (this.status !== OperationStatus.RemoteExecuting) { + if (this.isTerminal) { this._collatedWriter?.close(); this.stdioSummarizer.close(); this.stopwatch.stop(); diff --git a/libraries/rush-lib/src/logic/operations/OperationStatus.ts b/libraries/rush-lib/src/logic/operations/OperationStatus.ts index 12532ce4398..4005de5227c 100644 --- a/libraries/rush-lib/src/logic/operations/OperationStatus.ts +++ b/libraries/rush-lib/src/logic/operations/OperationStatus.ts @@ -22,10 +22,6 @@ export enum OperationStatus { * The Operation is currently executing */ Executing = 'EXECUTING', - /** - * The Operation is currently executing by a remote process - */ - RemoteExecuting = 'REMOTE EXECUTING', /** * The Operation completed successfully and did not write to standard output */ @@ -55,3 +51,17 @@ export enum OperationStatus { */ NoOp = 'NO OP' } + +/** + * The set of statuses that are considered terminal. + * @alpha + */ +export const TERMINAL_STATUSES: Set = new Set([ + OperationStatus.Success, + OperationStatus.SuccessWithWarning, + OperationStatus.Skipped, + OperationStatus.Blocked, + OperationStatus.FromCache, + OperationStatus.Failure, + OperationStatus.NoOp +]); diff --git a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts index 06d76750a69..c12c388d3c3 100644 --- a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts @@ -4,14 +4,8 @@ import { Operation } from '../Operation'; import { type IOperationExecutionRecordContext, OperationExecutionRecord } from '../OperationExecutionRecord'; import { MockOperationRunner } from './MockOperationRunner'; -import { - AsyncOperationQueue, - type IOperationIteratorResult, - type IOperationSortFunction, - UNASSIGNED_OPERATION -} from '../AsyncOperationQueue'; +import { AsyncOperationQueue, type IOperationSortFunction } from '../AsyncOperationQueue'; import { OperationStatus } from '../OperationStatus'; -import { Async } from '@rushstack/node-core-library'; function addDependency(consumer: OperationExecutionRecord, dependency: OperationExecutionRecord): void { consumer.dependencies.add(dependency); @@ -43,21 +37,14 @@ describe(AsyncOperationQueue.name, () => { const expectedOrder = [operations[2], operations[0], operations[1], operations[3]]; const actualOrder = []; - // Nothing sets the RemoteExecuting status, this should be a error if it happens - let hasUnassignedOperation: boolean = false; const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort); for await (const operation of queue) { actualOrder.push(operation); - if (operation.status === UNASSIGNED_OPERATION) { - hasUnassignedOperation = true; - continue; - } operation.status = OperationStatus.Success; queue.complete(operation); } expect(actualOrder).toEqual(expectedOrder); - expect(hasUnassignedOperation).toEqual(false); }); it('respects the sort predicate', async () => { @@ -71,23 +58,15 @@ describe(AsyncOperationQueue.name, () => { ): number => { return expectedOrder.indexOf(b) - expectedOrder.indexOf(a); }; - // Nothing sets the RemoteExecuting status, this should be a error if it happens - let hasUnassignedOperation: boolean = false; const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, customSort); for await (const operation of queue) { actualOrder.push(operation); - if (operation.status === UNASSIGNED_OPERATION) { - hasUnassignedOperation = true; - continue; - } operation.status = OperationStatus.Success; queue.complete(operation); } expect(actualOrder).toEqual(expectedOrder); - - expect(hasUnassignedOperation).toEqual(false); }); it('detects cycles', async () => { @@ -129,17 +108,11 @@ describe(AsyncOperationQueue.name, () => { const actualConcurrency: Map = new Map(); const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort); let concurrency: number = 0; - // Nothing sets the RemoteExecuting status, this should be a error if it happens - let hasUnassignedOperation: boolean = false; // Use 3 concurrent iterators to verify that it handles having more than the operation concurrency await Promise.all( Array.from({ length: 3 }, async () => { for await (const operation of queue) { - if (operation.status === UNASSIGNED_OPERATION) { - hasUnassignedOperation = true; - continue; - } ++concurrency; await Promise.resolve(); @@ -157,67 +130,14 @@ describe(AsyncOperationQueue.name, () => { for (const [operation, operationConcurrency] of expectedConcurrency) { expect(actualConcurrency.get(operation)).toEqual(operationConcurrency); } - - expect(hasUnassignedOperation).toEqual(false); - }); - - it('handles remote executed operations', async () => { - const operations = [ - createRecord('a'), - createRecord('b'), - createRecord('c'), - createRecord('d'), - createRecord('e') - ]; - - addDependency(operations[2], operations[1]); - addDependency(operations[3], operations[1]); - addDependency(operations[4], operations[1]); - addDependency(operations[3], operations[2]); - addDependency(operations[4], operations[3]); - - // b remote executing -> a -> b (remote executed) -> c -> d -> e - const expectedOrder: string[] = ['b', 'a', 'b', 'c', 'd', 'e']; - - const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort); - - const actualOrder: string[] = []; - let remoteExecuted: boolean = false; - for await (const operation of queue) { - let record: OperationExecutionRecord | undefined; - if (operation.status === UNASSIGNED_OPERATION) { - await Async.sleepAsync(100); - record = queue.tryGetRemoteExecutingOperation(); - } else { - record = operation; - } - if (!record) { - continue; - } - - actualOrder.push(record.name); - - if (record === operations[1]) { - if (!remoteExecuted) { - operations[1].status = OperationStatus.RemoteExecuting; - // remote executed operation is finished later - remoteExecuted = true; - continue; - } - } - record.status = OperationStatus.Success; - queue.complete(record); - } - - expect(actualOrder).toEqual(expectedOrder); }); it('handles an empty queue', async () => { const operations: OperationExecutionRecord[] = []; const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort); - const iterator: AsyncIterator = queue[Symbol.asyncIterator](); - const result: IteratorResult = await iterator.next(); + const iterator: AsyncIterator = queue[Symbol.asyncIterator](); + const result: IteratorResult = await iterator.next(); expect(result.done).toEqual(true); }); }); diff --git a/rush-plugins/rush-serve-plugin/src/phasedCommandHandler.ts b/rush-plugins/rush-serve-plugin/src/phasedCommandHandler.ts index b7d66378ac3..e906350465f 100644 --- a/rush-plugins/rush-serve-plugin/src/phasedCommandHandler.ts +++ b/rush-plugins/rush-serve-plugin/src/phasedCommandHandler.ts @@ -265,7 +265,6 @@ function tryEnableBuildStatusWebSocketServer( [OperationStatus.Ready]: 'Ready', [OperationStatus.Queued]: 'Queued', [OperationStatus.Executing]: 'Executing', - [OperationStatus.RemoteExecuting]: 'RemoteExecuting', [OperationStatus.Success]: 'Success', [OperationStatus.SuccessWithWarning]: 'SuccessWithWarning', [OperationStatus.Skipped]: 'Skipped',