Skip to content

Delete spilled channel BLOBs after read #20687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace {
LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s);

#define LOG_T(s) \
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s);
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s);

class TDqChannelStorageActor : public IDqChannelStorageActor,
public NActors::TActorBootstrapped<TDqChannelStorageActor>
Expand Down Expand Up @@ -115,7 +115,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
LOG_T("[TEvGet] blobId: " << msg.BlobId_);

auto opBegin = TInstant::Now();

auto loadingBlobInfo = TLoadingBlobInfo{std::move(msg.Promise_), opBegin};
LoadingBlobs_.emplace(msg.BlobId_, std::move(loadingBlobInfo));

Expand Down Expand Up @@ -203,7 +203,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,

// BlobId -> blob size + promise that blob is saved
std::unordered_map<ui64, TWritingBlobInfo> WritingBlobs_;

// BlobId -> promise with requested blob
std::unordered_map<ui64, TLoadingBlobInfo> LoadingBlobs_;

Expand Down
9 changes: 2 additions & 7 deletions ydb/library/yql/dq/actors/spilling/compute_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ NThreading::TFuture<NKikimr::NMiniKQL::ISpiller::TKey> TDqComputeStorage::Put(TC
}

NThreading::TFuture<std::optional<TChunkedBuffer>> TDqComputeStorage::Get(TKey key) {
return GetInternal(key, false);
return Extract(key);
}

NThreading::TFuture<void> TDqComputeStorage::Delete(TKey key) {
Expand All @@ -41,15 +41,10 @@ NThreading::TFuture<void> TDqComputeStorage::Delete(TKey key) {
}

NThreading::TFuture<std::optional<TChunkedBuffer>> TDqComputeStorage::Extract(TKey key) {
return GetInternal(key, true);
}

NThreading::TFuture<std::optional<TChunkedBuffer>> TDqComputeStorage::GetInternal(TKey key, bool removeBlobAfterRead) {

auto promise = NThreading::NewPromise<std::optional<TChunkedBuffer>>();
auto future = promise.GetFuture();

ActorSystem_->Send(ComputeStorageActorId_, new TEvGet(key, std::move(promise), removeBlobAfterRead));
ActorSystem_->Send(ComputeStorageActorId_, new TEvGet(key, std::move(promise)));
return future;
}

Expand Down
2 changes: 0 additions & 2 deletions ydb/library/yql/dq/actors/spilling/compute_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller
NThreading::TFuture<void> Delete(TKey key) override;

private:
NThreading::TFuture<std::optional<TChunkedBuffer>> GetInternal(TKey key, bool removeBlobAfterRead);

NActors::TActorSystem* ActorSystem_;
IDqComputeStorageActor* ComputeStorageActor_;
NActors::TActorId ComputeStorageActorId_;
Expand Down
20 changes: 7 additions & 13 deletions ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
};

struct TLoadingBlobInfo {
bool RemoveAfterRead;
NThreading::TPromise<std::optional<TChunkedBuffer>> BlobPromise;
TInstant OpBegin;
};
Expand All @@ -59,8 +58,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
}

void Bootstrap() {
auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, SpillerName_,
SelfId(), false);
auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, SpillerName_, SelfId(), true);
SpillingActorId_ = Register(spillingActor);
Become(&TDqComputeStorageActor::WorkState);
}
Expand Down Expand Up @@ -131,12 +129,10 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
return;
}

bool removeBlobAfterRead = msg.RemoveBlobAfterRead_;

auto loadingBlobInfo = TLoadingBlobInfo{removeBlobAfterRead, std::move(msg.Promise_), opBegin};
auto loadingBlobInfo = TLoadingBlobInfo{std::move(msg.Promise_), opBegin};
LoadingBlobs_.emplace(msg.Key_, std::move(loadingBlobInfo));

SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, removeBlobAfterRead));
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_));
}

void HandleWork(TEvDelete::TPtr& ev) {
Expand All @@ -149,7 +145,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor

DeletingBlobs_.emplace(msg.Key_, std::move(msg.Promise_));

SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, true));
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_));
}

void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
Expand Down Expand Up @@ -209,9 +205,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
SpillingTaskCounters_->ComputeReadTime += opDuration.MilliSeconds();
}

if (blobInfo.RemoveAfterRead) {
UpdateStatsAfterBlobDeletion(msg.Blob.Size(), msg.BlobId);
}
UpdateStatsAfterBlobDeletion(msg.Blob.Size(), msg.BlobId);

auto owner = std::make_shared<TBuffer>(std::move(msg.Blob));
TChunkedBuffer res(TStringBuf(reinterpret_cast<const char*>(owner->Data()), owner->Size()), owner);
Expand Down Expand Up @@ -277,9 +271,9 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor

} // anonymous namespace

IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback,
IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback,
TErrorCallback errorCallback, TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters) {
return new TDqComputeStorageActor(txId, spillerName, wakeupCallback, errorCallback, spillingTaskCounters);
}

} // namespace NYql::NDq
} // namespace NYql::NDq
7 changes: 2 additions & 5 deletions ydb/library/yql/dq/actors/spilling/compute_storage_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct TDqComputeStorageActorEvents {
enum {
EvPut = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 30000,
EvGet,
EvExtract,
EvDelete
};
};
Expand All @@ -44,16 +43,14 @@ struct TEvPut : NActors::TEventLocal<TEvPut, TDqComputeStorageActorEvents::EvPut
};

struct TEvGet : NActors::TEventLocal<TEvGet, TDqComputeStorageActorEvents::EvGet> {
TEvGet(IDqComputeStorageActor::TKey key, NThreading::TPromise<std::optional<TChunkedBuffer>>&& promise, bool removeBlobAfterRead)
TEvGet(IDqComputeStorageActor::TKey key, NThreading::TPromise<std::optional<TChunkedBuffer>>&& promise)
: Key_(key)
, Promise_(std::move(promise))
, RemoveBlobAfterRead_(removeBlobAfterRead)
{
}

IDqComputeStorageActor::TKey Key_;
NThreading::TPromise<std::optional<TChunkedBuffer>> Promise_;
bool RemoveBlobAfterRead_;
};

struct TEvDelete : NActors::TEventLocal<TEvDelete, TDqComputeStorageActorEvents::EvDelete> {
Expand All @@ -67,7 +64,7 @@ struct TEvDelete : NActors::TEventLocal<TEvDelete, TDqComputeStorageActorEvents:
NThreading::TPromise<void> Promise_;
};

IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback,
IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, TWakeUpCallback wakeupCallback,
TErrorCallback errorCallback, TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters);

} // namespace NYql::NDq
5 changes: 2 additions & 3 deletions ydb/library/yql/dq/actors/spilling/spilling.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ struct TEvDqSpilling {

struct TEvRead : public NActors::TEventLocal<TEvRead, TDqSpillingEvents::EvRead> {
ui64 BlobId;
bool RemoveBlob;
TMaybe<TDuration> Timeout;

TEvRead(ui64 blobId, bool removeBlob = false, TMaybe<TDuration> timeout = {})
: BlobId(blobId), RemoveBlob(removeBlob), Timeout(timeout) {}
TEvRead(ui64 blobId, TMaybe<TDuration> timeout = {})
: BlobId(blobId), Timeout(timeout) {}
};

struct TEvReadResult : public NActors::TEventLocal<TEvReadResult, TDqSpillingEvents::EvReadResult> {
Expand Down
Loading