Skip to content

Conversation

mskd12
Copy link
Contributor

@mskd12 mskd12 commented Jan 7, 2025

Description

This PR prototypes the client-side of the event streams design. The high-level logic is as follows.

Every X seconds, do:

  • fetch the latest stream head object & verify it with the committee keys (if we had object subscriptions, we wouldn't have to poll this way)
  • if it got updated since we last checked, fetch the newly emitted events and verify them against the new stream head.

Thoughts

Some design decisions that can help simplify the design (things I noticed as I prototyped this):

  • Deterministic derivation of the stream head's object ID from the event type
  • Whether the client needs to download the full checkpoint data or not: Would be nice to avoid if possible. A few places this crops up:
    - ADS choice: Using hash chains or similar data structures avoids it (as opposed to counters)
    - Object inclusion (verifying the latest state of the stream head object). The below prototype downloads the full checkpoint to verify the object's latest state (the only part of the prototype I don't like). To avoid this, we'd have to include a new field in the summary that contains a hash of all the object references (or a Merkle tree if we want to future-proof it).
  • In the below design, the StreamHead contains a:
    - digest (e.g. hash chain head)
    - a pointer to the last emitted event (to help figure out where to start looking)
    - counter (to help determine how many events to request)

Copy link

vercel bot commented Jan 7, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
sui-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jan 17, 2025 6:04am
3 Skipped Deployments
Name Status Preview Comments Updated (UTC)
multisig-toolkit ⬜️ Ignored (Inspect) Visit Preview Jan 17, 2025 6:04am
sui-kiosk ⬜️ Ignored (Inspect) Visit Preview Jan 17, 2025 6:04am
sui-typescript-docs ⬜️ Ignored (Inspect) Jan 17, 2025 6:04am

@mskd12 mskd12 temporarily deployed to sui-typescript-aws-kms-test-env January 7, 2025 11:58 — with GitHub Actions Inactive
Copy link
Contributor

@amnn amnn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some high level thoughts and simplifications! We need to figure out a good way to get your authentication code unblocked as well.

Comment on lines +12 to +50
// If [e1, e2, e3] are events, digest = H(H(e3), H(H(e2), H(H(e1)))), most_recent_event = H(e3)
public struct StreamHead has key, store {
id: UID,
stream_id: String,
digest: vector<u8>,
most_recent_event_digest: vector<u8>,
count: u64,
}

// Creates a stream given a stream identifier.
public fun create_stream(ctx: &mut TxContext, stream_id: String): StreamHead {
StreamHead {
id: object::new(ctx),
stream_id: stream_id,
digest: vector::empty(),
most_recent_event_digest: vector::empty(),
count: 0,
}
}

public fun hash_two(lhs: vector<u8>, rhs: vector<u8>): vector<u8> {
let mut inputs = lhs;
inputs.append(rhs);
hash::sha3_256(inputs)
}

public fun add_to_stream<T: copy + drop>(
event: T,
stream_head: &mut StreamHead
) {
// This check effectively acts as an access control because the events defined in a module
// can only be instantiated by that module.
// Note that we could support dynamic or runtime-defined streams by instead doing access control with a capability object.
assert!(stream_head.stream_id == type_name::into_string(type_name::get<T>()), ENoAccess);

stream_head.count = stream_head.count + 1;
stream_head.most_recent_event_digest = hash::sha3_256(bcs::to_bytes(&event));
stream_head.digest = hash_two(stream_head.most_recent_event_digest, stream_head.digest);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have StreamHead accept a phantom type parameter to leverage the type system to do the stream_id check for you.

Suggested change
// If [e1, e2, e3] are events, digest = H(H(e3), H(H(e2), H(H(e1)))), most_recent_event = H(e3)
public struct StreamHead has key, store {
id: UID,
stream_id: String,
digest: vector<u8>,
most_recent_event_digest: vector<u8>,
count: u64,
}
// Creates a stream given a stream identifier.
public fun create_stream(ctx: &mut TxContext, stream_id: String): StreamHead {
StreamHead {
id: object::new(ctx),
stream_id: stream_id,
digest: vector::empty(),
most_recent_event_digest: vector::empty(),
count: 0,
}
}
public fun hash_two(lhs: vector<u8>, rhs: vector<u8>): vector<u8> {
let mut inputs = lhs;
inputs.append(rhs);
hash::sha3_256(inputs)
}
public fun add_to_stream<T: copy + drop>(
event: T,
stream_head: &mut StreamHead
) {
// This check effectively acts as an access control because the events defined in a module
// can only be instantiated by that module.
// Note that we could support dynamic or runtime-defined streams by instead doing access control with a capability object.
assert!(stream_head.stream_id == type_name::into_string(type_name::get<T>()), ENoAccess);
stream_head.count = stream_head.count + 1;
stream_head.most_recent_event_digest = hash::sha3_256(bcs::to_bytes(&event));
stream_head.digest = hash_two(stream_head.most_recent_event_digest, stream_head.digest);
}
// If [e1, e2, e3] are events, digest = H(H(e3), H(H(e2), H(H(e1)))), most_recent_event = H(e3)
public struct StreamHead<phantom T: copy + drop> has key, store {
id: UID,
digest: vector<u8>,
most_recent_event_digest: vector<u8>,
count: u64,
}
// Creates a stream given a stream identifier.
public fun create_stream<T: copy + drop>(ctx: &mut TxContext): StreamHead<T> {
StreamHead {
id: object::new(ctx),
digest: vector::empty(),
most_recent_event_digest: vector::empty(),
count: 0,
}
}
public fun hash_two(lhs: vector<u8>, rhs: vector<u8>): vector<u8> {
let mut inputs = lhs;
inputs.append(rhs);
hash::sha3_256(inputs)
}
public fun add_to_stream<T: copy + drop>(
event: T,
stream_head: &mut StreamHead<T>
) {

let checkpoint_number = v["data"]["epoch"]["checkpoints"]["nodes"][0]["sequenceNumber"]
.as_u64()
.unwrap();
.ok_or(anyhow!("Cannot parse checkpoint number"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.ok_or(anyhow!("Cannot parse checkpoint number"))?;
.ok_or_else(|| anyhow!("Cannot parse checkpoint number"))?;

@mskd12 mskd12 force-pushed the deepak/light-client-minor-refactor branch from cd1ad1f to e73eac4 Compare March 20, 2025 19:41
Base automatically changed from deepak/light-client-minor-refactor to main March 20, 2025 20:24
@mskd12 mskd12 closed this Jun 17, 2025
@mskd12 mskd12 deleted the deepak/event-streams-client-side branch June 18, 2025 16:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants