Skip to content

Commit 969ac09

Browse files
committed
Merge branch 'rightlib' into merge-libs-250522-0050
2 parents 2747776 + 4034575 commit 969ac09

File tree

20 files changed

+201
-84
lines changed

20 files changed

+201
-84
lines changed

util/charset/utf8.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,29 @@ namespace {
8181
Y_ASSERT(false);
8282
return false;
8383
}
84+
85+
// return longest valid utf-8 prefix size in bytes that is less than or equal to |size|
86+
size_t GetLongestUtf8PrefixSize(TStringBuf sb, size_t size, bool robust) {
87+
const unsigned char* beg = reinterpret_cast<const unsigned char*>(sb.data());
88+
const unsigned char* end = reinterpret_cast<const unsigned char*>(sb.data() + sb.size());
89+
const unsigned char* cur = beg;
90+
while (cur < end && static_cast<size_t>(cur - beg) < size) {
91+
size_t runeLen;
92+
if (RECODE_OK != GetUTF8CharLen(runeLen, cur, end)) {
93+
if (robust) {
94+
break;
95+
} else {
96+
ythrow yexception() << "invalid UTF-8 char at pos " << (cur - beg);
97+
}
98+
}
99+
if ((cur - beg) + runeLen > size) {
100+
break;
101+
}
102+
cur += runeLen;
103+
}
104+
return cur - beg;
105+
}
106+
84107
} // namespace
85108

86109
extern const wchar32 BROKEN_RUNE = 0xFFFD;
@@ -169,3 +192,25 @@ TString ToUpperUTF8(TStringBuf s) {
169192
TString ToUpperUTF8(const char* s) {
170193
return ToUpperUTF8(TStringBuf(s));
171194
}
195+
196+
void Utf8TruncateInplace(TString& s, size_t size) {
197+
const size_t prefixSize = GetLongestUtf8PrefixSize(TStringBuf{s}, size, false);
198+
if (prefixSize != s.size()) {
199+
s.resize(prefixSize);
200+
}
201+
}
202+
203+
void Utf8TruncateInplaceRobust(TString& s, size_t size) {
204+
const size_t prefixSize = GetLongestUtf8PrefixSize(TStringBuf{s}, size, true);
205+
if (prefixSize != s.size()) {
206+
s.resize(prefixSize);
207+
}
208+
}
209+
210+
TStringBuf Utf8Truncate(TStringBuf sb Y_LIFETIME_BOUND, size_t size) {
211+
return sb.substr(0, GetLongestUtf8PrefixSize(sb, size, false));
212+
}
213+
214+
TStringBuf Utf8TruncateRobust(TStringBuf sb Y_LIFETIME_BOUND, size_t size) noexcept {
215+
return sb.substr(0, GetLongestUtf8PrefixSize(sb, size, true));
216+
}

util/charset/utf8.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,3 +442,11 @@ bool ToUpperUTF8Impl(const char* beg, size_t n, TString& newString);
442442
TString ToUpperUTF8(const TString& s);
443443
TString ToUpperUTF8(TStringBuf s);
444444
TString ToUpperUTF8(const char* s);
445+
446+
//! cut utf-8 string to fit into |size| bytes
447+
void Utf8TruncateInplace(TString& s, size_t size);
448+
//! cut on a valid utf-8 sequence less or equal |size|
449+
void Utf8TruncateInplaceRobust(TString& s, size_t size);
450+
[[nodiscard]] TStringBuf Utf8Truncate(TStringBuf sb Y_LIFETIME_BOUND, size_t size);
451+
//! on error returns the longest valid utf8 sequence
452+
[[nodiscard]] TStringBuf Utf8TruncateRobust(TStringBuf sb Y_LIFETIME_BOUND, size_t size) noexcept;

util/charset/utf8_ut.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,52 @@ Y_UNIT_TEST_SUITE(TUtfUtilTest) {
123123
wtextScalar.remove(wtextSSE.size());
124124
UNIT_ASSERT(wtextScalar == wtextSSE);
125125
}
126+
127+
Y_UNIT_TEST(TestUtf8TruncateInplace) {
128+
TString s = "Съешь ещё этих мягких французских булок, да выпей же чаю.";
129+
Utf8TruncateInplace(s, 0u);
130+
UNIT_ASSERT_EQUAL(s, "");
131+
132+
s = "Съешь ещё этих мягких французских булок, да выпей же чаю.";
133+
Utf8TruncateInplace(s, 10u);
134+
UNIT_ASSERT_EQUAL(s, "Съешь");
135+
136+
s = "Съешь ещё этих мягких французских булок, да выпей же чаю.";
137+
TString s_copy = s;
138+
Utf8TruncateInplace(s, s.size());
139+
UNIT_ASSERT_EQUAL(s, s_copy);
140+
141+
Utf8TruncateInplace(s, Max());
142+
UNIT_ASSERT_EQUAL(s, s_copy);
143+
}
144+
145+
Y_UNIT_TEST(TestUtf8TruncateCorrupted) {
146+
const TString s = "Съешь ещё этих мягких французских булок, да выпей же чаю.";
147+
TStringBuf corrupted{s, 0u, 21u};
148+
UNIT_ASSERT_EXCEPTION_CONTAINS(Y_UNUSED(Utf8Truncate(corrupted, 21u)), yexception, "invalid UTF-8 char");
149+
UNIT_ASSERT_NO_EXCEPTION(Y_UNUSED(Utf8TruncateRobust(corrupted, 21u)));
150+
TStringBuf fixed = Utf8TruncateRobust(corrupted, 21u);
151+
UNIT_ASSERT_LE(fixed.size(), 21u);
152+
UNIT_ASSERT_EQUAL(fixed, "Съешь ещё э");
153+
}
154+
155+
Y_UNIT_TEST(TestUtf8CutInvalidSuffixInplace) {
156+
TString s = "Съешь ещё этих мягких французских булок, да выпей же чаю.";
157+
s.resize(21);
158+
UNIT_ASSERT_UNEQUAL(s, "Съешь ещё э");
159+
Utf8TruncateInplaceRobust(s, s.size());
160+
UNIT_ASSERT_EQUAL(s, "Съешь ещё э");
161+
}
162+
163+
Y_UNIT_TEST(TestUtf8CutInvalidSuffix) {
164+
TStringBuf sb = "Съешь ещё этих мягких французских булок, да выпей же чаю."sv;
165+
UNIT_ASSERT_EQUAL(Utf8TruncateRobust(sb, sb.size()), sb);
166+
UNIT_ASSERT_EQUAL(Utf8TruncateRobust(sb.substr(0, 21), sb.size()), "Съешь ещё э"sv);
167+
}
168+
169+
Y_UNIT_TEST(TestUtf8TruncateCornerCases) {
170+
UNIT_ASSERT_EQUAL(Utf8Truncate("①②③"sv, 4).size(), 3);
171+
UNIT_ASSERT_VALUES_EQUAL(Utf8Truncate("foobar"sv, Max()), "foobar"sv);
172+
}
173+
126174
} // Y_UNIT_TEST_SUITE(TUtfUtilTest)

util/generic/hash.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "fwd.h"
44

55
#include "hash_table.h"
6+
#include "mapfindptr.h"
67

78
template <class Key, class T, class HashFcn, class EqualKey, class Alloc>
89
class THashMap: public TMapOps<THashMap<Key, T, HashFcn, EqualKey, Alloc>> {

util/generic/hash_table.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,18 @@
11
#pragma once
22

33
#include "fwd.h"
4-
#include "mapfindptr.h"
54

65
#include <util/memory/alloc.h>
76
#include <util/system/compiler.h>
87
#include <util/system/type_name.h>
98
#include <util/system/yassert.h>
109
#include <util/str_stl.h>
1110
#include "yexception.h"
12-
#include "typetraits.h"
1311
#include "utility.h"
1412

1513
#include <algorithm>
16-
#include <initializer_list>
1714
#include <memory>
1815
#include <tuple>
19-
#include <utility>
20-
21-
#include <cstdlib>
2216

2317
#include "hash_primes.h"
2418

@@ -32,7 +26,7 @@ struct TSelect1st {
3226
template <class Value>
3327
struct __yhashtable_node {
3428
/** If the first bit is not set, then this is a pointer to the next node in
35-
* the list of nodes for the current bucket. Otherwise this is a pointer of
29+
* the list of nodes for the current bucket. Otherwise, this is a pointer of
3630
* type __yhashtable_node**, pointing back into the buckets array.
3731
*
3832
* This trick makes it possible to use only one node pointer in a hash table

yql/essentials/core/type_ann/type_ann_core.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4443,6 +4443,10 @@ namespace NTypeAnnImpl {
44434443
}
44444444

44454445
const auto sourceType = input->Head().GetTypeAnn();
4446+
if (HasError(sourceType, ctx.Expr)) {
4447+
return IGraphTransformer::TStatus::Error;
4448+
}
4449+
44464450
const auto options = CastResult<Strong>(sourceType, targetType);
44474451
if (!(options & NKikimr::NUdf::ECastOptions::Impossible)) {
44484452
auto type = targetType;

yt/yt/client/api/delegating_client.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -899,22 +899,22 @@ class TDelegatingClient
899899
(cookie, options))
900900

901901
// Shuffle Service
902-
DELEGATE_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
902+
DELEGATE_METHOD(TFuture<TSignedShuffleHandlePtr>, StartShuffle, (
903903
const std::string& account,
904904
int partitionCount,
905905
NObjectClient::TTransactionId transactionId,
906906
const TStartShuffleOptions& options),
907907
(account, partitionCount, transactionId, options))
908908

909909
DELEGATE_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (
910-
const TShuffleHandlePtr& shuffleHandle,
910+
const TSignedShuffleHandlePtr& shuffleHandle,
911911
int partitionIndex,
912912
std::optional<std::pair<int, int>> writerIndexRange,
913913
const TShuffleReaderOptions& options),
914914
(shuffleHandle, partitionIndex, writerIndexRange, options))
915915

916916
DELEGATE_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (
917-
const TShuffleHandlePtr& shuffleHandle,
917+
const TSignedShuffleHandlePtr& shuffleHandle,
918918
const std::string& partitionColumn,
919919
std::optional<int> writerIndex,
920920
const TShuffleWriterOptions& options),

yt/yt/client/api/rpc_proxy/client_impl.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2781,7 +2781,7 @@ TFuture<TFlowExecuteResult> TClient::FlowExecute(
27812781
}));
27822782
}
27832783

2784-
TFuture<TShuffleHandlePtr> TClient::StartShuffle(
2784+
TFuture<TSignedShuffleHandlePtr> TClient::StartShuffle(
27852785
const std::string& account,
27862786
int partitionCount,
27872787
TTransactionId parentTransactionId,
@@ -2803,12 +2803,12 @@ TFuture<TShuffleHandlePtr> TClient::StartShuffle(
28032803
}
28042804

28052805
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspStartShufflePtr& rsp) {
2806-
return ConvertTo<TShuffleHandlePtr>(TYsonString(rsp->shuffle_handle()));
2806+
return ConvertTo<TSignedShuffleHandlePtr>(TYsonStringBuf(rsp->signed_shuffle_handle()));
28072807
}));
28082808
}
28092809

28102810
TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
2811-
const TShuffleHandlePtr& shuffleHandle,
2811+
const TSignedShuffleHandlePtr& signedShuffleHandle,
28122812
int partitionIndex,
28132813
std::optional<std::pair<int, int>> writerIndexRange,
28142814
const TShuffleReaderOptions& options)
@@ -2818,7 +2818,7 @@ TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
28182818
auto req = proxy.ReadShuffleData();
28192819
InitStreamingRequest(*req);
28202820

2821-
req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString());
2821+
req->set_signed_shuffle_handle(ConvertToYsonString(signedShuffleHandle).ToString());
28222822
req->set_partition_index(partitionIndex);
28232823
if (options.Config) {
28242824
req->set_reader_config(ConvertToYsonString(options.Config).ToString());
@@ -2836,7 +2836,7 @@ TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
28362836
}
28372837

28382838
TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter(
2839-
const TShuffleHandlePtr& shuffleHandle,
2839+
const TSignedShuffleHandlePtr& signedShuffleHandle,
28402840
const std::string& partitionColumn,
28412841
std::optional<int> writerIndex,
28422842
const TShuffleWriterOptions& options)
@@ -2845,7 +2845,7 @@ TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter(
28452845
auto req = proxy.WriteShuffleData();
28462846
InitStreamingRequest(*req);
28472847

2848-
req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString());
2848+
req->set_signed_shuffle_handle(ConvertToYsonString(signedShuffleHandle).ToString());
28492849
req->set_partition_column(ToProto(partitionColumn));
28502850
if (options.Config) {
28512851
req->set_writer_config(ConvertToYsonString(options.Config).ToString());

yt/yt/client/api/rpc_proxy/client_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -599,20 +599,20 @@ class TClient
599599
const TFlowExecuteOptions& options = {}) override;
600600

601601
// Shuffle service client
602-
TFuture<TShuffleHandlePtr> StartShuffle(
602+
TFuture<TSignedShuffleHandlePtr> StartShuffle(
603603
const std::string& account,
604604
int partitionCount,
605605
NObjectClient::TTransactionId parentTransactionId,
606606
const TStartShuffleOptions& options) override;
607607

608608
TFuture<IRowBatchReaderPtr> CreateShuffleReader(
609-
const TShuffleHandlePtr& shuffleHandle,
609+
const TSignedShuffleHandlePtr& shuffleHandle,
610610
int partitionIndex,
611611
std::optional<std::pair<int, int>> writerIndexRange,
612612
const TShuffleReaderOptions& options) override;
613613

614614
TFuture<IRowBatchWriterPtr> CreateShuffleWriter(
615-
const TShuffleHandlePtr& shuffleHandle,
615+
const TSignedShuffleHandlePtr& shuffleHandle,
616616
const std::string& partitionColumn,
617617
std::optional<int> writerIndex,
618618
const TShuffleWriterOptions& options) override;

yt/yt/client/api/shuffle_client.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ struct TShuffleHandle
2323

2424
DEFINE_REFCOUNTED_TYPE(TShuffleHandle)
2525

26+
YT_DEFINE_STRONG_TYPEDEF(TSignedShuffleHandlePtr, NSignature::TSignaturePtr);
27+
2628
void FormatValue(TStringBuilderBase* builder, const TShuffleHandlePtr& shuffleHandle, TStringBuf spec);
2729

2830
////////////////////////////////////////////////////////////////////////////////
@@ -51,20 +53,20 @@ struct IShuffleClient
5153
{
5254
virtual ~IShuffleClient() = default;
5355

54-
virtual TFuture<TShuffleHandlePtr> StartShuffle(
56+
virtual TFuture<TSignedShuffleHandlePtr> StartShuffle(
5557
const std::string& account,
5658
int partitionCount,
5759
NObjectClient::TTransactionId parentTransactionId,
5860
const TStartShuffleOptions& options) = 0;
5961

6062
virtual TFuture<IRowBatchReaderPtr> CreateShuffleReader(
61-
const TShuffleHandlePtr& shuffleHandle,
63+
const TSignedShuffleHandlePtr& shuffleHandle,
6264
int partitionIndex,
6365
std::optional<std::pair<int, int>> writerIndexRange = {},
6466
const TShuffleReaderOptions& options = {}) = 0;
6567

6668
virtual TFuture<IRowBatchWriterPtr> CreateShuffleWriter(
67-
const TShuffleHandlePtr& shuffleHandle,
69+
const TSignedShuffleHandlePtr& shuffleHandle,
6870
const std::string& partitionColumn,
6971
std::optional<int> writerIndex = {},
7072
const TShuffleWriterOptions& options = {}) = 0;

0 commit comments

Comments
 (0)