Skip to content

Commit b94ba9d

Browse files
committed
core, graph, store: Restart writable when restarting subgraph
When the subgraph runner encounters an error, it needs to restart the store to clear any errors that might have happened.
1 parent d14c155 commit b94ba9d

File tree

6 files changed

+136
-5
lines changed

6 files changed

+136
-5
lines changed

core/src/subgraph/inputs.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,42 @@ pub struct IndexingInputs<C: Blockchain> {
3333
/// possibly expensive and noisy, information
3434
pub instrument: bool,
3535
}
36+
37+
impl<C: Blockchain> IndexingInputs<C> {
38+
pub fn with_store(&self, store: Arc<dyn WritableStore>) -> Self {
39+
let IndexingInputs {
40+
deployment,
41+
features,
42+
start_blocks,
43+
stop_block,
44+
store: _,
45+
debug_fork,
46+
triggers_adapter,
47+
chain,
48+
templates,
49+
unified_api_version,
50+
static_filters,
51+
poi_version,
52+
network,
53+
manifest_idx_and_name,
54+
instrument,
55+
} = self;
56+
IndexingInputs {
57+
deployment: deployment.clone(),
58+
features: features.clone(),
59+
start_blocks: start_blocks.clone(),
60+
stop_block: stop_block.clone(),
61+
store,
62+
debug_fork: debug_fork.clone(),
63+
triggers_adapter: triggers_adapter.clone(),
64+
chain: chain.clone(),
65+
templates: templates.clone(),
66+
unified_api_version: unified_api_version.clone(),
67+
static_filters: *static_filters,
68+
poi_version: *poi_version,
69+
network: network.clone(),
70+
manifest_idx_and_name: manifest_idx_and_name.clone(),
71+
instrument: *instrument,
72+
}
73+
}
74+
}

core/src/subgraph/runner.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,19 @@ where
179179
self.inputs.store.flush().await?;
180180
return Ok(self);
181181
}
182-
Action::Restart => break,
182+
Action::Restart => {
183+
// Restart the store to clear any errors that it
184+
// might have encountered and use that from now on
185+
let store = self.inputs.store.cheap_clone();
186+
let store = store.restart().await?;
187+
self.inputs = Arc::new(self.inputs.with_store(store));
188+
// Also clear the entity cache since we might have
189+
// entries in there that never made it to the
190+
// database
191+
self.state.entity_lfu_cache = LfuCache::new();
192+
self.state.synced = self.inputs.store.is_deployment_synced().await?;
193+
break;
194+
}
183195
};
184196
}
185197
}

graph/src/components/store/traits.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,17 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {
324324

325325
/// Wait for the background writer to finish processing its queue
326326
async fn flush(&self) -> Result<(), StoreError>;
327+
328+
/// Restart the `WritableStore`. This will clear any errors that have
329+
/// been encountered. Code that calls this must not make any assumptions
330+
/// about what has been written already, as the write queue might
331+
/// contain unprocessed write requests that will be discarded by this
332+
/// call.
333+
///
334+
/// After this call, `self` should not be used anymore, as it will
335+
/// continue to produce errors for any write requests, and instead, the
336+
/// returned `WritableStore` should be used.
337+
async fn restart(self: Arc<Self>) -> Result<Arc<dyn WritableStore>, StoreError>;
327338
}
328339

329340
#[async_trait]

store/postgres/src/writable.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use graph::prelude::{
1313
BLOCK_NUMBER_MAX,
1414
};
1515
use graph::schema::InputSchema;
16-
use graph::slog::info;
16+
use graph::slog::{info, warn};
1717
use graph::tokio::task::JoinHandle;
1818
use graph::util::bounded_queue::BoundedQueue;
1919
use graph::{
@@ -1313,4 +1313,17 @@ impl WritableStoreTrait for WritableStore {
13131313
async fn flush(&self) -> Result<(), StoreError> {
13141314
self.writer.flush().await
13151315
}
1316+
1317+
async fn restart(self: Arc<Self>) -> Result<Arc<dyn WritableStoreTrait>, StoreError> {
1318+
if self.poisoned() {
1319+
let logger = self.store.logger.clone();
1320+
if let Err(e) = self.stop().await {
1321+
warn!(logger, "Writable had error when stopping, it is safe to ignore this error"; "error" => e.to_string());
1322+
}
1323+
let store = Arc::new(self.store.store.0.clone());
1324+
store.writable(logger, self.store.site.id.into()).await
1325+
} else {
1326+
Ok(self)
1327+
}
1328+
}
13161329
}

store/test-store/tests/graph/entity_cache.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ impl WritableStore for MockStore {
169169
async fn causality_region_curr_val(&self) -> Result<Option<CausalityRegion>, StoreError> {
170170
unimplemented!()
171171
}
172+
173+
async fn restart(self: Arc<Self>) -> Result<Arc<dyn WritableStore>, StoreError> {
174+
unimplemented!()
175+
}
172176
}
173177

174178
fn make_band_key(id: &'static str) -> EntityKey {

store/test-store/tests/postgres/writable.rs

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use web3::types::H256;
1616
const SCHEMA_GQL: &str = "
1717
type Counter @entity {
1818
id: ID!,
19-
count: Int,
19+
count: Int!,
2020
}
2121
";
2222

@@ -93,8 +93,7 @@ where
9393

9494
// Run test and wait for the background writer to finish its work so
9595
// it won't conflict with the next test
96-
test(store, writable.clone(), deployment).await;
97-
writable.flush().await.unwrap();
96+
test(store, writable, deployment).await;
9897
});
9998
}
10099

@@ -161,5 +160,58 @@ fn tracker() {
161160

162161
resume_writer(&deployment, 1).await;
163162
assert_eq!(2, read_count());
163+
164+
// There shouldn't be anything left to do, but make sure of that
165+
writable.flush().await.unwrap();
166+
})
167+
}
168+
169+
#[test]
170+
fn restart() {
171+
run_test(|store, writable, deployment| async move {
172+
let subgraph_store = store.subgraph_store();
173+
174+
// Cause an error by leaving out the non-nullable `count` attribute
175+
let entity_ops = vec![EntityOperation::Set {
176+
key: count_key("1"),
177+
data: entity! { id: "1" },
178+
}];
179+
transact_entity_operations(
180+
&subgraph_store,
181+
&deployment,
182+
block_pointer(1),
183+
entity_ops.clone(),
184+
)
185+
.await
186+
.unwrap();
187+
// flush checks for errors and therefore fails
188+
writable
189+
.flush()
190+
.await
191+
.expect_err("writing with missing non-nullable field should fail");
192+
193+
// We now have a poisoned store. Restarting it gives us a new store
194+
// that works again
195+
let writable = writable.restart().await.unwrap();
196+
writable.flush().await.unwrap();
197+
198+
// Retry our write with correct data
199+
let entity_ops = vec![EntityOperation::Set {
200+
key: count_key("1"),
201+
data: entity! { id: "1", count: 1 },
202+
}];
203+
// `SubgraphStore` caches the correct writable so that this call
204+
// uses the restarted writable, and is equivalent to using
205+
// `writable` directly
206+
transact_entity_operations(
207+
&subgraph_store,
208+
&deployment,
209+
block_pointer(1),
210+
entity_ops.clone(),
211+
)
212+
.await
213+
.unwrap();
214+
// Look, no errors
215+
writable.flush().await.unwrap();
164216
})
165217
}

0 commit comments

Comments
 (0)