-
Thank you very much for your excellent work on this library. Question
My guess is that there is a limit on concurrent requests somehow and thus the client is blocking on either send or recieve but I am unclear where / why Background
conn, err := grpc.NewClient("unix:///tmp/shacl_validator.sock",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return SitemapCrawlStats{}, fmt.Errorf("failed to connect to gRPC server: %w", err)
}
defer conn.Close()
grpcClient := protoBuild.NewShaclValidatorClient(conn)
- My Tonic Server Code
#[tokio::main(flavor = "multi_thread")] // defaults to number of cpus on the system
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let path = "/tmp/shacl_validator.sock";
// Remove the socket file if it already exists
if Path::new(path).exists() {
fs::remove_file(path)?;
}
std::fs::create_dir_all(Path::new(path).parent().unwrap())?;
let uds = UnixListener::bind(path)?;
let uds_stream = UnixListenerStream::new(uds);
// the validator just runs one CPU intensive function to validate a payload then returns
let validator = Validator::default();
println!("Starting gRPC server on {}", path);
// Run the server and listen for Ctrl+C
let server = Server::builder()
.add_service(ShaclValidatorServer::new(validator))
.serve_with_incoming_shutdown(
uds_stream,
async {
signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
}
);
let result = server.await;
// Clean up the socket file on shutdown
if Path::new(path).exists() {
println!("Cleaning up socket file at {}", path);
fs::remove_file(path)?;
}
result?;
Ok(())
}
Timing Background
Thank you very much |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Nearly all of my experience with tonic is via http2. For reference, at my day job, we have a go sdk customers use (on their own servers) that connects via normal tls http2 to remote servers (that I own) running tonic. P999 latency for payloads of this size is on the order of 1-3 milliseconds, assuming they're in the same cloud and depending on whether or not they're in the same AZ. I'm not saying this to brag, but to reassure you that multilingual grpc with < 1s latency is a very reasonable ask 😆 ❤️ First and easiest, a tokio trivia: Your let result = tokio::spawn(server).await; This makes your server run on a runtime thread, so any task it spawns will be from within the runtime - and that makes the tasks start much more quickly. I don't think this is costing you a second though. Second and probably a wild goose chase, is your go client making only 1 connection for all the concurrency? I've had to go to great lengths to make high concurrency on 1 Third and probably the root, is that 200 milliseconds is too long to hold a cooperative multitasking thread. Tokio doesn't know which task you want to execute first, so in the abstract, they run in unordered fashion, and your complete work items delay other work items. In Rust, tasks don't have any way to implicitly, preemptively yield in the middle of their work. They will occupy their thread unconditionally until they reach an Make your cpu work async: You make that 200-300ms "validation" code asynchronous, and assuming you're doing a bunch of computation in some loop, you would put an await in there every thousand iterations or something to that effect, like this: tokio::task::consume_budget().await; Run your cpu work off the runtime: Not all thread-bound work is created equal. Consider how easily a cpu accomplishes thread::sleep() or reading bytes from a drive, versus a hard loop computing the nth digit of pi. I am not sure what kind of work your validation work is, so if I had to give you a silver bullet recommendation, I'd ask you to move that 200-300ms to a different thread pool. You can do that by: let validation_result = tokio::task::spawn_blocking(async move {
run_200ms_validation(the_request)
}); or by creating a Lastly, some obligatory engineering theory because I can't help myself: If you have 100 computations that each take 0.2 seconds, that's 20 seconds of CPU time that has come from somewhere. By Little's Law, a 16 core CPU can only retire a CPU-bound 200ms task at a rate of:
80 requests per second. If you make requests at a higher rate than that, you are queueing and your latency will increase - irrespective of libraries, tech stacks, architectures, or anything else. |
Beta Was this translation helpful? Give feedback.
-
Thank you so much for your answer! That indeed made a huge speedup, and after some additional caching on the schema validation I am doing, I am now getting < 5 ms round trip times. This is awesome! Thanks! Context for what I did for others:
// Run the server and listen for Ctrl+C
let server = Server::builder()
// this service is the one i custom defined in tonic
.add_service(ShaclValidatorServer::new(validator))
.serve_with_incoming_shutdown(uds_stream, async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
});
// Make sure that the server is ran on the runtime
let result = tokio::spawn(server).await?;
#[tonic::async_trait]
impl ShaclValidator for Validator {
/// Validates the triples in the request using both dataset-oriented and location-oriented validation.
/// Returns a ValidationReply with the validation result.
async fn validate(
&self,
request: Request<TurtleValidationRequest>,
) -> Result<Response<ValidationReply>, Status> {
println!("Received request");
let start = std::time::Instant::now();
let req = request.into_inner();
let triples = Arc::new(req.triples);
let dataset_triples = triples.clone();
let location_triples = triples.clone();
let dataset_schema = self.dataset_schema.clone();
let location_schema = self.location_schema.clone();
// takes about 200ms but now occurs in such a way that the main async runtime is not blocked
let dataset_handle = tokio::task::spawn_blocking(move || {
let start_validation = std::time::Instant::now();
let res = validate_triples(&dataset_schema, &dataset_triples);
println!("Dataset validation took: {:?}", start_validation.elapsed());
res
});
// takes about 200ms but now occurs in such a way that the main async runtime is not blocked
let location_handle = tokio::task::spawn_blocking(move || {
let start_validation = std::time::Instant::now();
let res = validate_triples(&location_schema, &location_triples);
println!("Location validation took: {:?}", start_validation.elapsed());
res
});
let (dataset_validation_report, location_validation_report) =
tokio::try_join!(dataset_handle, location_handle)
.map_err(|e| Status::internal(format!("Join error: {:?}", e)))?; This got it down around 400ms; I think some of the concurrent allocations were still making it slow?
println!("Received request");
let start = std::time::Instant::now();
let req = request.into_inner();
let dataset_validation_report = self.validate_dataset_oriented(&req.triples);
let location_validation_report = self.validate_location_oriented(&req.triples);
println!("Validation took {:?}", start.elapsed()); This ended up getting me <3ms on average for my round trip
Thank you very much again; I appreciate it! |
Beta Was this translation helpful? Give feedback.
Nearly all of my experience with tonic is via http2. For reference, at my day job, we have a go sdk customers use (on their own servers) that connects via normal tls http2 to remote servers (that I own) running tonic. P999 latency for payloads of this size is on the order of 1-3 milliseconds, assuming they're in the same cloud and depending on whether or not they're in the same AZ. I'm not saying this to brag, but to reassure you that multilingual grpc with < 1s latency is a very reasonable ask 😆 ❤️
First and easiest, a tokio trivia: Your
main
is fine, except that it runs your server "off the runtime." This can cause delays mostly for new connections (possibly more). Try changing one lin…