Skip to content

Commit fd748da

Browse files
authored
Merge pull request #116 from ruccho/feature/backpressure
Add backpressure control and allow asynchronous flushing
2 parents 1f499bc + 3e2c268 commit fd748da

File tree

11 files changed

+203
-25
lines changed

11 files changed

+203
-25
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ Once the handler sends a request, these settings become immutable and cannot be
231231
|Http2MaxSendBufferSize|Gets or sets the maximum write buffer size for each HTTP/2 stream. Default is currently 1MB, but may change.|
232232
|Http2InitialMaxSendStreams|Gets or sets the initial maximum of locally initiated (send) streams. This value will be overwritten by the value included in the initial SETTINGS frame received from the peer as part of a connection preface.|
233233
|UnixDomainSocketPath|Gets or sets the path to a Unix Domain Socket to be used as HTTP communication channel instead of the default TCP.|
234+
|ResponsePipeOptions|Gets or sets the options for the pipe used to receive the response body.|
234235

235236
Most of them expose [hyper client settings](https://docs.rs/hyper-util/latest/hyper_util/client/legacy/struct.Builder.html), so please check those as well.
236237

native/yaha_native/src/binding.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use hyper::{
99
Request, StatusCode, Uri, Version,
1010
};
1111
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
12-
use tokio::select;
12+
use tokio::{select, sync::oneshot};
1313
use tokio_util::sync::CancellationToken;
1414

1515
use crate::interop::{ByteBuffer, StringBuffer};
@@ -65,7 +65,7 @@ pub extern "C" fn yaha_init_context(
6565
status_code: i32,
6666
version: YahaHttpVersion,
6767
),
68-
on_receive: extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8),
68+
on_receive: extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8, task_handle: usize),
6969
on_complete: extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason, h2_error_code: u32),
7070
) -> *mut YahaNativeContext {
7171
let runtime_ctx = YahaNativeRuntimeContextInternal::from_raw_context(runtime_ctx);
@@ -86,7 +86,7 @@ pub extern "C" fn yaha_dispose_context(ctx: *mut YahaNativeContext) {
8686
ctx.on_status_code_and_headers_receive = _sentinel_on_status_code_and_headers_receive;
8787
}
8888
extern "C" fn _sentinel_on_complete(_: i32, _: NonZeroIsize, _: CompletionReason, _: u32) { panic!("The context has already disposed: on_complete"); }
89-
extern "C" fn _sentinel_on_receive(_: i32, _: NonZeroIsize, _: usize, _: *const u8) { panic!("The context has already disposed: on_receive"); }
89+
extern "C" fn _sentinel_on_receive(_: i32, _: NonZeroIsize, _: usize, _: *const u8, _: usize) { panic!("The context has already disposed: on_receive"); }
9090
extern "C" fn _sentinel_on_status_code_and_headers_receive(_: i32, _: NonZeroIsize, _: i32, _: YahaHttpVersion) { panic!("The context has already disposed: on_status_code_and_headers_receive"); }
9191

9292
#[no_mangle]
@@ -347,7 +347,6 @@ pub extern "C" fn yaha_client_config_unix_domain_socket_path(
347347
ctx.uds_socket_path.get_or_insert(uds_socket_path.into());
348348
}
349349

350-
351350
#[no_mangle]
352351
pub extern "C" fn yaha_build_client(ctx: *mut YahaNativeContext) {
353352
let ctx = YahaNativeContextInternal::from_raw_context(ctx);
@@ -582,7 +581,30 @@ pub extern "C" fn yaha_request_begin(
582581
Ok(frame) => {
583582
if frame.is_data() {
584583
let data = frame.into_data().unwrap();
585-
(ctx.on_receive)(seq, state, data.len(), data.as_ptr());
584+
let (tx, rx) = oneshot::channel::<Result<(), String>>();
585+
let tx = Box::into_raw(Box::new(tx)) as usize;
586+
587+
(ctx.on_receive)(seq, state, data.len(), data.as_ptr(), tx);
588+
match rx.await {
589+
Ok(result) => {
590+
if let Err(err) = result {
591+
// the sender reports an error
592+
LAST_ERROR.with(|v| {
593+
*v.borrow_mut() = Some(err);
594+
});
595+
(ctx.on_complete)(seq, state, CompletionReason::Error, 0);
596+
return;
597+
}
598+
},
599+
Err(e) => {
600+
// the sender is dropped without sending
601+
LAST_ERROR.with(|v| {
602+
*v.borrow_mut() = Some("on_receive() has not completed correctly.".to_string());
603+
});
604+
(ctx.on_complete)(seq, state, CompletionReason::Error, 0);
605+
return;
606+
}
607+
}
586608
} else if frame.is_trailers() {
587609

588610
trailer_received = true;
@@ -815,3 +837,14 @@ pub extern "C" fn yaha_request_destroy(
815837
let req_ctx = crate::context::to_internal_arc(req_ctx);
816838
true
817839
}
840+
841+
#[no_mangle]
842+
pub extern "C" fn yaha_complete_task(task_handle: usize, error: *const StringBuffer) {
843+
let tx = unsafe { Box::from_raw(task_handle as *mut oneshot::Sender<Result<(), String>>) };
844+
if error.is_null() {
845+
tx.send(Ok(())).unwrap();
846+
} else {
847+
let error = unsafe { (*error).to_str().to_string() };
848+
tx.send(Err(error)).unwrap();
849+
}
850+
}

native/yaha_native/src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{primitives::{CompletionReason, YahaHttpVersion}};
3333

3434
type OnStatusCodeAndHeadersReceive =
3535
extern "C" fn(req_seq: i32, state: NonZeroIsize, status_code: i32, version: YahaHttpVersion);
36-
type OnReceive = extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8);
36+
type OnReceive = extern "C" fn(req_seq: i32, state: NonZeroIsize, length: usize, buf: *const u8, task_handle: usize);
3737
type OnComplete = extern "C" fn(req_seq: i32, state: NonZeroIsize, reason: CompletionReason, h2_error_code: u32);
3838
type OnServerCertificateVerificationHandler = extern "C" fn(callback_state: NonZeroIsize, server_name: *const u8, server_name_len: usize, certificate_der: *const u8, certificate_der_len: usize, now: u64) -> bool;
3939

src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ internal class NativeHttpHandlerCore : IDisposable
2323
private readonly YahaContextSafeHandle _handle;
2424
private GCHandle? _onVerifyServerCertificateHandle; // The handle must be released in Dispose if it is allocated.
2525
private bool _disposed = false;
26+
private PipeOptions? _responsePipeOptions;
2627

2728
// NOTE: We need to keep the callback delegates in advance.
2829
// The delegates are kept on the Rust side, so it will crash if they are garbage collected.
@@ -199,6 +200,12 @@ private unsafe void Initialize(YahaNativeContext* ctx, NativeClientSettings sett
199200
}
200201
}
201202

203+
if (settings.ResponsePipeOptions is { } responsePipeOptions)
204+
{
205+
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"Option '{nameof(settings.ResponsePipeOptions)}' = {responsePipeOptions}");
206+
_responsePipeOptions = responsePipeOptions;
207+
}
208+
202209
NativeMethods.yaha_build_client(ctx);
203210

204211
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"{nameof(NativeHttpHandlerCore)} created");
@@ -327,7 +334,7 @@ private unsafe RequestContext UnsafeSend(YahaContextSafeHandle ctxHandle, YahaRe
327334
NativeMethods.yaha_request_set_has_body(ctx, reqCtx, request.Content != null);
328335

329336
// Prepare a request context
330-
var requestContextManaged = new RequestContext(_handle, reqCtxHandle, request, requestSequence, cancellationToken);
337+
var requestContextManaged = new RequestContext(_handle, reqCtxHandle, request, requestSequence, _responsePipeOptions, cancellationToken);
331338
requestContextManaged.Allocate();
332339
if (cancellationToken.IsCancellationRequested)
333340
{
@@ -450,13 +457,63 @@ private static unsafe bool OnServerCertificateVerification(IntPtr callbackState,
450457
}
451458

452459
[MonoPInvokeCallback(typeof(NativeMethods.yaha_init_context_on_receive_delegate))]
453-
private static unsafe void OnReceive(int reqSeq, IntPtr state, UIntPtr length, byte* buf)
460+
private static unsafe void OnReceive(int reqSeq, IntPtr state, UIntPtr length, byte* buf, nuint taskHandle)
454461
{
455-
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{reqSeq}:State:0x{state:X}] Response data received: Length={length}");
462+
try
463+
{
464+
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{reqSeq}:State:0x{state:X}] Response data received: Length={length}");
456465

457-
var bufSpan = new Span<byte>(buf, (int)length);
458-
var requestContext = RequestContext.FromHandle(state);
459-
requestContext.Response.Write(bufSpan);
466+
var bufSpan = new Span<byte>(buf, (int)length);
467+
var requestContext = RequestContext.FromHandle(state);
468+
var write = requestContext.Response.WriteAsync(bufSpan);
469+
470+
if (write.IsCompleted)
471+
{
472+
write.GetAwaiter().GetResult();
473+
CompleteTask(taskHandle);
474+
}
475+
else
476+
{
477+
// backpressure is occurred
478+
var awaiter = write.GetAwaiter();
479+
awaiter.UnsafeOnCompleted(() =>
480+
{
481+
try
482+
{
483+
awaiter.GetResult();
484+
}
485+
catch (Exception ex)
486+
{
487+
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Error($"[ReqSeq:{reqSeq}:State:0x{state:X}] Failed to flush response data: {ex}");
488+
CompleteTask(taskHandle, ex.ToString());
489+
return;
490+
}
491+
492+
CompleteTask(taskHandle);
493+
});
494+
}
495+
}
496+
catch (Exception ex)
497+
{
498+
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Error($"[ReqSeq:{reqSeq}:State:0x{state:X}] Failed to flush response data: {ex}");
499+
CompleteTask(taskHandle, ex.ToString());
500+
}
501+
}
502+
503+
private static unsafe void CompleteTask(nuint taskHandle, string? error = null)
504+
{
505+
if (error is null)
506+
{
507+
NativeMethods.yaha_complete_task(taskHandle, (StringBuffer*)0);
508+
return;
509+
}
510+
511+
using var messageUtf8 = new TempUtf8String(error);
512+
fixed (byte* messagePtr = messageUtf8.Span)
513+
{
514+
var sb = new StringBuffer(messagePtr, messageUtf8.Span.Length);
515+
NativeMethods.yaha_complete_task((nuint)(nint)taskHandle, &sb);
516+
}
460517
}
461518

462519
[MonoPInvokeCallback(typeof(NativeMethods.yaha_init_context_on_complete_delegate))]

src/YetAnotherHttpHandler/NativeMethods.Uwp.g.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal static unsafe partial class NativeMethods
3333
public delegate void yaha_init_context_on_status_code_and_headers_receive_delegate(int req_seq, nint state, int status_code, YahaHttpVersion version);
3434

3535
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
36-
public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf);
36+
public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf, nuint task_handle);
3737

3838
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
3939
public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason, uint h2_error_code);
@@ -172,6 +172,9 @@ internal static unsafe partial class NativeMethods
172172
[return: MarshalAs(UnmanagedType.U1)]
173173
public static extern bool yaha_request_destroy(YahaNativeContext* ctx, YahaNativeRequestContext* req_ctx);
174174

175+
[DllImport(__DllName, EntryPoint = "yaha_complete_task", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
176+
public static extern void yaha_complete_task(nuint task_handle, StringBuffer* error);
177+
175178

176179
}
177180

src/YetAnotherHttpHandler/NativeMethods.g.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ internal static unsafe partial class NativeMethods
3838
public delegate void yaha_init_context_on_status_code_and_headers_receive_delegate(int req_seq, nint state, int status_code, YahaHttpVersion version);
3939

4040
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
41-
public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf);
41+
public delegate void yaha_init_context_on_receive_delegate(int req_seq, nint state, nuint length, byte* buf, nuint task_handle);
4242

4343
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
4444
public delegate void yaha_init_context_on_complete_delegate(int req_seq, nint state, CompletionReason reason, uint h2_error_code);
@@ -177,6 +177,9 @@ internal static unsafe partial class NativeMethods
177177
[return: MarshalAs(UnmanagedType.U1)]
178178
public static extern bool yaha_request_destroy(YahaNativeContext* ctx, YahaNativeRequestContext* req_ctx);
179179

180+
[DllImport(__DllName, EntryPoint = "yaha_complete_task", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
181+
public static extern void yaha_complete_task(nuint task_handle, StringBuffer* error);
182+
180183

181184
}
182185

src/YetAnotherHttpHandler/RequestContext.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ internal class RequestContext : IDisposable
3333
public PipeWriter Writer => _pipe.Writer;
3434
public IntPtr Handle => GCHandle.ToIntPtr(_handle);
3535

36-
internal RequestContext(YahaContextSafeHandle ctx, YahaRequestContextSafeHandle requestContext, HttpRequestMessage requestMessage, int requestSequence, CancellationToken cancellationToken)
36+
internal RequestContext(YahaContextSafeHandle ctx, YahaRequestContextSafeHandle requestContext, HttpRequestMessage requestMessage, int requestSequence, PipeOptions? responsePipeOptions, CancellationToken cancellationToken)
3737
{
3838
_ctxHandle = ctx;
3939
_requestContextHandle = requestContext;
40-
_response = new ResponseContext(requestMessage, this, cancellationToken);
40+
_response = new ResponseContext(requestMessage, this, responsePipeOptions, cancellationToken);
4141
_readRequestTask = default;
4242
_requestSequence = requestSequence;
4343
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

src/YetAnotherHttpHandler/ResponseContext.cs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@ namespace Cysharp.Net.Http
1313
internal class ResponseContext
1414
{
1515
private readonly RequestContext _requestContext;
16-
private readonly Pipe _pipe = new Pipe(System.IO.Pipelines.PipeOptions.Default);
16+
private readonly Pipe _pipe;
1717
private readonly TaskCompletionSource<HttpResponseMessage> _responseTask;
1818
private readonly HttpResponseMessage _message;
1919
private readonly CancellationToken _cancellationToken;
2020
private readonly CancellationTokenRegistration _tokenRegistration;
2121
private readonly object _writeLock = new object();
2222
private bool _completed = false;
23+
private Task<FlushResult>? _latestFlushTask;
2324

24-
internal ResponseContext(HttpRequestMessage requestMessage, RequestContext requestContext, CancellationToken cancellationToken)
25+
internal ResponseContext(HttpRequestMessage requestMessage, RequestContext requestContext, PipeOptions? pipeOptions, CancellationToken cancellationToken)
2526
{
27+
_pipe = new Pipe(pipeOptions ?? PipeOptions.Default);
2628
_requestContext = requestContext;
2729
_responseTask = new TaskCompletionSource<HttpResponseMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
2830
_cancellationToken = cancellationToken;
@@ -42,22 +44,27 @@ internal ResponseContext(HttpRequestMessage requestMessage, RequestContext reque
4244
#endif
4345
}
4446

45-
public void Write(ReadOnlySpan<byte> data)
47+
public ValueTask<FlushResult> WriteAsync(ReadOnlySpan<byte> data)
4648
{
47-
// NOTE: Currently, this method is called from the rust-side thread (tokio-worker-thread),
48-
// so care must be taken because throwing a managed exception will cause a crash.
4949
lock (_writeLock)
5050
{
51-
if (_completed) return;
51+
if (_completed) return default;
52+
53+
WaitForLatestFlush();
5254

5355
var buffer = _pipe.Writer.GetSpan(data.Length);
5456
data.CopyTo(buffer);
5557
_pipe.Writer.Advance(data.Length);
56-
var t = _pipe.Writer.FlushAsync();
57-
if (!t.IsCompleted)
58+
59+
var flush = _pipe.Writer.FlushAsync(_cancellationToken);
60+
if (flush.IsCompleted)
5861
{
59-
t.AsTask().GetAwaiter().GetResult();
62+
_latestFlushTask = null;
63+
return flush;
6064
}
65+
66+
_latestFlushTask = flush.AsTask();
67+
return new ValueTask<FlushResult>(_latestFlushTask);
6168
}
6269
}
6370

@@ -105,6 +112,7 @@ public void Complete()
105112
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Trace($"[ReqSeq:{_requestContext.RequestSequence}] Response completed. (_completed={_completed})");
106113
lock (_writeLock)
107114
{
115+
WaitForLatestFlush();
108116
_pipe.Writer.Complete();
109117
_completed = true;
110118
}
@@ -139,6 +147,7 @@ public void CompleteAsFailed(string errorMessage, uint h2ErrorCode)
139147
}
140148
#endif
141149
_responseTask.TrySetException(ex);
150+
WaitForLatestFlush();
142151
_pipe.Writer.Complete(ex);
143152
_completed = true;
144153
}
@@ -152,6 +161,7 @@ public void Cancel()
152161
{
153162
_requestContext.TryAbort();
154163
_responseTask.TrySetCanceled(_cancellationToken);
164+
WaitForLatestFlush();
155165
_pipe.Writer.Complete(new OperationCanceledException(_cancellationToken));
156166
_completed = true;
157167
}
@@ -177,6 +187,23 @@ public async Task<HttpResponseMessage> GetResponseAsync()
177187
}
178188
}
179189

190+
private void WaitForLatestFlush()
191+
{
192+
// PipeWriter is not thread-safe, so we need to wait for the latest flush task to complete before writing to the pipe.
193+
194+
if (_latestFlushTask is { IsCompleted: false } latestFlushTask)
195+
{
196+
try
197+
{
198+
latestFlushTask.Wait();
199+
}
200+
catch (Exception)
201+
{
202+
// It is safe to ignore an exception thrown by the latest flush task because it will be caught by NativeHttpHandlerCore.OnReceive().
203+
}
204+
}
205+
}
206+
180207
internal static class Http2ErrorCode
181208
{
182209
// https://github.com/dotnet/aspnetcore/blob/release/8.0/src/Shared/ServerInfrastructure/Http2/Http2ErrorCode.cs

0 commit comments

Comments
 (0)