Skip to content

Commit ed70303

Browse files
authored
Add input object notify in the writeback cache (#21995)
## Description This PR adds an object notify in the writeback cache. It will allow us to replace transaction manager. A transaction to be scheduled will call the notify wait to wait for input object to become available. The notify wait function is very similar to `multi_input_objects_available` in execution cache. ## Test plan Added tests --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK:
1 parent a73e93f commit ed70303

File tree

3 files changed

+462
-11
lines changed

3 files changed

+462
-11
lines changed

crates/sui-core/src/execution_cache.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ pub trait ObjectCacheRead: Send + Sync {
261261
) {
262262
assert!(
263263
input_key.version().is_none() || input_key.version().unwrap().is_valid(),
264-
"Shared objects in cancelled transaction should always be available immediately,
264+
"Shared objects in cancelled transaction should always be available immediately,
265265
but it appears that transaction manager is waiting for {:?} to become available",
266266
input_key
267267
);
@@ -397,7 +397,7 @@ pub trait ObjectCacheRead: Send + Sync {
397397
version: SequenceNumber,
398398
epoch_id: EpochId,
399399
) -> bool {
400-
let full_id = FullObjectID::Fastpath(object_id); // function explicilty assumes "fastpath"
400+
let full_id = FullObjectID::Fastpath(object_id); // function explicitly assumes "fastpath"
401401
matches!(
402402
self.get_latest_marker(full_id, epoch_id),
403403
Some((marker_version, MarkerValue::FastpathStreamEnded)) if marker_version >= version
@@ -406,6 +406,20 @@ pub trait ObjectCacheRead: Send + Sync {
406406

407407
/// Return the watermark for the highest checkpoint for which we've pruned objects.
408408
fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber>;
409+
410+
/// Given a list of input and receiving objects for a transaction,
411+
/// wait until all of them become available, so that the transaction
412+
/// can start execution.
413+
/// `input_and_receiving_keys` contains both input objects and receiving
414+
/// input objects, including canceled objects.
415+
/// TODO: Eventually this can return the objects read results,
416+
/// so that execution does not need to load them again.
417+
fn notify_read_input_objects<'a>(
418+
&'a self,
419+
input_and_receiving_keys: &'a [InputKey],
420+
receiving_keys: &'a HashSet<InputKey>,
421+
epoch: &'a EpochId,
422+
) -> BoxFuture<'a, Vec<()>>;
409423
}
410424

411425
pub trait TransactionCacheRead: Send + Sync {
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
// Copyright (c) Mysten Labs, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::authority::authority_store_tables::AuthorityPerpetualTables;
5+
6+
use super::*;
7+
use futures::FutureExt;
8+
use std::path::Path;
9+
use std::time::Duration;
10+
use sui_framework::BuiltInFramework;
11+
use sui_move_build::BuildConfig;
12+
use sui_swarm_config::network_config_builder::ConfigBuilder;
13+
use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
14+
use sui_types::object::{Object, Owner};
15+
use sui_types::storage::InputKey;
16+
use sui_types::SUI_FRAMEWORK_PACKAGE_ID;
17+
use tempfile::tempdir;
18+
use tokio::time::timeout;
19+
20+
async fn create_writeback_cache() -> Arc<WritebackCache> {
21+
let path = tempdir().unwrap();
22+
let tables = Arc::new(AuthorityPerpetualTables::open(path.path(), None));
23+
let config = ConfigBuilder::new_with_temp_dir().build();
24+
let store = AuthorityStore::open_with_committee_for_testing(
25+
tables,
26+
config.committee_with_network().committee(),
27+
&config.genesis,
28+
)
29+
.await
30+
.unwrap();
31+
Arc::new(WritebackCache::new_for_tests(store))
32+
}
33+
34+
#[tokio::test]
35+
async fn test_immediate_return_canceled_shared() {
36+
let cache = create_writeback_cache().await;
37+
38+
let canceled_key = InputKey::VersionedObject {
39+
id: FullObjectID::new(ObjectID::random(), Some(SequenceNumber::from(1))),
40+
version: SequenceNumber::CANCELLED_READ,
41+
};
42+
let receiving_keys = HashSet::new();
43+
let epoch = &0;
44+
45+
// Should return immediately since canceled shared objects are always available
46+
let result = cache
47+
.notify_read_input_objects(&[canceled_key], &receiving_keys, epoch)
48+
.now_or_never()
49+
.unwrap();
50+
assert_eq!(result.len(), 1);
51+
52+
let congested_key = InputKey::VersionedObject {
53+
id: FullObjectID::new(ObjectID::random(), Some(SequenceNumber::from(1))),
54+
version: SequenceNumber::CONGESTED,
55+
};
56+
57+
let result = cache
58+
.notify_read_input_objects(&[congested_key], &receiving_keys, epoch)
59+
.now_or_never()
60+
.unwrap();
61+
assert_eq!(result.len(), 1);
62+
63+
let randomness_unavailable_key = InputKey::VersionedObject {
64+
id: FullObjectID::new(ObjectID::random(), Some(SequenceNumber::from(1))),
65+
version: SequenceNumber::RANDOMNESS_UNAVAILABLE,
66+
};
67+
68+
let result = cache
69+
.notify_read_input_objects(&[randomness_unavailable_key], &receiving_keys, epoch)
70+
.now_or_never()
71+
.unwrap();
72+
assert_eq!(result.len(), 1);
73+
}
74+
75+
#[tokio::test]
76+
async fn test_immediate_return_cached_object() {
77+
let cache = create_writeback_cache().await;
78+
79+
let object_id = ObjectID::random();
80+
let version = SequenceNumber::from(1);
81+
let object = Object::with_id_owner_version_for_testing(object_id, version, Owner::Immutable);
82+
83+
cache.write_object_entry(&object_id, version, ObjectEntry::Object(object));
84+
85+
let input_keys = vec![InputKey::VersionedObject {
86+
id: FullObjectID::new(object_id, None),
87+
version,
88+
}];
89+
let receiving_keys = HashSet::new();
90+
let epoch = &0;
91+
92+
// Should return immediately since object is in cache
93+
let result = cache
94+
.notify_read_input_objects(&input_keys, &receiving_keys, epoch)
95+
.now_or_never()
96+
.unwrap();
97+
98+
assert_eq!(result.len(), 1);
99+
}
100+
101+
#[tokio::test]
102+
async fn test_immediate_return_cached_package() {
103+
let cache = create_writeback_cache().await;
104+
105+
let input_keys = vec![InputKey::Package {
106+
id: SUI_FRAMEWORK_PACKAGE_ID,
107+
}];
108+
let receiving_keys = HashSet::new();
109+
let epoch = &0;
110+
111+
// Should return immediately since system package is available by default.
112+
let result = cache
113+
.notify_read_input_objects(&input_keys, &receiving_keys, epoch)
114+
.now_or_never()
115+
.unwrap();
116+
117+
assert_eq!(result.len(), 1);
118+
}
119+
120+
#[tokio::test]
121+
async fn test_immediate_return_consensus_stream_ended() {
122+
let cache = create_writeback_cache().await;
123+
124+
let object_id = ObjectID::random();
125+
let version = SequenceNumber::from(1);
126+
let epoch = 0;
127+
128+
// Write consensus stream ended marker
129+
cache.write_marker_value(
130+
epoch,
131+
FullObjectKey::new(FullObjectID::new(object_id, Some(version)), version),
132+
MarkerValue::ConsensusStreamEnded(TransactionDigest::random()),
133+
);
134+
135+
let input_keys = vec![InputKey::VersionedObject {
136+
id: FullObjectID::new(object_id, Some(version)),
137+
version,
138+
}];
139+
let receiving_keys = HashSet::new();
140+
141+
// Should return immediately since object is marked as consensus stream ended
142+
let result = cache
143+
.notify_read_input_objects(&input_keys, &receiving_keys, &epoch)
144+
.now_or_never()
145+
.unwrap();
146+
147+
assert_eq!(result.len(), 1);
148+
}
149+
150+
#[tokio::test]
151+
async fn test_wait_for_object() {
152+
let cache = create_writeback_cache().await;
153+
154+
let object_id = ObjectID::random();
155+
let version = SequenceNumber::from(1);
156+
157+
let input_keys = vec![InputKey::VersionedObject {
158+
id: FullObjectID::new(object_id, Some(version)),
159+
version,
160+
}];
161+
let receiving_keys = HashSet::new();
162+
let epoch = &0;
163+
164+
let result = timeout(
165+
Duration::from_secs(3),
166+
cache.notify_read_input_objects(&input_keys, &receiving_keys, epoch),
167+
)
168+
.await;
169+
assert!(result.is_err());
170+
171+
// Write an older version of the object.
172+
tokio::spawn({
173+
let cache = cache.clone();
174+
async move {
175+
tokio::time::sleep(Duration::from_millis(100)).await;
176+
let object = Object::with_id_owner_version_for_testing(
177+
object_id,
178+
SequenceNumber::from(0),
179+
Owner::Shared {
180+
initial_shared_version: version,
181+
},
182+
);
183+
cache.write_object_entry(&object_id, version, ObjectEntry::Object(object));
184+
}
185+
});
186+
let result = timeout(
187+
Duration::from_secs(3),
188+
cache.notify_read_input_objects(&input_keys, &receiving_keys, epoch),
189+
)
190+
.await;
191+
assert!(result.is_err());
192+
193+
// Write the correct version of the object.
194+
tokio::spawn({
195+
let cache = cache.clone();
196+
async move {
197+
tokio::time::sleep(Duration::from_millis(100)).await;
198+
let object = Object::with_id_owner_version_for_testing(
199+
object_id,
200+
version,
201+
Owner::Shared {
202+
initial_shared_version: version,
203+
},
204+
);
205+
cache.write_object_entry(&object_id, version, ObjectEntry::Object(object));
206+
}
207+
});
208+
let result = timeout(
209+
Duration::from_secs(3),
210+
cache.notify_read_input_objects(&input_keys, &receiving_keys, epoch),
211+
)
212+
.await
213+
.unwrap();
214+
assert_eq!(result.len(), 1);
215+
}
216+
217+
#[tokio::test]
218+
async fn test_wait_for_package() {
219+
let cache = create_writeback_cache().await;
220+
221+
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../examples/move/basics");
222+
let compiled_modules = BuildConfig::new_for_testing()
223+
.build(&path)
224+
.unwrap()
225+
.into_modules();
226+
let package = Object::new_package_for_testing(
227+
&compiled_modules,
228+
TransactionDigest::genesis_marker(),
229+
BuiltInFramework::genesis_move_packages(),
230+
)
231+
.unwrap();
232+
let package_id = package.id();
233+
let version = package.version();
234+
235+
let input_keys = vec![InputKey::Package { id: package_id }];
236+
let receiving_keys = HashSet::new();
237+
let epoch = &0;
238+
239+
// Start notification future
240+
let notification = cache.notify_read_input_objects(&input_keys, &receiving_keys, epoch);
241+
242+
// Write package after small delay
243+
tokio::spawn({
244+
let cache = cache.clone();
245+
async move {
246+
tokio::time::sleep(Duration::from_millis(100)).await;
247+
cache.write_object_entry(&package_id, version, ObjectEntry::Object(package));
248+
}
249+
});
250+
251+
// Should complete once package is written
252+
let result = timeout(Duration::from_secs(1), notification).await.unwrap();
253+
254+
assert_eq!(result.len(), 1);
255+
}
256+
257+
#[tokio::test]
258+
async fn test_wait_for_consensus_stream_end() {
259+
let cache = create_writeback_cache().await;
260+
261+
let object_id = ObjectID::random();
262+
let version = SequenceNumber::from(1);
263+
let epoch = &0;
264+
265+
let input_keys = vec![InputKey::VersionedObject {
266+
id: FullObjectID::new(object_id, Some(version)),
267+
version,
268+
}];
269+
let receiving_keys = HashSet::new();
270+
271+
// Start notification future
272+
let notification = cache.notify_read_input_objects(&input_keys, &receiving_keys, epoch);
273+
274+
// Write consensus stream ended marker after small delay
275+
tokio::spawn({
276+
let cache = cache.clone();
277+
async move {
278+
tokio::time::sleep(Duration::from_millis(100)).await;
279+
cache.write_marker_value(
280+
*epoch,
281+
FullObjectKey::new(FullObjectID::new(object_id, Some(version)), version),
282+
MarkerValue::ConsensusStreamEnded(TransactionDigest::random()),
283+
);
284+
}
285+
});
286+
287+
// Should complete once marker is written
288+
let result = timeout(Duration::from_secs(1), notification).await.unwrap();
289+
290+
assert_eq!(result.len(), 1);
291+
}
292+
293+
#[tokio::test]
294+
async fn test_receiving_object_higher_version() {
295+
let cache = create_writeback_cache().await;
296+
297+
let object_id = ObjectID::random();
298+
let requested_version = SequenceNumber::from(1);
299+
let higher_version = SequenceNumber::from(2);
300+
let object = Object::with_id_owner_version_for_testing(
301+
object_id,
302+
higher_version,
303+
Owner::AddressOwner(SuiAddress::default()),
304+
);
305+
306+
// Write higher version to cache
307+
cache.write_object_entry(&object_id, higher_version, ObjectEntry::Object(object));
308+
309+
let input_keys = vec![InputKey::VersionedObject {
310+
id: FullObjectID::new(object_id, None),
311+
version: requested_version,
312+
}];
313+
let mut receiving_keys = HashSet::new();
314+
receiving_keys.insert(input_keys[0]);
315+
let epoch = &0;
316+
317+
// Should return immediately since a higher version exists for receiving object
318+
let result = cache
319+
.notify_read_input_objects(&input_keys, &receiving_keys, epoch)
320+
.now_or_never()
321+
.unwrap();
322+
323+
assert_eq!(result.len(), 1);
324+
}

0 commit comments

Comments
 (0)