Skip to content

[rush] unassigned operations can ignore weighting constraints #4821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Fixes an issue with cobuilds where weighted operations could ignore the scheduler's weighting requirements.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
28 changes: 25 additions & 3 deletions libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,17 @@ export class AsyncOperationQueue
return;
}

if (waitingIterators.length > 0) {
if (waitingIterators.length > 0 && this.hasRemoteExecutingOperations()) {
// returns an unassigned operation to let caller decide when there is at least one
// remote executing operation which is not ready to process.
if (queue.some((operation) => operation.status === OperationStatus.RemoteExecuting)) {
const remoteExecutingOperation: OperationExecutionRecord | undefined =
this.tryGetRemoteExecutingOperation();
if (remoteExecutingOperation) {
waitingIterators.shift()!({
value: remoteExecutingOperation,
done: false
});
} else {
waitingIterators.shift()!({
value: { weight: 1, status: UNASSIGNED_OPERATION },
done: false
Expand All @@ -175,19 +182,34 @@ export class AsyncOperationQueue
}
}

public hasRemoteExecutingOperations(): boolean {
for (const operation of this._queue) {
if (operation.status === OperationStatus.RemoteExecuting) {
return true;
}
}
return false;
}

public tryGetRemoteExecutingOperation(): OperationExecutionRecord | undefined {
const { _queue: queue } = this;

// cycle through the queue to find the next operation that is executed remotely
for (let i: number = queue.length - 1; i >= 0; i--) {
const operation: OperationExecutionRecord = queue[i];

if (operation.status === OperationStatus.RemoteExecuting) {
const currentTime: number = new Date().getTime();
if (operation.status === OperationStatus.RemoteExecuting && operation.checkAfter < currentTime) {
return operation;
}
}
return undefined;
}

public getRemoteExecutingSleepDuration(): number {
return 5000;
}

/**
* Returns this queue as an async iterator, such that multiple functions iterating this object concurrently
* receive distinct iteration results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin {
periodicCallback.start();
} else {
// failed to acquire the lock, mark current operation to remote executing
const currentTime: number = new Date().getTime();
// eslint-disable-next-line require-atomic-updates -- this should be safe
record.lastCheckedAt = currentTime;
// eslint-disable-next-line require-atomic-updates -- this should also be safe
record.checkAfter = currentTime + 500;
return OperationStatus.RemoteExecuting;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ export class OperationExecutionManager {
*/
if (operation.status === UNASSIGNED_OPERATION) {
// Pause for a few time
await Async.sleepAsync(5000);
record = this._executionQueue.tryGetRemoteExecutingOperation();
await Async.sleepAsync(this._executionQueue.getRemoteExecutingSleepDuration());
} else {
record = operation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
preventAutoclose: true
});

/**
* Used to check for remote executing operations. When this is set, the operation will only be queued after the timestamp.
*/
public checkAfter: number = Number.MAX_SAFE_INTEGER;

/**
* Used with checkAfter to determine when to check for remote executing operations again. This may be useful if you want
* to customize the check interval.
*/
public lastCheckedAt: number = 0;

public readonly runner: IOperationRunner;
public readonly associatedPhase: IPhase | undefined;
public readonly associatedProject: RushConfigurationProject | undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ describe(AsyncOperationQueue.name, () => {
operations[1].status = OperationStatus.RemoteExecuting;
// remote executed operation is finished later
remoteExecuted = true;
const currentTime: number = new Date().getTime();
record.lastCheckedAt = currentTime;
record.checkAfter = currentTime + 10;
continue;
}
}
Expand Down
Loading