@@ -25,10 +25,8 @@ import * as logs from "../logs";
25
25
import { BigQuery } from "@google-cloud/bigquery" ;
26
26
27
27
export interface FirestoreBigQueryEventHistoryTrackerConfig {
28
- collectionPath : string ;
28
+ tableId : string ;
29
29
datasetId : string ;
30
- initialized : boolean ;
31
- suppressWarnings : boolean ;
32
30
}
33
31
34
32
/**
@@ -39,122 +37,87 @@ export interface FirestoreBigQueryEventHistoryTrackerConfig {
39
37
*/
40
38
export class FirestoreBigQueryEventHistoryTracker implements FirestoreEventHistoryTracker {
41
39
bq : bigquery . BigQuery ;
42
- tableName : string ;
43
- initialized : boolean ;
44
- suppressWarnings : boolean ;
40
+ initialized : boolean = false ;
45
41
46
42
constructor ( public config : FirestoreBigQueryEventHistoryTrackerConfig ) {
47
43
this . bq = new bigquery . BigQuery ( ) ;
48
- this . initialized = config . initialized ;
49
- this . tableName = config . collectionPath . replace ( / \/ / g, "_" ) ;
50
- this . suppressWarnings = config . suppressWarnings ;
51
44
}
52
45
53
46
async record ( events : FirestoreDocumentChangeEvent [ ] ) {
54
- if ( ! this . config . initialized ) {
55
- try {
56
- await this . initialize ( this . config . datasetId , this . tableName ) ;
57
- this . initialized = true ;
58
- } catch ( e ) {
59
- logs . bigQueryErrorRecordingDocumentChange ( e ) ;
60
- }
61
- }
47
+ await this . initialize ( ) ;
48
+
62
49
const rows = events . map ( event => {
63
- return this . buildDataRow (
64
- // Use the function's event ID to protect against duplicate executions
65
- event . eventId ,
66
- event . operation ,
67
- event . timestamp ,
68
- event . documentName ,
69
- event . data ) ;
50
+ // This must match firestoreToBQTable().
51
+ return {
52
+ timestamp : event . timestamp ,
53
+ event_id : event . eventId ,
54
+ document_name : event . documentName ,
55
+ operation : ChangeType [ event . operation ] ,
56
+ data : JSON . stringify ( event . data ) ,
57
+ } ;
70
58
} ) ;
71
- await this . insertData ( this . config . datasetId , this . tableName , rows ) ;
59
+ await this . insertData ( rows ) ;
72
60
}
73
61
74
62
/**
75
- * Ensure that the defined Firestore schema exists within BigQuery and
76
- * contains the correct information. This is invoked for the first time when
77
- * the first document change event is recorded.
63
+ * Inserts rows of data into the BigQuery raw change log table.
78
64
*/
79
- async initialize ( datasetId : string , tableName : string ) {
80
- const rawTable = raw ( tableName ) ;
81
-
82
- await this . initializeDataset ( datasetId ) ;
83
- await this . initializeChangelog ( datasetId , rawTable ) ;
84
- await this . initializeLatestView ( datasetId , rawTable ) ;
85
- } ;
86
-
87
- buildDataRow (
88
- eventId : string ,
89
- changeType : ChangeType ,
90
- timestamp : string ,
91
- document_name : string ,
92
- data ?: Object
93
- ) : bigquery . RowMetadata {
94
- // This must match firestoreToBQTable().
95
- return {
96
- timestamp : timestamp ,
97
- event_id : eventId ,
98
- document_name : document_name ,
99
- operation : ChangeType [ changeType ] ,
100
- data : JSON . stringify ( data ) ,
101
- } ;
102
- } ;
65
+ async insertData ( rows : bigquery . RowMetadata [ ] ) {
66
+ try {
67
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
68
+ const table = dataset . table ( this . rawChangeLogTableName ( ) ) ;
69
+ logs . dataInserting ( rows . length ) ;
70
+ await table . insert ( rows ) ;
71
+ logs . dataInserted ( rows . length ) ;
72
+ } catch ( e ) {
73
+ // Reinitializing in case the destintation table is modified.
74
+ this . initialized = false ;
75
+ throw e ;
76
+ }
77
+ }
103
78
104
79
/**
105
- * Insert a row of data into the BigQuery `raw` data table
80
+ * Creates the BigQuery resources with the expected schema for {@link FirestoreEventHistoryTracker}.
81
+ * After the first invokation, it skips initialization assuming these resources are still there.
106
82
*/
107
- async insertData (
108
- datasetId : string ,
109
- collectionTableName : string ,
110
- rows : bigquery . RowMetadata [ ]
111
- ) {
112
- const name = changeLog ( raw ( collectionTableName ) ) ;
113
- const dataset = this . bq . dataset ( datasetId ) ;
114
- const table = dataset . table ( name ) ;
115
- const rowCount = rows . length ;
116
-
117
- logs . dataInserting ( rowCount ) ;
118
- await table . insert ( rows ) ;
119
- logs . dataInserted ( rowCount ) ;
120
- } ;
83
+ async initialize ( ) {
84
+ if ( this . initialized ) {
85
+ return ;
86
+ }
87
+ await this . initializeDataset ( ) ;
88
+ await this . initializeRawChangeLogTable ( ) ;
89
+ await this . initializeLatestView ( ) ;
90
+ this . initialized = true ;
91
+ }
121
92
122
93
/**
123
- * Check that the specified dataset exists, and create it if it doesn't.
94
+ * Creates the specified dataset if it doesn't already exists .
124
95
*/
125
- async initializeDataset ( datasetId : string ) : Promise < bigquery . Dataset > {
126
- const dataset = this . bq . dataset ( datasetId ) ;
96
+ async initializeDataset ( ) {
97
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
127
98
const [ datasetExists ] = await dataset . exists ( ) ;
128
99
if ( datasetExists ) {
129
- if ( ! this . suppressWarnings ) {
130
- logs . bigQueryDatasetExists ( datasetId ) ;
131
- }
100
+ logs . bigQueryDatasetExists ( this . config . datasetId ) ;
132
101
} else {
133
- logs . bigQueryDatasetCreating ( datasetId ) ;
102
+ logs . bigQueryDatasetCreating ( this . config . datasetId ) ;
134
103
await dataset . create ( ) ;
135
- logs . bigQueryDatasetCreated ( datasetId ) ;
104
+ logs . bigQueryDatasetCreated ( this . config . datasetId ) ;
136
105
}
137
106
return dataset ;
138
- } ;
107
+ }
139
108
140
109
/**
141
- * Check that the table exists within the specified dataset, and create it
142
- * if it doesn't. If the table does exist, validate that the BigQuery schema
143
- * is correct and add any missing fields.
110
+ * Creates the raw change log table if it it doesn't exist already.
111
+ * TODO: Validate that the BigQuery schema is correct if the table does exist,
144
112
*/
145
- async initializeChangelog (
146
- datasetId : string ,
147
- tableName : string ,
148
- ) : Promise < bigquery . Table > {
149
- const changelogName = changeLog ( tableName ) ;
150
- const dataset = this . bq . dataset ( datasetId ) ;
151
- let table = dataset . table ( changelogName ) ;
113
+ async initializeRawChangeLogTable ( ) {
114
+ const changelogName = this . rawChangeLogTableName ( ) ;
115
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
116
+ const table = dataset . table ( changelogName ) ;
152
117
const [ tableExists ] = await table . exists ( ) ;
153
118
154
119
if ( tableExists ) {
155
- if ( ! this . suppressWarnings ) {
156
- logs . bigQueryTableAlreadyExists ( table . id , dataset . id ) ;
157
- }
120
+ logs . bigQueryTableAlreadyExists ( table . id , dataset . id ) ;
158
121
} else {
159
122
logs . bigQueryTableCreating ( changelogName ) ;
160
123
const options = {
@@ -169,37 +132,35 @@ export class FirestoreBigQueryEventHistoryTracker implements FirestoreEventHisto
169
132
} ;
170
133
171
134
/**
172
- * Create a view over a table storing a change log of Firestore documents
173
- * which contains only latest version of all live documents in the mirrored
174
- * collection.
135
+ * Creates a view over the raw change log table that returns only latest operation of
136
+ * all existing documents in the exported collection.
175
137
*/
176
- async initializeLatestView (
177
- datasetId : string ,
178
- tableName : string
179
- ) : Promise < bigquery . Table > {
180
- let viewName = latest ( tableName ) ;
181
- const dataset = this . bq . dataset ( datasetId ) ;
182
- let view = dataset . table ( viewName ) ;
138
+ async initializeLatestView ( ) {
139
+ const dataset = this . bq . dataset ( this . config . datasetId ) ;
140
+ const view = dataset . table ( this . rawLatestView ( ) ) ;
183
141
const [ viewExists ] = await view . exists ( ) ;
184
142
185
143
if ( viewExists ) {
186
- if ( ! this . suppressWarnings ) {
187
- logs . bigQueryViewAlreadyExists ( view . id , dataset . id ) ;
188
- }
144
+ logs . bigQueryViewAlreadyExists ( view . id , dataset . id ) ;
189
145
} else {
190
- const latestSnapshot = latestConsistentSnapshotView ( datasetId , changeLog ( tableName ) ) ;
191
- logs . bigQueryViewCreating ( viewName , latestSnapshot . query ) ;
146
+ const latestSnapshot = latestConsistentSnapshotView ( this . config . datasetId , this . rawChangeLogTableName ( ) ) ;
147
+ logs . bigQueryViewCreating ( this . rawLatestView ( ) , latestSnapshot . query ) ;
192
148
const options = {
193
- friendlyName : viewName ,
149
+ friendlyName : this . rawLatestView ( ) ,
194
150
view : latestSnapshot ,
195
151
} ;
196
152
await view . create ( options ) ;
197
- logs . bigQueryViewCreated ( viewName ) ;
153
+ logs . bigQueryViewCreated ( this . rawLatestView ( ) ) ;
198
154
}
199
155
return view ;
200
- } ;
156
+ }
157
+
158
+ private rawChangeLogTableName ( ) : string {
159
+ return `${ this . config . tableId } _raw_changelog` ;
160
+ }
161
+
162
+ private rawLatestView ( ) : string {
163
+ return `${ this . config . tableId } _raw_latest` ;
164
+ }
201
165
}
202
166
203
- export function raw ( tableName : string ) : string { return `${ tableName } _raw` ; } ;
204
- export function changeLog ( tableName : string ) : string { return `${ tableName } _changelog` ; }
205
- export function latest ( tableName : string ) : string { return `${ tableName } _latest` ; } ;
0 commit comments