Skip to content

Commit 399b732

Browse files
authored
substreams: Add support for overriding module params (#4759)
If params is provided on the substreams-based subgraph manifest then replace all the existing params on the output module with the provided list.
1 parent f88c0ff commit 399b732

File tree

1 file changed

+136
-6
lines changed

1 file changed

+136
-6
lines changed

chain/substreams/src/data_source.rs

Lines changed: 136 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use graph::{
77
components::link_resolver::LinkResolver,
88
prelude::{async_trait, BlockNumber, DataSourceTemplateInfo, Link},
99
slog::Logger,
10+
substreams::{
11+
module::input::{Input, Params},
12+
Module,
13+
},
1014
};
1115

1216
use prost::Message;
@@ -152,6 +156,35 @@ pub struct UnresolvedDataSource {
152156
pub mapping: UnresolvedMapping,
153157
}
154158

159+
/// Replace all the existing params with the provided ones.
160+
fn patch_module_params(params: Option<Vec<String>>, module: &mut Module) {
161+
let params = match params {
162+
Some(params) => params,
163+
None => return,
164+
};
165+
166+
let mut inputs: Vec<graph::substreams::module::Input> = module
167+
.inputs
168+
.iter()
169+
.flat_map(|input| match input.input {
170+
None => None,
171+
Some(Input::Params(_)) => None,
172+
Some(_) => Some(input.clone()),
173+
})
174+
.collect();
175+
176+
inputs.append(
177+
&mut params
178+
.into_iter()
179+
.map(|value| graph::substreams::module::Input {
180+
input: Some(Input::Params(Params { value })),
181+
})
182+
.collect(),
183+
);
184+
185+
module.inputs = inputs;
186+
}
187+
155188
#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)]
156189
#[serde(rename_all = "camelCase")]
157190
/// Text api_version, before parsing and validation.
@@ -170,13 +203,17 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
170203
) -> Result<DataSource, Error> {
171204
let content = resolver.cat(logger, &self.source.package.file).await?;
172205

173-
let package = graph::substreams::Package::decode(content.as_ref())?;
206+
let mut package = graph::substreams::Package::decode(content.as_ref())?;
174207

175-
let module = match package.modules {
176-
Some(ref modules) => modules
208+
let module = match package.modules.as_mut() {
209+
Some(modules) => modules
177210
.modules
178-
.iter()
179-
.find(|module| module.name == self.source.package.module_name),
211+
.iter_mut()
212+
.find(|module| module.name == self.source.package.module_name)
213+
.map(|module| {
214+
patch_module_params(self.source.package.params, module);
215+
module
216+
}),
180217
None => None,
181218
};
182219

@@ -233,6 +270,7 @@ pub struct UnresolvedSource {
233270
pub struct UnresolvedPackage {
234271
pub module_name: String,
235272
pub file: Link,
273+
pub params: Option<Vec<String>>,
236274
}
237275

238276
#[derive(Debug, Clone, Default, Deserialize)]
@@ -285,7 +323,10 @@ mod test {
285323
prelude::{async_trait, serde_yaml, JsonValueStream, Link},
286324
slog::{o, Discard, Logger},
287325
substreams::module::{Kind, KindMap, KindStore},
288-
substreams::{Module, Modules, Package},
326+
substreams::{
327+
module::input::{Input, Params},
328+
Module, Modules, Package,
329+
},
289330
};
290331
use prost::Message;
291332

@@ -304,6 +345,32 @@ mod test {
304345
file: Link {
305346
link: "/ipfs/QmbHnhUFZa6qqqRyubUYhXntox1TCBxqryaBM1iNGqVJzT".into(),
306347
},
348+
params: None,
349+
},
350+
},
351+
mapping: UnresolvedMapping {
352+
api_version: "0.0.7".into(),
353+
kind: "substreams/graph-entities".into(),
354+
},
355+
};
356+
assert_eq!(ds, expected);
357+
}
358+
359+
#[test]
360+
fn parse_data_source_with_params() {
361+
let ds: UnresolvedDataSource =
362+
serde_yaml::from_str(TEMPLATE_DATA_SOURCE_WITH_PARAMS).unwrap();
363+
let expected = UnresolvedDataSource {
364+
kind: SUBSTREAMS_KIND.into(),
365+
network: Some("mainnet".into()),
366+
name: "Uniswap".into(),
367+
source: crate::UnresolvedSource {
368+
package: crate::UnresolvedPackage {
369+
module_name: "output".into(),
370+
file: Link {
371+
link: "/ipfs/QmbHnhUFZa6qqqRyubUYhXntox1TCBxqryaBM1iNGqVJzT".into(),
372+
},
373+
params: Some(vec!["x", "y", "123"].into_iter().map(Into::into).collect()),
307374
},
308375
},
309376
mapping: UnresolvedMapping {
@@ -338,6 +405,50 @@ mod test {
338405
assert_eq!(ds, expected);
339406
}
340407

408+
#[tokio::test]
409+
async fn data_source_conversion_override_params() {
410+
let mut package = gen_package();
411+
let mut modules = package.modules.unwrap();
412+
modules.modules.get_mut(0).map(|module| {
413+
module.inputs = vec![
414+
graph::substreams::module::Input {
415+
input: Some(Input::Params(Params { value: "x".into() })),
416+
},
417+
graph::substreams::module::Input {
418+
input: Some(Input::Params(Params { value: "y".into() })),
419+
},
420+
graph::substreams::module::Input {
421+
input: Some(Input::Params(Params {
422+
value: "123".into(),
423+
})),
424+
},
425+
]
426+
});
427+
package.modules = Some(modules);
428+
429+
let ds: UnresolvedDataSource =
430+
serde_yaml::from_str(TEMPLATE_DATA_SOURCE_WITH_PARAMS).unwrap();
431+
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
432+
let logger = Logger::root(Discard, o!());
433+
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
434+
let expected = DataSource {
435+
kind: SUBSTREAMS_KIND.into(),
436+
network: Some("mainnet".into()),
437+
name: "Uniswap".into(),
438+
source: crate::Source {
439+
module_name: "output".into(),
440+
package,
441+
},
442+
mapping: Mapping {
443+
api_version: semver::Version::from_str("0.0.7").unwrap(),
444+
kind: "substreams/graph-entities".into(),
445+
},
446+
context: Arc::new(None),
447+
initial_block: Some(123),
448+
};
449+
assert_eq!(ds, expected);
450+
}
451+
341452
#[test]
342453
fn data_source_validation() {
343454
let mut ds = gen_data_source();
@@ -444,6 +555,25 @@ mod test {
444555
apiVersion: 0.0.7
445556
"#;
446557

558+
const TEMPLATE_DATA_SOURCE_WITH_PARAMS: &str = r#"
559+
kind: substreams
560+
name: Uniswap
561+
network: mainnet
562+
source:
563+
package:
564+
moduleName: output
565+
file:
566+
/: /ipfs/QmbHnhUFZa6qqqRyubUYhXntox1TCBxqryaBM1iNGqVJzT
567+
# This IPFs path would be generated from a local path at deploy time
568+
params:
569+
- x
570+
- y
571+
- 123
572+
mapping:
573+
kind: substreams/graph-entities
574+
apiVersion: 0.0.7
575+
"#;
576+
447577
#[derive(Debug)]
448578
struct NoopLinkResolver {}
449579

0 commit comments

Comments
 (0)