@@ -25,136 +25,102 @@ import * as logs from "../logs";
25
25
import { BigQuery } from "@google-cloud/bigquery" ;
26
26
27
27
export interface FirestoreBigQueryEventHistoryTrackerConfig {
28
- collectionPath : string ;
29
28
datasetId : string ;
30
- initialized : boolean ;
31
- suppressWarnings : boolean ;
29
+ tableId : string ;
32
30
}
33
31
34
32
/**
35
- * An interface to BigQuery which handles:
36
- * - Iniitializing the raw changelog table when the first event gets recorded.
37
- * - Initializing the latest view over the raw changelog.
38
- * - Streaming writes into the raw changelog table.
33
+ * An FirestoreEventHistoryTracker that exports data to BigQuery.
34
+ *
35
+ * When the first event is received, it creates necessary BigQuery resources:
36
+ * - Dataset: {@link FirestoreBigQueryEventHistoryTrackerConfig#datasetId}.
37
+ * - Table: Raw change log table {@link FirestoreBigQueryEventHistoryTracker#rawChangeLogTableName}.
38
+ * - View: Latest view {@link FirestoreBigQueryEventHistoryTracker#rawLatestView}.
39
+ * If any subsequent data export fails, it will attempt to reinitialize.
39
40
*/
40
41
export class FirestoreBigQueryEventHistoryTracker implements FirestoreEventHistoryTracker {
41
42
bq : bigquery . BigQuery ;
42
- tableName : string ;
43
- initialized : boolean ;
44
- suppressWarnings : boolean ;
43
+ initialized : boolean = false ;
45
44
46
45
constructor ( public config : FirestoreBigQueryEventHistoryTrackerConfig ) {
47
46
this . bq = new bigquery . BigQuery ( ) ;
48
- this . initialized = config . initialized ;
49
- this . tableName = config . collectionPath . replace ( / \/ / g, "_" ) ;
50
- this . suppressWarnings = config . suppressWarnings ;
51
47
}
52
48
53
49
async record ( events : FirestoreDocumentChangeEvent [ ] ) {
54
- if ( ! this . config . initialized ) {
55
- try {
56
- await this . initialize ( this . config . datasetId , this . tableName ) ;
57
- this . initialized = true ;
58
- } catch ( e ) {
59
- logs . bigQueryErrorRecordingDocumentChange ( e ) ;
60
- }
61
- }
50
+ await this . initialize ( ) ;
51
+
62
52
const rows = events . map ( event => {
63
- return this . buildDataRow (
64
- // Use the function's event ID to protect against duplicate executions
65
- event . eventId ,
66
- event . operation ,
67
- event . timestamp ,
68
- event . documentName ,
69
- event . data ) ;
53
+ // This must match firestoreToBQTable().
54
+ return {
55
+ timestamp : event . timestamp ,
56
+ event_id : event . eventId ,
57
+ document_name : event . documentName ,
58
+ operation : ChangeType [ event . operation ] ,
59
+ data : JSON . stringify ( event . data ) ,
60
+ } ;
70
61
} ) ;
71
- await this . insertData ( this . config . datasetId , this . tableName , rows ) ;
62
+ await this . insertData ( rows ) ;
72
63
}
73
64
74
65
/**
75
- * Ensure that the defined Firestore schema exists within BigQuery and
76
- * contains the correct information. This is invoked for the first time when
77
- * the first document change event is recorded.
66
+ * Inserts rows of data into the BigQuery raw change log table.
78
67
*/
79
- async initialize ( datasetId : string , tableName : string ) {
80
- const rawTable = raw ( tableName ) ;
81
-
82
- await this . initializeDataset ( datasetId ) ;
83
- await this . initializeChangelog ( datasetId , rawTable ) ;
84
- await this . initializeLatestView ( datasetId , rawTable ) ;
85
- } ;
86
-
87
- buildDataRow (
88
- eventId : string ,
89
- changeType : ChangeType ,
90
- timestamp : string ,
91
- document_name : string ,
92
- data ?: Object
93
- ) : bigquery . RowMetadata {
94
- // This must match firestoreToBQTable().
95
- return {
96
- timestamp : timestamp ,
97
- event_id : eventId ,
98
- document_name : document_name ,
99
- operation : ChangeType [ changeType ] ,
100
- data : JSON . stringify ( data ) ,
101
- } ;
102
- } ;
68
+ private async insertData ( rows : bigquery . RowMetadata [ ] ) {
69
+ try {
70
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
71
+ const table = dataset . table ( this . rawChangeLogTableName ( ) ) ;
72
+ logs . dataInserting ( rows . length ) ;
73
+ await table . insert ( rows ) ;
74
+ logs . dataInserted ( rows . length ) ;
75
+ } catch ( e ) {
76
+ // Reinitializing in case the destintation table is modified.
77
+ this . initialized = false ;
78
+ throw e ;
79
+ }
80
+ }
103
81
104
82
/**
105
- * Insert a row of data into the BigQuery `raw` data table
83
+ * Creates the BigQuery resources with the expected schema for {@link FirestoreEventHistoryTracker}.
84
+ * After the first invokation, it skips initialization assuming these resources are still there.
106
85
*/
107
- async insertData (
108
- datasetId : string ,
109
- collectionTableName : string ,
110
- rows : bigquery . RowMetadata [ ]
111
- ) {
112
- const name = changeLog ( raw ( collectionTableName ) ) ;
113
- const dataset = this . bq . dataset ( datasetId ) ;
114
- const table = dataset . table ( name ) ;
115
- const rowCount = rows . length ;
116
-
117
- logs . dataInserting ( rowCount ) ;
118
- await table . insert ( rows ) ;
119
- logs . dataInserted ( rowCount ) ;
120
- } ;
86
+ private async initialize ( ) {
87
+ if ( this . initialized ) {
88
+ return ;
89
+ }
90
+ await this . initializeDataset ( ) ;
91
+ await this . initializeRawChangeLogTable ( ) ;
92
+ await this . initializeLatestView ( ) ;
93
+ this . initialized = true ;
94
+ }
121
95
122
96
/**
123
- * Check that the specified dataset exists, and create it if it doesn't.
97
+ * Creates the specified dataset if it doesn't already exists .
124
98
*/
125
- async initializeDataset ( datasetId : string ) : Promise < bigquery . Dataset > {
126
- const dataset = this . bq . dataset ( datasetId ) ;
99
+ private async initializeDataset ( ) {
100
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
127
101
const [ datasetExists ] = await dataset . exists ( ) ;
128
102
if ( datasetExists ) {
129
- if ( ! this . suppressWarnings ) {
130
- logs . bigQueryDatasetExists ( datasetId ) ;
131
- }
103
+ logs . bigQueryDatasetExists ( this . config . datasetId ) ;
132
104
} else {
133
- logs . bigQueryDatasetCreating ( datasetId ) ;
105
+ logs . bigQueryDatasetCreating ( this . config . datasetId ) ;
134
106
await dataset . create ( ) ;
135
- logs . bigQueryDatasetCreated ( datasetId ) ;
107
+ logs . bigQueryDatasetCreated ( this . config . datasetId ) ;
136
108
}
137
109
return dataset ;
138
- } ;
110
+ }
139
111
140
112
/**
141
- * Check that the table exists within the specified dataset, and create it
142
- * if it doesn't. If the table does exist, validate that the BigQuery schema
143
- * is correct and add any missing fields.
113
+ * Creates the raw change log table if it doesn't already exist.
114
+ * TODO: Validate that the BigQuery schema is correct if the table does exist,
144
115
*/
145
- async initializeChangelog (
146
- datasetId : string ,
147
- tableName : string ,
148
- ) : Promise < bigquery . Table > {
149
- const changelogName = changeLog ( tableName ) ;
150
- const dataset = this . bq . dataset ( datasetId ) ;
151
- let table = dataset . table ( changelogName ) ;
116
+ private async initializeRawChangeLogTable ( ) {
117
+ const changelogName = this . rawChangeLogTableName ( ) ;
118
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
119
+ const table = dataset . table ( changelogName ) ;
152
120
const [ tableExists ] = await table . exists ( ) ;
153
121
154
122
if ( tableExists ) {
155
- if ( ! this . suppressWarnings ) {
156
- logs . bigQueryTableAlreadyExists ( table . id , dataset . id ) ;
157
- }
123
+ logs . bigQueryTableAlreadyExists ( table . id , dataset . id ) ;
158
124
} else {
159
125
logs . bigQueryTableCreating ( changelogName ) ;
160
126
const options = {
@@ -169,37 +135,35 @@ export class FirestoreBigQueryEventHistoryTracker implements FirestoreEventHisto
169
135
} ;
170
136
171
137
/**
172
- * Create a view over a table storing a change log of Firestore documents
173
- * which contains only latest version of all live documents in the mirrored
174
- * collection.
138
+ * Creates the latest snapshot view, which returns only latest operations
139
+ * of all existing documents over the raw change log table.
175
140
*/
176
- async initializeLatestView (
177
- datasetId : string ,
178
- tableName : string
179
- ) : Promise < bigquery . Table > {
180
- let viewName = latest ( tableName ) ;
181
- const dataset = this . bq . dataset ( datasetId ) ;
182
- let view = dataset . table ( viewName ) ;
141
+ private async initializeLatestView ( ) {
142
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
143
+ const view = dataset . table ( this . rawLatestView ( ) ) ;
183
144
const [ viewExists ] = await view . exists ( ) ;
184
145
185
146
if ( viewExists ) {
186
- if ( ! this . suppressWarnings ) {
187
- logs . bigQueryViewAlreadyExists ( view . id , dataset . id ) ;
188
- }
147
+ logs . bigQueryViewAlreadyExists ( view . id , dataset . id ) ;
189
148
} else {
190
- const latestSnapshot = latestConsistentSnapshotView ( datasetId , changeLog ( tableName ) ) ;
191
- logs . bigQueryViewCreating ( viewName , latestSnapshot . query ) ;
149
+ const latestSnapshot = latestConsistentSnapshotView ( this . config . datasetId , this . rawChangeLogTableName ( ) ) ;
150
+ logs . bigQueryViewCreating ( this . rawLatestView ( ) , latestSnapshot . query ) ;
192
151
const options = {
193
- friendlyName : viewName ,
152
+ friendlyName : this . rawLatestView ( ) ,
194
153
view : latestSnapshot ,
195
154
} ;
196
155
await view . create ( options ) ;
197
- logs . bigQueryViewCreated ( viewName ) ;
156
+ logs . bigQueryViewCreated ( this . rawLatestView ( ) ) ;
198
157
}
199
158
return view ;
200
- } ;
159
+ }
160
+
161
+ private rawChangeLogTableName ( ) : string {
162
+ return `${ this . config . tableId } _raw_changelog` ;
163
+ }
164
+
165
+ private rawLatestView ( ) : string {
166
+ return `${ this . config . tableId } _raw_latest` ;
167
+ }
201
168
}
202
169
203
- export function raw ( tableName : string ) : string { return `${ tableName } _raw` ; } ;
204
- export function changeLog ( tableName : string ) : string { return `${ tableName } _changelog` ; }
205
- export function latest ( tableName : string ) : string { return `${ tableName } _latest` ; } ;
0 commit comments