Skip to content

[rush] unassigned operations can ignore weighting constraints #4821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Fixes an issue with cobuilds where weighted operations could ignore the scheduler's weighting requirements.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
51 changes: 28 additions & 23 deletions libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,6 @@ import type { OperationExecutionRecord } from './OperationExecutionRecord';
import { OperationStatus } from './OperationStatus';
import { RushConstants } from '../RushConstants';

/**
* When the queue returns an unassigned operation, it means there is at least one remote executing operation,
* at this time, the caller has a chance to make a decision:
* 1. Manually invoke `tryGetRemoteExecutingOperation()` to get the remote executing operation.
* 2. If there is no remote executing operation available, wait for some time and return in callback, which
* internally invoke `assignOperations()` to assign new operations.
* NOTE: the caller must wait for some time to avoid busy loop and burn CPU cycles.
*/
export const UNASSIGNED_OPERATION: 'UNASSIGNED_OPERATION' = 'UNASSIGNED_OPERATION';

export type IOperationIteratorResult =
| OperationExecutionRecord
| { weight: 1; status: typeof UNASSIGNED_OPERATION };

/**
* Implementation of the async iteration protocol for a collection of IOperation objects.
* The async iterator will wait for an operation to be ready for execution, or terminate if there are no more operations.
Expand All @@ -29,12 +15,13 @@ export type IOperationIteratorResult =
* stall until another operations completes.
*/
export class AsyncOperationQueue
implements AsyncIterable<IOperationIteratorResult>, AsyncIterator<IOperationIteratorResult>
implements AsyncIterable<OperationExecutionRecord>, AsyncIterator<OperationExecutionRecord>
{
private readonly _queue: OperationExecutionRecord[];
private readonly _pendingIterators: ((result: IteratorResult<IOperationIteratorResult>) => void)[];
private readonly _pendingIterators: ((result: IteratorResult<OperationExecutionRecord>) => void)[];
private readonly _totalOperations: number;
private readonly _completedOperations: Set<OperationExecutionRecord>;
private _intervalId: NodeJS.Timeout | undefined;

private _isDone: boolean;

Expand All @@ -51,17 +38,22 @@ export class AsyncOperationQueue
this._totalOperations = this._queue.length;
this._isDone = false;
this._completedOperations = new Set<OperationExecutionRecord>();
this._intervalId = setInterval(() => {
this.assignOperations();
}, 1000);
// Unref the interval so that it doesn't keep the process alive
this._intervalId.unref();
}

/**
* For use with `for await (const operation of taskQueue)`
* @see {AsyncIterator}
*/
public next(): Promise<IteratorResult<IOperationIteratorResult>> {
public next(): Promise<IteratorResult<OperationExecutionRecord>> {
const { _pendingIterators: waitingIterators } = this;

const promise: Promise<IteratorResult<IOperationIteratorResult>> = new Promise(
(resolve: (result: IteratorResult<IOperationIteratorResult>) => void) => {
const promise: Promise<IteratorResult<OperationExecutionRecord>> = new Promise(
(resolve: (result: IteratorResult<OperationExecutionRecord>) => void) => {
waitingIterators.push(resolve);
}
);
Expand Down Expand Up @@ -154,6 +146,15 @@ export class AsyncOperationQueue
}

if (this._isDone) {
if (this._intervalId) {
// Attempt to clear the interval after the queue completes.
try {
clearInterval(this._intervalId);
this._intervalId = undefined;
} catch (_error) {
// Ignore errors
}
}
for (const resolveAsyncIterator of waitingIterators.splice(0)) {
resolveAsyncIterator({
value: undefined,
Expand All @@ -166,9 +167,11 @@ export class AsyncOperationQueue
if (waitingIterators.length > 0) {
// returns an unassigned operation to let caller decide when there is at least one
// remote executing operation which is not ready to process.
if (queue.some((operation) => operation.status === OperationStatus.RemoteExecuting)) {
const remoteExecutingOperation: OperationExecutionRecord | undefined =
this.tryGetRemoteExecutingOperation();
if (remoteExecutingOperation) {
waitingIterators.shift()!({
value: { weight: 1, status: UNASSIGNED_OPERATION },
value: remoteExecutingOperation,
done: false
});
}
Expand All @@ -177,11 +180,13 @@ export class AsyncOperationQueue

public tryGetRemoteExecutingOperation(): OperationExecutionRecord | undefined {
const { _queue: queue } = this;

// cycle through the queue to find the next operation that is executed remotely
for (let i: number = queue.length - 1; i >= 0; i--) {
const operation: OperationExecutionRecord = queue[i];

if (operation.status === OperationStatus.RemoteExecuting) {
const currentTime: number = new Date().getTime();
if (operation.status === OperationStatus.RemoteExecuting && operation.checkAfter < currentTime) {
return operation;
}
}
Expand All @@ -192,7 +197,7 @@ export class AsyncOperationQueue
* Returns this queue as an async iterator, such that multiple functions iterating this object concurrently
* receive distinct iteration results.
*/
public [Symbol.asyncIterator](): AsyncIterator<IOperationIteratorResult> {
public [Symbol.asyncIterator](): AsyncIterator<OperationExecutionRecord> {
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin {
periodicCallback.start();
} else {
// failed to acquire the lock, mark current operation to remote executing
const currentTime: number = new Date().getTime();
// eslint-disable-next-line require-atomic-updates -- this should be safe
record.lastCheckedAt = currentTime;
// eslint-disable-next-line require-atomic-updates -- this should also be safe
record.checkAfter = currentTime + 500;
return OperationStatus.RemoteExecuting;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ import {
import { StreamCollator, type CollatedTerminal, type CollatedWriter } from '@rushstack/stream-collator';
import { NewlineKind, Async, InternalError, AlreadyReportedError } from '@rushstack/node-core-library';

import {
AsyncOperationQueue,
type IOperationIteratorResult,
type IOperationSortFunction,
UNASSIGNED_OPERATION
} from './AsyncOperationQueue';
import { AsyncOperationQueue, type IOperationSortFunction } from './AsyncOperationQueue';
import type { Operation } from './Operation';
import { OperationStatus } from './OperationStatus';
import { type IOperationExecutionRecordContext, OperationExecutionRecord } from './OperationExecutionRecord';
Expand Down Expand Up @@ -252,30 +247,11 @@ export class OperationExecutionManager {

await Async.forEachAsync(
this._executionQueue,
async (operation: IOperationIteratorResult) => {
let record: OperationExecutionRecord | undefined;
/**
* If the operation is UNASSIGNED_OPERATION, it means that the queue is not able to assign a operation.
* This happens when some operations run remotely. So, we should try to get a remote executing operation
* from the queue manually here.
*/
if (operation.status === UNASSIGNED_OPERATION) {
// Pause for a few time
await Async.sleepAsync(5000);
record = this._executionQueue.tryGetRemoteExecutingOperation();
} else {
record = operation;
}

if (!record) {
// Fail to assign a operation, start over again
return;
} else {
await record.executeAsync({
onStart: onOperationStartAsync,
onResult: onOperationCompleteAsync
});
}
async (operation: OperationExecutionRecord) => {
await operation.executeAsync({
onStart: onOperationStartAsync,
onResult: onOperationCompleteAsync
});
},
{
concurrency: maxParallelism,
Expand Down Expand Up @@ -424,9 +400,6 @@ export class OperationExecutionManager {
if (record.status !== OperationStatus.RemoteExecuting) {
// If the operation was not remote, then we can notify queue that it is complete
this._executionQueue.complete(record);
} else {
// Attempt to requeue other operations if the operation was remote
this._executionQueue.assignOperations();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
preventAutoclose: true
});

/**
* Used to check for remote executing operations. When this is set, the operation will only be queued after the timestamp.
*/
public checkAfter: number = Number.MAX_SAFE_INTEGER;

/**
* Used with checkAfter to determine when to check for remote executing operations again. This may be useful if you want
* to customize the check interval.
*/
public lastCheckedAt: number = 0;

public readonly runner: IOperationRunner;
public readonly associatedPhase: IPhase | undefined;
public readonly associatedProject: RushConfigurationProject | undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
import { Operation } from '../Operation';
import { type IOperationExecutionRecordContext, OperationExecutionRecord } from '../OperationExecutionRecord';
import { MockOperationRunner } from './MockOperationRunner';
import {
AsyncOperationQueue,
type IOperationIteratorResult,
type IOperationSortFunction,
UNASSIGNED_OPERATION
} from '../AsyncOperationQueue';
import { AsyncOperationQueue, type IOperationSortFunction } from '../AsyncOperationQueue';
import { OperationStatus } from '../OperationStatus';
import { Async } from '@rushstack/node-core-library';

Expand Down Expand Up @@ -43,21 +38,14 @@ describe(AsyncOperationQueue.name, () => {

const expectedOrder = [operations[2], operations[0], operations[1], operations[3]];
const actualOrder = [];
// Nothing sets the RemoteExecuting status, this should be a error if it happens
let hasUnassignedOperation: boolean = false;
const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort);
for await (const operation of queue) {
actualOrder.push(operation);
if (operation.status === UNASSIGNED_OPERATION) {
hasUnassignedOperation = true;
continue;
}
operation.status = OperationStatus.Success;
queue.complete(operation);
}

expect(actualOrder).toEqual(expectedOrder);
expect(hasUnassignedOperation).toEqual(false);
});

it('respects the sort predicate', async () => {
Expand All @@ -71,23 +59,15 @@ describe(AsyncOperationQueue.name, () => {
): number => {
return expectedOrder.indexOf(b) - expectedOrder.indexOf(a);
};
// Nothing sets the RemoteExecuting status, this should be a error if it happens
let hasUnassignedOperation: boolean = false;

const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, customSort);
for await (const operation of queue) {
actualOrder.push(operation);
if (operation.status === UNASSIGNED_OPERATION) {
hasUnassignedOperation = true;
continue;
}
operation.status = OperationStatus.Success;
queue.complete(operation);
}

expect(actualOrder).toEqual(expectedOrder);

expect(hasUnassignedOperation).toEqual(false);
});

it('detects cycles', async () => {
Expand Down Expand Up @@ -129,17 +109,11 @@ describe(AsyncOperationQueue.name, () => {
const actualConcurrency: Map<OperationExecutionRecord, number> = new Map();
const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort);
let concurrency: number = 0;
// Nothing sets the RemoteExecuting status, this should be a error if it happens
let hasUnassignedOperation: boolean = false;

// Use 3 concurrent iterators to verify that it handles having more than the operation concurrency
await Promise.all(
Array.from({ length: 3 }, async () => {
for await (const operation of queue) {
if (operation.status === UNASSIGNED_OPERATION) {
hasUnassignedOperation = true;
continue;
}
++concurrency;
await Promise.resolve();

Expand All @@ -157,8 +131,6 @@ describe(AsyncOperationQueue.name, () => {
for (const [operation, operationConcurrency] of expectedConcurrency) {
expect(actualConcurrency.get(operation)).toEqual(operationConcurrency);
}

expect(hasUnassignedOperation).toEqual(false);
});

it('handles remote executed operations', async () => {
Expand All @@ -185,7 +157,7 @@ describe(AsyncOperationQueue.name, () => {
let remoteExecuted: boolean = false;
for await (const operation of queue) {
let record: OperationExecutionRecord | undefined;
if (operation.status === UNASSIGNED_OPERATION) {
if (operation.status === OperationStatus.RemoteExecuting) {
await Async.sleepAsync(100);
record = queue.tryGetRemoteExecutingOperation();
} else {
Expand All @@ -202,6 +174,9 @@ describe(AsyncOperationQueue.name, () => {
operations[1].status = OperationStatus.RemoteExecuting;
// remote executed operation is finished later
remoteExecuted = true;
const currentTime: number = new Date().getTime();
record.lastCheckedAt = currentTime;
record.checkAfter = currentTime + 100;
continue;
}
}
Expand All @@ -216,8 +191,8 @@ describe(AsyncOperationQueue.name, () => {
const operations: OperationExecutionRecord[] = [];

const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort);
const iterator: AsyncIterator<IOperationIteratorResult> = queue[Symbol.asyncIterator]();
const result: IteratorResult<IOperationIteratorResult> = await iterator.next();
const iterator: AsyncIterator<OperationExecutionRecord> = queue[Symbol.asyncIterator]();
const result: IteratorResult<OperationExecutionRecord> = await iterator.next();
expect(result.done).toEqual(true);
});
});
Loading