Skip to content

Commit faddb58

Browse files
authored
Merge pull request MaterializeInc#11720 from danhhz/persist_cleanup
persist: small cleanups
2 parents e598539 + 811dfe4 commit faddb58

File tree

5 files changed

+107
-57
lines changed

5 files changed

+107
-57
lines changed

src/persist-client/src/impl/machine.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::error::{InvalidUsage, NoOp};
2626
use crate::r#impl::state::{ReadCapability, State, WriteCapability};
2727
use crate::read::ReaderId;
2828
use crate::write::WriterId;
29-
use crate::Id;
29+
use crate::ShardId;
3030

3131
#[derive(Debug)]
3232
pub struct Machine<K, V, T, D> {
@@ -54,16 +54,16 @@ where
5454
T: Timestamp + Lattice + Codec64,
5555
D: Semigroup + Codec64,
5656
{
57-
pub fn new(id: Id, consensus: Arc<dyn Consensus + Send + Sync>) -> Self {
57+
pub fn new(shard_id: ShardId, consensus: Arc<dyn Consensus + Send + Sync>) -> Self {
5858
Machine {
5959
consensus,
6060
seqno: None,
61-
state: State::new(id),
61+
state: State::new(shard_id),
6262
}
6363
}
6464

65-
pub fn id(&self) -> Id {
66-
self.state.id()
65+
pub fn shard_id(&self) -> ShardId {
66+
self.state.shard_id()
6767
}
6868

6969
pub async fn register(
@@ -240,7 +240,7 @@ where
240240
mut work_fn: WorkFn,
241241
) -> Result<Result<(SeqNo, R), E>, ExternalError> {
242242
loop {
243-
let id = self.state.id();
243+
let shard_id = self.state.shard_id();
244244

245245
let new_seqno = self.seqno.unwrap_or_default().next();
246246
let mut new_state = self.state.clone();
@@ -254,7 +254,7 @@ where
254254
let cas_res = self
255255
.consensus
256256
.compare_and_set(
257-
&id.to_string(),
257+
&shard_id.to_string(),
258258
deadline,
259259
self.seqno,
260260
VersionedData {
@@ -289,8 +289,8 @@ where
289289
}
290290

291291
async fn fetch_and_update_state(&mut self, deadline: Instant) -> Result<(), ExternalError> {
292-
let id = self.id();
293-
let current = self.consensus.head(&id.to_string(), deadline).await?;
292+
let shard_id = self.shard_id();
293+
let current = self.consensus.head(&shard_id.to_string(), deadline).await?;
294294
self.update_state(current).await
295295
}
296296

@@ -299,7 +299,7 @@ where
299299
Some(x) => x,
300300
None => {
301301
self.seqno = None;
302-
self.state = State::new(self.state.id());
302+
self.state = State::new(self.state.shard_id());
303303
return Ok(());
304304
}
305305
};

src/persist-client/src/impl/state.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use timely::PartialOrder;
2222
use crate::error::{InvalidUsage, NoOp};
2323
use crate::read::ReaderId;
2424
use crate::write::WriterId;
25-
use crate::Id;
25+
use crate::ShardId;
2626

2727
#[derive(Clone, Debug, PartialEq)]
2828
pub struct ReadCapability<T> {
@@ -38,7 +38,7 @@ pub struct WriteCapability<T> {
3838
// TODO: Document invariants.
3939
#[derive(Debug)]
4040
pub struct State<K, V, T, D> {
41-
id: Id,
41+
shard_id: ShardId,
4242

4343
writers: HashMap<WriterId, WriteCapability<T>>,
4444
readers: HashMap<ReaderId, ReadCapability<T>>,
@@ -53,7 +53,7 @@ pub struct State<K, V, T, D> {
5353
impl<K, V, T: Clone, D> Clone for State<K, V, T, D> {
5454
fn clone(&self) -> Self {
5555
Self {
56-
id: self.id.clone(),
56+
shard_id: self.shard_id.clone(),
5757
writers: self.writers.clone(),
5858
readers: self.readers.clone(),
5959
since: self.since.clone(),
@@ -70,9 +70,9 @@ where
7070
T: Timestamp + Lattice + Codec64,
7171
D: Codec64,
7272
{
73-
pub fn new(id: Id) -> Self {
73+
pub fn new(shard_id: ShardId) -> Self {
7474
State {
75-
id,
75+
shard_id,
7676
writers: HashMap::new(),
7777
readers: HashMap::new(),
7878
since: Antichain::from_elem(T::minimum()),
@@ -81,8 +81,8 @@ where
8181
}
8282
}
8383

84-
pub fn id(&self) -> Id {
85-
self.id
84+
pub fn shard_id(&self) -> ShardId {
85+
self.shard_id
8686
}
8787

8888
pub fn register(
@@ -348,7 +348,7 @@ impl<T: Timestamp + Codec64> From<&DescriptionMeta> for Description<T> {
348348

349349
#[derive(Debug, Serialize, Deserialize)]
350350
struct StateRollupMeta {
351-
id: Id,
351+
shard_id: ShardId,
352352
key_codec: String,
353353
val_codec: String,
354354
ts_codec: String,
@@ -369,7 +369,7 @@ where
369369
{
370370
fn from(x: &State<K, V, T, D>) -> Self {
371371
StateRollupMeta {
372-
id: x.id,
372+
shard_id: x.shard_id,
373373
key_codec: K::codec_name(),
374374
val_codec: V::codec_name(),
375375
ts_codec: T::codec_name(),
@@ -433,7 +433,7 @@ where
433433
));
434434
}
435435
Ok(State {
436-
id: x.id,
436+
shard_id: x.shard_id,
437437
writers: x
438438
.writers
439439
.iter()

src/persist-client/src/lib.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -112,20 +112,26 @@ impl Location {
112112
}
113113

114114
/// An opaque identifier for a persist durable TVC (aka shard).
115-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)]
116-
pub struct Id([u8; 16]);
115+
#[derive(Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)]
116+
pub struct ShardId([u8; 16]);
117117

118-
impl std::fmt::Display for Id {
118+
impl std::fmt::Display for ShardId {
119119
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120-
std::fmt::Display::fmt(&Uuid::from_bytes(self.0), f)
120+
write!(f, "s{}", Uuid::from_bytes(self.0))
121121
}
122122
}
123123

124-
impl Id {
125-
/// Returns a random [Id] that is reasonably likely to have never been
124+
impl std::fmt::Debug for ShardId {
125+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126+
write!(f, "ShardId({})", Uuid::from_bytes(self.0))
127+
}
128+
}
129+
130+
impl ShardId {
131+
/// Returns a random [ShardId] that is reasonably likely to have never been
126132
/// generated before.
127133
pub fn new() -> Self {
128-
Id(Uuid::new_v4().as_bytes().to_owned())
134+
ShardId(Uuid::new_v4().as_bytes().to_owned())
129135
}
130136
}
131137

@@ -160,8 +166,8 @@ impl Client {
160166
Ok(Client { blob, consensus })
161167
}
162168

163-
/// Provides capabilities for the durable TVC identified by `id` at its
164-
/// current since and upper frontiers.
169+
/// Provides capabilities for the durable TVC identified by `shard_id` at
170+
/// its current since and upper frontiers.
165171
///
166172
/// This method is a best-effort attempt to regain control of the frontiers
167173
/// of a shard. Its most common uses are to recover capabilities that have
@@ -170,23 +176,23 @@ impl Client {
170176
/// released by all other parties, this call may result in capabilities with
171177
/// empty frontiers (which are useless).
172178
///
173-
/// If `id` has never been used before, initializes a new shard and returns
174-
/// handles with `since` and `upper` frontiers set to initial values of
175-
/// `Antichain::from_elem(T::minimum())`.
179+
/// If `shard_id` has never been used before, initializes a new shard and
180+
/// returns handles with `since` and `upper` frontiers set to initial values
181+
/// of `Antichain::from_elem(T::minimum())`.
176182
pub async fn open<K, V, T, D>(
177183
&self,
178184
timeout: Duration,
179-
id: Id,
185+
shard_id: ShardId,
180186
) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), ExternalError>
181187
where
182188
K: Debug + Codec,
183189
V: Debug + Codec,
184190
T: Timestamp + Lattice + Codec64,
185191
D: Semigroup + Codec64,
186192
{
187-
trace!("Client::open timeout={:?} id={:?}", timeout, id);
193+
trace!("Client::open timeout={:?} shard_id={:?}", timeout, shard_id);
188194
let deadline = Instant::now() + timeout;
189-
let mut machine = Machine::new(id, Arc::clone(&self.consensus));
195+
let mut machine = Machine::new(shard_id, Arc::clone(&self.consensus));
190196
let (writer_id, reader_id) = (WriterId::new(), ReaderId::new());
191197
let (write_cap, read_cap) = machine.register(deadline, &writer_id, &reader_id).await?;
192198
let writer = WriteHandle {
@@ -257,7 +263,7 @@ mod tests {
257263

258264
let (mut write, mut read) = new_test_client()
259265
.await?
260-
.open::<String, String, u64, i64>(NO_TIMEOUT, Id::new())
266+
.open::<String, String, u64, i64>(NO_TIMEOUT, ShardId::new())
261267
.await?;
262268
assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
263269
assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
@@ -308,13 +314,13 @@ mod tests {
308314
let client = new_test_client().await?;
309315

310316
let (mut write1, read1) = client
311-
.open::<String, String, u64, i64>(NO_TIMEOUT, Id::new())
317+
.open::<String, String, u64, i64>(NO_TIMEOUT, ShardId::new())
312318
.await?;
313319

314320
// Different types, so that checks would fail in case we were not separating these
315321
// collections internally.
316322
let (mut write2, read2) = client
317-
.open::<String, (), u64, i64>(NO_TIMEOUT, Id::new())
323+
.open::<String, (), u64, i64>(NO_TIMEOUT, ShardId::new())
318324
.await?;
319325

320326
let res = write1
@@ -349,7 +355,7 @@ mod tests {
349355
let client = new_test_client().await?;
350356

351357
let (write, read) = client
352-
.open::<String, String, u64, i64>(NO_TIMEOUT, Id::new())
358+
.open::<String, String, u64, i64>(NO_TIMEOUT, ShardId::new())
353359
.await?;
354360

355361
assert!(is_send_sync(client));
@@ -369,7 +375,7 @@ mod tests {
369375
(("3".to_owned(), "three".to_owned()), 3, 1),
370376
];
371377

372-
let id = Id::new();
378+
let id = ShardId::new();
373379
let client = new_test_client().await?;
374380
let (mut write1, read) = client
375381
.open::<String, String, u64, i64>(NO_TIMEOUT, id)
@@ -414,4 +420,32 @@ mod tests {
414420

415421
Ok(())
416422
}
423+
424+
#[test]
425+
fn fmt_ids() {
426+
assert_eq!(
427+
format!("{}", ShardId([0u8; 16])),
428+
"s00000000-0000-0000-0000-000000000000"
429+
);
430+
assert_eq!(
431+
format!("{:?}", ShardId([0u8; 16])),
432+
"ShardId(00000000-0000-0000-0000-000000000000)"
433+
);
434+
assert_eq!(
435+
format!("{}", WriterId([0u8; 16])),
436+
"w00000000-0000-0000-0000-000000000000"
437+
);
438+
assert_eq!(
439+
format!("{:?}", WriterId([0u8; 16])),
440+
"WriterId(00000000-0000-0000-0000-000000000000)"
441+
);
442+
assert_eq!(
443+
format!("{}", ReaderId([0u8; 16])),
444+
"r00000000-0000-0000-0000-000000000000"
445+
);
446+
assert_eq!(
447+
format!("{:?}", ReaderId([0u8; 16])),
448+
"ReaderId(00000000-0000-0000-0000-000000000000)"
449+
);
450+
}
417451
}

src/persist-client/src/read.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,21 @@ use uuid::Uuid;
2929

3030
use crate::error::InvalidUsage;
3131
use crate::r#impl::machine::Machine;
32-
use crate::Id;
32+
use crate::ShardId;
3333

3434
/// An opaque identifier for a reader of a persist durable TVC (aka shard).
35-
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
35+
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
3636
pub struct ReaderId(pub(crate) [u8; 16]);
3737

3838
impl std::fmt::Display for ReaderId {
3939
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40-
std::fmt::Display::fmt(&Uuid::from_bytes(self.0), f)
40+
write!(f, "r{}", Uuid::from_bytes(self.0))
41+
}
42+
}
43+
44+
impl std::fmt::Debug for ReaderId {
45+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46+
write!(f, "ReaderId({})", Uuid::from_bytes(self.0))
4147
}
4248
}
4349

@@ -57,7 +63,7 @@ impl ReaderId {
5763
/// See [ReadHandle::snapshot] for details.
5864
#[derive(Debug, Serialize, Deserialize)]
5965
pub struct SnapshotSplit {
60-
id: Id,
66+
shard_id: ShardId,
6167
as_of: Vec<[u8; 8]>,
6268
batches: Vec<String>,
6369
}
@@ -352,6 +358,9 @@ where
352358
/// system. A `new_since` of the empty antichain "finishes" this shard,
353359
/// promising that no more data will ever be read by this handle.
354360
///
361+
/// It is possible to heartbeat a reader lease by calling this with
362+
/// `new_since` equal to `self.since()` (making the call a no-op).
363+
///
355364
/// The clunky two-level Result is to enable more obvious error handling in
356365
/// the caller. See <http://sled.rs/errors.html> for details.
357366
pub async fn downgrade_since(
@@ -456,7 +465,7 @@ where
456465
};
457466
let mut splits = (0..num_splits.get())
458467
.map(|_| SnapshotSplit {
459-
id: self.machine.id(),
468+
shard_id: self.machine.shard_id(),
460469
as_of: as_of.iter().map(|x| T::encode(x)).collect(),
461470
batches: Vec::new(),
462471
})
@@ -479,11 +488,11 @@ where
479488
timeout,
480489
split
481490
);
482-
if split.id != self.machine.id() {
491+
if split.shard_id != self.machine.shard_id() {
483492
return Ok(Err(InvalidUsage(anyhow!(
484493
"snapshot shard id {} doesn't match handle id {}",
485-
split.id,
486-
self.machine.id()
494+
split.shard_id,
495+
self.machine.shard_id()
487496
))));
488497
}
489498
let iter = SnapshotIter {

0 commit comments

Comments
 (0)