Skip to content

Commit c3d5d79

Browse files
authored
feat(meta): implement distributed semaphore based on meta-service (#17651)
* docs: update meta-service change-log * feat(meta): implement distributed semaphore based on meta-service This commit introduces a distributed semaphore implementation that provides reliable resource management across distributed systems. The semaphore is built on top of meta-service and offers: - Fair queueing mechanism using sequence numbers - Automatic lease management with TTL - Atomic operations for consistency - Real-time updates via meta-service watch API Usage: ```rust let client = MetaGrpcClient::try_create(/*..*/); let acquired_guard = Semaphore::new_acquired( client, "your/semaphore/name/in/meta/service", 2, // capacity: 2 acquired at most "id11", // ID of this acquirer Duration::from_secs(3) // lease time ).await?; acquired_guard.await; // Released ``` The implementation uses a queue-based structure with sequence numbers to ensure fair ordering of resource requests. It handles distributed coordination through meta-service, providing automatic lease extension and cleanup of stale entries. Key components: - Semaphore: Main entry point for semaphore operations - Acquirer: Handles the acquisition process - AcquiredGuard: Manages semaphore lifecycle - Queue-based storage with sequence numbers Note: Current version sets capacity per client. Future versions will store capacity in meta key for global consistency. * chore: add example * chore: fix fmt * chore: fix type changes * refactor: rename semaphore key|entry to permit key|entry * chore: remove unused deps * chore: fix lint * chore: fix display ut for permit * chore: fix lint * chore: fix ut * refactor: rename AcquiredGuard to Permit * refactor: refine acquirer doc * refactor: rename SemaphoreEvent to PermitEvent * chore: rename * chore: refine path * update meta-service compat doc
1 parent 536187a commit c3d5d79

38 files changed

+2962
-19
lines changed

โ€ŽCargo.lock

Lines changed: 21 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€ŽCargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ members = [
105105
"src/meta/ee",
106106
"src/meta/proto-conv",
107107
"src/meta/protos",
108+
"src/meta/semaphore",
108109
"src/meta/service",
109110
"tests/sqllogictests",
110111
"src/tests/sqlsmith",
@@ -146,6 +147,7 @@ databend-common-meta-embedded = { path = "src/meta/embedded" }
146147
databend-common-meta-kvapi = { path = "src/meta/kvapi" }
147148
databend-common-meta-process = { path = "src/meta/process" }
148149
databend-common-meta-raft-store = { path = "src/meta/raft-store" }
150+
databend-common-meta-semaphore = { path = "src/meta/semaphore" }
149151
databend-common-meta-sled-store = { path = "src/meta/sled-store" }
150152
databend-common-meta-stoerr = { path = "src/meta/stoerr" }
151153
databend-common-meta-store = { path = "src/meta/store" }
@@ -274,6 +276,7 @@ chrono = { version = "0.4.31", features = ["serde"] }
274276
chrono-tz = { version = "0.8", features = ["serde"] }
275277
cidr = { version = "0.2.2" }
276278
clap = { version = "4.4.2", features = ["derive"] }
279+
codeq = { version = "0.5.2" }
277280
comfy-table = "7"
278281
convert_case = "0.6.0"
279282
cookie = "0.18.1"

โ€Žsrc/meta/README.md

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,29 @@ To add a new feature(add new type or update an type), the developer should do:
7777

7878
The following is an illustration of the latest query-meta compatibility:
7979

80-
| `Meta\Query` | [0.9.41, 1.1.34) | [1.1.34, 1.2.287) | [1.2.287, 1.2.361) | [1.2.361, +โˆž) |
81-
|:-------------------|:-----------------|:---------------|:-----------|:-----------|
82-
| [0.8.30, 0.8.35) | โŒ | โŒ | โŒ |โŒ |
83-
| [0.8.35, 0.9.23) | โœ… | โŒ | โŒ |โŒ |
84-
| [0.9.23, 0.9.42) | โœ… | โŒ | โŒ |โŒ |
85-
| [0.9.42, 1.1.32) | โœ… | โŒ | โŒ |โŒ |
86-
| [1.1.32, 1.2.63) | โœ… | โœ… | โŒ |โŒ |
87-
| [1.2.63, 1.2.226) | โœ… | โœ… | โŒ |โŒ |
88-
| [1.2.226, 1.2.258) | โœ… | โœ… | โœ… |โŒ |
89-
| [1.2.258, 1.2.663) | โœ… | โœ… | โœ… |โœ… |
90-
| [1.2.663, +โˆž) | โŒ | โŒ | โœ… |โœ… |
80+
`[, a.b.c)` denotes the range of versions from previous version(on the left column)(inclusive)
81+
upto `a.b.c` (exclusive).
82+
83+
TODO: xx is the version in which semaphore is added. update it when merged.
84+
85+
| `Meta\Query` | 1.1.34) | [, 1.2.287) | [, 1.2.361) | [, xx) | [, +โˆž) |
86+
|:-------------------|:--------|:------------|:------------|:-------|:-----------------|
87+
| [0.8.30, 0.8.35) | | โŒ | โŒ | โŒ | โŒ |
88+
| [0.8.35, 0.9.23) | | โŒ | โŒ | โŒ | โŒ |
89+
| [0.9.23, 0.9.42) | | โŒ | โŒ | โŒ | โŒ |
90+
| [0.9.42, 1.1.32) | | โŒ | โŒ | โŒ | โŒ |
91+
| [1.1.32, 1.2.63) | | โœ… | โŒ | โŒ | โŒ |
92+
| [1.2.63, 1.2.226) | | โœ… | โŒ | โŒ | โŒ |
93+
| [1.2.226, 1.2.258) | | โœ… | โœ… | โŒ | โŒ |
94+
| [1.2.258, 1.2.663) | | โœ… | โœ… | โœ… | โœ…(no semaphore) |
95+
| [1.2.663, 1.2.677) | | โŒ | โœ… | โœ… | โœ…(no semaphore) |
96+
| [1.2.677, +โˆž) | | โŒ | โœ… | โœ… | โœ… |
9197

9298
History versions that are not included in the above chart:
9399

94100
- Query `[0.7.59, 0.8.80)` is compatible with Meta `[0.8.30, 0.9.23)`.
95101
- Query `[0.8.80, 0.9.41)` is compatible with Meta `[0.8.35, 0.9.42)`.
102+
- Query `[0.9.41, 1.1.34)` is compatible with Meta `[0.8.35, 1.2.663)`.
96103

97104

98105
## Compatibility between databend-meta

โ€Žsrc/meta/client/src/grpc_client.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use databend_common_meta_types::protobuf::WatchRequest;
5858
use databend_common_meta_types::protobuf::WatchResponse;
5959
use databend_common_meta_types::ConnectionError;
6060
use databend_common_meta_types::GrpcConfig;
61+
use databend_common_meta_types::InvalidArgument;
6162
use databend_common_meta_types::MetaClientError;
6263
use databend_common_meta_types::MetaError;
6364
use databend_common_meta_types::MetaHandshakeError;
@@ -986,6 +987,26 @@ impl MetaGrpcClient {
986987
debug!("{}: handle watch request: {:?}", self, watch_request);
987988

988989
let mut client = self.get_established_client().await?;
990+
991+
if watch_request.initial_flush {
992+
let server_version = client.server_protocol_version();
993+
let least_server_version = 1002677;
994+
995+
if server_version < least_server_version {
996+
let err = format!(
997+
"WatchRequest::initial_flush requires databend-meta is at least {}, but: {}",
998+
least_server_version, server_version
999+
);
1000+
1001+
error!("{}", err);
1002+
1003+
return Err(InvalidArgument::new(
1004+
AnyError::error("databend-meta version too low"),
1005+
err,
1006+
)
1007+
.into());
1008+
}
1009+
}
9891010
let res = client.watch(watch_request).await?;
9901011
Ok(res.into_inner())
9911012
}

โ€Žsrc/meta/client/src/lib.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,21 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
113113
/// - 2024-11-23: since 1.2.663
114114
/// ๐Ÿ‘ฅ client: remove use of `Operation::AsIs`
115115
///
116-
/// - 2024-12-1*: since 1.2.*
116+
/// - 2024-12-16: since 1.2.674
117117
/// ๐Ÿ–ฅ server: add `txn_condition::Target::KeysWithPrefix`,
118118
/// to support matching the key count by a prefix.
119119
///
120-
/// - 2024-12-1*: since 1.2.*
121-
/// ๐Ÿ–ฅ server: add `TxnRequest::condition_tree`,
122-
/// to specify a complex bool expression.
120+
/// - 2024-12-20: since 1.2.676
121+
/// ๐Ÿ–ฅ server: add `TxnRequest::operations`,
122+
/// to specify a complex bool expression and corresponding operations
123123
///
124+
/// - 2024-12-26: since 1.2.677
125+
/// ๐Ÿ–ฅ server: add `WatchRequest::initial_flush`,
126+
/// to let watch stream flush all keys in a range at the beginning.
127+
///
128+
/// - 2025-03-28: since TODO: add version when merged.
129+
/// ๐Ÿ‘ฅ client: semaphore(watch) requires `WatchRequest::initial_flush`(`1,2.677`),
130+
/// other RPC does not require `1.2.677`, requires only `1.2.259`.
124131
///
125132
/// Server feature set:
126133
/// ```yaml

โ€Žsrc/meta/semaphore/Cargo.toml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
[package]
2+
name = "databend-common-meta-semaphore"
3+
description = "A semaphore impl based on distributed meta-service"
4+
version = { workspace = true }
5+
authors = { workspace = true }
6+
license = { workspace = true }
7+
publish = { workspace = true }
8+
edition = { workspace = true }
9+
10+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
11+
12+
[lib]
13+
doctest = false
14+
test = true
15+
16+
[dependencies]
17+
codeq = { workspace = true }
18+
databend-common-base = { workspace = true }
19+
databend-common-meta-client = { workspace = true }
20+
databend-common-meta-kvapi = { workspace = true }
21+
databend-common-meta-types = { workspace = true }
22+
futures = { workspace = true }
23+
log = { workspace = true }
24+
thiserror = { workspace = true }
25+
tokio = { workspace = true }
26+
tonic = { workspace = true }
27+
28+
[dev-dependencies]
29+
anyhow = { workspace = true }
30+
pretty_assertions = { workspace = true }
31+
32+
[lints]
33+
workspace = true

โ€Žsrc/meta/semaphore/README.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Databend Common Meta Semaphore
2+
3+
A distributed semaphore implementation based on meta-service, providing reliable resource management across distributed systems.
4+
5+
## Overview
6+
7+
This crate implements a distributed semaphore using a meta-service as the underlying storage and coordination mechanism. It provides a robust way to manage and coordinate access to limited resources across distributed systems.
8+
9+
## Features
10+
11+
- **Distributed Coordination**: Uses meta-service for reliable distributed coordination
12+
- **Fair Queueing**: Implements a fair queueing mechanism based on sequence numbers
13+
- **Automatic Lease Management**: Handles TTL and lease extension automatically
14+
- **Atomic Operations**: Ensures consistency through atomic operations on meta-service
15+
- **Event-based Updates**: Real-time updates through meta-service watch API
16+
17+
## Key Components
18+
19+
### Semaphore Structure
20+
21+
```text
22+
<prefix>/meta -> {capacity: 10} // Planned for future versions
23+
<prefix>/queue/<seq_1> -> {id: "<id_1>", value: 1}
24+
<prefix>/queue/<seq_2> -> {id: "<id_2>", value: 2}
25+
<prefix>/queue/<seq_3> -> {id: "<id_3>", value: 1}
26+
<prefix>/seq_generator -> {}
27+
```
28+
29+
- `<prefix>`: User-defined string to identify a semaphore instance
30+
- `queue/*`: Contains semaphore entries with sequence numbers
31+
- `seq_generator`: Generates globally unique sequence numbers
32+
- Each entry contains:
33+
- `id`: User-defined identifier
34+
- `value`: Resource amount consumed
35+
36+
### Main Types
37+
38+
- `Semaphore`: The main entry point for semaphore operations
39+
- `Acquirer`: Handles the semaphore acquisition process
40+
- `AcquiredGuard`: Manages the lifecycle of an acquired semaphore
41+
- `SemaphoreEntry`: Represents a semaphore entry in the queue
42+
- `SemaphoreKey`: Defines the key structure for semaphore entries
43+
44+
## Usage
45+
46+
```rust
47+
let client = MetaGrpcClient::try_create(/*..*/);
48+
let acquired_guard = Semaphore::new_acquired(
49+
client,
50+
"your/semaphore/name/in/meta/service",
51+
2, // capacity: 2 acquired at most
52+
"id11", // ID of this acquirer
53+
Duration::from_secs(3) // lease time
54+
).await?;
55+
56+
acquired_guard.await;
57+
// Released
58+
```
59+
60+
## Implementation Details
61+
62+
### Acquisition Process
63+
64+
1. Client obtains a new sequence number from `seq_generator`
65+
2. Creates a `SemaphoreEntry` and inserts it at `queue/<seq>`
66+
3. Submits a watch request to monitor changes
67+
4. Maintains local collections of `acquired` and `waiting` entries
68+
5. Manages entries based on capacity and sequence order
69+
70+
![](semaphore_diagram.svg)
71+
72+
### Consistency Guarantees
73+
74+
- Atomic verification of sequence numbers during insertion
75+
- Fair ordering based on sequence numbers
76+
- Automatic lease management to prevent stale entries
77+
- Real-time updates through meta-service watch API
78+
79+
## Current Limitations
80+
81+
- Capacity is set per client rather than globally (planned for future versions)
82+
- Different capacity values across clients may lead to starvation
83+
- Future versions will store capacity in the `meta` key for consistency
84+
85+
## License
86+
87+
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

0 commit comments

Comments
ย (0)