-
Notifications
You must be signed in to change notification settings - Fork 4
Initial implementation of both sns-executor and sns-worker #277
Changes from all commits
b593196
6a69313
462be8c
3704e7b
0784bc9
48074ec
49643ba
2de5225
0088b60
9556481
551dd29
5a7a07a
cfca6c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
ALTER TABLE ciphertexts | ||
ADD COLUMN IF NOT EXISTS large_ct BYTEA, | ||
ADD COLUMN IF NOT EXISTS is_sent BOOLEAN NOT NULL DEFAULT FALSE, | ||
ADD COLUMN IF NOT EXISTS is_allowed BOOLEAN NOT NULL DEFAULT FALSE; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
ALTER TABLE ciphertexts | ||
ADD COLUMN IF NOT EXISTS large_ct BYTEA, | ||
ADD COLUMN IF NOT EXISTS is_sent BOOLEAN NOT NULL DEFAULT FALSE, | ||
ADD COLUMN IF NOT EXISTS is_allowed BOOLEAN NOT NULL DEFAULT FALSE; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
[package] | ||
name = "sns-executor" | ||
version = "0.1.0" | ||
authors.workspace = true | ||
edition.workspace = true | ||
license.workspace = true | ||
|
||
[dependencies] | ||
# workspace dependencies | ||
bincode = { workspace = true } | ||
clap = { workspace = true } | ||
prometheus = { workspace = true } | ||
prost = { workspace = true } | ||
rayon = { workspace = true } | ||
sha3 = { workspace = true } | ||
tokio = { workspace = true } | ||
tonic = { workspace = true } | ||
tracing = { workspace = true } | ||
tracing-subscriber = { workspace = true } | ||
anyhow = { workspace = true } | ||
serde = { workspace = true } | ||
hex = "0.4" | ||
|
||
aligned-vec = "0.5.0" | ||
num-traits = "=0.2.19" | ||
sqlx = { version = "0.7", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid"] } | ||
|
||
serde_json = "=1.0" | ||
Comment on lines
+22
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Could combine these under "crates.io" dependencies. Also, we could use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, I'll move it in the workspace with the next PR. |
||
|
||
# local dependencies | ||
fhevm-engine-common = { path = "../fhevm-engine-common" } | ||
|
||
# arch-specific dependencies | ||
[target.'cfg(target_arch = "x86_64")'.dependencies] | ||
tfhe = { workspace = true, features = ["x86_64-unix"] } | ||
[target.'cfg(target_arch = "aarch64")'.dependencies] | ||
tfhe = { workspace = true, features = ["aarch64-unix"] } | ||
|
||
[[bin]] | ||
name = "sns_worker" | ||
path = "src/bin/sns_worker.rs" | ||
|
||
|
||
[features] | ||
decrypt_128 = [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# SnS executor | ||
|
||
## Description | ||
|
||
### Library crate (sns-executor) | ||
|
||
Executes a loop that: | ||
- Retrieves `(handle, compressed_ct)` pairs from PG table.ciphertexts marked as `allowed`. | ||
- Computes `large_ct` using the SnS algorithm. | ||
- Updates the `large_ct` column corresponding to the specified handle. | ||
- Sends a signal indicating the availability of newly computed `large_ct`. | ||
|
||
#### Features | ||
**decrypt_128** - Decrypt each `large_ct` and print it as a plaintext (for testing purposes only). | ||
|
||
### Binary (sns-worker) | ||
|
||
Runs sns-executor. See also `src/bin/utils/daemon_cli.rs` | ||
|
||
|
||
## How to run a sns-worker | ||
|
||
``` | ||
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/coprocessor \ | ||
cargo run --release -- \ | ||
--pg-listen-channel "allowed_handles" \ | ||
--pg-notify-channel "computed_handles" \ | ||
--keys-file-path "./default_keys.bin" | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
use serde::{de::DeserializeOwned, Serialize}; | ||
use sns_executor::DBConfig; | ||
use std::fs; | ||
use tokio::{signal::unix, sync::broadcast}; | ||
|
||
mod utils; | ||
|
||
fn read_element<T: DeserializeOwned + Serialize>(file_path: String) -> anyhow::Result<T> { | ||
let read_element = fs::read(file_path.clone())?; | ||
Ok(bincode::deserialize_from(read_element.as_slice())?) | ||
} | ||
|
||
fn handle_sigint(cancel_tx: broadcast::Sender<()>) { | ||
tokio::spawn(async move { | ||
let mut signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); | ||
signal.recv().await; | ||
cancel_tx.send(()).unwrap(); | ||
}); | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let args = utils::daemon_cli::parse_args(); | ||
|
||
// Read keys from the file path, if specified | ||
let mut keys = None; | ||
if let Some(path) = args.keys_file_path { | ||
keys = Some(read_element(path).expect("Failed to read keys.")); | ||
} | ||
|
||
let db_url = args | ||
.database_url | ||
.clone() | ||
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined")); | ||
|
||
tracing_subscriber::fmt().json().with_level(true).init(); | ||
|
||
let conf = sns_executor::Config { | ||
db: DBConfig { | ||
url: db_url, | ||
listen_channel: args.pg_listen_channel, | ||
notify_channel: args.pg_notify_channel, | ||
batch_limit: args.work_items_batch_size, | ||
polling_interval: args.pg_polling_interval, | ||
max_connections: args.pg_pool_connections, | ||
}, | ||
}; | ||
|
||
// Handle SIGINIT signals | ||
let (cancel_tx, cancel_rx) = broadcast::channel(1); | ||
handle_sigint(cancel_tx); | ||
|
||
if let Err(err) = sns_executor::run(keys, &conf, cancel_rx).await { | ||
tracing::error!("Worker failed: {:?}", err); | ||
} | ||
Comment on lines
+53
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which thread(s) execute the SnS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean I guess we expect to run multiple workers, not multiple threads, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rayon thread-pool threads. In
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes, it is multithreaded itself! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's multi-threaded over blocks of a single Ciphertext64 . There is also issue to reconsider this. |
||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,41 @@ | ||||
use clap::{command, Parser}; | ||||
|
||||
#[derive(Parser, Debug, Clone)] | ||||
#[command(version, about, long_about = None)] | ||||
pub struct Args { | ||||
/// Work items batch size | ||||
#[arg(long, default_value_t = 4)] | ||||
pub work_items_batch_size: u32, | ||||
|
||||
/// NOTIFY/LISTEN channel for database that the worker listen to | ||||
#[arg(long)] | ||||
pub pg_listen_channel: String, | ||||
|
||||
/// NOTIFY/LISTEN channel for database that the worker notify to | ||||
#[arg(long)] | ||||
pub pg_notify_channel: String, | ||||
|
||||
/// Polling interval in seconds | ||||
#[arg(long, default_value_t = 60)] | ||||
pub pg_polling_interval: u32, | ||||
|
||||
/// Postgres pool connections | ||||
#[arg(long, default_value_t = 10)] | ||||
pub pg_pool_connections: u32, | ||||
Comment on lines
+23
to
+24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw, why is the default value 10? Do we need a pool of so many? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH, not sure what should be the value for now. I experienced connection issues with So, it just follows the recommendation from
|
||||
|
||||
/// Postgres database url. If unspecified DATABASE_URL environment variable is used | ||||
#[arg(long)] | ||||
pub database_url: Option<String>, | ||||
|
||||
/// KeySet file. If unspecified the the keys are read from the database (not implemented) | ||||
#[arg(long)] | ||||
pub keys_file_path: Option<String>, | ||||
|
||||
/// sns-executor service name in OTLP traces (not implemented) | ||||
#[arg(long, default_value = "sns-executor")] | ||||
pub service_name: String, | ||||
} | ||||
|
||||
pub fn parse_args() -> Args { | ||||
Args::parse() | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pub mod daemon_cli; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are migrations always the same (ie same db )?
if so we could have a symbolic link instead for the full directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we could just put the files in the fhevm-engine-common crate or somewhere else, but reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why the migrations folder is duplicated in both fhevm-db and coprocessor, but for now, I'm leaving it as is to prevent conflicts with the tfhe-worker changes.