Skip to content

Commit 285bc6b

Browse files
committed
docs
1 parent d5a5a03 commit 285bc6b

File tree

2 files changed

+180
-5
lines changed
  • firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery

2 files changed

+180
-5
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import * as bigquery from "@google-cloud/bigquery";
18+
import {
19+
firestoreToBQTable,
20+
} from "./schema";
21+
import { latestConsistentSnapshotView } from "./snapshot";
22+
23+
import { ChangeType, FirestoreEventHistoryTracker, FirestoreDocumentChangeEvent } from "../tracker";
24+
import * as logs from "../logs";
25+
import { BigQuery } from "@google-cloud/bigquery";
26+
27+
export interface FirestoreBigQueryEventHistoryTrackerConfig {
28+
tableId: string;
29+
datasetId: string;
30+
}
31+
32+
/**
33+
* An FirestoreEventHistoryTracker that exports data to BigQuery.
34+
*
35+
* When the first event is received, it creates necessary BigQuery resources:
36+
* - Dataset named {@link FirestoreBigQueryEventHistoryTrackerConfig#datasetId}.
37+
* - Raw change log BigQuery table named {@link FirestoreBigQueryEventHistoryTracker#rawChangeLogTableName}.
38+
* - Latest view named {@link FirestoreBigQueryEventHistoryTracker#rawChangeLogTableName}.
39+
* the raw changelog table
40+
* - Initializing the latest view over the raw changelog.
41+
*
42+
* - Streaming writes into the raw changelog table.
43+
*/
44+
export class FirestoreBigQueryEventHistoryTracker implements FirestoreEventHistoryTracker {
45+
bq: bigquery.BigQuery;
46+
initialized: boolean = false;
47+
48+
constructor(public config: FirestoreBigQueryEventHistoryTrackerConfig) {
49+
this.bq = new bigquery.BigQuery();
50+
}
51+
52+
async record(events: FirestoreDocumentChangeEvent[]) {
53+
await this.initialize();
54+
55+
const rows = events.map(event => {
56+
// This must match firestoreToBQTable().
57+
return {
58+
timestamp: event.timestamp,
59+
event_id: event.eventId,
60+
document_name: event.documentName,
61+
operation: ChangeType[event.operation],
62+
data: JSON.stringify(event.data),
63+
};
64+
});
65+
await this.insertData(rows);
66+
}
67+
68+
/**
69+
* Inserts rows of data into the BigQuery raw change log table.
70+
*/
71+
async insertData(rows: bigquery.RowMetadata[]) {
72+
try {
73+
const dataset = this.bq.dataset(this.config.datasetId);
74+
const table = dataset.table(this.rawChangeLogTableName());
75+
logs.dataInserting(rows.length);
76+
await table.insert(rows);
77+
logs.dataInserted(rows.length);
78+
} catch (e) {
79+
// Reinitializing in case the destintation table is modified.
80+
this.initialized = false;
81+
throw e;
82+
}
83+
}
84+
85+
/**
86+
* Creates the BigQuery resources with the expected schema for {@link FirestoreEventHistoryTracker}.
87+
* After the first invokation, it skips initialization assuming these resources are still there.
88+
*/
89+
async initialize() {
90+
if (this.initialized) {
91+
return;
92+
}
93+
await this.initializeDataset();
94+
await this.initializeRawChangeLogTable();
95+
await this.initializeLatestView();
96+
this.initialized = true;
97+
}
98+
99+
/**
100+
* Creates the specified dataset if it doesn't already exists.
101+
*/
102+
async initializeDataset() {
103+
const dataset = this.bq.dataset(this.config.datasetId);
104+
const [datasetExists] = await dataset.exists();
105+
if (datasetExists) {
106+
logs.bigQueryDatasetExists(this.config.datasetId);
107+
} else {
108+
logs.bigQueryDatasetCreating(this.config.datasetId);
109+
await dataset.create();
110+
logs.bigQueryDatasetCreated(this.config.datasetId);
111+
}
112+
return dataset;
113+
}
114+
115+
/**
116+
* Creates the raw change log table if it it doesn't exist already.
117+
* TODO: Validate that the BigQuery schema is correct if the table does exist,
118+
*/
119+
async initializeRawChangeLogTable() {
120+
const changelogName = this.rawChangeLogTableName();
121+
const dataset = this.bq.dataset(this.config.datasetId);
122+
const table = dataset.table(changelogName);
123+
const [tableExists] = await table.exists();
124+
125+
if (tableExists) {
126+
logs.bigQueryTableAlreadyExists(table.id, dataset.id);
127+
} else {
128+
logs.bigQueryTableCreating(changelogName);
129+
const options = {
130+
// `friendlyName` needs to be here to satisfy TypeScript
131+
friendlyName: changelogName,
132+
schema: firestoreToBQTable(),
133+
};
134+
await table.create(options);
135+
logs.bigQueryTableCreated(changelogName);
136+
}
137+
return table;
138+
};
139+
140+
/**
141+
* Creates a view over the raw change log table that returns only latest operation of
142+
* all existing documents in the exported collection.
143+
*/
144+
async initializeLatestView() {
145+
const dataset = this.bq.dataset(this.config.datasetId);
146+
const view = dataset.table(this.rawLatestView());
147+
const [viewExists] = await view.exists();
148+
149+
if (viewExists) {
150+
logs.bigQueryViewAlreadyExists(view.id, dataset.id);
151+
} else {
152+
const latestSnapshot = latestConsistentSnapshotView(this.config.datasetId, this.rawChangeLogTableName());
153+
logs.bigQueryViewCreating(this.rawLatestView(), latestSnapshot.query);
154+
const options = {
155+
friendlyName: this.rawLatestView(),
156+
view: latestSnapshot,
157+
};
158+
await view.create(options);
159+
logs.bigQueryViewCreated(this.rawLatestView());
160+
}
161+
return view;
162+
}
163+
164+
private rawChangeLogTableName(): string {
165+
return `${this.config.tableId}_raw_changelog`;
166+
}
167+
168+
private rawLatestView(): string {
169+
return `${this.config.tableId}_raw_latest`;
170+
}
171+
}
172+

firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@ import * as logs from "../logs";
2525
import { BigQuery } from "@google-cloud/bigquery";
2626

2727
export interface FirestoreBigQueryEventHistoryTrackerConfig {
28-
tableId: string;
2928
datasetId: string;
29+
tableId: string;
3030
}
3131

3232
/**
33-
* An interface to BigQuery which handles:
34-
* - Iniitializing the raw changelog table when the first event gets recorded.
35-
* - Initializing the latest view over the raw changelog.
36-
* - Streaming writes into the raw changelog table.
33+
* An FirestoreEventHistoryTracker that exports data to BigQuery.
34+
*
35+
* When the first event is received, it creates necessary BigQuery resources:
36+
* - Dataset: {@link FirestoreBigQueryEventHistoryTrackerConfig#datasetId}.
37+
* - Raw change log BigQuery table: {@link FirestoreBigQueryEventHistoryTracker#rawChangeLogTableName}.
38+
* - Latest view: {@link FirestoreBigQueryEventHistoryTracker#rawLatestView}.
39+
* If any subsequent data export fail, it will attempt to reinitialize.
3740
*/
3841
export class FirestoreBigQueryEventHistoryTracker implements FirestoreEventHistoryTracker {
3942
bq: bigquery.BigQuery;

0 commit comments

Comments
 (0)