Skip to content

Commit c46068e

Browse files
author
ermolovd
committed
fix buffered stream interleaving for proto format
commit_hash:66e6b75f97ea253ddbf4c97a180f9f971c59f93b
1 parent 8e841b7 commit c46068e

File tree

4 files changed

+79
-0
lines changed

4 files changed

+79
-0
lines changed

yt/cpp/mapreduce/io/proto_table_writer.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,21 @@ void TLenvalProtoSingleTableWriter::AddRow(const Message& row, size_t tableIndex
231231

232232
////////////////////////////////////////////////////////////////////////////////
233233

234+
void LenvalEncodeProto(IZeroCopyOutput* output, const ::google::protobuf::Message& message)
235+
{
236+
i32 size = message.ByteSizeLong();
237+
output->Write(&size, sizeof(size));
238+
239+
TProtobufZeroCopyOutputStream adapter(output);
240+
auto result = message.SerializeToZeroCopyStream(&adapter);
241+
242+
adapter.ThrowOnError();
243+
244+
if (!result) {
245+
ythrow yexception() << "Failed to serialize protobuf message";
246+
}
247+
}
248+
249+
////////////////////////////////////////////////////////////////////////////////
250+
234251
} // namespace NYT

yt/cpp/mapreduce/io/proto_table_writer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,8 @@ class TLenvalProtoSingleTableWriter
7979

8080
////////////////////////////////////////////////////////////////////////////////
8181

82+
void LenvalEncodeProto(IZeroCopyOutput* output, const ::google::protobuf::Message& message);
83+
84+
////////////////////////////////////////////////////////////////////////////////
85+
8286
} // namespace NYT

yt/yt/core/misc/protobuf_helpers.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,46 @@ TProtobufOutputStreamAdaptor::TProtobufOutputStreamAdaptor(IOutputStream* stream
617617
, CopyingOutputStreamAdaptor(this)
618618
{ }
619619

620+
TProtobufZeroCopyOutputStream::TProtobufZeroCopyOutputStream(IZeroCopyOutput* stream)
621+
: Stream_(stream)
622+
{ }
623+
624+
bool TProtobufZeroCopyOutputStream::Next(void** data, int* size)
625+
{
626+
try {
627+
size_t sizetSize = Stream_->Next(data);
628+
constexpr int maxSize = std::numeric_limits<int>::max();
629+
if (sizetSize > maxSize) {
630+
Stream_->Undo(sizetSize - maxSize);
631+
sizetSize = maxSize;
632+
}
633+
*size = sizetSize;
634+
} catch (const std::exception&) {
635+
Error_ = std::current_exception();
636+
return false;
637+
}
638+
ByteCount_ += *size;
639+
return true;
640+
}
641+
642+
void TProtobufZeroCopyOutputStream::BackUp(int count)
643+
{
644+
ByteCount_ -= count;
645+
Stream_->Undo(count);
646+
}
647+
648+
int64_t TProtobufZeroCopyOutputStream::ByteCount() const
649+
{
650+
return ByteCount_;
651+
}
652+
653+
void TProtobufZeroCopyOutputStream::ThrowOnError() const
654+
{
655+
if (Error_) {
656+
std::rethrow_exception(Error_);
657+
}
658+
}
659+
620660
////////////////////////////////////////////////////////////////////////////////
621661

622662
} // namespace NYT

yt/yt/core/misc/protobuf_helpers.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,24 @@ class TProtobufOutputStreamAdaptor
548548
explicit TProtobufOutputStreamAdaptor(IOutputStream* stream);
549549
};
550550

551+
class TProtobufZeroCopyOutputStream
552+
: public ::google::protobuf::io::ZeroCopyOutputStream
553+
{
554+
public:
555+
explicit TProtobufZeroCopyOutputStream(IZeroCopyOutput* stream);
556+
557+
bool Next(void** data, int* size) override;
558+
void BackUp(int count) override;
559+
int64_t ByteCount() const override;
560+
561+
void ThrowOnError() const;
562+
563+
private:
564+
IZeroCopyOutput* const Stream_;
565+
std::exception_ptr Error_;
566+
int64_t ByteCount_ = 0;
567+
};
568+
551569
////////////////////////////////////////////////////////////////////////////////
552570

553571
} // namespace NYT

0 commit comments

Comments
 (0)