diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp index 2c5813ed57ad..071ed4593d88 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp @@ -35,7 +35,7 @@ namespace { LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); #define LOG_T(s) \ - LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); + LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); class TDqChannelStorageActor : public IDqChannelStorageActor, public NActors::TActorBootstrapped @@ -115,7 +115,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, LOG_T("[TEvGet] blobId: " << msg.BlobId_); auto opBegin = TInstant::Now(); - + auto loadingBlobInfo = TLoadingBlobInfo{std::move(msg.Promise_), opBegin}; LoadingBlobs_.emplace(msg.BlobId_, std::move(loadingBlobInfo)); @@ -203,7 +203,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, // BlobId -> blob size + promise that blob is saved std::unordered_map WritingBlobs_; - + // BlobId -> promise with requested blob std::unordered_map LoadingBlobs_; diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp index 04dab7ff070f..8c2690032d3c 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp @@ -28,7 +28,7 @@ NThreading::TFuture TDqComputeStorage::Put(TC } NThreading::TFuture> TDqComputeStorage::Get(TKey key) { - return GetInternal(key, false); + return Extract(key); } NThreading::TFuture TDqComputeStorage::Delete(TKey key) { @@ -41,15 +41,10 @@ NThreading::TFuture TDqComputeStorage::Delete(TKey key) { } NThreading::TFuture> TDqComputeStorage::Extract(TKey key) { - return GetInternal(key, true); -} - -NThreading::TFuture> TDqComputeStorage::GetInternal(TKey key, bool removeBlobAfterRead) { - auto promise = NThreading::NewPromise>(); auto future = promise.GetFuture(); - ActorSystem_->Send(ComputeStorageActorId_, new TEvGet(key, std::move(promise), removeBlobAfterRead)); + ActorSystem_->Send(ComputeStorageActorId_, new TEvGet(key, std::move(promise))); return future; } diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.h b/ydb/library/yql/dq/actors/spilling/compute_storage.h index 6b7314525c9c..97cb93d5afb9 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.h +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.h @@ -30,8 +30,6 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller NThreading::TFuture Delete(TKey key) override; private: - NThreading::TFuture> GetInternal(TKey key, bool removeBlobAfterRead); - NActors::TActorSystem* ActorSystem_; IDqComputeStorageActor* ComputeStorageActor_; NActors::TActorId ComputeStorageActorId_; diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp index e0c8b3c21cbf..29c86727ba80 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp @@ -41,7 +41,6 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped> BlobPromise; TInstant OpBegin; }; @@ -59,8 +58,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrappedComputeReadTime += opDuration.MilliSeconds(); } - if (blobInfo.RemoveAfterRead) { - UpdateStatsAfterBlobDeletion(msg.Blob.Size(), msg.BlobId); - } + UpdateStatsAfterBlobDeletion(msg.Blob.Size(), msg.BlobId); auto owner = std::make_shared(std::move(msg.Blob)); TChunkedBuffer res(TStringBuf(reinterpret_cast(owner->Data()), owner->Size()), owner); @@ -277,9 +271,9 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped spillingTaskCounters) { return new TDqComputeStorageActor(txId, spillerName, wakeupCallback, errorCallback, spillingTaskCounters); } -} // namespace NYql::NDq +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h index ddd86432f81e..ace0d712a0c3 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h +++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h @@ -27,7 +27,6 @@ struct TDqComputeStorageActorEvents { enum { EvPut = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 30000, EvGet, - EvExtract, EvDelete }; }; @@ -44,16 +43,14 @@ struct TEvPut : NActors::TEventLocal { - TEvGet(IDqComputeStorageActor::TKey key, NThreading::TPromise>&& promise, bool removeBlobAfterRead) + TEvGet(IDqComputeStorageActor::TKey key, NThreading::TPromise>&& promise) : Key_(key) , Promise_(std::move(promise)) - , RemoveBlobAfterRead_(removeBlobAfterRead) { } IDqComputeStorageActor::TKey Key_; NThreading::TPromise> Promise_; - bool RemoveBlobAfterRead_; }; struct TEvDelete : NActors::TEventLocal { @@ -67,7 +64,7 @@ struct TEvDelete : NActors::TEventLocal Promise_; }; -IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, +IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback, TErrorCallback errorCallback, TIntrusivePtr spillingTaskCounters); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spilling.h b/ydb/library/yql/dq/actors/spilling/spilling.h index 767ede0bdd76..920d7f902298 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling.h +++ b/ydb/library/yql/dq/actors/spilling/spilling.h @@ -30,11 +30,10 @@ struct TEvDqSpilling { struct TEvRead : public NActors::TEventLocal { ui64 BlobId; - bool RemoveBlob; TMaybe Timeout; - TEvRead(ui64 blobId, bool removeBlob = false, TMaybe timeout = {}) - : BlobId(blobId), RemoveBlob(removeBlob), Timeout(timeout) {} + TEvRead(ui64 blobId, TMaybe timeout = {}) + : BlobId(blobId), Timeout(timeout) {} }; struct TEvReadResult : public NActors::TEventLocal {