-
Notifications
You must be signed in to change notification settings - Fork 11.6k
Client-side changes for event streams #20804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
3 Skipped Deployments
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some high level thoughts and simplifications! We need to figure out a good way to get your authentication code unblocked as well.
// If [e1, e2, e3] are events, digest = H(H(e3), H(H(e2), H(H(e1)))), most_recent_event = H(e3) | ||
public struct StreamHead has key, store { | ||
id: UID, | ||
stream_id: String, | ||
digest: vector<u8>, | ||
most_recent_event_digest: vector<u8>, | ||
count: u64, | ||
} | ||
|
||
// Creates a stream given a stream identifier. | ||
public fun create_stream(ctx: &mut TxContext, stream_id: String): StreamHead { | ||
StreamHead { | ||
id: object::new(ctx), | ||
stream_id: stream_id, | ||
digest: vector::empty(), | ||
most_recent_event_digest: vector::empty(), | ||
count: 0, | ||
} | ||
} | ||
|
||
public fun hash_two(lhs: vector<u8>, rhs: vector<u8>): vector<u8> { | ||
let mut inputs = lhs; | ||
inputs.append(rhs); | ||
hash::sha3_256(inputs) | ||
} | ||
|
||
public fun add_to_stream<T: copy + drop>( | ||
event: T, | ||
stream_head: &mut StreamHead | ||
) { | ||
// This check effectively acts as an access control because the events defined in a module | ||
// can only be instantiated by that module. | ||
// Note that we could support dynamic or runtime-defined streams by instead doing access control with a capability object. | ||
assert!(stream_head.stream_id == type_name::into_string(type_name::get<T>()), ENoAccess); | ||
|
||
stream_head.count = stream_head.count + 1; | ||
stream_head.most_recent_event_digest = hash::sha3_256(bcs::to_bytes(&event)); | ||
stream_head.digest = hash_two(stream_head.most_recent_event_digest, stream_head.digest); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could have StreamHead
accept a phantom
type parameter to leverage the type system to do the stream_id
check for you.
// If [e1, e2, e3] are events, digest = H(H(e3), H(H(e2), H(H(e1)))), most_recent_event = H(e3) | |
public struct StreamHead has key, store { | |
id: UID, | |
stream_id: String, | |
digest: vector<u8>, | |
most_recent_event_digest: vector<u8>, | |
count: u64, | |
} | |
// Creates a stream given a stream identifier. | |
public fun create_stream(ctx: &mut TxContext, stream_id: String): StreamHead { | |
StreamHead { | |
id: object::new(ctx), | |
stream_id: stream_id, | |
digest: vector::empty(), | |
most_recent_event_digest: vector::empty(), | |
count: 0, | |
} | |
} | |
public fun hash_two(lhs: vector<u8>, rhs: vector<u8>): vector<u8> { | |
let mut inputs = lhs; | |
inputs.append(rhs); | |
hash::sha3_256(inputs) | |
} | |
public fun add_to_stream<T: copy + drop>( | |
event: T, | |
stream_head: &mut StreamHead | |
) { | |
// This check effectively acts as an access control because the events defined in a module | |
// can only be instantiated by that module. | |
// Note that we could support dynamic or runtime-defined streams by instead doing access control with a capability object. | |
assert!(stream_head.stream_id == type_name::into_string(type_name::get<T>()), ENoAccess); | |
stream_head.count = stream_head.count + 1; | |
stream_head.most_recent_event_digest = hash::sha3_256(bcs::to_bytes(&event)); | |
stream_head.digest = hash_two(stream_head.most_recent_event_digest, stream_head.digest); | |
} | |
// If [e1, e2, e3] are events, digest = H(H(e3), H(H(e2), H(H(e1)))), most_recent_event = H(e3) | |
public struct StreamHead<phantom T: copy + drop> has key, store { | |
id: UID, | |
digest: vector<u8>, | |
most_recent_event_digest: vector<u8>, | |
count: u64, | |
} | |
// Creates a stream given a stream identifier. | |
public fun create_stream<T: copy + drop>(ctx: &mut TxContext): StreamHead<T> { | |
StreamHead { | |
id: object::new(ctx), | |
digest: vector::empty(), | |
most_recent_event_digest: vector::empty(), | |
count: 0, | |
} | |
} | |
public fun hash_two(lhs: vector<u8>, rhs: vector<u8>): vector<u8> { | |
let mut inputs = lhs; | |
inputs.append(rhs); | |
hash::sha3_256(inputs) | |
} | |
public fun add_to_stream<T: copy + drop>( | |
event: T, | |
stream_head: &mut StreamHead<T> | |
) { |
let checkpoint_number = v["data"]["epoch"]["checkpoints"]["nodes"][0]["sequenceNumber"] | ||
.as_u64() | ||
.unwrap(); | ||
.ok_or(anyhow!("Cannot parse checkpoint number"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.ok_or(anyhow!("Cannot parse checkpoint number"))?; | |
.ok_or_else(|| anyhow!("Cannot parse checkpoint number"))?; |
cd1ad1f
to
e73eac4
Compare
Description
This PR prototypes the client-side of the event streams design. The high-level logic is as follows.
Every X seconds, do:
Thoughts
Some design decisions that can help simplify the design (things I noticed as I prototyped this):
- ADS choice: Using hash chains or similar data structures avoids it (as opposed to counters)
- Object inclusion (verifying the latest state of the stream head object). The below prototype downloads the full checkpoint to verify the object's latest state (the only part of the prototype I don't like). To avoid this, we'd have to include a new field in the summary that contains a hash of all the object references (or a Merkle tree if we want to future-proof it).
StreamHead
contains a:- digest (e.g. hash chain head)
- a pointer to the last emitted event (to help figure out where to start looking)
- counter (to help determine how many events to request)