Skip to content

Commit 4ae0eff

Browse files
committed
all: Make write::EntityMod and EntityModification the same
1 parent fa0ed65 commit 4ae0eff

File tree

9 files changed

+128
-62
lines changed

9 files changed

+128
-62
lines changed

core/src/subgraph/runner.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ where
403403
evict_stats,
404404
} = block_state
405405
.entity_cache
406-
.as_modifications()
406+
.as_modifications(block.number())
407407
.map_err(|e| BlockProcessingError::Unknown(e.into()))?;
408408
section.end();
409409

@@ -738,7 +738,12 @@ where
738738
return Err(anyhow!("{}", err.to_string()));
739739
}
740740

741-
mods.extend(block_state.entity_cache.as_modifications()?.modifications);
741+
mods.extend(
742+
block_state
743+
.entity_cache
744+
.as_modifications(block.number())?
745+
.modifications,
746+
);
742747
processed_data_sources.extend(block_state.processed_data_sources);
743748
}
744749

graph/src/components/store/entity_cache.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::prelude::ENV_VARS;
1111
use crate::schema::InputSchema;
1212
use crate::util::lfu_cache::{EvictStats, LfuCache};
1313

14-
use super::{DerivedEntityQuery, EntityType, LoadRelatedRequest, StoreError};
14+
use super::{BlockNumber, DerivedEntityQuery, EntityType, LoadRelatedRequest, StoreError};
1515

1616
/// The scope in which the `EntityCache` should perform a `get` operation
1717
pub enum GetScope {
@@ -251,7 +251,10 @@ impl EntityCache {
251251
/// to the current state is actually needed.
252252
///
253253
/// Also returns the updated `LfuCache`.
254-
pub fn as_modifications(mut self) -> Result<ModificationsAndCache, StoreError> {
254+
pub fn as_modifications(
255+
mut self,
256+
block: BlockNumber,
257+
) -> Result<ModificationsAndCache, StoreError> {
255258
assert!(!self.in_handler);
256259

257260
// The first step is to make sure all entities being set are in `self.current`.
@@ -285,7 +288,12 @@ impl EntityCache {
285288
| (None, EntityOp::Overwrite(mut updates)) => {
286289
updates.remove_null_fields();
287290
self.current.insert(key.clone(), Some(updates.clone()));
288-
Some(Insert { key, data: updates })
291+
Some(Insert {
292+
key,
293+
data: updates,
294+
block,
295+
end: None,
296+
})
289297
}
290298
// Entity may have been changed
291299
(Some(current), EntityOp::Update(updates)) => {
@@ -294,7 +302,12 @@ impl EntityCache {
294302
.map_err(|e| key.unknown_attribute(e))?;
295303
self.current.insert(key.clone(), Some(data.clone()));
296304
if current != data {
297-
Some(Overwrite { key, data })
305+
Some(Overwrite {
306+
key,
307+
data,
308+
block,
309+
end: None,
310+
})
298311
} else {
299312
None
300313
}
@@ -303,15 +316,20 @@ impl EntityCache {
303316
(Some(current), EntityOp::Overwrite(data)) => {
304317
self.current.insert(key.clone(), Some(data.clone()));
305318
if current != data {
306-
Some(Overwrite { key, data })
319+
Some(Overwrite {
320+
key,
321+
data,
322+
block,
323+
end: None,
324+
})
307325
} else {
308326
None
309327
}
310328
}
311329
// Existing entity was deleted
312330
(Some(_), EntityOp::Remove) => {
313331
self.current.insert(key.clone(), None);
314-
Some(Remove { key })
332+
Some(Remove { key, block })
315333
}
316334
// Entity was deleted, but it doesn't exist in the store
317335
(None, EntityOp::Remove) => None,

graph/src/components/store/mod.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ impl StoreEvent {
725725
.map(|op| {
726726
use self::EntityModification::*;
727727
match op {
728-
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => {
728+
Insert { key, .. } | Overwrite { key, .. } | Remove { key, .. } => {
729729
EntityChange::for_data(subgraph_id.clone(), key.clone())
730730
}
731731
}
@@ -1008,18 +1008,50 @@ pub type PoolWaitStats = Arc<RwLock<MovingStats>>;
10081008
#[derive(Clone, Debug, PartialEq, Eq)]
10091009
pub enum EntityModification {
10101010
/// Insert the entity
1011-
Insert { key: EntityKey, data: Entity },
1011+
Insert {
1012+
key: EntityKey,
1013+
data: Entity,
1014+
block: BlockNumber,
1015+
end: Option<BlockNumber>,
1016+
},
10121017
/// Update the entity by overwriting it
1013-
Overwrite { key: EntityKey, data: Entity },
1018+
Overwrite {
1019+
key: EntityKey,
1020+
data: Entity,
1021+
block: BlockNumber,
1022+
end: Option<BlockNumber>,
1023+
},
10141024
/// Remove the entity
1015-
Remove { key: EntityKey },
1025+
Remove { key: EntityKey, block: BlockNumber },
10161026
}
10171027

10181028
impl EntityModification {
1029+
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
1030+
EntityModification::Insert {
1031+
key,
1032+
data,
1033+
block,
1034+
end: None,
1035+
}
1036+
}
1037+
1038+
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
1039+
EntityModification::Overwrite {
1040+
key,
1041+
data,
1042+
block,
1043+
end: None,
1044+
}
1045+
}
1046+
1047+
pub fn remove(key: EntityKey, block: BlockNumber) -> Self {
1048+
EntityModification::Remove { key, block }
1049+
}
1050+
10191051
pub fn entity_ref(&self) -> &EntityKey {
10201052
use EntityModification::*;
10211053
match self {
1022-
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => key,
1054+
Insert { key, .. } | Overwrite { key, .. } | Remove { key, .. } => key,
10231055
}
10241056
}
10251057

graph/src/components/store/write.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,27 +101,37 @@ impl<'a> TryFrom<&'a EntityMod> for EntityWrite<'a> {
101101
}
102102

103103
impl EntityMod {
104-
fn new(m: EntityModification, block: BlockNumber) -> Self {
104+
fn new(m: EntityModification) -> Self {
105105
match m {
106-
EntityModification::Insert { key, data } => Self::Insert {
106+
EntityModification::Insert {
107107
key,
108108
data,
109109
block,
110-
end: None,
110+
end,
111+
} => Self::Insert {
112+
key,
113+
data,
114+
block,
115+
end,
111116
},
112-
EntityModification::Overwrite { key, data } => Self::Overwrite {
117+
EntityModification::Overwrite {
113118
key,
114119
data,
115120
block,
116-
end: None,
121+
end,
122+
} => Self::Overwrite {
123+
key,
124+
data,
125+
block,
126+
end,
117127
},
118-
EntityModification::Remove { key } => Self::Remove { key, block },
128+
EntityModification::Remove { key, block } => Self::Remove { key, block },
119129
}
120130
}
121131

122132
#[cfg(debug_assertions)]
123-
pub fn new_test(m: EntityModification, block: BlockNumber) -> Self {
124-
Self::new(m, block)
133+
pub fn new_test(m: EntityModification) -> Self {
134+
Self::new(m)
125135
}
126136

127137
pub fn id(&self) -> &Word {
@@ -321,7 +331,7 @@ impl RowGroup {
321331
));
322332
}
323333

324-
let row = EntityMod::new(emod, block);
334+
let row = EntityMod::new(emod);
325335
self.append_row(row)
326336
}
327337

runtime/test/src/test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ fn make_thing(id: &str, value: &str) -> (String, EntityModification) {
441441
let key = EntityKey::data("Thing".to_string(), id);
442442
(
443443
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
444-
EntityModification::Insert { key, data },
444+
EntityModification::insert(key, data, 0),
445445
)
446446
}
447447

@@ -485,7 +485,7 @@ async fn run_ipfs_map(
485485
.ctx
486486
.state
487487
.entity_cache
488-
.as_modifications()?
488+
.as_modifications(0)?
489489
.modifications;
490490

491491
// Bring the modifications into a predictable order (by entity_id)
@@ -1017,7 +1017,7 @@ async fn test_entity_store(api_version: Version) {
10171017
&mut module.instance_ctx_mut().ctx.state.entity_cache,
10181018
EntityCache::new(Arc::new(writable.clone())),
10191019
);
1020-
let mut mods = cache.as_modifications().unwrap().modifications;
1020+
let mut mods = cache.as_modifications(0).unwrap().modifications;
10211021
assert_eq!(1, mods.len());
10221022
match mods.pop().unwrap() {
10231023
EntityModification::Overwrite { data, .. } => {
@@ -1038,7 +1038,7 @@ async fn test_entity_store(api_version: Version) {
10381038
.ctx
10391039
.state
10401040
.entity_cache
1041-
.as_modifications()
1041+
.as_modifications(0)
10421042
.unwrap()
10431043
.modifications;
10441044
assert_eq!(1, mods.len());

store/test-store/src/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ pub async fn transact_entities_and_dynamic_data_sources(
290290
let mut entity_cache = EntityCache::new(Arc::new(store.clone()));
291291
entity_cache.append(ops);
292292
let mods = entity_cache
293-
.as_modifications()
293+
.as_modifications(block_ptr_to.number)
294294
.expect("failed to convert to modifications")
295295
.modifications;
296296
let metrics_registry = Arc::new(MetricsRegistry::mock());

store/test-store/tests/graph/entity_cache.rs

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ fn sort_by_entity_key(mut mods: Vec<EntityModification>) -> Vec<EntityModificati
191191
async fn empty_cache_modifications() {
192192
let store = Arc::new(MockStore::new(BTreeMap::new()));
193193
let cache = EntityCache::new(store);
194-
let result = cache.as_modifications();
194+
let result = cache.as_modifications(0);
195195
assert_eq!(result.unwrap().modifications, vec![]);
196196
}
197197

@@ -214,18 +214,12 @@ fn insert_modifications() {
214214
.set(sigurros_key.clone(), sigurros_data.clone())
215215
.unwrap();
216216

217-
let result = cache.as_modifications();
217+
let result = cache.as_modifications(0);
218218
assert_eq!(
219219
sort_by_entity_key(result.unwrap().modifications),
220220
sort_by_entity_key(vec![
221-
EntityModification::Insert {
222-
key: mogwai_key,
223-
data: mogwai_data,
224-
},
225-
EntityModification::Insert {
226-
key: sigurros_key,
227-
data: sigurros_data,
228-
}
221+
EntityModification::insert(mogwai_key, mogwai_data, 0),
222+
EntityModification::insert(sigurros_key, sigurros_data, 0)
229223
])
230224
);
231225
}
@@ -268,18 +262,12 @@ fn overwrite_modifications() {
268262
.set(sigurros_key.clone(), sigurros_data.clone())
269263
.unwrap();
270264

271-
let result = cache.as_modifications();
265+
let result = cache.as_modifications(0);
272266
assert_eq!(
273267
sort_by_entity_key(result.unwrap().modifications),
274268
sort_by_entity_key(vec![
275-
EntityModification::Overwrite {
276-
key: mogwai_key,
277-
data: mogwai_data,
278-
},
279-
EntityModification::Overwrite {
280-
key: sigurros_key,
281-
data: sigurros_data,
282-
}
269+
EntityModification::overwrite(mogwai_key, mogwai_data, 0),
270+
EntityModification::overwrite(sigurros_key, sigurros_data, 0)
283271
])
284272
);
285273
}
@@ -311,13 +299,14 @@ fn consecutive_modifications() {
311299

312300
// We expect a single overwrite modification for the above that leaves "id"
313301
// and "name" untouched, sets "founded" and removes the "label" field.
314-
let result = cache.as_modifications();
302+
let result = cache.as_modifications(0);
315303
assert_eq!(
316304
sort_by_entity_key(result.unwrap().modifications),
317-
sort_by_entity_key(vec![EntityModification::Overwrite {
318-
key: update_key,
319-
data: entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }
320-
}])
305+
sort_by_entity_key(vec![EntityModification::overwrite(
306+
update_key,
307+
entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 },
308+
0
309+
)])
321310
);
322311
}
323312

store/test-store/tests/postgres/relational_bytes.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub fn row_group_update(
8787
let mut group = RowGroup::new(entity_type.clone());
8888
for (key, data) in data {
8989
group
90-
.push(EntityModification::Overwrite { key, data }, block)
90+
.push(EntityModification::overwrite(key, data, block), block)
9191
.unwrap();
9292
}
9393
group
@@ -101,7 +101,7 @@ pub fn row_group_insert(
101101
let mut group = RowGroup::new(entity_type.clone());
102102
for (key, data) in data {
103103
group
104-
.push(EntityModification::Insert { key, data }, block)
104+
.push(EntityModification::insert(key, data, block), block)
105105
.unwrap();
106106
}
107107
group
@@ -115,7 +115,7 @@ pub fn row_group_delete(
115115
let mut group = RowGroup::new(entity_type.clone());
116116
for key in data {
117117
group
118-
.push(EntityModification::Remove { key }, block)
118+
.push(EntityModification::remove(key, block), block)
119119
.unwrap();
120120
}
121121
group

0 commit comments

Comments
 (0)