Skip to content

Commit 32c0d35

Browse files
committed
[ReplicatedLoglet] Remote append
Summary: Implements a remote loglet append calls to leader sequencer
1 parent 89d0aa5 commit 32c0d35

File tree

6 files changed

+663
-11
lines changed

6 files changed

+663
-11
lines changed

crates/bifrost/src/loglet/error.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use std::fmt::Debug;
1212
use std::sync::Arc;
1313

14-
use restate_core::ShutdownError;
14+
use restate_core::{network::NetworkError, ShutdownError};
1515
use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError};
1616

1717
#[derive(Debug, Clone, thiserror::Error)]
@@ -68,3 +68,13 @@ impl From<OperationError> for AppendError {
6868
}
6969
}
7070
}
71+
72+
impl From<NetworkError> for OperationError {
73+
fn from(value: NetworkError) -> Self {
74+
match value {
75+
NetworkError::Shutdown(err) => OperationError::Shutdown(err),
76+
// todo(azmy): are all network errors retryable?
77+
_ => OperationError::retryable(value),
78+
}
79+
}
80+
}

crates/bifrost/src/loglet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ impl LogletCommitResolver {
168168
pub fn offset(self, offset: LogletOffset) {
169169
let _ = self.tx.send(Ok(offset));
170170
}
171+
172+
pub fn error(self, err: AppendError) {
173+
let _ = self.tx.send(Err(err));
174+
}
171175
}
172176

173177
pub struct LogletCommit {

crates/bifrost/src/providers/replicated_loglet/loglet.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,22 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
110110
log_server_manager,
111111
})
112112
}
113+
114+
pub fn networking(&self) -> &Networking<T> {
115+
&self.networking
116+
}
117+
118+
pub fn params(&self) -> &ReplicatedLogletParams {
119+
&self.my_params
120+
}
121+
122+
pub fn log_id(&self) -> LogId {
123+
self.log_id
124+
}
125+
126+
pub fn segment_index(&self) -> SegmentIndex {
127+
self.segment_index
128+
}
113129
}
114130

115131
#[derive(derive_more::Debug, derive_more::IsVariant)]
@@ -143,8 +159,8 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
143159
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
144160
match self.sequencer {
145161
SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await,
146-
SequencerAccess::Remote { .. } => {
147-
todo!("Access to remote sequencers is not implemented yet")
162+
SequencerAccess::Remote { ref sequencers_rpc } => {
163+
sequencers_rpc.append(self, payloads).await
148164
}
149165
}
150166
}

0 commit comments

Comments
 (0)