Skip to content

fix(fcm): Wrap HTTP/2 session errors in promise #2868

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 60 additions & 35 deletions src/messaging/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import { App } from '../app';
import { deepCopy } from '../utils/deep-copy';
import { ErrorInfo, MessagingClientErrorCode, FirebaseMessagingError } from '../utils/error';
import {
ErrorInfo, MessagingClientErrorCode, FirebaseMessagingError, FirebaseMessagingSessionError
} from '../utils/error';
import * as utils from '../utils';
import * as validator from '../utils/validator';
import { validateMessage } from './messaging-internal';
Expand Down Expand Up @@ -206,48 +208,71 @@ export class Messaging {
MessagingClientErrorCode.INVALID_ARGUMENT, 'dryRun must be a boolean');
}

const http2SessionHandler = this.useLegacyTransport ? undefined : new Http2SessionHandler(`https://${FCM_SEND_HOST}`)
const http2SessionHandler = this.useLegacyTransport ? undefined : new Http2SessionHandler(`https://${FCM_SEND_HOST}`);

return this.getUrlPath()
.then((urlPath) => {
const requests: Promise<SendResponse>[] = copy.map(async (message) => {
validateMessage(message);
const request: { message: Message; validate_only?: boolean } = { message };
if (dryRun) {
request.validate_only = true;
}

if (http2SessionHandler){
return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse(
FCM_SEND_HOST, urlPath, request, http2SessionHandler);
}
return this.messagingRequestHandler.invokeHttpRequestHandlerForSendResponse(FCM_SEND_HOST, urlPath, request);
});
return Promise.allSettled(requests);
})
.then((results) => {
const responses: SendResponse[] = [];
results.forEach(result => {
if (result.status === 'fulfilled') {
responses.push(result.value);
} else { // rejected
responses.push({ success: false, error: result.reason })
}
})
const successCount: number = responses.filter((resp) => resp.success).length;
return {
responses,
successCount,
failureCount: responses.length - successCount,
};
if (http2SessionHandler) {
let sendResponsePromise: Promise<PromiseSettledResult<SendResponse>[]>;
return new Promise((resolve: (result: PromiseSettledResult<SendResponse>[]) => void, reject) => {
// Start session listeners
http2SessionHandler.invoke().catch((error) => {
const pendingBatchResponse =
sendResponsePromise ? sendResponsePromise.then(this.parseSendResponses) : undefined;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud, if a session error occurs before any of the requests have gone through would pendingBatchResponse be undefined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this would be undefined.

reject(new FirebaseMessagingSessionError(error, undefined, pendingBatchResponse));
});

// Start making requests
const requests: Promise<SendResponse>[] = copy.map(async (message) => {
validateMessage(message);
const request: { message: Message; validate_only?: boolean; } = { message };
if (dryRun) {
request.validate_only = true;
}
return this.messagingRequestHandler.invokeHttp2RequestHandlerForSendResponse(
FCM_SEND_HOST, urlPath, request, http2SessionHandler);
});

// Resolve once all requests have completed
sendResponsePromise = Promise.allSettled(requests);
sendResponsePromise.then(resolve);
});
} else {
const requests: Promise<SendResponse>[] = copy.map(async (message) => {
validateMessage(message);
const request: { message: Message; validate_only?: boolean; } = { message };
if (dryRun) {
request.validate_only = true;
}
return this.messagingRequestHandler.invokeHttpRequestHandlerForSendResponse(
FCM_SEND_HOST, urlPath, request);
});
return Promise.allSettled(requests);
}
})
.then(this.parseSendResponses)
.finally(() => {
if (http2SessionHandler){
http2SessionHandler.close()
}
http2SessionHandler?.close();
});
}

private parseSendResponses(results: PromiseSettledResult<SendResponse>[]): BatchResponse {
const responses: SendResponse[] = [];
results.forEach(result => {
if (result.status === 'fulfilled') {
responses.push(result.value);
} else { // rejected
responses.push({ success: false, error: result.reason });
}
});
const successCount: number = responses.filter((resp) => resp.success).length;
return {
responses,
successCount,
failureCount: responses.length - successCount,
};
}

/**
* Sends the given multicast message to all the FCM registration tokens
* specified in it.
Expand Down
31 changes: 24 additions & 7 deletions src/utils/api-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ class Http2RequestConfigImpl extends BaseRequestConfigImpl implements Http2Reque

public buildRequestOptions(): https.RequestOptions {
const parsed = this.buildUrl();
// TODO(b/401051826)
const protocol = parsed.protocol;

return {
Expand Down Expand Up @@ -1315,9 +1316,16 @@ export class ExponentialBackoffPoller<T> extends EventEmitter {
export class Http2SessionHandler {

private http2Session: http2.ClientHttp2Session
protected promise: Promise<void>
protected resolve: () => void;
protected reject: (_: any) => void;

constructor(url: string){
this.http2Session = this.createSession(url)
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
this.http2Session = this.createSession(url)
});
}

public createSession(url: string): http2.ClientHttp2Session {
Expand All @@ -1330,23 +1338,32 @@ export class Http2SessionHandler {
const http2Session = http2.connect(url, opts)

http2Session.on('goaway', (errorCode, _, opaqueData) => {
throw new FirebaseAppError(
this.reject(new FirebaseAppError(
AppErrorCodes.NETWORK_ERROR,
`Error while making requests: GOAWAY - ${opaqueData.toString()}, Error code: ${errorCode}`
);
`Error while making requests: GOAWAY - ${opaqueData?.toString()}, Error code: ${errorCode}`
));
})

http2Session.on('error', (error) => {
throw new FirebaseAppError(
this.reject(new FirebaseAppError(
AppErrorCodes.NETWORK_ERROR,
`Error while making requests: ${error}`
);
`Session error while making requests: ${error}`
));
})

http2Session.on('close', () => {
// Resolve current promise
this.resolve()
});
return http2Session
}
return this.http2Session
}

public invoke(): Promise<void> {
return this.promise
}

get session(): http2.ClientHttp2Session {
return this.http2Session
}
Expand Down
33 changes: 33 additions & 0 deletions src/utils/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

import { FirebaseError as FirebaseErrorInterface } from '../app';
import { BatchResponse } from '../messaging/messaging-api';
import { deepCopy } from '../utils/deep-copy';

/**
Expand Down Expand Up @@ -344,6 +345,38 @@ export class FirebaseMessagingError extends PrefixedFirebaseError {
}
}

export class FirebaseMessagingSessionError extends FirebaseMessagingError {
public pendingBatchResponse?: Promise<BatchResponse>;
/**
*
* @param info - The error code info.
* @param message - The error message. This will override the default message if provided.
* @param pendingBatchResponse - BatchResponse for pending messages when session error occured.
* @constructor
* @internal
*/
constructor(info: ErrorInfo, message?: string, pendingBatchResponse?: Promise<BatchResponse>) {
// Override default message if custom message provided.
super(info, message || info.message);
this.pendingBatchResponse = pendingBatchResponse;

/* tslint:disable:max-line-length */
// Set the prototype explicitly. See the following link for more details:
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
/* tslint:enable:max-line-length */
(this as any).__proto__ = FirebaseMessagingSessionError.prototype;
}

/** @returns The object representation of the error. */
public toJSON(): object {
return {
code: this.code,
message: this.message,
pendingBatchResponse: this.pendingBatchResponse,
};
}
}

/**
* Firebase project management error code structure. This extends PrefixedFirebaseError.
*/
Expand Down
18 changes: 11 additions & 7 deletions test/resources/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,11 @@ export interface MockHttp2Request {
}

export interface MockHttp2Response {
headers: http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader,
data: Buffer,
headers?: http2.IncomingHttpHeaders & http2.IncomingHttpStatusHeader,
data?: Buffer,
delay?: number,
error?: any
sessionError?: any
streamError?: any,
}

export class Http2Mocker {
Expand All @@ -340,12 +341,12 @@ export class Http2Mocker {
this.connectStub = sinon.stub(http2, 'connect');
this.connectStub.callsFake((_target: any, options: any) => {
const session = this.originalConnect('https://www.example.com', options);
session.request = this.createMockRequest()
session.request = this.createMockRequest(session)
return session;
})
}

private createMockRequest() {
private createMockRequest(session:http2.ClientHttp2Session) {
return (requestHeaders: http2.OutgoingHttpHeaders) => {
// Create a mock ClientHttp2Stream to return
const mockStream = new stream.Readable({
Expand All @@ -365,8 +366,11 @@ export class Http2Mocker {
const mockRes = this.mockResponses.shift();
if (mockRes) {
this.timeouts.push(setTimeout(() => {
if (mockRes.error) {
mockStream.emit('error', mockRes.error)
if (mockRes.sessionError) {
session.emit('error', mockRes.sessionError)
}
if (mockRes.streamError) {
mockStream.emit('error', mockRes.streamError)
}
else {
mockStream.emit('response', mockRes.headers);
Expand Down
31 changes: 31 additions & 0 deletions test/unit/messaging/messaging.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
import { HttpClient } from '../../../src/utils/api-request';
import { getMetricsHeader, getSdkVersion } from '../../../src/utils/index';
import * as utils from '../utils';
import { FirebaseMessagingSessionError } from '../../../src/utils/error';

chai.should();
chai.use(sinonChai);
Expand Down Expand Up @@ -121,6 +122,12 @@ function mockHttp2SendRequestError(
} as mocks.MockHttp2Response
}

function mockHttp2Error(streamError?: any, sessionError?:any): mocks.MockHttp2Response {
return {
streamError: streamError,
sessionError: sessionError
} as mocks.MockHttp2Response
}

function mockErrorResponse(
path: string,
Expand Down Expand Up @@ -906,6 +913,30 @@ describe('Messaging', () => {
});
});

it('should throw error with BatchResponse promise on session error event using HTTP/2', () => {
mockedHttp2Responses.push(mockHttp2SendRequestResponse('projects/projec_id/messages/1'))
const sessionError = 'MOCK_SESSION_ERROR'
mockedHttp2Responses.push(mockHttp2Error(
new Error(`MOCK_STREAM_ERROR caused by ${sessionError}`),
new Error(sessionError)
));
http2Mocker.http2Stub(mockedHttp2Responses)

return messaging.sendEach(
[validMessage, validMessage], true
).catch(async (error: FirebaseMessagingSessionError) => {
expect(error.code).to.equal('messaging/app/network-error');
expect(error.pendingBatchResponse).to.not.be.undefined;
await error.pendingBatchResponse?.then((response: BatchResponse) => {
expect(http2Mocker.requests.length).to.equal(2);
expect(response.failureCount).to.equal(1);
const responses = response.responses;
checkSendResponseSuccess(responses[0], 'projects/projec_id/messages/1');
checkSendResponseFailure(responses[1], 'app/network-error');
})
});
})

// This test was added to also verify https://github.com/firebase/firebase-admin-node/issues/1146
it('should be fulfilled when called with different message types using HTTP/2', () => {
const messageIds = [
Expand Down
44 changes: 42 additions & 2 deletions test/unit/utils/api-request.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,14 @@ function mockHttp2SendRequestError(
} as mocks.MockHttp2Response
}

function mockHttp2Error(err: any): mocks.MockHttp2Response {
function mockHttp2Error(streamError?: any, sessionError?:any): mocks.MockHttp2Response {
return {
error: err
streamError: streamError,
sessionError: sessionError
} as mocks.MockHttp2Response
}


/**
* Returns a new RetryConfig instance for testing. This is same as the default
* RetryConfig, with the backOffFactor set to 0 to avoid delays.
Expand Down Expand Up @@ -2500,6 +2502,44 @@ describe('Http2Client', () => {
http2SessionHandler: http2SessionHandler
}).should.eventually.be.rejectedWith(err).and.have.property('code', 'app/network-error');
});

it('should fail on session and stream errors', async () => {
const reqData = { request: 'data' };
const streamError = 'Error while making request: test stream error. Error code: AWFUL_STREAM_ERROR';
const sessionError = 'Session error while making requests: Error: AWFUL_SESSION_ERROR'
mockedHttp2Responses.push(mockHttp2Error(
{ message: 'test stream error', code: 'AWFUL_STREAM_ERROR' },
new Error('AWFUL_SESSION_ERROR')
));
http2Mocker.http2Stub(mockedHttp2Responses);

const client = new Http2Client();
http2SessionHandler = new Http2SessionHandler(mockHostUrl)

await client.send({
method: 'POST',
url: mockUrl,
headers: {
'authorization': 'Bearer token',
'My-Custom-Header': 'CustomValue',
},
data: reqData,
http2SessionHandler: http2SessionHandler,
}).should.eventually.be.rejectedWith(streamError).and.have.property('code', 'app/network-error')
.then(() => {
expect(http2Mocker.requests.length).to.equal(1);
expect(http2Mocker.requests[0].headers[':method']).to.equal('POST');
expect(http2Mocker.requests[0].headers[':scheme']).to.equal('https:');
expect(http2Mocker.requests[0].headers[':path']).to.equal(mockPath);
expect(JSON.parse(http2Mocker.requests[0].data)).to.deep.equal(reqData);
expect(http2Mocker.requests[0].headers.authorization).to.equal('Bearer token');
expect(http2Mocker.requests[0].headers['content-type']).to.contain('application/json');
expect(http2Mocker.requests[0].headers['My-Custom-Header']).to.equal('CustomValue');
});

await http2SessionHandler.invoke().should.eventually.be.rejectedWith(sessionError)
.and.have.property('code', 'app/network-error')
});
});

describe('AuthorizedHttpClient', () => {
Expand Down