@@ -23,6 +23,10 @@ static std::string JoinPath(const std::string& basePath, const std::string& path
23
23
return prefixPathSplit;
24
24
}
25
25
26
+ bool TLogMessage::TPrimaryKeyLogMessage::operator <(const TLogMessage::TPrimaryKeyLogMessage& o) const {
27
+ return App < o.App || App == o.App && Host < o.Host || App == o.App && Host == o.Host && Timestamp < o.Timestamp ;
28
+ }
29
+
26
30
TRunArgs GetRunArgs () {
27
31
28
32
std::string database = std::getenv (" YDB_DATABASE" );
@@ -57,26 +61,26 @@ TStatus CreateLogTable(TTableClient& client, const std::string& table) {
57
61
return status;
58
62
}
59
63
60
- TStatistic GetLogBatch (uint64_t logOffset, std::vector<TLogMessage>& logBatch, std::set<TLogMessage>& setMessage) {
64
+ TStatistic GetLogBatch (uint64_t logOffset, std::vector<TLogMessage>& logBatch, std::set<TLogMessage::TPrimaryKeyLogMessage >& setMessage) {
61
65
logBatch.clear ();
62
66
uint32_t correctSumApp = 0 ;
63
67
uint32_t correctSumHost = 0 ;
64
68
uint32_t correctRowCount = 0 ;
65
69
66
70
for (size_t i = 0 ; i < BATCH_SIZE; ++i) {
67
71
TLogMessage message;
68
- message.pk .App = " App_" + ToString (logOffset % 10 );
69
- message.pk .Host = " 192.168.0." + ToString (logOffset % 11 );
72
+ message.pk .App = " App_" + std::to_string (logOffset % 10 );
73
+ message.pk .Host = " 192.168.0." + std::to_string (logOffset % 11 );
70
74
message.pk .Timestamp = TInstant::Now () + TDuration::MilliSeconds (i % 1000 );
71
75
message.HttpCode = 200 ;
72
76
message.Message = i % 2 ? " GET / HTTP/1.1" : " GET /images/logo.png HTTP/1.1" ;
73
77
logBatch.emplace_back (message);
74
78
75
- if (!setMessage.contains (message)) {
79
+ if (!setMessage.contains (message. pk )) {
76
80
correctSumApp += logOffset % 10 ;
77
81
correctSumHost += logOffset % 11 ;
78
82
++correctRowCount;
79
- setMessage.insert (message);
83
+ setMessage.insert (message. pk );
80
84
}
81
85
82
86
}
@@ -108,65 +112,49 @@ TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const
108
112
return status;
109
113
}
110
114
111
- static TStatus ScanQuerySelect (TTableClient& client, const std::string& path, std::vector <TResultSet>& vectorResultSet) {
115
+ static TStatus SelectTransaction (TSession session, const std::string& path,
116
+ std::optional<TResultSet>& resultSet) {
112
117
std::filesystem::path filesystemPath (path);
113
118
auto query = std::format (R"(
114
- --!syntax_v1
115
119
PRAGMA TablePathPrefix("{}");
116
120
117
- SELECT *
121
+ SELECT
122
+ SUM(CAST(SUBSTRING(CAST(App as string), 4) as Int32)),
123
+ SUM(CAST(SUBSTRING(CAST(Host as string), 10) as Int32)),
124
+ COUNT(*)
118
125
FROM {}
119
- )" , filesystemPath.parent_path ().c_str (), filesystemPath.filename ().c_str ());
126
+ )" , filesystemPath.parent_path ().string (), filesystemPath.filename ().string ());
120
127
121
- auto resultScanQuery = client.StreamExecuteScanQuery (query).GetValueSync ();
122
-
123
- if (!resultScanQuery.IsSuccess ()) {
124
- return resultScanQuery;
125
- }
128
+ auto txControl =
129
+ TTxControl::BeginTx (TTxSettings::SerializableRW ())
130
+ .CommitTx ();
126
131
127
- bool eos = false ;
128
-
129
- while (!eos) {
130
- auto streamPart = resultScanQuery.ReadNext ().ExtractValueSync ();
131
-
132
- if (!streamPart.IsSuccess ()) {
133
- eos = true ;
134
- if (!streamPart.EOS ()) {
135
- return streamPart;
136
- }
137
- continue ;
138
- }
132
+ auto result = session.ExecuteDataQuery (query, txControl).GetValueSync ();
139
133
140
- if (streamPart.HasResultSet ()) {
141
- auto rs = streamPart.ExtractResultSet ();
142
- vectorResultSet.push_back (rs);
143
- }
134
+ if (result.IsSuccess ()) {
135
+ resultSet = result.GetResultSet (0 );
144
136
}
145
- return TStatus (EStatus::SUCCESS, NYql::TIssues ());
137
+
138
+ return result;
146
139
}
147
140
148
- TStatistic ScanQuerySelect (TTableClient& client, const std::string& path) {
149
- std::vector <TResultSet> vectorResultSet ;
150
- ThrowOnError (client.RetryOperationSync ([path, &vectorResultSet](TTableClient& client ) {
151
- return ScanQuerySelect (client , path, vectorResultSet );
141
+ TStatistic Select (TTableClient& client, const std::string& path) {
142
+ std::optional <TResultSet> resultSet ;
143
+ ThrowOnError (client.RetryOperationSync ([path, &resultSet](TSession session ) {
144
+ return SelectTransaction (session , path, resultSet );
152
145
}));
153
146
154
- uint32_t sumApp = 0 ;
155
- uint32_t sumHost = 0 ;
156
- uint32_t rowCount = 0 ;
147
+ TResultSetParser parser (*resultSet);
157
148
158
- for (TResultSet& resultSet : vectorResultSet) {
159
- TResultSetParser parser (resultSet);
160
-
161
- while (parser.TryNextRow ()) {
149
+ uint64_t sumApp = 0 ;
150
+ uint64_t sumHost = 0 ;
151
+ uint64_t rowCount = 0 ;
162
152
163
- ++rowCount;
164
- sumApp += ToString (parser.ColumnParser (" App" ).GetOptionalUtf8 ()).back () - ' 0' ;
165
- std::string strHost = ToString (parser.ColumnParser (" Host" ).GetOptionalUtf8 ());
166
- char penCharStrHost = strHost[strHost.size () - 2 ];
167
- sumHost += strHost.back () - ' 0' + (penCharStrHost == ' .' ? 0 : (penCharStrHost - ' 0' ) * 10 );
168
- }
169
-
153
+ if (parser.TryNextRow ()) {
154
+
155
+ sumApp = *parser.ColumnParser (" column0" ).GetOptionalInt64 ();
156
+ sumHost = *parser.ColumnParser (" column1" ).GetOptionalInt64 ();
157
+ rowCount = parser.ColumnParser (" column2" ).GetUint64 ();
170
158
}
171
159
172
160
return {sumApp, sumHost, rowCount};
0 commit comments