Skip to content

Commit e5abf8a

Browse files
tlj77facebook-github-bot
authored andcommitted
Add visibility rule for TPEH stream events listeners
Summary: ^ Insert another layer of blocker blocking outside users. Reviewed By: sazonovkirill Differential Revision: D75272079 fbshipit-source-id: 53d03fa48f892aa18f3f03c24fa0a20b7bce1a4a
1 parent 4668264 commit e5abf8a

File tree

3 files changed

+108
-46
lines changed

3 files changed

+108
-46
lines changed

third-party/thrift/src/thrift/lib/cpp/ContextStack.cpp

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <folly/tracing/StaticTracepoint.h>
2121

22+
#include <thrift/lib/cpp/StreamEventHandler.h>
2223
#include <thrift/lib/cpp2/async/InterceptorFlags.h>
2324
#include <thrift/lib/cpp2/detail/EventHandlerRuntime.h>
2425
#include <thrift/lib/cpp2/server/Cpp2ConnContext.h>
@@ -369,7 +370,9 @@ void ContextStack::onStreamSubscribe() {
369370

370371
if (handlers_) {
371372
for (size_t i = 0; i < handlers_->size(); i++) {
372-
(*handlers_)[i]->onStreamSubscribe(contextAt(i));
373+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
374+
streamEventHandler->onStreamSubscribe(contextAt(i));
375+
}
373376
}
374377
}
375378
}
@@ -382,7 +385,9 @@ void ContextStack::onStreamCredit(uint32_t credits) {
382385
methodNamePrefixed_);
383386
if (handlers_) {
384387
for (size_t i = 0; i < handlers_->size(); i++) {
385-
(*handlers_)[i]->onStreamCredit(contextAt(i), credits);
388+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
389+
streamEventHandler->onStreamCredit(contextAt(i), credits);
390+
}
386391
}
387392
}
388393
}
@@ -395,7 +400,9 @@ void ContextStack::onStreamNext() {
395400
methodNamePrefixed_);
396401
if (handlers_) {
397402
for (size_t i = 0; i < handlers_->size(); i++) {
398-
(*handlers_)[i]->onStreamNext(contextAt(i));
403+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
404+
streamEventHandler->onStreamNext(contextAt(i));
405+
}
399406
}
400407
}
401408
}
@@ -409,7 +416,9 @@ void ContextStack::onStreamPauseReceive() {
409416

410417
if (handlers_) {
411418
for (size_t i = 0; i < handlers_->size(); i++) {
412-
(*handlers_)[i]->onStreamPauseReceive(contextAt(i));
419+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
420+
streamEventHandler->onStreamPauseReceive(contextAt(i));
421+
}
413422
}
414423
}
415424
}
@@ -423,7 +432,9 @@ void ContextStack::onStreamResumeReceive() {
423432

424433
if (handlers_) {
425434
for (size_t i = 0; i < handlers_->size(); i++) {
426-
(*handlers_)[i]->onStreamResumeReceive(contextAt(i));
435+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
436+
streamEventHandler->onStreamResumeReceive(contextAt(i));
437+
}
427438
}
428439
}
429440
}
@@ -438,7 +449,9 @@ void ContextStack::handleStreamErrorWrapped(
438449

439450
if (handlers_) {
440451
for (size_t i = 0; i < handlers_->size(); i++) {
441-
(*handlers_)[i]->handleStreamErrorWrapped(contextAt(i), ew);
452+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
453+
streamEventHandler->handleStreamErrorWrapped(contextAt(i), ew);
454+
}
442455
}
443456
}
444457
}
@@ -452,7 +465,9 @@ void ContextStack::onStreamFinally(details::STREAM_ENDING_TYPES endReason) {
452465

453466
if (handlers_) {
454467
for (size_t i = 0; i < handlers_->size(); i++) {
455-
(*handlers_)[i]->onStreamFinally(contextAt(i), endReason);
468+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
469+
streamEventHandler->onStreamFinally(contextAt(i), endReason);
470+
}
456471
}
457472
}
458473
}
@@ -465,7 +480,9 @@ void ContextStack::onSinkSubscribe() {
465480
methodNamePrefixed_);
466481
if (handlers_) {
467482
for (size_t i = 0; i < handlers_->size(); i++) {
468-
(*handlers_)[i]->onSinkSubscribe(contextAt(i));
483+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
484+
streamEventHandler->onSinkSubscribe(contextAt(i));
485+
}
469486
}
470487
}
471488
}
@@ -478,7 +495,9 @@ void ContextStack::onSinkNext() {
478495
methodNamePrefixed_);
479496
if (handlers_) {
480497
for (size_t i = 0; i < handlers_->size(); i++) {
481-
(*handlers_)[i]->onSinkNext(contextAt(i));
498+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
499+
streamEventHandler->onSinkNext(contextAt(i));
500+
}
482501
}
483502
}
484503
}
@@ -491,7 +510,9 @@ void ContextStack::onSinkCancel() {
491510
methodNamePrefixed_);
492511
if (handlers_) {
493512
for (size_t i = 0; i < handlers_->size(); i++) {
494-
(*handlers_)[i]->onSinkCancel(contextAt(i));
513+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
514+
streamEventHandler->onSinkCancel(contextAt(i));
515+
}
495516
}
496517
}
497518
}
@@ -504,7 +525,9 @@ void ContextStack::onSinkCredit(uint32_t credits) {
504525
methodNamePrefixed_);
505526
if (handlers_) {
506527
for (size_t i = 0; i < handlers_->size(); i++) {
507-
(*handlers_)[i]->onSinkCredit(contextAt(i), credits);
528+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
529+
streamEventHandler->onSinkCredit(contextAt(i), credits);
530+
}
508531
}
509532
}
510533
}
@@ -518,7 +541,9 @@ void ContextStack::onSinkFinally(details::SINK_ENDING_TYPES endReason) {
518541

519542
if (handlers_) {
520543
for (size_t i = 0; i < handlers_->size(); i++) {
521-
(*handlers_)[i]->onSinkFinally(contextAt(i), endReason);
544+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
545+
streamEventHandler->onSinkFinally(contextAt(i), endReason);
546+
}
522547
}
523548
}
524549
}
@@ -532,7 +557,9 @@ void ContextStack::handleSinkError(const folly::exception_wrapper& ew) {
532557

533558
if (handlers_) {
534559
for (size_t i = 0; i < handlers_->size(); i++) {
535-
(*handlers_)[i]->handleSinkError(contextAt(i), ew);
560+
if (auto* streamEventHandler = (*handlers_)[i]->getStreamEventHandler()) {
561+
streamEventHandler->handleSinkError(contextAt(i), ew);
562+
}
536563
}
537564
}
538565
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <folly/ExceptionWrapper.h>
20+
21+
namespace apache::thrift {
22+
namespace details {
23+
enum class STREAM_ENDING_TYPES {
24+
COMPLETE = 0,
25+
ERROR = 1,
26+
CANCEL = 2,
27+
};
28+
29+
enum class SINK_ENDING_TYPES {
30+
COMPLETE = 0,
31+
COMPLETE_WITH_ERROR = 1,
32+
ERROR = 2,
33+
};
34+
} // namespace details
35+
36+
// EXPERIMENTAL: DO NOT USE WITHOUT TALKING TO THRIFT TEAM
37+
class StreamEventHandler {
38+
public:
39+
virtual ~StreamEventHandler() {}
40+
virtual void onStreamSubscribe(void*) {}
41+
virtual void onStreamNext(void*) {}
42+
virtual void onStreamCredit(void*, uint32_t) {}
43+
virtual void onStreamPauseReceive(void*) {}
44+
virtual void onStreamResumeReceive(void*) {}
45+
virtual void handleStreamErrorWrapped(
46+
void*, const folly::exception_wrapper&) {}
47+
virtual void onStreamFinally(void*, details::STREAM_ENDING_TYPES) {}
48+
49+
virtual void onSinkSubscribe(void*) {}
50+
virtual void onSinkNext(void*) {}
51+
virtual void onSinkCancel(void*) {}
52+
virtual void onSinkCredit(void*, uint32_t) {}
53+
virtual void handleSinkError(void*, const folly::exception_wrapper&) {}
54+
virtual void onSinkFinally(void*, details::SINK_ENDING_TYPES) {}
55+
};
56+
57+
} // namespace apache::thrift

third-party/thrift/src/thrift/lib/cpp/TProcessorEventHandler.h

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,11 @@
2222
#include <folly/Demangle.h>
2323
#include <folly/ExceptionWrapper.h>
2424
#include <thrift/lib/cpp/SerializedMessage.h>
25+
#include <thrift/lib/cpp/StreamEventHandler.h>
2526
#include <thrift/lib/cpp/server/TConnectionContext.h>
2627
#include <thrift/lib/cpp/transport/THeader.h>
2728

2829
namespace apache::thrift {
29-
namespace details {
30-
enum class STREAM_ENDING_TYPES {
31-
COMPLETE = 0,
32-
ERROR = 1,
33-
CANCEL = 2,
34-
};
35-
36-
enum class SINK_ENDING_TYPES {
37-
COMPLETE = 0,
38-
COMPLETE_WITH_ERROR = 1,
39-
ERROR = 2,
40-
};
41-
} // namespace details
4230

4331
using server::TConnectionContext;
4432

@@ -197,28 +185,18 @@ class TProcessorEventHandler {
197185
return userException(ctx, fn_name, type, what);
198186
}
199187

200-
// Experimental: callbacks for stream events, please do not use without asking
201-
// help from Thrift team.
202-
virtual void onStreamSubscribe(void*) {}
203-
virtual void onStreamNext(void*) {}
204-
virtual void onStreamCredit(void*, uint32_t) {}
205-
virtual void onStreamPauseReceive(void*) {}
206-
virtual void onStreamResumeReceive(void*) {}
207-
virtual void handleStreamErrorWrapped(
208-
void*, const folly::exception_wrapper&) {}
209-
virtual void onStreamFinally(void*, details::STREAM_ENDING_TYPES) {}
210-
211-
// Experimental: callbacks for sink events, please do not use without asking
212-
// help from Thrift team.
213-
virtual void onSinkSubscribe(void*) {}
214-
virtual void onSinkNext(void*) {}
215-
virtual void onSinkCancel(void*) {}
216-
virtual void onSinkCredit(void*, uint32_t) {}
217-
virtual void handleSinkError(void*, const folly::exception_wrapper&) {}
218-
virtual void onSinkFinally(void*, details::SINK_ENDING_TYPES) {}
188+
StreamEventHandler* getStreamEventHandler() {
189+
return streamEventHandler_.get();
190+
}
219191

220192
protected:
221-
TProcessorEventHandler() {}
193+
TProcessorEventHandler() : streamEventHandler_(nullptr) {}
194+
explicit TProcessorEventHandler(
195+
std::unique_ptr<StreamEventHandler> streamEventHandler)
196+
: streamEventHandler_(std::move(streamEventHandler)) {}
197+
198+
private:
199+
std::unique_ptr<StreamEventHandler> streamEventHandler_;
222200
};
223201

224202
/**

0 commit comments

Comments
 (0)