Skip to content

Commit 08ae991

Browse files
author
Liubov Didkivska
authored
Change PrefetchHelper to template class (#1045)
Refactor PrefetchHelper, QueryMetadataJob, DownloadItemsJob to work with different root types. With PrefetchHelper we can start query data handles for different types. User should only specify such functions as 'query' - to request metadata needed to download data, 'filter' - to manage results received after query, 'download' - download data. Relates-To: OLPEDGE-1900 Signed-off-by: Liubov Didkivska <ext-liubov.didkivska@here.com>
1 parent ee555b0 commit 08ae991

File tree

7 files changed

+427
-389
lines changed

7 files changed

+427
-389
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright (C) 2020 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#pragma once
21+
22+
#include <olp/core/client/CancellationContext.h>
23+
#include <olp/core/logging/Log.h>
24+
#include <olp/dataservice/read/Types.h>
25+
26+
#include "Common.h"
27+
#include "ExtendedApiResponse.h"
28+
#include "ExtendedApiResponseHelpers.h"
29+
30+
namespace olp {
31+
namespace dataservice {
32+
namespace read {
33+
34+
using ExtendedDataResponse = ExtendedApiResponse<model::Data, client::ApiError,
35+
client::NetworkStatistics>;
36+
37+
// Prototype of function used to download data using data handle.
38+
using DownloadFunc = std::function<ExtendedDataResponse(
39+
std::string, client::CancellationContext)>;
40+
41+
template <typename ItemType, typename PrefetchResult>
42+
using AppendResultFunc =
43+
std::function<void(ExtendedDataResponse response, ItemType item,
44+
PrefetchResult& prefetch_result)>;
45+
46+
template <typename ItemType, typename PrefetchResult>
47+
class DownloadItemsJob {
48+
public:
49+
DownloadItemsJob(DownloadFunc download,
50+
AppendResultFunc<ItemType, PrefetchResult> append_result,
51+
Callback<PrefetchResult> user_callback,
52+
PrefetchStatusCallback status_callback)
53+
: download_(std::move(download)),
54+
append_result_(std::move(append_result)),
55+
user_callback_(std::move(user_callback)),
56+
status_callback_(std::move(status_callback)) {}
57+
58+
void Initialize(size_t items_count, client::NetworkStatistics statistics) {
59+
download_task_count_ = total_download_task_count_ = items_count;
60+
accumulated_statistics_ = statistics;
61+
}
62+
63+
ExtendedDataResponse Download(const std::string& data_handle,
64+
client::CancellationContext context) {
65+
return download_(data_handle, context);
66+
}
67+
size_t GetAccumulatedBytes(const olp::client::NetworkStatistics& statistics) {
68+
// This narrow cast is necessary to avoid narrowing compiler errors like
69+
// -Wc++11-narrowing when building for 32bit targets.
70+
const auto bytes_transferred =
71+
statistics.GetBytesDownloaded() + statistics.GetBytesUploaded();
72+
return static_cast<size_t>(bytes_transferred &
73+
std::numeric_limits<size_t>::max());
74+
}
75+
76+
void CompleteItem(ItemType item, ExtendedDataResponse response) {
77+
std::lock_guard<std::mutex> lock(mutex_);
78+
accumulated_statistics_ += GetNetworkStatistics(response);
79+
80+
if (response.IsSuccessful()) {
81+
requests_succeeded_++;
82+
} else {
83+
requests_failed_++;
84+
}
85+
86+
append_result_(response, item, prefetch_result_);
87+
88+
if (status_callback_) {
89+
status_callback_(PrefetchStatus{
90+
requests_succeeded_ + requests_failed_, total_download_task_count_,
91+
GetAccumulatedBytes(accumulated_statistics_)});
92+
}
93+
94+
if (!--download_task_count_) {
95+
OLP_SDK_LOG_DEBUG_F("DownloadItemsJob",
96+
"Download complete, succeeded=%zu, failed=%zu",
97+
requests_succeeded_, requests_failed_);
98+
99+
OnPrefetchCompleted(std::move(prefetch_result_));
100+
}
101+
}
102+
103+
void OnPrefetchCompleted(Response<PrefetchResult> result) {
104+
auto prefetch_callback = std::move(user_callback_);
105+
prefetch_callback(std::move(result));
106+
}
107+
108+
private:
109+
DownloadFunc download_;
110+
AppendResultFunc<ItemType, PrefetchResult> append_result_;
111+
Callback<PrefetchResult> user_callback_;
112+
PrefetchStatusCallback status_callback_;
113+
size_t download_task_count_{0};
114+
size_t total_download_task_count_{0};
115+
size_t requests_succeeded_{0};
116+
size_t requests_failed_{0};
117+
client::NetworkStatistics accumulated_statistics_;
118+
PrefetchResult prefetch_result_;
119+
std::mutex mutex_;
120+
};
121+
122+
} // namespace read
123+
} // namespace dataservice
124+
} // namespace olp
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright (C) 2020 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#pragma once
21+
22+
#include <olp/core/client/CancellationContext.h>
23+
#include <olp/core/client/PendingRequests.h>
24+
#include <olp/core/logging/Log.h>
25+
#include <olp/dataservice/read/Types.h>
26+
27+
#include "Common.h"
28+
#include "DownloadItemsJob.h"
29+
#include "ExtendedApiResponse.h"
30+
#include "ExtendedApiResponseHelpers.h"
31+
#include "QueryMetadataJob.h"
32+
33+
namespace olp {
34+
namespace dataservice {
35+
namespace read {
36+
37+
template <typename PrefetchItemsResult>
38+
using PrefetchItemsResponseCallback = Callback<PrefetchItemsResult>;
39+
40+
class PrefetchHelper {
41+
public:
42+
template <typename ItemType, typename QueryType, typename PrefetchResult>
43+
static client::CancellationToken Prefetch(
44+
const std::vector<QueryType>& roots,
45+
QueryItemsFunc<ItemType, QueryType> query,
46+
FilterItemsFunc<ItemType> filter, DownloadFunc download,
47+
AppendResultFunc<ItemType, PrefetchResult> append_result,
48+
Callback<PrefetchResult> user_callback,
49+
PrefetchStatusCallback status_callback,
50+
std::shared_ptr<thread::TaskScheduler> task_scheduler,
51+
std::shared_ptr<client::PendingRequests> pending_requests) {
52+
client::CancellationContext execution_context;
53+
54+
auto download_job =
55+
std::make_shared<DownloadItemsJob<ItemType, PrefetchResult>>(
56+
std::move(download), std::move(append_result),
57+
std::move(user_callback), std::move(status_callback));
58+
59+
auto query_job =
60+
std::make_shared<QueryMetadataJob<ItemType, QueryType, PrefetchResult>>(
61+
std::move(query), std::move(filter), download_job, task_scheduler,
62+
pending_requests, execution_context);
63+
64+
query_job->Initialize(roots.size());
65+
66+
OLP_SDK_LOG_DEBUG_F("PrefetchJob", "Starting queries, requests=%zu",
67+
roots.size());
68+
69+
execution_context.ExecuteOrCancelled([&]() {
70+
VectorOfTokens tokens;
71+
std::transform(std::begin(roots), std::end(roots),
72+
std::back_inserter(tokens), [&](QueryType root) {
73+
return AddTask(
74+
task_scheduler, pending_requests,
75+
[=](client::CancellationContext context) {
76+
return query_job->Query(root, context);
77+
},
78+
[=](QueryItemsResponse<ItemType> response) {
79+
query_job->CompleteQuery(std::move(response));
80+
});
81+
});
82+
return CreateToken(std::move(tokens));
83+
});
84+
85+
return client::CancellationToken(
86+
[execution_context]() mutable { execution_context.CancelOperation(); });
87+
}
88+
};
89+
90+
} // namespace read
91+
} // namespace dataservice
92+
} // namespace olp

0 commit comments

Comments
 (0)