Skip to content

Commit 403e291

Browse files
cabljacpr-Mais
andauthored
Release: firestore-bigquery-export (#2184)
* fix: log data and old data on fail to enqueue (#2183) * Update firestore-bigquery-export/extension.yaml Co-authored-by: Mais Alheraki <mais.alheraki@gmail.com> * Update firestore-bigquery-export/extension.yaml * chore: update wording of new param (#2185) --------- Co-authored-by: Mais Alheraki <mais.alheraki@gmail.com>
1 parent 3b22f83 commit 403e291

File tree

7 files changed

+113
-53
lines changed

7 files changed

+113
-53
lines changed

firestore-bigquery-export/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## Version 0.1.55
2+
3+
feat - log failed queued tasks
4+
15
## Version 0.1.54
26

37
fixed - bump changetracker and fix more vulnerabilities

firestore-bigquery-export/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ To install an extension, your project must be on the [Blaze (pay as you go) plan
126126

127127
* Collection path: What is the path of the collection that you would like to export? You may use `{wildcard}` notation to match a subcollection of all documents in a collection (for example: `chatrooms/{chatid}/posts`). Parent Firestore Document IDs from `{wildcards}` can be returned in `path_params` as a JSON formatted string.
128128

129+
* Enable logging failed exports: If enabled, the extension will log event exports that failed to enqueue to Cloud Logging, to mitigate data loss.
130+
129131
* Enable Wildcard Column field with Parent Firestore Document IDs: If enabled, creates a column containing a JSON object of all wildcard ids from a documents path.
130132

131133
* Dataset ID: What ID would you like to use for your BigQuery dataset? This extension will create the dataset, if it doesn't already exist.

firestore-bigquery-export/extension.yaml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
name: firestore-bigquery-export
16-
version: 0.1.54
16+
version: 0.1.55
1717
specVersion: v1beta
1818

1919
displayName: Stream Firestore to BigQuery
@@ -206,6 +206,19 @@ params:
206206
default: posts
207207
required: true
208208

209+
- param: LOG_FAILED_EXPORTS
210+
label: Enable logging failed exports
211+
description: >-
212+
If enabled, the extension will log event exports that failed to enqueue to
213+
Cloud Logging, to mitigate data loss.
214+
type: select
215+
options:
216+
- label: Yes
217+
value: yes
218+
- label: No
219+
value: no
220+
required: true
221+
209222
- param: WILDCARD_IDS
210223
label: Enable Wildcard Column field with Parent Firestore Document IDs
211224
description: >-

firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Object {
2020
"instanceId": undefined,
2121
"kmsKeyName": "test",
2222
"location": "us-central1",
23+
"logFailedExportData": false,
2324
"maxDispatchesPerSecond": 10,
2425
"tableId": "my_table",
2526
"timePartitioning": null,
@@ -74,4 +75,4 @@ Object {
7475
",
7576
"validationRegex": "^[a-zA-Z0-9_]+$",
7677
}
77-
`;
78+
`;

firestore-bigquery-export/functions/src/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export function clustering(clusters: string | undefined) {
3232
}
3333

3434
export default {
35+
logFailedExportData: process.env.LOG_FAILED_EXPORTS === "yes",
3536
bqProjectId: process.env.BIGQUERY_PROJECT_ID,
3637
databaseId: "(default)",
3738
collectionPath: process.env.COLLECTION_PATH,

firestore-bigquery-export/functions/src/index.ts

Lines changed: 72 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,27 @@ import * as logs from "./logs";
3232
import * as events from "./events";
3333
import { getChangeType, getDocumentId, resolveWildcardIds } from "./util";
3434

35+
const eventTrackerConfig = {
36+
tableId: config.tableId,
37+
datasetId: config.datasetId,
38+
datasetLocation: config.datasetLocation,
39+
backupTableId: config.backupCollectionId,
40+
transformFunction: config.transformFunction,
41+
timePartitioning: config.timePartitioning,
42+
timePartitioningField: config.timePartitioningField,
43+
timePartitioningFieldType: config.timePartitioningFieldType,
44+
timePartitioningFirestoreField: config.timePartitioningFirestoreField,
45+
databaseId: config.databaseId,
46+
clustering: config.clustering,
47+
wildcardIds: config.wildcardIds,
48+
bqProjectId: config.bqProjectId,
49+
useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax,
50+
skipInit: true,
51+
kmsKeyName: config.kmsKeyName,
52+
};
53+
3554
const eventTracker: FirestoreEventHistoryTracker =
36-
new FirestoreBigQueryEventHistoryTracker({
37-
tableId: config.tableId,
38-
datasetId: config.datasetId,
39-
datasetLocation: config.datasetLocation,
40-
backupTableId: config.backupCollectionId,
41-
transformFunction: config.transformFunction,
42-
timePartitioning: config.timePartitioning,
43-
timePartitioningField: config.timePartitioningField,
44-
timePartitioningFieldType: config.timePartitioningFieldType,
45-
timePartitioningFirestoreField: config.timePartitioningFirestoreField,
46-
databaseId: config.databaseId,
47-
clustering: config.clustering,
48-
wildcardIds: config.wildcardIds,
49-
bqProjectId: config.bqProjectId,
50-
useNewSnapshotQuerySyntax: config.useNewSnapshotQuerySyntax,
51-
skipInit: true,
52-
kmsKeyName: config.kmsKeyName,
53-
});
55+
new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig);
5456

5557
logs.init();
5658

@@ -97,60 +99,81 @@ export const fsexportbigquery = functions
9799
.document(config.collectionPath)
98100
.onWrite(async (change, context) => {
99101
logs.start();
100-
try {
101-
const changeType = getChangeType(change);
102-
const documentId = getDocumentId(change);
102+
const changeType = getChangeType(change);
103+
const documentId = getDocumentId(change);
104+
105+
const isCreated = changeType === ChangeType.CREATE;
106+
const isDeleted = changeType === ChangeType.DELETE;
103107

104-
const isCreated = changeType === ChangeType.CREATE;
105-
const isDeleted = changeType === ChangeType.DELETE;
108+
const data = isDeleted ? undefined : change.after?.data();
109+
const oldData =
110+
isCreated || config.excludeOldData ? undefined : change.before?.data();
106111

107-
const data = isDeleted ? undefined : change.after.data();
108-
const oldData =
109-
isCreated || config.excludeOldData ? undefined : change.before.data();
112+
/**
113+
* Serialize early before queueing in cloud task
114+
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
115+
*/
116+
let serializedData: any;
117+
let serializedOldData: any;
110118

119+
try {
120+
serializedData = eventTracker.serializeData(data);
121+
serializedOldData = eventTracker.serializeData(oldData);
122+
} catch (err) {
123+
logs.error(false, "Failed to serialize data", err, null, null);
124+
throw err;
125+
}
126+
127+
try {
111128
await events.recordStartEvent({
112129
documentId,
113130
changeType,
114-
before: {
115-
data: change.before.data(),
116-
},
117-
after: {
118-
data: change.after.data(),
119-
},
131+
before: { data: change.before.data() },
132+
after: { data: change.after.data() },
120133
context: context.resource,
121134
});
135+
} catch (err) {
136+
logs.error(false, "Failed to record start event", err, null, null);
137+
throw err;
138+
}
122139

140+
try {
123141
const queue = getFunctions().taskQueue(
124142
`locations/${config.location}/functions/syncBigQuery`,
125143
config.instanceId
126144
);
127145

128-
/**
129-
* enqueue data cannot currently handle documentdata
130-
* Serialize early before queueing in clopud task
131-
* Cloud tasks currently have a limit of 1mb, this also ensures payloads are kept to a minimum
132-
*/
133-
const seializedData = eventTracker.serializeData(data);
134-
const serializedOldData = eventTracker.serializeData(oldData);
135-
136146
await queue.enqueue({
137147
context,
138148
changeType,
139149
documentId,
140-
data: seializedData,
150+
data: serializedData,
141151
oldData: serializedOldData,
142152
});
143153
} catch (err) {
144-
await events.recordErrorEvent(err as Error);
145-
logs.error(err);
146-
const eventAgeMs = Date.now() - Date.parse(context.timestamp);
147-
const eventMaxAgeMs = 10000;
154+
const event = {
155+
timestamp: context.timestamp, // This is a Cloud Firestore commit timestamp with microsecond precision.
156+
operation: changeType,
157+
documentName: context.resource.name,
158+
documentId: documentId,
159+
pathParams: config.wildcardIds ? context.params : null,
160+
eventId: context.eventId,
161+
data: serializedData,
162+
oldData: serializedOldData,
163+
};
148164

149-
if (eventAgeMs > eventMaxAgeMs) {
150-
return;
165+
await events.recordErrorEvent(err as Error);
166+
// Only log the error once here
167+
if (!err.logged) {
168+
logs.error(
169+
config.logFailedExportData,
170+
"Failed to enqueue task to syncBigQuery",
171+
err,
172+
event,
173+
eventTrackerConfig
174+
);
151175
}
152-
153-
throw err;
176+
return;
154177
}
155178

156179
logs.complete();

firestore-bigquery-export/functions/src/logs.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,24 @@ export const dataTypeInvalid = (
149149
);
150150
};
151151

152-
export const error = (err: Error) => {
153-
logger.error("Error when mirroring data to BigQuery", err);
152+
export const error = (
153+
includeEvent: boolean,
154+
message: string,
155+
err: Error,
156+
event: any,
157+
eventTrackerConfig: any
158+
) => {
159+
if (includeEvent) {
160+
logger.error(`Error when mirroring data to BigQuery: ${message}`, {
161+
error: err,
162+
event,
163+
eventTrackerConfig,
164+
});
165+
} else {
166+
logger.error(`Error when mirroring data to BigQuery: ${message}`, {
167+
error: err,
168+
});
169+
}
154170
};
155171

156172
export const init = () => {

0 commit comments

Comments
 (0)