Skip to content

Commit bcd5e65

Browse files
committed
store,graph,node : Add methods to update non_fatal_errors field in db, and wired them in runner
1 parent 7f136f6 commit bcd5e65

File tree

9 files changed

+49
-2
lines changed

9 files changed

+49
-2
lines changed

core/src/subgraph/runner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ where
478478
persisted_data_sources,
479479
deterministic_errors,
480480
processed_data_sources,
481+
is_non_fatal_errors_active,
481482
)
482483
.await
483484
.context("Failed to transact block operations")?;

graph/src/components/store/traits.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {
309309
data_sources: Vec<StoredDynamicDataSource>,
310310
deterministic_errors: Vec<SubgraphError>,
311311
offchain_to_remove: Vec<StoredDynamicDataSource>,
312+
is_non_fatal_errors_active: bool,
312313
) -> Result<(), StoreError>;
313314

314315
/// The deployment `id` finished syncing, mark it as synced in the database

graph/src/components/store/write.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ pub struct Batch {
628628
pub deterministic_errors: Vec<SubgraphError>,
629629
pub offchain_to_remove: DataSources,
630630
pub error: Option<StoreError>,
631+
pub is_non_fatal_errors_active: bool,
631632
}
632633

633634
impl Batch {
@@ -639,6 +640,7 @@ impl Batch {
639640
data_sources: Vec<StoredDynamicDataSource>,
640641
deterministic_errors: Vec<SubgraphError>,
641642
offchain_to_remove: Vec<StoredDynamicDataSource>,
643+
is_non_fatal_errors_active: bool,
642644
) -> Result<Self, StoreError> {
643645
let block = block_ptr.number;
644646

@@ -670,6 +672,7 @@ impl Batch {
670672
deterministic_errors,
671673
offchain_to_remove,
672674
error: None,
675+
is_non_fatal_errors_active,
673676
})
674677
}
675678

store/postgres/src/deployment.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,29 @@ pub fn fail(
775775
) -> Result<(), StoreError> {
776776
let error_id = insert_subgraph_error(conn, error)?;
777777

778-
update_deployment_status(conn, id, SubgraphHealth::Failed, Some(error_id))?;
778+
update_deployment_status(conn, id, SubgraphHealth::Failed, Some(error_id), None)?;
779+
780+
Ok(())
781+
}
782+
783+
pub fn update_non_fatal_errors(
784+
conn: &PgConnection,
785+
deployment_id: &DeploymentHash,
786+
health: SubgraphHealth,
787+
non_fatal_errors: Option<&[SubgraphError]>,
788+
) -> Result<(), StoreError> {
789+
let error_ids = non_fatal_errors.map(|errors| {
790+
errors
791+
.iter()
792+
.map(|error| {
793+
hex::encode(stable_hash_legacy::utils::stable_hash::<SetHasher, _>(
794+
error,
795+
))
796+
})
797+
.collect::<Vec<_>>()
798+
});
799+
800+
update_deployment_status(conn, deployment_id, health, None, error_ids)?;
779801

780802
Ok(())
781803
}
@@ -802,6 +824,7 @@ pub fn update_deployment_status(
802824
deployment_id: &DeploymentHash,
803825
health: SubgraphHealth,
804826
fatal_error: Option<String>,
827+
non_fatal_errors: Option<Vec<String>>,
805828
) -> Result<(), StoreError> {
806829
use subgraph_deployment as d;
807830

@@ -810,6 +833,7 @@ pub fn update_deployment_status(
810833
d::failed.eq(health.is_failed()),
811834
d::health.eq(health),
812835
d::fatal_error.eq::<Option<String>>(fatal_error),
836+
d::non_fatal_errors.eq::<Vec<String>>(non_fatal_errors.unwrap_or(vec![])),
813837
))
814838
.execute(conn)
815839
.map(|_| ())

store/postgres/src/deployment_store.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,15 @@ impl DeploymentStore {
11161116
&batch.deterministic_errors,
11171117
batch.block_ptr.number,
11181118
)?;
1119+
1120+
if batch.is_non_fatal_errors_active {
1121+
deployment::update_non_fatal_errors(
1122+
&conn,
1123+
&site.deployment,
1124+
deployment::SubgraphHealth::Unhealthy,
1125+
Some(&batch.deterministic_errors),
1126+
)?;
1127+
}
11191128
}
11201129

11211130
let earliest_block = deployment::transact_block(
@@ -1631,7 +1640,7 @@ impl DeploymentStore {
16311640
let _ = self.revert_block_operations(site.clone(), parent_ptr.clone(), &FirehoseCursor::None)?;
16321641

16331642
// Unfail the deployment.
1634-
deployment::update_deployment_status(conn, deployment_id, prev_health, None)?;
1643+
deployment::update_deployment_status(conn, deployment_id, prev_health, None,None)?;
16351644

16361645
Ok(UnfailOutcome::Unfailed)
16371646
}
@@ -1714,6 +1723,7 @@ impl DeploymentStore {
17141723
deployment_id,
17151724
deployment::SubgraphHealth::Healthy,
17161725
None,
1726+
None,
17171727
)?;
17181728

17191729
// Delete the fatal error.

store/postgres/src/writable.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,7 @@ impl WritableStoreTrait for WritableStore {
14951495
data_sources: Vec<StoredDynamicDataSource>,
14961496
deterministic_errors: Vec<SubgraphError>,
14971497
processed_data_sources: Vec<StoredDynamicDataSource>,
1498+
is_non_fatal_errors_active: bool,
14981499
) -> Result<(), StoreError> {
14991500
let batch = Batch::new(
15001501
self.store.input_schema.cheap_clone(),
@@ -1504,6 +1505,7 @@ impl WritableStoreTrait for WritableStore {
15041505
data_sources,
15051506
deterministic_errors,
15061507
processed_data_sources,
1508+
is_non_fatal_errors_active,
15071509
)?;
15081510
self.writer.write(batch, stopwatch).await?;
15091511

store/test-store/src/store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ pub async fn transact_errors(
232232
Vec::new(),
233233
errs,
234234
Vec::new(),
235+
false,
235236
)
236237
.await?;
237238
flush(deployment).await
@@ -287,6 +288,7 @@ pub async fn transact_entities_and_dynamic_data_sources(
287288
deployment.id,
288289
Arc::new(manifest_idx_and_name),
289290
))?;
291+
290292
let mut entity_cache = EntityCache::new(Arc::new(store.clone()));
291293
entity_cache.append(ops);
292294
let mods = entity_cache
@@ -309,6 +311,7 @@ pub async fn transact_entities_and_dynamic_data_sources(
309311
data_sources,
310312
Vec::new(),
311313
Vec::new(),
314+
false,
312315
)
313316
.await
314317
}

store/test-store/tests/graph/entity_cache.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ impl WritableStore for MockStore {
130130
_: Vec<StoredDynamicDataSource>,
131131
_: Vec<SubgraphError>,
132132
_: Vec<StoredDynamicDataSource>,
133+
_: bool,
133134
) -> Result<(), StoreError> {
134135
unimplemented!()
135136
}

store/test-store/tests/postgres/store.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1536,6 +1536,7 @@ fn handle_large_string_with_index() {
15361536
Vec::new(),
15371537
Vec::new(),
15381538
Vec::new(),
1539+
false,
15391540
)
15401541
.await
15411542
.expect("Failed to insert large text");
@@ -1635,6 +1636,7 @@ fn handle_large_bytea_with_index() {
16351636
Vec::new(),
16361637
Vec::new(),
16371638
Vec::new(),
1639+
false,
16381640
)
16391641
.await
16401642
.expect("Failed to insert large text");

0 commit comments

Comments
 (0)