Skip to content

Commit 1afa3ba

Browse files
authored
AsyncDecoding and HttpGateway have been fixed (#9118)
1 parent ca964ba commit 1afa3ba

File tree

3 files changed

+22
-33
lines changed

3 files changed

+22
-33
lines changed

ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ friend class IHTTPGateway;
746746
}
747747

748748
size_t FillHandlers() {
749-
const std::unique_lock lock(Sync);
749+
const std::unique_lock lock(SyncRef());
750750
for (auto it = Streams.cbegin(); Streams.cend() != it;) {
751751
if (const auto& stream = it->lock()) {
752752
const auto streamHandle = stream->GetHandle();
@@ -795,7 +795,7 @@ friend class IHTTPGateway;
795795
TEasyCurl::TPtr easy;
796796
long httpResponseCode = 0L;
797797
{
798-
const std::unique_lock lock(Sync);
798+
const std::unique_lock lock(SyncRef());
799799
if (const auto it = Allocated.find(handle); Allocated.cend() != it) {
800800
easy = std::move(it->second);
801801
TString codeLabel;
@@ -847,7 +847,7 @@ friend class IHTTPGateway;
847847
void Fail(CURLMcode result) {
848848
std::stack<TEasyCurl::TPtr> works;
849849
{
850-
const std::unique_lock lock(Sync);
850+
const std::unique_lock lock(SyncRef());
851851

852852
for (auto& item : Allocated) {
853853
works.emplace(std::move(item.second));
@@ -868,7 +868,7 @@ friend class IHTTPGateway;
868868
void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, TRetryPolicy::TPtr retryPolicy) final {
869869
Rps->Inc();
870870

871-
const std::unique_lock lock(Sync);
871+
const std::unique_lock lock(SyncRef());
872872
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
873873
Await.emplace(std::move(easy));
874874
Wakeup(0U);
@@ -877,7 +877,7 @@ friend class IHTTPGateway;
877877
void Delete(TString url, THeaders headers, TOnResult callback, TRetryPolicy::TPtr retryPolicy) final {
878878
Rps->Inc();
879879

880-
const std::unique_lock lock(Sync);
880+
const std::unique_lock lock(SyncRef());
881881
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::DELETE, 0, std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
882882
Await.emplace(std::move(easy));
883883
Wakeup(0U);
@@ -898,7 +898,7 @@ friend class IHTTPGateway;
898898
callback(TResult(CURLE_OK, TIssues{error}));
899899
return;
900900
}
901-
const std::unique_lock lock(Sync);
901+
const std::unique_lock lock(SyncRef());
902902
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
903903
Await.emplace(std::move(easy));
904904
Wakeup(sizeLimit);
@@ -915,13 +915,14 @@ friend class IHTTPGateway;
915915
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
916916
{
917917
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList());
918-
const std::unique_lock lock(Sync);
918+
const std::unique_lock lock(SyncRef());
919919
const auto handle = stream->GetHandle();
920920
TEasyCurlStream::TWeakPtr weak = stream;
921921
Streams.emplace_back(stream);
922922
Allocated.emplace(handle, std::move(stream));
923923
Wakeup(0ULL);
924-
return [weak](TIssue issue) {
924+
return [weak, sync=Sync](TIssue issue) {
925+
const std::unique_lock lock(*sync);
925926
if (const auto& stream = weak.lock())
926927
stream->Cancel(issue);
927928
};
@@ -932,7 +933,7 @@ friend class IHTTPGateway;
932933
}
933934

934935
void OnRetry(TEasyCurlBuffer::TPtr easy) {
935-
const std::unique_lock lock(Sync);
936+
const std::unique_lock lock(SyncRef());
936937
const size_t sizeLimit = easy->GetSizeLimit();
937938
Await.emplace(std::move(easy));
938939
Wakeup(sizeLimit);
@@ -950,6 +951,10 @@ friend class IHTTPGateway;
950951
}
951952

952953
private:
954+
std::mutex& SyncRef() {
955+
return *Sync;
956+
}
957+
953958
CURLM* Handle = nullptr;
954959

955960
std::queue<TEasyCurlBuffer::TPtr> Await;
@@ -959,7 +964,7 @@ friend class IHTTPGateway;
959964
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
960965
std::priority_queue<std::pair<TInstant, TEasyCurlBuffer::TPtr>> Delayed;
961966

962-
std::mutex Sync;
967+
std::shared_ptr<std::mutex> Sync = std::make_shared<std::mutex>();
963968
std::thread Thread;
964969
std::atomic<bool> IsStopped = false;
965970

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ struct TReadSpec {
167167
NDB::ColumnsWithTypeAndName CHColumns;
168168
std::shared_ptr<arrow::Schema> ArrowSchema;
169169
NDB::FormatSettings Settings;
170-
TString Format, Compression;
170+
// It's very important to keep here std::string instead of TString
171+
// because of the cast from TString to std::string is using the MutRef (it isn't thread-safe).
172+
// This behaviour can be found in the getInputFormat call
173+
std::string Format;
174+
TString Compression;
171175
ui64 SizeLimit = 0;
172176
ui32 BlockLengthPosition = 0;
173177
std::vector<ui32> ColumnReorder;
@@ -1375,12 +1379,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
13751379
DecodedChunkSizeHist,
13761380
HttpInflightSize,
13771381
HttpDataRps,
1378-
DeferredQueueSize,
1379-
ReadSpec->Format,
1380-
ReadSpec->Compression,
1381-
ReadSpec->ArrowSchema,
1382-
ReadSpec->RowSpec,
1383-
ReadSpec->Settings
1382+
DeferredQueueSize
13841383
);
13851384

13861385
if (!UseRuntimeListing) {

ydb/library/yql/providers/s3/common/source_context.h

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,7 @@ struct TSourceContext {
3434
, NMonitoring::THistogramPtr decodedChunkSizeHist
3535
, NMonitoring::TDynamicCounters::TCounterPtr httpInflightSize
3636
, NMonitoring::TDynamicCounters::TCounterPtr httpDataRps
37-
, NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize
38-
, const TString format
39-
, const TString compression
40-
, std::shared_ptr<arrow::Schema> schema
41-
, std::unordered_map<TStringBuf, NKikimr::NMiniKQL::TType*, THash<TStringBuf>> rowTypes
42-
, NDB::FormatSettings settings)
37+
, NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize)
4338
: SourceId(sourceId)
4439
, Limit(limit)
4540
, ActorSystem(actorSystem)
@@ -54,11 +49,6 @@ struct TSourceContext {
5449
, HttpInflightSize(httpInflightSize)
5550
, HttpDataRps(httpDataRps)
5651
, DeferredQueueSize(deferredQueueSize)
57-
, Format(format)
58-
, Compression(compression)
59-
, Schema(schema)
60-
, RowTypes(rowTypes)
61-
, Settings(settings)
6252
{
6353
}
6454

@@ -105,11 +95,6 @@ struct TSourceContext {
10595
NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
10696
NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
10797
NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
108-
const TString Format;
109-
const TString Compression;
110-
std::shared_ptr<arrow::Schema> Schema;
111-
std::unordered_map<TStringBuf, NKikimr::NMiniKQL::TType*, THash<TStringBuf>> RowTypes;
112-
NDB::FormatSettings Settings;
11398
private:
11499
std::atomic_uint64_t Value;
115100
std::mutex Mutex;

0 commit comments

Comments
 (0)