Skip to content

Commit 148b191

Browse files
committed
[ReplicatedLoglet] Implement remote sequencer find tail
1 parent 845d464 commit 148b191

File tree

5 files changed

+110
-9
lines changed

5 files changed

+110
-9
lines changed

crates/bifrost/src/providers/replicated_loglet/loglet.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
158158
async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError> {
159159
match self.sequencer {
160160
SequencerAccess::Local { .. } => Ok(*self.known_global_tail.get()),
161-
SequencerAccess::Remote { .. } => {
162-
todo!("find_tail() is not implemented yet")
163-
}
161+
SequencerAccess::Remote { ref handle } => handle.find_tail().await,
164162
}
165163
}
166164

crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,17 @@ use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore};
2020

2121
use restate_core::{
2222
network::{
23-
rpc_router::{RpcRouter, RpcToken},
23+
rpc_router::{RpcError, RpcRouter, RpcToken},
2424
NetworkError, NetworkSendError, Networking, Outgoing, TransportConnect, WeakConnection,
2525
},
2626
task_center, ShutdownError, TaskKind,
2727
};
2828
use restate_types::{
2929
config::Configuration,
30-
logs::{metadata::SegmentIndex, LogId, LogletOffset, Record},
31-
net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus},
30+
logs::{metadata::SegmentIndex, LogId, LogletOffset, Record, SequenceNumber, TailState},
31+
net::replicated_loglet::{
32+
Append, Appended, CommonRequestHeader, GetSequencerInfo, SequencerStatus,
33+
},
3234
replicated_loglet::ReplicatedLogletParams,
3335
GenerationalNodeId,
3436
};
@@ -205,6 +207,76 @@ where
205207

206208
Ok(connection)
207209
}
210+
211+
/// Attempts to find tail.
212+
///
213+
/// This first tries to find tail by synchronizing with sequencer. If this failed
214+
/// duo to sequencer not reachable, it will immediately try to find tail by querying
215+
/// fmajority of loglet servers
216+
pub async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError> {
217+
// try to sync with sequencer
218+
if self.sync_sequencer_tail().await.is_ok() {
219+
return Ok(*self.known_global_tail.get());
220+
}
221+
222+
// otherwise we need to try to fetch this from the log servers.
223+
self.sync_log_servers_tail().await?;
224+
Ok(*self.known_global_tail.get())
225+
}
226+
227+
/// Synchronize known_global_tail with the sequencer
228+
async fn sync_sequencer_tail(&self) -> Result<(), NetworkError> {
229+
let result = self
230+
.sequencers_rpc
231+
.info
232+
.call(
233+
&self.networking,
234+
self.params.sequencer,
235+
GetSequencerInfo {
236+
header: CommonRequestHeader {
237+
log_id: self.log_id,
238+
loglet_id: self.params.loglet_id,
239+
segment_index: self.segment_index,
240+
},
241+
},
242+
)
243+
.await
244+
.map(|incoming| incoming.into_body());
245+
246+
let info = match result {
247+
Ok(info) => info,
248+
Err(RpcError::Shutdown(shutdown)) => return Err(NetworkError::Shutdown(shutdown)),
249+
Err(RpcError::SendError(err)) => return Err(err.source),
250+
};
251+
252+
match info.header.status {
253+
SequencerStatus::Ok => {
254+
// update header info
255+
if let Some(offset) = info.header.known_global_tail {
256+
self.known_global_tail.notify_offset_update(offset);
257+
}
258+
}
259+
SequencerStatus::Sealed => {
260+
self.known_global_tail.notify(
261+
true,
262+
info.header
263+
.known_global_tail
264+
.unwrap_or(LogletOffset::INVALID),
265+
);
266+
}
267+
_ => {
268+
unreachable!()
269+
}
270+
};
271+
272+
Ok(())
273+
}
274+
275+
/// A fallback mechanism in case sequencer is not available
276+
/// to try and sync known_global_tail with fmajority of LogServers
277+
async fn sync_log_servers_tail(&self) -> Result<(), OperationError> {
278+
todo!()
279+
}
208280
}
209281

210282
/// RemoteSequencerConnection represents a single open connection

crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use restate_core::network::rpc_router::RpcRouter;
1515
use restate_core::network::MessageRouterBuilder;
1616
use restate_types::net::log_server::{GetLogletInfo, GetRecords, Release, Seal, Store, Trim};
17-
use restate_types::net::replicated_loglet::Append;
17+
use restate_types::net::replicated_loglet::{Append, GetSequencerInfo};
1818

1919
/// Used by replicated loglets to send requests and receive responses from log-servers
2020
/// Cloning this is cheap and all clones will share the same internal trackers.
@@ -56,14 +56,16 @@ impl LogServersRpc {
5656
#[derive(Clone)]
5757
pub struct SequencersRpc {
5858
pub append: RpcRouter<Append>,
59+
pub info: RpcRouter<GetSequencerInfo>,
5960
}
6061

6162
impl SequencersRpc {
6263
/// Registers all routers into the supplied message router. This ensures that
6364
/// responses are routed correctly.
6465
pub fn new(router_builder: &mut MessageRouterBuilder) -> Self {
6566
let append = RpcRouter::new(router_builder);
67+
let info = RpcRouter::new(router_builder);
6668

67-
Self { append }
69+
Self { append, info }
6870
}
6971
}

crates/types/protobuf/restate/common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ enum TargetName {
6363
// ReplicatedLoglet
6464
REPLICATED_LOGLET_APPEND = 40;
6565
REPLICATED_LOGLET_APPENDED = 41;
66+
REPLICATED_LOGLET_GET_INFO = 42;
67+
REPLICATED_LOGLET_INFO = 43;
6668
}
6769

6870
enum NodeStatus {

crates/types/src/net/replicated_loglet.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ impl Append {
9191
self.payloads
9292
.iter()
9393
.map(|p| p.estimated_encode_size())
94-
.sum()
94+
.sum::<usize>()
95+
+ size_of::<CommonRequestHeader>()
9596
}
9697
}
9798

@@ -137,3 +138,29 @@ impl Appended {
137138
self
138139
}
139140
}
141+
142+
define_rpc! {
143+
@request = GetSequencerInfo,
144+
@response = SequencerInfo,
145+
@request_target = TargetName::ReplicatedLogletGetInfo,
146+
@response_target = TargetName::ReplicatedLogletInfo,
147+
}
148+
149+
// ** APPEND
150+
#[derive(Debug, Clone, Serialize, Deserialize)]
151+
pub struct GetSequencerInfo {
152+
#[serde(flatten)]
153+
pub header: CommonRequestHeader,
154+
}
155+
156+
impl GetSequencerInfo {
157+
pub fn estimated_encode_size(&self) -> usize {
158+
size_of::<CommonRequestHeader>()
159+
}
160+
}
161+
162+
#[derive(Debug, Clone, Serialize, Deserialize)]
163+
pub struct SequencerInfo {
164+
#[serde(flatten)]
165+
pub header: CommonResponseHeader,
166+
}

0 commit comments

Comments
 (0)