Skip to content

Commit 4999b7e

Browse files
authored
YQL: Load binaries from YT binary cache (#11295)
1 parent 3c5b82e commit 4999b7e

File tree

6 files changed

+98
-25
lines changed

6 files changed

+98
-25
lines changed

ydb/library/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
391391
REGISTER_SETTING(*this, LLVMNodeCountLimit);
392392
REGISTER_SETTING(*this, SamplingIoBlockSize);
393393
REGISTER_SETTING(*this, BinaryTmpFolder);
394+
REGISTER_SETTING(*this, BinaryCacheFolder);
394395
REGISTER_SETTING(*this, BinaryExpirationInterval);
395396
REGISTER_SETTING(*this, FolderInlineDataLimit);
396397
REGISTER_SETTING(*this, FolderInlineItemsLimit);

ydb/library/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ struct TYtSettings {
101101
NCommon::TConfSetting<TString, false> DefaultCluster;
102102
NCommon::TConfSetting<TString, false> StaticPool;
103103
NCommon::TConfSetting<TString, false> BinaryTmpFolder;
104+
NCommon::TConfSetting<TString, false> BinaryCacheFolder;
104105
NCommon::TConfSetting<TDuration, false> BinaryExpirationInterval;
105106
NCommon::TConfSetting<bool, false> IgnoreTypeV3;
106107
NCommon::TConfSetting<bool, false> _UseMultisetAttributes;

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,40 @@ std::pair<TString, NYT::TTransactionId> TTransactionCache::TEntry::GetBinarySnap
327327
return std::make_pair(snapshotPath, snapshotTx->GetId());
328328
}
329329

330+
TMaybe<std::pair<TString, NYT::TTransactionId>> TTransactionCache::TEntry::GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName) {
331+
if (binaryCacheFolder.StartsWith(NYT::TConfig::Get()->Prefix)) {
332+
binaryCacheFolder = binaryCacheFolder.substr(NYT::TConfig::Get()->Prefix.size());
333+
}
334+
YQL_ENSURE(md5.size() > 4);
335+
TString remotePath = TFsPath(binaryCacheFolder) / md5.substr(0, 2) / md5.substr(2, 2) / md5;
336+
337+
ITransactionPtr snapshotTx;
338+
with_lock(Lock_) {
339+
if (!BinarySnapshotTx) {
340+
BinarySnapshotTx = Client->StartTransaction(TStartTransactionOptions().Attributes(TransactionSpec));
341+
}
342+
snapshotTx = BinarySnapshotTx;
343+
if (auto p = BinarySnapshots.FindPtr(remotePath)) {
344+
return std::make_pair(*p, snapshotTx->GetId());
345+
}
346+
}
347+
TString snapshotPath;
348+
try {
349+
NYT::ILockPtr fileLock = snapshotTx->Lock(remotePath, NYT::ELockMode::LM_SNAPSHOT);
350+
snapshotPath = TStringBuilder() << '#' << GetGuidAsString(fileLock->GetLockedNodeId());
351+
} catch (const TErrorResponse& e) {
352+
YQL_CLOG(WARN, ProviderYt) << "Can't load binary for \"" << fileName << "\" from BinaryCacheFolder: " << e.what();
353+
return Nothing();
354+
}
355+
with_lock(Lock_) {
356+
BinarySnapshots[remotePath] = snapshotPath;
357+
}
358+
YQL_CLOG(DEBUG, ProviderYt) << "Snapshot \""
359+
<< fileName << "\" -> \"" << remotePath << "\" -> "
360+
<< snapshotPath << ", tx=" << GetGuidAsString(snapshotTx->GetId());
361+
return std::make_pair(snapshotPath, snapshotTx->GetId());
362+
}
363+
330364
void TTransactionCache::TEntry::CreateDefaultTmpFolder() {
331365
if (DefaultTmpFolder) {
332366
Client->Create(DefaultTmpFolder, NYT::NT_MAP, NYT::TCreateOptions().Recursive(true).IgnoreExisting(true));

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ class TTransactionCache {
119119
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false);
120120

121121
std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval);
122+
TMaybe<std::pair<TString, NYT::TTransactionId>> GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName);
122123

123124
void CreateDefaultTmpFolder();
124125

ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -558,14 +558,7 @@ void FillUserJobSpecImpl(NYT::TUserJobSpec& spec,
558558
}
559559
}
560560

561-
const TString binTmpFolder = settings->BinaryTmpFolder.Get().GetOrElse(TString());
562-
if (!localRun && binTmpFolder) {
563-
const TDuration binExpiration = settings->BinaryExpirationInterval.Get().GetOrElse(TDuration());
564-
TTransactionCache::TEntry::TPtr entry = execCtx.GetOrCreateEntry(settings);
565-
TString bin = mrJobBin.empty() ? GetPersistentExecPath() : mrJobBin;
566-
const auto binSize = TFileStat(bin).Size;
567-
YQL_ENSURE(binSize != 0);
568-
561+
if (!localRun) {
569562
if (mrJobBin.empty()) {
570563
mrJobBinMd5 = GetPersistentExecPathMd5();
571564
} else if (!mrJobBinMd5) {
@@ -576,10 +569,33 @@ void FillUserJobSpecImpl(NYT::TUserJobSpec& spec,
576569
mrJobBinMd5 = MD5::File(mrJobBin);
577570
}
578571
}
572+
}
579573

580-
auto mrJobSnapshot = entry->GetBinarySnapshot(binTmpFolder, *mrJobBinMd5, bin, binExpiration);
581-
spec.JobBinaryCypressPath(mrJobSnapshot.first, mrJobSnapshot.second);
574+
const TString binTmpFolder = settings->BinaryTmpFolder.Get().GetOrElse(TString());
575+
const TString binCacheFolder = settings->BinaryCacheFolder.Get().GetOrElse(TString());
576+
if (!localRun && (binTmpFolder || binCacheFolder)) {
577+
TString bin = mrJobBin.empty() ? GetPersistentExecPath() : mrJobBin;
578+
const auto binSize = TFileStat(bin).Size;
579+
YQL_ENSURE(binSize != 0);
582580
fileMemUsage += binSize;
581+
TTransactionCache::TEntry::TPtr entry = execCtx.GetOrCreateEntry(settings);
582+
bool useBinCache = false;
583+
if (binCacheFolder) {
584+
if (auto snapshot = entry->GetBinarySnapshotFromCache(binCacheFolder, *mrJobBinMd5, "mrjob")) {
585+
spec.JobBinaryCypressPath(snapshot->first, snapshot->second);
586+
useBinCache = true;
587+
}
588+
}
589+
if (!useBinCache) {
590+
if (binTmpFolder) {
591+
const TDuration binExpiration = settings->BinaryExpirationInterval.Get().GetOrElse(TDuration());
592+
auto mrJobSnapshot = entry->GetBinarySnapshot(binTmpFolder, *mrJobBinMd5, bin, binExpiration);
593+
spec.JobBinaryCypressPath(mrJobSnapshot.first, mrJobSnapshot.second);
594+
} else if (!mrJobBin.empty()) {
595+
spec.JobBinaryLocalPath(mrJobBin, mrJobBinMd5);
596+
}
597+
598+
}
583599
}
584600
else if (!mrJobBin.empty()) {
585601
const auto binSize = TFileStat(mrJobBin).Size;

ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -444,23 +444,43 @@ void TGatewayTransformer::ApplyUserJobSpec(NYT::TUserJobSpec& spec, bool localRu
444444
spec.AddLocalFile(file.first, opts);
445445
}
446446
const TString binTmpFolder = Settings_->BinaryTmpFolder.Get().GetOrElse(TString());
447-
if (localRun || !binTmpFolder) {
448-
for (auto& file: *DeferredUdfFiles_) {
449-
TAddLocalFileOptions opts;
450-
if (!fakeChecksum && file.second.Hash) {
451-
opts.MD5CheckSum(file.second.Hash);
447+
const TString binCacheFolder = Settings_->BinaryCacheFolder.Get().GetOrElse(TString());
448+
if (!localRun && binCacheFolder) {
449+
auto udfFiles = std::move(*DeferredUdfFiles_);
450+
TTransactionCache::TEntry::TPtr entry = GetEntry();
451+
for (auto& file: udfFiles) {
452+
YQL_ENSURE(!file.second.Hash.Empty());
453+
if (auto snapshot = entry->GetBinarySnapshotFromCache(binCacheFolder, file.second.Hash, file.first)) {
454+
spec.AddFile(TRichYPath(snapshot->first).TransactionId(snapshot->second)
455+
.FileName(TFsPath(file.first)
456+
.GetName())
457+
.Executable(true)
458+
.BypassArtifactCache(file.second.BypassArtifactCache));
459+
} else {
460+
DeferredUdfFiles_->push_back(file);
452461
}
453-
YQL_ENSURE(TFileStat(file.first).Size != 0);
454-
opts.BypassArtifactCache(file.second.BypassArtifactCache);
455-
spec.AddLocalFile(file.first, opts);
456462
}
457-
} else {
458-
const TDuration binExpiration = Settings_->BinaryExpirationInterval.Get().GetOrElse(TDuration());
459-
auto entry = GetEntry();
460-
for (auto& file: *DeferredUdfFiles_) {
461-
YQL_ENSURE(TFileStat(file.first).Size != 0);
462-
auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second.Hash, file.first, binExpiration);
463-
spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true).BypassArtifactCache(file.second.BypassArtifactCache));
463+
}
464+
if (!DeferredUdfFiles_->empty()) {
465+
if (localRun || !binTmpFolder) {
466+
for (auto& file: *DeferredUdfFiles_) {
467+
TAddLocalFileOptions opts;
468+
if (!fakeChecksum && file.second.Hash) {
469+
opts.MD5CheckSum(file.second.Hash);
470+
}
471+
YQL_ENSURE(TFileStat(file.first).Size != 0);
472+
opts.BypassArtifactCache(file.second.BypassArtifactCache);
473+
spec.AddLocalFile(file.first, opts);
474+
}
475+
} else {
476+
const TDuration binExpiration = Settings_->BinaryExpirationInterval.Get().GetOrElse(TDuration());
477+
auto entry = GetEntry();
478+
for (auto& file: *DeferredUdfFiles_) {
479+
YQL_ENSURE(TFileStat(file.first).Size != 0);
480+
YQL_ENSURE(!file.second.Hash.Empty());
481+
auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second.Hash, file.first, binExpiration);
482+
spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true).BypassArtifactCache(file.second.BypassArtifactCache));
483+
}
464484
}
465485
}
466486
RemoteFiles_->clear();

0 commit comments

Comments
 (0)