Skip to content

Commit a3f26a3

Browse files
committed
Intermediate changes
commit_hash:d1545de5448760526e5b01472ffac8185967334a
1 parent 52daccf commit a3f26a3

File tree

11 files changed

+43
-49
lines changed

11 files changed

+43
-49
lines changed

yql/essentials/utils/runnable.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include <util/generic/ptr.h>
24

35
namespace NYql {

yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ auto defaultTaskFunction = [] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic
7070
Sleep(TDuration::Seconds(4));
7171
return ETaskStatus::Completed;
7272
}
73-
return ETaskStatus::Aborted;
73+
return ETaskStatus::Failed;
7474
};
7575

7676
Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
@@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
128128

129129
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
130130
auto factory = MakeFmrJobFactory(settings);
131-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
131+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
132132
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
133133
worker->Start();
134134
Sleep(TDuration::Seconds(1));
@@ -146,7 +146,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
146146
}
147147
TFmrJobFactorySettings settings{.NumThreads = 10, .Function = defaultTaskFunction};
148148
auto factory = MakeFmrJobFactory(settings);
149-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
149+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
150150
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
151151
worker->Start();
152152
Sleep(TDuration::Seconds(6));
@@ -184,12 +184,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
184184
Sleep(TDuration::Seconds(1));
185185
return ETaskStatus::Completed;
186186
}
187-
return ETaskStatus::Aborted;
187+
return ETaskStatus::Failed;
188188
};
189189

190190
TFmrJobFactorySettings settings{.NumThreads = 10, .Function = func};
191191
auto factory = MakeFmrJobFactory(settings);
192-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
192+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
193193
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
194194
worker->Start();
195195
Sleep(TDuration::Seconds(5));
@@ -211,7 +211,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
211211

212212
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
213213
auto factory = MakeFmrJobFactory(settings);
214-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
214+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
215215
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
216216
worker->Start();
217217

@@ -229,7 +229,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
229229

230230
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
231231
auto factory = MakeFmrJobFactory(settings);
232-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
232+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
233233
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
234234
worker->Start();
235235

@@ -259,11 +259,11 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
259259
return ETaskStatus::Completed;
260260
}
261261
}
262-
return ETaskStatus::Aborted;
262+
return ETaskStatus::Failed;
263263
};
264264
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
265265
auto factory = MakeFmrJobFactory(settings);
266-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
266+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
267267
TFmrWorkerProxy workerProxy(coordinator, factory, workerSettings);
268268

269269
workerProxy.Start();
@@ -290,12 +290,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
290290
Sleep(TDuration::Seconds(2));
291291
throw std::runtime_error{"Function crashed"};
292292
}
293-
return ETaskStatus::Aborted;
293+
return ETaskStatus::Failed;
294294
};
295295

296296
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func};
297297
auto factory = MakeFmrJobFactory(settings);
298-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
298+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
299299
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
300300
worker->Start();
301301
Sleep(TDuration::Seconds(4));

yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ struct TOperationInfo {
2121
};
2222

2323
struct TIdempotencyKeyInfo {
24-
TString operationId;
24+
TString OperationId;
2525
TInstant OperationCreationTime;
2626
};
2727

@@ -47,13 +47,13 @@ class TFmrCoordinator: public IFmrCoordinator {
4747
TGuard<TMutex> guard(Mutex_);
4848
TMaybe<TString> IdempotencyKey = request.IdempotencyKey;
4949
if (IdempotencyKey && IdempotencyKeys_.contains(*IdempotencyKey)) {
50-
auto operationId = IdempotencyKeys_[*IdempotencyKey].operationId;
50+
auto operationId = IdempotencyKeys_[*IdempotencyKey].OperationId;
5151
auto& operationInfo = Operations_[operationId];
5252
return NThreading::MakeFuture(TStartOperationResponse(operationInfo.OperationStatus, operationId));
5353
}
5454
auto operationId = GenerateId();
5555
if (IdempotencyKey) {
56-
IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.operationId = operationId, .OperationCreationTime=TInstant::Now()};
56+
IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.OperationId = operationId, .OperationCreationTime=TInstant::Now()};
5757
}
5858

5959
TString taskId = GenerateId();
@@ -107,17 +107,16 @@ class TFmrCoordinator: public IFmrCoordinator {
107107
TGuard<TMutex> guard(Mutex_);
108108

109109
ui32 workerId = request.WorkerId;
110-
YQL_ENSURE(workerId >= 1 && workerId <= WorkersNum_);
111-
if (! workerToVolatileId_.contains(workerId)) {
112-
workerToVolatileId_[workerId] = request.VolatileId;
113-
} else if (request.VolatileId != workerToVolatileId_[workerId]) {
114-
workerToVolatileId_[workerId] = request.VolatileId;
110+
YQL_ENSURE(workerId >= 0 && workerId < WorkersNum_);
111+
if (! WorkerToVolatileId_.contains(workerId)) {
112+
WorkerToVolatileId_[workerId] = request.VolatileId;
113+
} else if (request.VolatileId != WorkerToVolatileId_[workerId]) {
114+
WorkerToVolatileId_[workerId] = request.VolatileId;
115115
for (auto& [taskId, taskInfo]: Tasks_) {
116116
auto taskStatus = Tasks_[taskId].TaskStatus;
117117
auto operationId = Tasks_[taskId].OperationId;
118118
if (taskStatus == ETaskStatus::InProgress) {
119119
TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel
120-
TString sessionId = Operations_[operationId].SessionId;
121120
TFmrError error{
122121
.Component = EFmrComponent::Coordinator, .ErrorMessage = "Max retries limit exceeded", .OperationId = operationId};
123122
SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed, error);
@@ -145,7 +144,7 @@ class TFmrCoordinator: public IFmrCoordinator {
145144
}
146145

147146
for (auto& taskId: TaskToDeleteIds_) {
148-
SetUnfinishedTaskStatus(taskId, ETaskStatus::Aborted);
147+
SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed);
149148
}
150149
return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_});
151150
}
@@ -159,7 +158,7 @@ class TFmrCoordinator: public IFmrCoordinator {
159158
auto currentTime = TInstant::Now();
160159
for (auto it = IdempotencyKeys_.begin(); it != IdempotencyKeys_.end();) {
161160
auto operationCreationTime = it->second.OperationCreationTime;
162-
auto operationId = it->second.operationId;
161+
auto operationId = it->second.OperationId;
163162
if (currentTime - operationCreationTime > IdempotencyKeyStoreTime_) {
164163
it = IdempotencyKeys_.erase(it);
165164
if (Operations_.contains(operationId)) {
@@ -229,7 +228,7 @@ class TFmrCoordinator: public IFmrCoordinator {
229228

230229
TMutex Mutex_;
231230
const ui32 WorkersNum_;
232-
std::unordered_map<ui32, TString> workerToVolatileId_; // worker id -> volatile id
231+
std::unordered_map<ui32, TString> WorkerToVolatileId_; // worker id -> volatile id
233232
const TIntrusivePtr<IRandomProvider> RandomProvider_;
234233
std::thread ClearIdempotencyKeysThread_;
235234
std::atomic<bool> StopCoordinator_;

yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) {
1818
*operationResults = "operation_result";
1919
return ETaskStatus::Completed;
2020
}
21-
return ETaskStatus::Aborted;
21+
return ETaskStatus::Failed;
2222
};
2323
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
2424
auto factory = MakeFmrJobFactory(settings);
@@ -44,7 +44,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) {
4444
return ETaskStatus::Completed;
4545
}
4646
}
47-
return ETaskStatus::Aborted;
47+
return ETaskStatus::Failed;
4848
};
4949
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
5050

@@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) {
5656
cancelFlag->store(true);
5757
auto taskResult = futureTaskStatus.GetValueSync();
5858
ETaskStatus taskStatus = taskResult->TaskStatus;
59-
UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Aborted);
59+
UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Failed);
6060
UNIT_ASSERT_NO_DIFF(*operationResults, "computing_result");
6161
}
6262
}

yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@ class TFmrJobFactory: public IFmrJobFactory {
4040
return future;
4141
}
4242

43-
private:
44-
void Start() {
43+
void Start() override {
4544
ThreadPool_ = CreateThreadPool(NumThreads_);
4645
}
4746

48-
void Stop() {
47+
void Stop() override {
4948
ThreadPool_->Stop();
5049
}
5150

yt/yql/providers/yt/fmr/job_factory/interface/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SRCS(
66

77
PEERDIR(
88
library/cpp/threading/future
9+
yql/essentials/utils
910
)
1011

1112
YQL_LAST_ABI_VERSION()

yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
#include <library/cpp/threading/future/core/future.h>
44
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
5+
#include <yql/essentials/utils/runnable.h>
56

67
namespace NYql::NFmr {
78

8-
class IFmrJobFactory: public TThrRefBase {
9+
class IFmrJobFactory: public IRunnable {
910
public:
1011
using TPtr = TIntrusivePtr<IFmrJobFactory>;
1112

yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ enum class ETaskStatus {
2121
Accepted,
2222
InProgress,
2323
Failed,
24-
Completed,
25-
Aborted
24+
Completed
2625
};
2726

2827
enum class ETaskType {

yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
2828
*operationResults = "operation_result";
2929
return ETaskStatus::Completed;
3030
}
31-
return ETaskStatus::Aborted;
31+
return ETaskStatus::Failed;
3232
};
3333
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func};
3434

3535
auto factory = MakeFmrJobFactory(settings);
36-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
36+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
3737
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
3838
worker->Start();
3939
coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
@@ -56,11 +56,11 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
5656
}
5757
}
5858
*operationResults = "operation_cancelled";
59-
return ETaskStatus::Aborted;
59+
return ETaskStatus::Failed;
6060
};
6161
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
6262
auto factory = MakeFmrJobFactory(settings);
63-
TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
63+
TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
6464
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
6565
worker->Start();
6666
auto operationId = coordinator->StartOperation(CreateOperationRequest()).GetValueSync().OperationId;
@@ -82,12 +82,12 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
8282
(*operationResult)++;
8383
return ETaskStatus::Completed;
8484
}
85-
return ETaskStatus::Aborted;
85+
return ETaskStatus::Failed;
8686
};
8787
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
8888
auto factory = MakeFmrJobFactory(settings);
89-
TFmrWorkerSettings firstWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
90-
TFmrWorkerSettings secondWorkerSettings{.WorkerId = 2, .RandomProvider = CreateDeterministicRandomProvider(2)};
89+
TFmrWorkerSettings firstWorkerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
90+
TFmrWorkerSettings secondWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(2)};
9191
auto firstWorker = MakeFmrWorker(coordinator, factory, firstWorkerSettings);
9292
auto secondWorker = MakeFmrWorker(coordinator, factory, secondWorkerSettings);
9393
firstWorker->Start();

yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#include <library/cpp/threading/future/wait/wait.h>
22
#include <thread>
3-
#include <ranges>
43
#include <util/system/mutex.h>
54
#include <yql/essentials/utils/yql_panic.h>
65
#include "yql_yt_worker_impl.h"
@@ -12,15 +11,14 @@ namespace {
1211
struct TFmrWorkerState {
1312
TMutex Mutex;
1413
std::unordered_map<TString, TTaskResult::TPtr> TaskStatuses;
15-
std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>> TaskFutures;
1614
};
1715

1816
class TFmrWorker: public IFmrWorker {
1917
public:
2018
TFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings)
2119
: Coordinator_(coordinator),
2220
JobFactory_(jobFactory),
23-
WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{}, std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>>{})),
21+
WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{})),
2422
StopWorker_(false),
2523
RandomProvider_(settings.RandomProvider),
2624
WorkerId_(settings.WorkerId),
@@ -86,12 +84,9 @@ class TFmrWorker: public IFmrWorker {
8684
with_lock(state->Mutex) {
8785
YQL_ENSURE(state->TaskStatuses.contains(task->TaskId));
8886
state->TaskStatuses[task->TaskId] = finalTaskStatus;
89-
state->TaskFutures.erase(task->TaskId);
9087
}
9188
}
9289
});
93-
YQL_ENSURE(!WorkerState_->TaskFutures.contains(taskId));
94-
WorkerState_->TaskFutures[taskId] = future;
9590
}
9691
}
9792
Sleep(TimeToSleepBetweenRequests_);
@@ -107,10 +102,8 @@ class TFmrWorker: public IFmrWorker {
107102
taskInfo.second->store(true);
108103
}
109104
StopWorker_ = true;
110-
auto futuresView = std::views::values(WorkerState_->TaskFutures);
111-
taskFutures = std::vector<NThreading::TFuture<TTaskResult::TPtr>>{futuresView.begin(), futuresView.end()};
112105
}
113-
NThreading::WaitAll(taskFutures).GetValueSync();
106+
JobFactory_->Stop();
114107
if (MainThread_.joinable()) {
115108
MainThread_.join();
116109
}

0 commit comments

Comments
 (0)