Skip to content

Commit 6e06c44

Browse files
authored
Change PublishToBatch() to use sync repo. (#1047)
write::VersionedLayerClient::PublishToBatch use async approach. We need to make it similar to dataservice::read clients. Making it sync will simplify the code. Also added unit and integration tests to improve coverage. Resolves: OLPEDGE-1404 Signed-off-by: Kostiantyn Zvieriev <ext-kostiantyn.zvieriev@here.com>
1 parent 08ae991 commit 6e06c44

File tree

8 files changed

+1330
-188
lines changed

8 files changed

+1330
-188
lines changed

olp-cpp-sdk-dataservice-write/src/CatalogSettings.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ CatalogSettings::CatalogSettings(const client::HRN catalog,
4848
}
4949

5050
CatalogSettings::LayerSettingsResult CatalogSettings::GetLayerSettingsFromModel(
51-
const model::Catalog& catalog, const std::string& layer_id) {
51+
const model::Catalog& catalog, const std::string& layer_id) const {
5252
const auto& layers = catalog.GetLayers();
5353

5454
auto layer_it = std::find_if(
@@ -70,7 +70,7 @@ CatalogSettings::LayerSettingsResult CatalogSettings::GetLayerSettingsFromModel(
7070

7171
CatalogSettings::LayerSettingsResult CatalogSettings::GetLayerSettings(
7272
client::CancellationContext context, BillingTag billing_tag,
73-
const std::string& layer_id) {
73+
const std::string& layer_id) const {
7474
const auto catalog_settings_key = catalog_.ToString() + "::catalog";
7575

7676
if (!cache_->Contains(catalog_settings_key)) {

olp-cpp-sdk-dataservice-write/src/CatalogSettings.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ class CatalogSettings {
5252

5353
LayerSettingsResult GetLayerSettings(client::CancellationContext context,
5454
BillingTag billing_tag,
55-
const std::string& layer_id);
55+
const std::string& layer_id) const;
5656

5757
private:
58-
LayerSettingsResult GetLayerSettingsFromModel(const model::Catalog& catalog,
59-
const std::string& layer_id);
58+
LayerSettingsResult GetLayerSettingsFromModel(
59+
const model::Catalog& catalog, const std::string& layer_id) const;
6060

6161
client::HRN catalog_;
6262
std::shared_ptr<cache::KeyValueCache> cache_;

olp-cpp-sdk-dataservice-write/src/VersionedLayerClientImpl.cpp

Lines changed: 78 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -465,195 +465,103 @@ olp::client::CancellationToken VersionedLayerClientImpl::PublishToBatch(
465465
const model::Publication& pub,
466466
const model::PublishPartitionDataRequest& request,
467467
PublishPartitionDataCallback callback) {
468-
std::string publication_id = pub.GetId().value_or("");
469-
if (publication_id.empty()) {
470-
callback(client::ApiError(client::ErrorCode::InvalidArgument,
471-
"Invalid publication", true));
472-
return {};
473-
}
468+
auto publish_task =
469+
[=](client::CancellationContext context) -> PublishPartitionDataResponse {
470+
if (!pub.GetId()) {
471+
return {{client::ErrorCode::InvalidArgument,
472+
"Invalid publication: publication ID missing", true}};
473+
}
474+
const auto& publication_id = pub.GetId().get();
474475

475-
std::string layer_id = request.GetLayerId();
476-
if (layer_id.empty()) {
477-
callback(client::ApiError(client::ErrorCode::InvalidArgument,
478-
"Invalid request", true));
479-
return {};
480-
}
476+
const auto& layer_id = request.GetLayerId();
477+
if (layer_id.empty()) {
478+
return {{client::ErrorCode::InvalidArgument,
479+
"Invalid publication: layer ID missing", true}};
480+
}
481481

482-
const auto data_handle = GenerateUuid();
483-
std::shared_ptr<model::PublishPartition> partition =
484-
std::make_shared<model::PublishPartition>();
485-
partition->SetPartition(request.GetPartitionId().value_or(""));
486-
partition->SetData(request.GetData());
487-
partition->SetDataHandle(data_handle);
482+
const auto data_handle = GenerateUuid();
483+
model::PublishPartition partition;
484+
partition.SetPartition(request.GetPartitionId().value_or(""));
485+
partition.SetData(request.GetData());
486+
partition.SetDataHandle(data_handle);
488487

489-
auto self = shared_from_this();
490-
auto cancel_context = std::make_shared<client::CancellationContext>();
491-
auto id = tokenList_.GetNextId();
492-
auto cancel_function = [=]() {
493-
self->tokenList_.RemoveTask(id);
494-
callback(client::ApiError(client::ErrorCode::Cancelled,
495-
"Operation cancelled.", true));
496-
};
497-
498-
auto uploadPartition_callback = [=](UploadPartitionResponse response) {
499-
self->tokenList_.RemoveTask(id);
500-
if (!response.IsSuccessful()) {
501-
callback(std::move(response.GetError()));
502-
} else {
503-
model::ResponseOkSingle res;
504-
res.SetTraceID(partition->GetPartition().value_or(""));
505-
callback(std::move(res));
488+
auto layer_settings_response = catalog_settings_.GetLayerSettings(
489+
context, request.GetBillingTag(), layer_id);
490+
if (!layer_settings_response.IsSuccessful()) {
491+
return layer_settings_response.GetError();
506492
}
507-
};
508493

509-
auto uploadBlob_callback = [=](UploadBlobResponse response) {
510-
if (!response.IsSuccessful()) {
511-
self->tokenList_.RemoveTask(id);
512-
callback(std::move(response.GetError()));
513-
} else {
514-
self->UploadPartition(publication_id, partition, layer_id, cancel_context,
515-
uploadPartition_callback);
494+
auto layer_settings = layer_settings_response.GetResult();
495+
if (layer_settings.content_type.empty()) {
496+
auto errmsg = boost::format(
497+
"Unable to find the Layer ID (%1%) "
498+
"provided in the request in the "
499+
"Catalog specified when creating "
500+
"this VersionedLayerClient instance.") %
501+
layer_id;
502+
return {{client::ErrorCode::InvalidArgument, errmsg.str()}};
516503
}
517-
};
518504

519-
auto catalogModel_callback = [=](boost::optional<client::ApiError> error) {
520-
if (error) {
521-
self->tokenList_.RemoveTask(id);
522-
callback(std::move(*error));
523-
} else {
505+
auto upload_blob_response =
506+
UploadBlob(partition, data_handle, layer_settings.content_type,
507+
layer_settings.content_encoding, layer_id,
508+
request.GetBillingTag(), context);
509+
if (!upload_blob_response.IsSuccessful()) {
510+
return upload_blob_response.GetError();
524511
}
525-
};
526-
527-
cancel_context->ExecuteOrCancelled(
528-
[=]() -> client::CancellationToken {
529-
return self->InitApiClients(
530-
cancel_context, [=](boost::optional<client::ApiError> err) {
531-
if (err) {
532-
callback(err.get());
533-
return;
534-
}
535512

536-
auto layer_settings_response = catalog_settings_.GetLayerSettings(
537-
*cancel_context, request.GetBillingTag(), layer_id);
538-
if (!layer_settings_response.IsSuccessful()) {
539-
callback(layer_settings_response.GetError());
540-
return;
541-
}
542-
auto layer_settings = layer_settings_response.GetResult();
543-
if (layer_settings.content_type.empty()) {
544-
auto errmsg = boost::format(
545-
"Unable to find the Layer ID (%1%) "
546-
"provided in the request in the "
547-
"Catalog specified when creating "
548-
"this VersionedLayerClient instance.") %
549-
layer_id;
550-
callback(client::ApiError(client::ErrorCode::InvalidArgument,
551-
errmsg.str()));
552-
return;
553-
}
513+
auto upload_partition_response =
514+
UploadPartition(publication_id, partition, layer_id, context);
515+
if (!upload_partition_response.IsSuccessful()) {
516+
return upload_partition_response.GetError();
517+
}
554518

555-
self->UploadBlob(publication_id, partition, data_handle,
556-
layer_settings.content_type, layer_settings.content_encoding,
557-
layer_id, request.GetBillingTag(),
558-
cancel_context, uploadBlob_callback);
559-
});
560-
},
561-
cancel_function);
519+
model::ResponseOkSingle res;
520+
res.SetTraceID(partition.GetPartition().value_or(""));
521+
return res;
522+
};
562523

563-
auto token = client::CancellationToken(
564-
[cancel_context]() { cancel_context->CancelOperation(); });
565-
tokenList_.AddTask(id, token);
566-
return token;
524+
return AddTask(settings_.task_scheduler, pending_requests_,
525+
std::move(publish_task), std::move(callback));
567526
}
568527

569-
void VersionedLayerClientImpl::UploadPartition(
570-
std::string publication_id,
571-
std::shared_ptr<model::PublishPartition> partition, std::string layer_id,
572-
std::shared_ptr<client::CancellationContext> cancel_context,
573-
const UploadPartitionCallback& callback) {
574-
auto self = shared_from_this();
575-
auto cancel_function = [callback]() {
576-
callback(client::ApiError(client::ErrorCode::Cancelled,
577-
"Operation cancelled.", true));
578-
};
579-
580-
std::shared_ptr<model::PublishPartition> publishPartition =
581-
std::make_shared<model::PublishPartition>();
582-
publishPartition->SetPartition(partition->GetPartition().value_or(""));
583-
publishPartition->SetDataHandle(partition->GetDataHandle().value_or(""));
528+
UploadPartitionResponse VersionedLayerClientImpl::UploadPartition(
529+
const std::string& publication_id, const model::PublishPartition& partition,
530+
const std::string& layer_id, client::CancellationContext context) {
531+
auto olp_client_response = ApiClientLookup::LookupApiClient(
532+
catalog_, context, "publish", "v2", settings_);
533+
if (!olp_client_response.IsSuccessful()) {
534+
return olp_client_response.GetError();
535+
}
584536

585-
auto uploadPartition_callback = [=](UploadPartitionsResponse response) {
586-
if (!response.IsSuccessful()) {
587-
callback(std::move(response.GetError()));
588-
} else {
589-
callback(response.MoveResult());
590-
}
591-
};
537+
auto publish_client = olp_client_response.MoveResult();
592538

593-
auto uploadPartition_function = [=]() -> client::CancellationToken {
594-
model::PublishPartitions partitions;
595-
partitions.SetPartitions({*publishPartition});
596-
return PublishApi::UploadPartitions(*self->apiclient_publish_, partitions,
597-
publication_id, layer_id, boost::none,
598-
uploadPartition_callback);
599-
};
539+
model::PublishPartition publish_partition;
540+
publish_partition.SetPartition(partition.GetPartition().value_or(""));
541+
publish_partition.SetDataHandle(partition.GetDataHandle().value_or(""));
542+
model::PublishPartitions partitions;
543+
partitions.SetPartitions({publish_partition});
600544

601-
cancel_context->ExecuteOrCancelled(
602-
[=]() -> client::CancellationToken {
603-
return self->InitApiClients(
604-
cancel_context, [=](boost::optional<client::ApiError> err) {
605-
if (err) {
606-
callback(err.get());
607-
return;
608-
}
609-
cancel_context->ExecuteOrCancelled(uploadPartition_function,
610-
cancel_function);
611-
});
612-
},
613-
cancel_function);
545+
return PublishApi::UploadPartitions(publish_client, partitions,
546+
publication_id, layer_id, boost::none,
547+
context);
614548
}
615549

616-
void VersionedLayerClientImpl::UploadBlob(
617-
std::string /*publication_id*/,
618-
std::shared_ptr<model::PublishPartition> partition, std::string data_handle,
619-
std::string content_type, std::string content_encoding,
620-
std::string layer_id, BillingTag billing_tag,
621-
std::shared_ptr<client::CancellationContext> cancel_context,
622-
const UploadBlobCallback& callback) {
623-
auto self = shared_from_this();
624-
auto cancel_function = [callback]() {
625-
callback(client::ApiError(client::ErrorCode::Cancelled,
626-
"Operation cancelled.", true));
627-
};
628-
629-
auto uploadBlob_callback = [=](UploadBlobResponse response) {
630-
if (!response.IsSuccessful()) {
631-
callback(std::move(response.GetError()));
632-
} else {
633-
callback(response.MoveResult());
634-
}
635-
};
636-
637-
auto uploadBlob_function = [=]() -> client::CancellationToken {
638-
return BlobApi::PutBlob(*self->apiclient_blob_, layer_id, content_type,
639-
content_encoding, data_handle, partition->GetData(),
640-
billing_tag, uploadBlob_callback);
641-
};
550+
UploadBlobResponse VersionedLayerClientImpl::UploadBlob(
551+
const model::PublishPartition& partition, const std::string& data_handle,
552+
const std::string& content_type, const std::string& content_encoding,
553+
const std::string& layer_id, BillingTag billing_tag,
554+
client::CancellationContext context) {
555+
auto olp_client_response = ApiClientLookup::LookupApiClient(
556+
catalog_, context, "blob", "v1", settings_);
557+
if (!olp_client_response.IsSuccessful()) {
558+
return olp_client_response.GetError();
559+
}
642560

643-
cancel_context->ExecuteOrCancelled(
644-
[=]() -> client::CancellationToken {
645-
return self->InitApiClients(cancel_context,
646-
[=](boost::optional<client::ApiError> err) {
647-
if (err) {
648-
callback(err.get());
649-
return;
650-
}
651-
652-
cancel_context->ExecuteOrCancelled(
653-
uploadBlob_function, cancel_function);
654-
});
655-
},
656-
cancel_function);
561+
auto blob_client = olp_client_response.MoveResult();
562+
return BlobApi::PutBlob(blob_client, layer_id, content_type, content_encoding,
563+
data_handle, partition.GetData(), billing_tag,
564+
context);
657565
}
658566

659567
client::CancellableFuture<CheckDataExistsResponse>

olp-cpp-sdk-dataservice-write/src/VersionedLayerClientImpl.h

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -115,19 +115,18 @@ class VersionedLayerClientImpl
115115
std::shared_ptr<client::CancellationContext> cancel_context,
116116
InitApiClientsCallback callback);
117117

118-
void UploadBlob(std::string publication_id,
119-
std::shared_ptr<model::PublishPartition> partition,
120-
std::string data_handle, std::string content_type,
121-
std::string content_encoding, std::string layer_id,
122-
BillingTag billing_tag,
123-
std::shared_ptr<client::CancellationContext> cancel_context,
124-
const UploadBlobCallback& callback);
125-
126-
void UploadPartition(
127-
std::string publication_id,
128-
std::shared_ptr<model::PublishPartition> partition, std::string layer_id,
129-
std::shared_ptr<client::CancellationContext> cancel_context,
130-
const UploadPartitionCallback& callback);
118+
UploadBlobResponse UploadBlob(const model::PublishPartition& partition,
119+
const std::string& data_handle,
120+
const std::string& content_type,
121+
const std::string& content_encoding,
122+
const std::string& layer_id,
123+
BillingTag billing_tag,
124+
client::CancellationContext context);
125+
126+
UploadPartitionResponse UploadPartition(
127+
const std::string& publication_id,
128+
const model::PublishPartition& partition, const std::string& layer_id,
129+
client::CancellationContext context);
131130

132131
client::HRN catalog_;
133132
client::OlpClientSettings settings_;

olp-cpp-sdk-dataservice-write/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(OLP_SDK_DATASERVICE_WRITE_TEST_SOURCES
2323
StartBatchRequestTest.cpp
2424
StreamLayerClientImplTest.cpp
2525
TimeUtilsTest.cpp
26+
VersionedLayerClientImplPublishToBatchTest.cpp
2627
VersionedLayerClientImplTest.cpp
2728
)
2829

0 commit comments

Comments
 (0)