Skip to content

Commit 10ccd9f

Browse files
committed
quic: For streams allow ReadableStream as body
After the change the body parameter for all created QuicStreams can take a ReadableStream of Uint8Array, Arraybuffers etc.. We introduce a DataQueueFeeder class, that may be also used for other related mechanisms. A locally created unidirectional Stream can not have a reader. Therefore the interface is changed to handle this case. (undercovered when writing the tests). Furthermore, a ResumeStream must be added After AddStream in QuicSession, as the ResumeStream beforehand triggered with set_outbound is a no-op, as Stream was not added to Session beforehand. Fixes: #60234
1 parent 3ac88a7 commit 10ccd9f

File tree

10 files changed

+482
-9
lines changed

10 files changed

+482
-9
lines changed

doc/api/quic.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ added: v23.8.0
475475
-->
476476

477477
* `options` {Object}
478-
* `body` {ArrayBuffer | ArrayBufferView | Blob}
478+
* `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream<Uint8Array>}
479479
* `sendOrder` {number}
480480
* Returns: {Promise} for a {quic.QuicStream}
481481

@@ -489,7 +489,7 @@ added: v23.8.0
489489
-->
490490

491491
* `options` {Object}
492-
* `body` {ArrayBuffer | ArrayBufferView | Blob}
492+
* `body` {ArrayBuffer | ArrayBufferView | Blob | ReadableStream<Uint8Array>}
493493
* `sendOrder` {number}
494494
* Returns: {Promise} for a {quic.QuicStream}
495495

@@ -820,7 +820,7 @@ The callback to invoke when the stream is reset. Read/write.
820820
added: v23.8.0
821821
-->
822822

823-
* Type: {ReadableStream}
823+
* Type: {ReadableStream | undefined}
824824

825825
### `stream.session`
826826

lib/internal/quic/quic.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const {
3434
Endpoint: Endpoint_,
3535
Http3Application: Http3,
3636
setCallbacks,
37+
DataQueueFeeder,
3738

3839
// The constants to be exposed to end users for various options.
3940
CC_ALGO_RENO_STR: CC_ALGO_RENO,
@@ -114,6 +115,10 @@ const {
114115
buildNgHeaderString,
115116
} = require('internal/http2/util');
116117

118+
const {
119+
isReadableStream,
120+
} = require('internal/webstreams/readablestream');
121+
117122
const kEmptyObject = { __proto__: null };
118123

119124
const {
@@ -546,6 +551,37 @@ setCallbacks({
546551
function validateBody(body) {
547552
// TODO(@jasnell): Support streaming sources
548553
if (body === undefined) return body;
554+
if (isReadableStream(body)) {
555+
const feeder = new DataQueueFeeder();
556+
const reader = body.getReader();
557+
558+
const feeding = async () => {
559+
await feeder.ready();
560+
let cont = true;
561+
562+
while (cont) {
563+
let read;
564+
try {
565+
read = await reader.read();
566+
} catch (error) {
567+
feeder.error(error);
568+
}
569+
const { value, done } = read;
570+
try {
571+
cont = await feeder.submit(value, done);
572+
} catch (error) {
573+
reader.cancel(error.toString());
574+
break;
575+
}
576+
}
577+
if (!cont) {
578+
reader.releaseLock();
579+
}
580+
581+
};
582+
feeding();
583+
return feeder;
584+
}
549585
// Transfer ArrayBuffers...
550586
if (isArrayBuffer(body)) {
551587
return ArrayBufferPrototypeTransfer(body);
@@ -578,6 +614,7 @@ function validateBody(body) {
578614
'ArrayBuffer',
579615
'ArrayBufferView',
580616
'Blob',
617+
'ReadableStream',
581618
], body);
582619
}
583620

src/dataqueue/queue.cc

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include <memory>
1919
#include <vector>
2020

21+
#include "../quic/streams.h"
22+
2123
namespace node {
2224

2325
using v8::ArrayBufferView;
@@ -1061,9 +1063,81 @@ class FdEntry final : public EntryImpl {
10611063
friend class ReaderImpl;
10621064
};
10631065

1066+
} // namespace
1067+
// ============================================================================
1068+
1069+
class FeederEntry final : public EntryImpl {
1070+
public:
1071+
FeederEntry(DataQueueFeeder* feeder) : feeder_(feeder) {
1072+
}
1073+
1074+
static std::unique_ptr<FeederEntry> Create(DataQueueFeeder* feeder) {
1075+
return std::make_unique<FeederEntry>(feeder);
1076+
}
1077+
1078+
std::shared_ptr<DataQueue::Reader> get_reader() override {
1079+
return ReaderImpl::Create(this);
1080+
}
1081+
1082+
std::unique_ptr<Entry> slice(
1083+
uint64_t start, std::optional<uint64_t> end = std::nullopt) override {
1084+
// we are not idempotent
1085+
return std::unique_ptr<Entry>(nullptr);
1086+
}
1087+
1088+
std::optional<uint64_t> size() const override {
1089+
return std::optional<uint64_t>();
1090+
}
1091+
1092+
bool is_idempotent() const override { return false; }
1093+
1094+
SET_NO_MEMORY_INFO()
1095+
SET_MEMORY_INFO_NAME(FeederEntry)
1096+
SET_SELF_SIZE(FeederEntry)
1097+
1098+
private:
1099+
DataQueueFeeder* feeder_;
1100+
1101+
class ReaderImpl final : public DataQueue::Reader,
1102+
public std::enable_shared_from_this<ReaderImpl> {
1103+
public:
1104+
static std::shared_ptr<ReaderImpl> Create(FeederEntry* entry) {
1105+
return std::make_shared<ReaderImpl>(entry);
1106+
};
1107+
1108+
explicit ReaderImpl(FeederEntry* entry) : entry_(entry) {
1109+
}
1110+
1111+
~ReaderImpl() {
1112+
entry_->feeder_->DrainAndClose();
1113+
}
1114+
1115+
int Pull(Next next,
1116+
int options,
1117+
DataQueue::Vec* data,
1118+
size_t count,
1119+
size_t max_count_hint = bob::kMaxCountHint) override {
1120+
if (entry_->feeder_->Done()) {
1121+
std::move(next)(bob::STATUS_EOS, nullptr, 0, [](uint64_t) {});
1122+
return bob::STATUS_EOS;
1123+
}
1124+
entry_->feeder_->addPendingPull(
1125+
DataQueueFeeder::PendingPull(std::move(next)));
1126+
entry_->feeder_->tryWakePulls();
1127+
return bob::STATUS_WAIT;
1128+
}
1129+
1130+
SET_NO_MEMORY_INFO()
1131+
SET_MEMORY_INFO_NAME(FeederEntry::Reader)
1132+
SET_SELF_SIZE(ReaderImpl)
1133+
1134+
private:
1135+
FeederEntry* entry_;
1136+
};
1137+
};
1138+
10641139
// ============================================================================
10651140

1066-
} // namespace
10671141

10681142
std::shared_ptr<DataQueue> DataQueue::CreateIdempotent(
10691143
std::vector<std::unique_ptr<Entry>> list) {
@@ -1137,6 +1211,11 @@ std::unique_ptr<DataQueue::Entry> DataQueue::CreateFdEntry(Environment* env,
11371211
return FdEntry::Create(env, path);
11381212
}
11391213

1214+
std::unique_ptr<DataQueue::Entry> DataQueue::CreateFeederEntry(
1215+
DataQueueFeeder* feeder) {
1216+
return FeederEntry::Create(feeder);
1217+
}
1218+
11401219
void DataQueue::Initialize(Environment* env, v8::Local<v8::Object> target) {
11411220
// Nothing to do here currently.
11421221
}

src/dataqueue/queue.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include <vector>
1717

1818
namespace node {
19+
using v8::Local;
20+
using v8::Value;
1921

2022
// Represents a sequenced collection of data sources that can be
2123
// consumed as a single logical stream of data. Sources can be
@@ -124,6 +126,8 @@ namespace node {
124126
// For non-idempotent DataQueues, only a single reader is ever allowed for
125127
// the DataQueue, and the data can only ever be read once.
126128

129+
class DataQueueFeeder;
130+
127131
class DataQueue : public MemoryRetainer {
128132
public:
129133
struct Vec {
@@ -224,6 +228,8 @@ class DataQueue : public MemoryRetainer {
224228
static std::unique_ptr<Entry> CreateFdEntry(Environment* env,
225229
v8::Local<v8::Value> path);
226230

231+
static std::unique_ptr<Entry> CreateFeederEntry(DataQueueFeeder* feeder);
232+
227233
// Creates a Reader for the given queue. If the queue is idempotent,
228234
// any number of readers can be created, all of which are guaranteed
229235
// to provide the same data. Otherwise, only a single reader is

src/quic/bindingdata.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
#include <unordered_map>
1616
#include "defs.h"
1717

18-
namespace node::quic {
18+
namespace node {
19+
class DataQueueFeeder;
20+
namespace quic {
1921

2022
class Endpoint;
2123
class Packet;
@@ -24,6 +26,7 @@ class Packet;
2426

2527
// The FunctionTemplates the BindingData will store for us.
2628
#define QUIC_CONSTRUCTORS(V) \
29+
V(dataqueuefeeder) \
2730
V(endpoint) \
2831
V(http3application) \
2932
V(logstream) \
@@ -68,6 +71,7 @@ class Packet;
6871
V(ciphers, "ciphers") \
6972
V(crl, "crl") \
7073
V(cubic, "cubic") \
74+
V(dataqueuefeeder, "DataQueueFeeder") \
7175
V(disable_stateless_reset, "disableStatelessReset") \
7276
V(enable_connect_protocol, "enableConnectProtocol") \
7377
V(enable_datagrams, "enableDatagrams") \
@@ -264,6 +268,7 @@ struct CallbackScope final : public CallbackScopeBase {
264268
explicit CallbackScope(T* ptr) : CallbackScopeBase(ptr->env()), ref(ptr) {}
265269
};
266270

267-
} // namespace node::quic
271+
} // namespace quic
272+
} // namespace node
268273

269274
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

src/quic/quic.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ void CreatePerIsolateProperties(IsolateData* isolate_data,
2727
Endpoint::InitPerIsolate(isolate_data, target);
2828
Session::InitPerIsolate(isolate_data, target);
2929
Stream::InitPerIsolate(isolate_data, target);
30+
DataQueueFeeder::InitPerIsolate(isolate_data, target);
3031
}
3132

3233
void CreatePerContextProperties(Local<Object> target,
@@ -38,13 +39,15 @@ void CreatePerContextProperties(Local<Object> target,
3839
Endpoint::InitPerContext(realm, target);
3940
Session::InitPerContext(realm, target);
4041
Stream::InitPerContext(realm, target);
42+
DataQueueFeeder::InitPerContext(realm, target);
4143
}
4244

4345
void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
4446
BindingData::RegisterExternalReferences(registry);
4547
Endpoint::RegisterExternalReferences(registry);
4648
Session::RegisterExternalReferences(registry);
4749
Stream::RegisterExternalReferences(registry);
50+
DataQueueFeeder::RegisterExternalReferences(registry);
4851
}
4952

5053
} // namespace quic

src/quic/session.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,6 +1940,8 @@ BaseObjectPtr<Stream> Session::CreateStream(
19401940
if (auto stream = Stream::Create(this, id, std::move(data_source)))
19411941
[[likely]] {
19421942
AddStream(stream, option);
1943+
ResumeStream(id); // ok, we need to resume, as the Resume before fails
1944+
// as the stream was not added yet
19431945
return stream;
19441946
}
19451947
return {};

0 commit comments

Comments
 (0)