Skip to content

[rush] unassigned operations can ignore weighting constraints #4821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Fixes an issue with cobuilds where weighted operations could ignore the scheduler's weighting requirements.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
1 change: 0 additions & 1 deletion common/reviews/api/rush-lib.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,6 @@ export enum OperationStatus {
NoOp = "NO OP",
Queued = "QUEUED",
Ready = "READY",
RemoteExecuting = "REMOTE EXECUTING",
Skipped = "SKIPPED",
Success = "SUCCESS",
SuccessWithWarning = "SUCCESS WITH WARNINGS",
Expand Down
54 changes: 6 additions & 48 deletions libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,6 @@ import type { OperationExecutionRecord } from './OperationExecutionRecord';
import { OperationStatus } from './OperationStatus';
import { RushConstants } from '../RushConstants';

/**
* When the queue returns an unassigned operation, it means there is at least one remote executing operation,
* at this time, the caller has a chance to make a decision:
* 1. Manually invoke `tryGetRemoteExecutingOperation()` to get the remote executing operation.
* 2. If there is no remote executing operation available, wait for some time and return in callback, which
* internally invoke `assignOperations()` to assign new operations.
* NOTE: the caller must wait for some time to avoid busy loop and burn CPU cycles.
*/
export const UNASSIGNED_OPERATION: 'UNASSIGNED_OPERATION' = 'UNASSIGNED_OPERATION';

export type IOperationIteratorResult =
| OperationExecutionRecord
| { weight: 1; status: typeof UNASSIGNED_OPERATION };

/**
* Implementation of the async iteration protocol for a collection of IOperation objects.
* The async iterator will wait for an operation to be ready for execution, or terminate if there are no more operations.
Expand All @@ -29,10 +15,10 @@ export type IOperationIteratorResult =
* stall until another operations completes.
*/
export class AsyncOperationQueue
implements AsyncIterable<IOperationIteratorResult>, AsyncIterator<IOperationIteratorResult>
implements AsyncIterable<OperationExecutionRecord>, AsyncIterator<OperationExecutionRecord>
{
private readonly _queue: OperationExecutionRecord[];
private readonly _pendingIterators: ((result: IteratorResult<IOperationIteratorResult>) => void)[];
private readonly _pendingIterators: ((result: IteratorResult<OperationExecutionRecord>) => void)[];
private readonly _totalOperations: number;
private readonly _completedOperations: Set<OperationExecutionRecord>;

Expand All @@ -57,11 +43,11 @@ export class AsyncOperationQueue
* For use with `for await (const operation of taskQueue)`
* @see {AsyncIterator}
*/
public next(): Promise<IteratorResult<IOperationIteratorResult>> {
public next(): Promise<IteratorResult<OperationExecutionRecord>> {
const { _pendingIterators: waitingIterators } = this;

const promise: Promise<IteratorResult<IOperationIteratorResult>> = new Promise(
(resolve: (result: IteratorResult<IOperationIteratorResult>) => void) => {
const promise: Promise<IteratorResult<OperationExecutionRecord>> = new Promise(
(resolve: (result: IteratorResult<OperationExecutionRecord>) => void) => {
waitingIterators.push(resolve);
}
);
Expand Down Expand Up @@ -129,10 +115,6 @@ export class AsyncOperationQueue
// This operation is not yet ready to be executed
// next one plz :)
continue;
} else if (record.status === OperationStatus.RemoteExecuting) {
// This operation is not ready to execute yet, but it may become ready later
// next one plz :)
continue;
} else if (record.status !== OperationStatus.Ready) {
// Sanity check
throw new Error(`Unexpected status "${record.status}" for queued operation: ${record.name}`);
Expand Down Expand Up @@ -162,37 +144,13 @@ export class AsyncOperationQueue
}
return;
}

if (waitingIterators.length > 0) {
// returns an unassigned operation to let caller decide when there is at least one
// remote executing operation which is not ready to process.
if (queue.some((operation) => operation.status === OperationStatus.RemoteExecuting)) {
waitingIterators.shift()!({
value: { weight: 1, status: UNASSIGNED_OPERATION },
done: false
});
}
}
}

public tryGetRemoteExecutingOperation(): OperationExecutionRecord | undefined {
const { _queue: queue } = this;
// cycle through the queue to find the next operation that is executed remotely
for (let i: number = queue.length - 1; i >= 0; i--) {
const operation: OperationExecutionRecord = queue[i];

if (operation.status === OperationStatus.RemoteExecuting) {
return operation;
}
}
return undefined;
}

/**
* Returns this queue as an async iterator, such that multiple functions iterating this object concurrently
* receive distinct iteration results.
*/
public [Symbol.asyncIterator](): AsyncIterator<IOperationIteratorResult> {
public [Symbol.asyncIterator](): AsyncIterator<OperationExecutionRecord> {
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin {
}

const record: OperationExecutionRecord = runnerContext as OperationExecutionRecord;

const {
associatedProject: project,
associatedPhase: phase,
Expand Down Expand Up @@ -393,8 +394,10 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin {
});
periodicCallback.start();
} else {
// failed to acquire the lock, mark current operation to remote executing
return OperationStatus.RemoteExecuting;
setTimeout(() => {
record.status = OperationStatus.Ready;
}, 500);
return OperationStatus.Executing;
}
}
};
Expand Down Expand Up @@ -429,14 +432,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin {
}

// No need to run for the following operation status
switch (record.status) {
case OperationStatus.NoOp:
case OperationStatus.RemoteExecuting: {
return;
}
default: {
break;
}
if (!record.isTerminal || record.status === OperationStatus.NoOp) {
return;
}

const { cobuildLock, projectBuildCache, isCacheWriteAllowed, buildCacheTerminal, cacheRestored } =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ const TIMELINE_CHART_SYMBOLS: Record<OperationStatus, string> = {
[OperationStatus.Ready]: '?',
[OperationStatus.Queued]: '?',
[OperationStatus.Executing]: '?',
[OperationStatus.RemoteExecuting]: '?',
[OperationStatus.Success]: '#',
[OperationStatus.SuccessWithWarning]: '!',
[OperationStatus.Failure]: '!',
Expand All @@ -104,7 +103,6 @@ const TIMELINE_CHART_COLORIZER: Record<OperationStatus, (string: string) => stri
[OperationStatus.Ready]: Colorize.yellow,
[OperationStatus.Queued]: Colorize.yellow,
[OperationStatus.Executing]: Colorize.yellow,
[OperationStatus.RemoteExecuting]: Colorize.yellow,
[OperationStatus.Success]: Colorize.green,
[OperationStatus.SuccessWithWarning]: Colorize.yellow,
[OperationStatus.Failure]: Colorize.red,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ import {
import { StreamCollator, type CollatedTerminal, type CollatedWriter } from '@rushstack/stream-collator';
import { NewlineKind, Async, InternalError, AlreadyReportedError } from '@rushstack/node-core-library';

import {
AsyncOperationQueue,
type IOperationIteratorResult,
type IOperationSortFunction,
UNASSIGNED_OPERATION
} from './AsyncOperationQueue';
import { AsyncOperationQueue, type IOperationSortFunction } from './AsyncOperationQueue';
import type { Operation } from './Operation';
import { OperationStatus } from './OperationStatus';
import { type IOperationExecutionRecordContext, OperationExecutionRecord } from './OperationExecutionRecord';
Expand Down Expand Up @@ -103,7 +98,12 @@ export class OperationExecutionManager {
this._beforeExecuteOperation = beforeExecuteOperation;
this._afterExecuteOperation = afterExecuteOperation;
this._beforeExecuteOperations = beforeExecuteOperations;
this._onOperationStatusChanged = onOperationStatusChanged;
this._onOperationStatusChanged = (record: OperationExecutionRecord) => {
if (record.status === OperationStatus.Ready) {
this._executionQueue.assignOperations();
}
onOperationStatusChanged?.(record);
};

// TERMINAL PIPELINE:
//
Expand All @@ -124,7 +124,7 @@ export class OperationExecutionManager {
// Convert the developer graph to the mutable execution graph
const executionRecordContext: IOperationExecutionRecordContext = {
streamCollator: this._streamCollator,
onOperationStatusChanged,
onOperationStatusChanged: this._onOperationStatusChanged,
debugMode,
quietMode
};
Expand Down Expand Up @@ -252,30 +252,11 @@ export class OperationExecutionManager {

await Async.forEachAsync(
this._executionQueue,
async (operation: IOperationIteratorResult) => {
let record: OperationExecutionRecord | undefined;
/**
* If the operation is UNASSIGNED_OPERATION, it means that the queue is not able to assign a operation.
* This happens when some operations run remotely. So, we should try to get a remote executing operation
* from the queue manually here.
*/
if (operation.status === UNASSIGNED_OPERATION) {
// Pause for a few time
await Async.sleepAsync(5000);
record = this._executionQueue.tryGetRemoteExecutingOperation();
} else {
record = operation;
}

if (!record) {
// Fail to assign a operation, start over again
return;
} else {
await record.executeAsync({
onStart: onOperationStartAsync,
onResult: onOperationCompleteAsync
});
}
async (operation: OperationExecutionRecord) => {
await operation.executeAsync({
onStart: onOperationStartAsync,
onResult: onOperationCompleteAsync
});
},
{
concurrency: maxParallelism,
Expand Down Expand Up @@ -421,11 +402,10 @@ export class OperationExecutionManager {
}
}

if (record.status !== OperationStatus.RemoteExecuting) {
if (record.isTerminal) {
// If the operation was not remote, then we can notify queue that it is complete
this._executionQueue.complete(record);
} else {
// Attempt to requeue other operations if the operation was remote
this._executionQueue.assignOperations();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
import { InternalError, NewlineKind } from '@rushstack/node-core-library';
import { CollatedTerminal, type CollatedWriter, type StreamCollator } from '@rushstack/stream-collator';

import { OperationStatus } from './OperationStatus';
import { OperationStatus, TERMINAL_STATUSES } from './OperationStatus';
import type { IOperationRunner, IOperationRunnerContext } from './IOperationRunner';
import type { Operation } from './Operation';
import { Stopwatch } from '../../utilities/Stopwatch';
Expand Down Expand Up @@ -166,6 +166,10 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
return this._operationMetadataManager?.stateFile.state?.cobuildRunnerId;
}

public get isTerminal(): boolean {
return TERMINAL_STATUSES.has(this.status);
}

/**
* The current execution status of an operation. Operations start in the 'ready' state,
* but can be 'blocked' if an upstream operation failed. It is 'executing' when
Expand Down Expand Up @@ -277,7 +281,7 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
onStart: (record: OperationExecutionRecord) => Promise<OperationStatus | undefined>;
onResult: (record: OperationExecutionRecord) => Promise<void>;
}): Promise<void> {
if (this.status === OperationStatus.RemoteExecuting) {
if (!this.isTerminal) {
this.stopwatch.reset();
}
this.stopwatch.start();
Expand All @@ -299,7 +303,7 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
// Delegate global state reporting
await onResult(this);
} finally {
if (this.status !== OperationStatus.RemoteExecuting) {
if (this.isTerminal) {
this._collatedWriter?.close();
this.stdioSummarizer.close();
this.stopwatch.stop();
Expand Down
18 changes: 14 additions & 4 deletions libraries/rush-lib/src/logic/operations/OperationStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ export enum OperationStatus {
* The Operation is currently executing
*/
Executing = 'EXECUTING',
/**
* The Operation is currently executing by a remote process
*/
RemoteExecuting = 'REMOTE EXECUTING',
/**
* The Operation completed successfully and did not write to standard output
*/
Expand Down Expand Up @@ -55,3 +51,17 @@ export enum OperationStatus {
*/
NoOp = 'NO OP'
}

/**
* The set of statuses that are considered terminal.
* @alpha
*/
export const TERMINAL_STATUSES: Set<OperationStatus> = new Set([
OperationStatus.Success,
OperationStatus.SuccessWithWarning,
OperationStatus.Skipped,
OperationStatus.Blocked,
OperationStatus.FromCache,
OperationStatus.Failure,
OperationStatus.NoOp
]);
Loading
Loading