|
67 | 67 | #include <util/system/execpath.h>
|
68 | 68 | #include <util/system/guard.h>
|
69 | 69 | #include <util/system/shellcommand.h>
|
| 70 | +#include <util/system/mutex.h> |
70 | 71 | #include <util/ysaveload.h>
|
71 | 72 |
|
72 | 73 | #include <algorithm>
|
@@ -2811,76 +2812,73 @@ class TYtNativeGateway : public IYtGateway {
|
2811 | 2812 | TTableInfoResult& result)
|
2812 | 2813 | {
|
2813 | 2814 | TVector<NYT::TNode> attributes(tables.size());
|
2814 |
| - TVector<TMaybe<NYT::TNode>> linkAttributes(tables.size()); |
| 2815 | + NSorted::TSimpleMap<size_t, TString> requestSchemasIdxs; |
2815 | 2816 | {
|
| 2817 | + TMutex lock; |
2816 | 2818 | auto batchGet = tx->CreateBatchRequest();
|
2817 | 2819 | TVector<TFuture<void>> batchRes(Reserve(idxs.size()));
|
2818 | 2820 | for (auto& idx: idxs) {
|
2819 |
| - batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@").Apply( |
2820 |
| - [&attributes, &linkAttributes, idx] (const TFuture<NYT::TNode>& res) { |
2821 |
| - try { |
2822 |
| - NYT::TNode attrs = res.GetValue(); |
2823 |
| - auto type = GetTypeFromAttributes(attrs, false); |
2824 |
| - if (type == "link") { |
2825 |
| - linkAttributes[idx.first] = attrs; |
2826 |
| - } else { |
2827 |
| - attributes[idx.first] = attrs; |
| 2821 | + batchRes.push_back(batchGet->Get(idx.second + "/@").Apply([&attributes, &requestSchemasIdxs, &lock, idx] (const TFuture<NYT::TNode>& res) { |
| 2822 | + attributes[idx.first] = res.GetValue(); |
| 2823 | + if (attributes[idx.first].HasKey("schema") && attributes[idx.first]["schema"].IsEntity()) { |
| 2824 | + with_lock (lock) { |
| 2825 | + requestSchemasIdxs.push_back(idx); |
2828 | 2826 | }
|
2829 |
| - } catch (const TErrorResponse& e) { |
2830 |
| - // Yt returns NoSuchTransaction as inner issue for ResolveError |
2831 |
| - if (!e.IsResolveError() || e.IsNoSuchTransaction()) { |
2832 |
| - throw; |
2833 |
| - } |
2834 |
| - // Just ignore. Original table path may be deleted at this time |
2835 | 2827 | }
|
2836 | 2828 | }));
|
2837 | 2829 | }
|
2838 | 2830 | batchGet->ExecuteBatch();
|
2839 | 2831 | WaitExceptionOrAll(batchRes).GetValue();
|
2840 | 2832 | }
|
2841 |
| - |
| 2833 | + if (!requestSchemasIdxs.empty()) { |
| 2834 | + YQL_CLOG(INFO, ProviderYt) << "Additional request of @schema for " << requestSchemasIdxs.size() << " table(s)"; |
| 2835 | + auto batchGet = tx->CreateBatchRequest(); |
| 2836 | + TVector<TFuture<void>> batchRes(Reserve(requestSchemasIdxs.size())); |
| 2837 | + auto getOpts = TGetOptions() |
| 2838 | + .AttributeFilter(TAttributeFilter() |
| 2839 | + .AddAttribute("schema") |
| 2840 | + ); |
| 2841 | + for (auto& idx: requestSchemasIdxs) { |
| 2842 | + batchRes.push_back(batchGet->Get(idx.second + "/@", getOpts).Apply([&attributes, idx] (const TFuture<NYT::TNode>& res) { |
| 2843 | + attributes[idx.first]["schema"] = res.GetValue().At("schema"); |
| 2844 | + })); |
| 2845 | + } |
| 2846 | + batchGet->ExecuteBatch(); |
| 2847 | + WaitExceptionOrAll(batchRes).GetValue(); |
| 2848 | + } |
2842 | 2849 | {
|
2843 |
| - auto schemaAttrFilter = TAttributeFilter().AddAttribute("schema"); |
2844 |
| - TVector<NYT::TNode> schemas(tables.size()); |
2845 |
| - |
2846 | 2850 | auto batchGet = tx->CreateBatchRequest();
|
2847 | 2851 | TVector<TFuture<void>> batchRes;
|
2848 |
| - for (auto& idx : idxs) { |
2849 |
| - batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "/@", TGetOptions().AttributeFilter(schemaAttrFilter)) |
2850 |
| - .Apply([idx, &schemas] (const TFuture<NYT::TNode>& res) { |
2851 |
| - try { |
2852 |
| - schemas[idx.first] = res.GetValue(); |
2853 |
| - } catch (const TErrorResponse& e) { |
2854 |
| - // Yt returns NoSuchTransaction as inner issue for ResolveError |
2855 |
| - if (!e.IsResolveError() || e.IsNoSuchTransaction()) { |
2856 |
| - throw; |
2857 |
| - } |
2858 |
| - // Just ignore. Original table path may be deleted at this time |
2859 |
| - } |
2860 |
| - })); |
2861 |
| - |
2862 |
| - if (linkAttributes[idx.first]) { |
2863 |
| - const auto& linkAttr = *linkAttributes[idx.first]; |
2864 |
| - batchRes.push_back(batchGet->Get(idx.second + "/@").Apply( |
2865 |
| - [idx, &linkAttr, &attributes](const TFuture<NYT::TNode>& f) { |
| 2852 | + auto getOpts = TGetOptions() |
| 2853 | + .AttributeFilter(TAttributeFilter() |
| 2854 | + .AddAttribute("type") |
| 2855 | + .AddAttribute(TString{QB2Premapper}) |
| 2856 | + .AddAttribute(TString{YqlRowSpecAttribute}) |
| 2857 | + ); |
| 2858 | + for (auto& idx: idxs) { |
| 2859 | + batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@", getOpts).Apply([idx, &attributes](const TFuture<NYT::TNode>& f) { |
| 2860 | + try { |
2866 | 2861 | NYT::TNode attrs = f.GetValue();
|
2867 |
| - attributes[idx.first] = attrs; |
2868 |
| - // override some attributes by the link ones |
2869 |
| - if (linkAttr.HasKey(QB2Premapper)) { |
2870 |
| - attributes[idx.first][QB2Premapper] = linkAttr[QB2Premapper]; |
| 2862 | + if (GetTypeFromAttributes(attrs, false) == "link") { |
| 2863 | + // override some attributes by the link ones |
| 2864 | + if (attrs.HasKey(QB2Premapper)) { |
| 2865 | + attributes[idx.first][QB2Premapper] = attrs[QB2Premapper]; |
| 2866 | + } |
| 2867 | + if (attrs.HasKey(YqlRowSpecAttribute)) { |
| 2868 | + attributes[idx.first][YqlRowSpecAttribute] = attrs[YqlRowSpecAttribute]; |
| 2869 | + } |
2871 | 2870 | }
|
2872 |
| - if (linkAttr.HasKey(YqlRowSpecAttribute)) { |
2873 |
| - attributes[idx.first][YqlRowSpecAttribute] = linkAttr[YqlRowSpecAttribute]; |
| 2871 | + } catch (const TErrorResponse& e) { |
| 2872 | + // Yt returns NoSuchTransaction as inner issue for ResolveError |
| 2873 | + if (!e.IsResolveError() || e.IsNoSuchTransaction()) { |
| 2874 | + throw; |
2874 | 2875 | }
|
2875 |
| - })); |
2876 |
| - } |
| 2876 | + // Just ignore. Original table path may be deleted at this time |
| 2877 | + } |
| 2878 | + })); |
2877 | 2879 | }
|
2878 | 2880 | batchGet->ExecuteBatch();
|
2879 | 2881 | WaitExceptionOrAll(batchRes).GetValue();
|
2880 |
| - |
2881 |
| - for (auto& idx: idxs) { |
2882 |
| - attributes[idx.first]["schema"] = schemas[idx.first]["schema"]; |
2883 |
| - } |
2884 | 2882 | }
|
2885 | 2883 |
|
2886 | 2884 | auto batchGet = tx->CreateBatchRequest();
|
@@ -2912,6 +2910,7 @@ class TYtNativeGateway : public IYtGateway {
|
2912 | 2910 | }
|
2913 | 2911 |
|
2914 | 2912 | statInfo->Id = attrs["id"].AsString();
|
| 2913 | + YQL_ENSURE(statInfo->Id == TStringBuf(idx.second).Skip(1)); |
2915 | 2914 | statInfo->TableRevision = attrs["revision"].IntCast<ui64>();
|
2916 | 2915 | statInfo->Revision = GetContentRevision(attrs);
|
2917 | 2916 |
|
|
0 commit comments