Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.

Initial implementation of both sns-executor and sns-worker #277

Merged
merged 13 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
resolver = "2"
members = ["coprocessor", "executor", "fhevm-engine-common", "listener"]
members = ["coprocessor", "executor", "fhevm-engine-common", "listener", "sns-executor"]


[workspace.package]
authors = ["Zama"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE ciphertexts
ADD COLUMN IF NOT EXISTS large_ct BYTEA,
ADD COLUMN IF NOT EXISTS is_sent BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS is_allowed BOOLEAN NOT NULL DEFAULT FALSE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE ciphertexts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are migrations always the same (ie same db )?
if so we could have a symbolic link instead for the full directory

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could just put the files in the fhevm-engine-common crate or somewhere else, but reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the migrations folder is duplicated in both fhevm-db and coprocessor, but for now, I'm leaving it as is to prevent conflicts with the tfhe-worker changes.

ADD COLUMN IF NOT EXISTS large_ct BYTEA,
ADD COLUMN IF NOT EXISTS is_sent BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS is_allowed BOOLEAN NOT NULL DEFAULT FALSE;
24 changes: 24 additions & 0 deletions fhevm-engine/fhevm-engine-common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Result;
use bigdecimal::num_bigint::BigInt;
use tfhe::integer::bigint::StaticUnsignedBigInt;
use tfhe::integer::ciphertext::BaseRadixCiphertext;
use tfhe::integer::U256;
use tfhe::prelude::{CiphertextList, FheDecrypt};
use tfhe::shortint::Ciphertext;
use tfhe::{CompressedCiphertextList, CompressedCiphertextListBuilder};

use crate::utils::{safe_deserialize, safe_serialize};
Expand Down Expand Up @@ -389,6 +391,28 @@ impl SupportedFheCiphertexts {
}
}

pub fn to_ciphertext64(self) -> BaseRadixCiphertext<Ciphertext> {
match self {
SupportedFheCiphertexts::FheBool(v) => {
BaseRadixCiphertext::from(vec![v.into_raw_parts()])
}
SupportedFheCiphertexts::FheUint4(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint8(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint16(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint32(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint64(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint128(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint160(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint256(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheBytes64(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheBytes128(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheBytes256(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::Scalar(_) => {
panic!("scalar cannot be converted to regular ciphertext")
}
}
}

pub fn type_num(&self) -> i16 {
match self {
// values taken to match with solidity library
Expand Down
45 changes: 45 additions & 0 deletions fhevm-engine/sns-executor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[package]
name = "sns-executor"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
# workspace dependencies
bincode = { workspace = true }
clap = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
rayon = { workspace = true }
sha3 = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
hex = "0.4"

aligned-vec = "0.5.0"
num-traits = "=0.2.19"
sqlx = { version = "0.7", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid"] }

serde_json = "=1.0"
Comment on lines +22 to +28
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could combine these under "crates.io" dependencies.

Also, we could use sqlx from the workspace dependency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I'll move it in the workspace with the next PR.


# local dependencies
fhevm-engine-common = { path = "../fhevm-engine-common" }

# arch-specific dependencies
[target.'cfg(target_arch = "x86_64")'.dependencies]
tfhe = { workspace = true, features = ["x86_64-unix"] }
[target.'cfg(target_arch = "aarch64")'.dependencies]
tfhe = { workspace = true, features = ["aarch64-unix"] }

[[bin]]
name = "sns_worker"
path = "src/bin/sns_worker.rs"


[features]
decrypt_128 = []
29 changes: 29 additions & 0 deletions fhevm-engine/sns-executor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SnS executor

## Description

### Library crate (sns-executor)

Executes a loop that:
- Retrieves `(handle, compressed_ct)` pairs from PG table.ciphertexts marked as `allowed`.
- Computes `large_ct` using the SnS algorithm.
- Updates the `large_ct` column corresponding to the specified handle.
- Sends a signal indicating the availability of newly computed `large_ct`.

#### Features
**decrypt_128** - Decrypt each `large_ct` and print it as a plaintext (for testing purposes only).

### Binary (sns-worker)

Runs sns-executor. See also `src/bin/utils/daemon_cli.rs`


## How to run a sns-worker

```
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/coprocessor \
cargo run --release -- \
--pg-listen-channel "allowed_handles" \
--pg-notify-channel "computed_handles" \
--keys-file-path "./default_keys.bin"
```
56 changes: 56 additions & 0 deletions fhevm-engine/sns-executor/src/bin/sns_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use serde::{de::DeserializeOwned, Serialize};
use sns_executor::DBConfig;
use std::fs;
use tokio::{signal::unix, sync::broadcast};

mod utils;

fn read_element<T: DeserializeOwned + Serialize>(file_path: String) -> anyhow::Result<T> {
let read_element = fs::read(file_path.clone())?;
Ok(bincode::deserialize_from(read_element.as_slice())?)
}

fn handle_sigint(cancel_tx: broadcast::Sender<()>) {
tokio::spawn(async move {
let mut signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
signal.recv().await;
cancel_tx.send(()).unwrap();
});
}

#[tokio::main]
async fn main() {
let args = utils::daemon_cli::parse_args();

// Read keys from the file path, if specified
let mut keys = None;
if let Some(path) = args.keys_file_path {
keys = Some(read_element(path).expect("Failed to read keys."));
}

let db_url = args
.database_url
.clone()
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));

tracing_subscriber::fmt().json().with_level(true).init();

let conf = sns_executor::Config {
db: DBConfig {
url: db_url,
listen_channel: args.pg_listen_channel,
notify_channel: args.pg_notify_channel,
batch_limit: args.work_items_batch_size,
polling_interval: args.pg_polling_interval,
max_connections: args.pg_pool_connections,
},
};

// Handle SIGINIT signals
let (cancel_tx, cancel_rx) = broadcast::channel(1);
handle_sigint(cancel_tx);

if let Err(err) = sns_executor::run(keys, &conf, cancel_rx).await {
tracing::error!("Worker failed: {:?}", err);
}
Comment on lines +53 to +55
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which thread(s) execute the SnS?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean I guess we expect to run multiple workers, not multiple threads, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rayon thread-pool threads. In to_large_ciphertext, there is par_iter

 let res = blocks
            .par_iter()
            .map(|current_block| self.to_large_ciphertext_block(current_block))
            .collect::<anyhow::Result<Vec<Ciphertext128Block>>>()?;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, it is multithreaded itself!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's multi-threaded over blocks of a single Ciphertext64 . There is also issue to reconsider this.

#287

}
41 changes: 41 additions & 0 deletions fhevm-engine/sns-executor/src/bin/utils/daemon_cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use clap::{command, Parser};

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
/// Work items batch size
#[arg(long, default_value_t = 4)]
pub work_items_batch_size: u32,

/// NOTIFY/LISTEN channel for database that the worker listen to
#[arg(long)]
pub pg_listen_channel: String,

/// NOTIFY/LISTEN channel for database that the worker notify to
#[arg(long)]
pub pg_notify_channel: String,

/// Polling interval in seconds
#[arg(long, default_value_t = 60)]
pub pg_polling_interval: u32,

/// Postgres pool connections
#[arg(long, default_value_t = 10)]
pub pg_pool_connections: u32,
Comment on lines +23 to +24
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, why is the default value 10? Do we need a pool of so many?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, not sure what should be the value for now. I experienced connection issues with pg_pool_connections=1

So, it just follows the recommendation from Coprocessor
See also:

#[arg(long, default_value_t = 10)]


/// Postgres database url. If unspecified DATABASE_URL environment variable is used
#[arg(long)]
pub database_url: Option<String>,

/// KeySet file. If unspecified the the keys are read from the database (not implemented)
#[arg(long)]
pub keys_file_path: Option<String>,

/// sns-executor service name in OTLP traces (not implemented)
#[arg(long, default_value = "sns-executor")]
pub service_name: String,
}

pub fn parse_args() -> Args {
Args::parse()
}
1 change: 1 addition & 0 deletions fhevm-engine/sns-executor/src/bin/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod daemon_cli;
Loading