@@ -185,11 +185,15 @@ class TMain : public TMainBase {
185
185
.Handler1 ([this ](const NLastGetopt::TOptsParser* option) {
186
186
TStringBuf topicName, others;
187
187
TStringBuf (option->CurVal ()).Split (' @' , topicName, others);
188
+
188
189
TStringBuf path, partitionCountStr;
189
190
TStringBuf (others).Split (' :' , path, partitionCountStr);
190
191
size_t partitionCount = !partitionCountStr.empty () ? FromString<size_t >(partitionCountStr) : 1 ;
192
+ if (!partitionCount) {
193
+ ythrow yexception () << " Topic partition count should be at least one" ;
194
+ }
191
195
if (topicName.empty () || path.empty ()) {
192
- ythrow yexception () << " Incorrect PQ file mapping, expected form topic@path[:partitions_count]" << Endl ;
196
+ ythrow yexception () << " Incorrect PQ file mapping, expected form topic@path[:partitions_count]" ;
193
197
}
194
198
if (!PqFilesMapping.emplace (topicName, NYql::TDummyTopic (" pq" , TString (topicName), TString (path), partitionCount)).second ) {
195
199
ythrow yexception () << " Got duplicated topic name: " << topicName;
@@ -199,15 +203,7 @@ class TMain : public TMainBase {
199
203
options.AddLongOption (" cnacel-on-file-finish" , " Cancel emulate YDS topics when topic file finished" )
200
204
.RequiredArgument (" topic" )
201
205
.Handler1 ([this ](const NLastGetopt::TOptsParser* option) {
202
- TStringBuf topicName;
203
- TStringBuf filePath;
204
- TStringBuf (option->CurVal ()).Split (' @' , topicName, filePath);
205
- if (topicName.empty () || filePath.empty ()) {
206
- ythrow yexception () << " Incorrect PQ file mapping, expected form topic@file" ;
207
- }
208
- if (!PqFilesMapping.emplace (topicName, filePath).second ) {
209
- ythrow yexception () << " Got duplicated topic name: " << topicName;
210
- }
206
+ TopicsSettings[option->CurVal ()].CancelOnFileFinish = true ;
211
207
});
212
208
213
209
// Outputs
@@ -259,11 +255,18 @@ class TMain : public TMainBase {
259
255
260
256
if (!PqFilesMapping.empty ()) {
261
257
auto fileGateway = MakeIntrusive<NYql::TDummyPqGateway>();
262
- for (const auto & [_, topic] : PqFilesMapping) {
258
+ for (auto [_, topic] : PqFilesMapping) {
259
+ if (const auto it = TopicsSettings.find (topic.TopicName ); it != TopicsSettings.end ()) {
260
+ topic.CancelOnFileFinish = it->second .CancelOnFileFinish ;
261
+ TopicsSettings.erase (it);
262
+ }
263
263
fileGateway->AddDummyTopic (topic);
264
264
}
265
265
RunnerOptions.FqSettings .PqGateway = std::move (fileGateway);
266
266
}
267
+ if (!TopicsSettings.empty ()) {
268
+ ythrow yexception () << " Found topic settings for not existing topic: '" << TopicsSettings.begin ()->first << " '" ;
269
+ }
267
270
268
271
#ifdef PROFILE_MEMORY_ALLOCATIONS
269
272
if (RunnerOptions.FqSettings .VerboseLevel >= EVerbose::Info) {
@@ -302,6 +305,11 @@ class TMain : public TMainBase {
302
305
TExecutionOptions ExecutionOptions;
303
306
TRunnerOptions RunnerOptions;
304
307
std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;
308
+
309
+ struct TTopicSettings {
310
+ bool CancelOnFileFinish = false ;
311
+ };
312
+ std::unordered_map<TString, TTopicSettings> TopicsSettings;
305
313
};
306
314
307
315
} // anonymous namespace
0 commit comments