Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMake/resolve_dependency_modules/clp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ include_guard(GLOBAL)
FetchContent_Declare(
clp
GIT_REPOSITORY https://github.com/y-scope/clp.git
GIT_TAG 0bfaf5caec80d17527a2e91ba21fdede88b44f19)
GIT_TAG 581bd46198a97a89b174849851cc3f1f2100e466)

set(CLP_BUILD_CLP_REGEX_UTILS
OFF
Expand Down
1 change: 0 additions & 1 deletion velox/connectors/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#set(CLP_SRC_DIR ${clp_SOURCE_DIR}/components/core/src)
add_subdirectory(search_lib)

velox_add_library(
Expand Down
33 changes: 33 additions & 0 deletions velox/connectors/clp/ClpConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@

#include <boost/algorithm/string.hpp>

#include "velox/common/base/Exceptions.h"
#include "velox/common/config/Config.h"
#include "velox/connectors/clp/ClpConfig.h"
#include "velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.h"

namespace facebook::velox::connector::clp {

namespace {

ClpConfig::S3AuthProvider stringToS3AuthProvider(const std::string& strValue) {
auto upperValue = boost::algorithm::to_upper_copy(strValue);
VELOX_CHECK(!upperValue.empty());
if (upperValue == "CLP_PACKAGE") {
return ClpConfig::S3AuthProvider::kClpPackage;
}
VELOX_UNSUPPORTED("Unsupported s3 auth provider type: {}.", strValue);
}

ClpConfig::StorageType stringToStorageType(const std::string& strValue) {
auto upperValue = boost::algorithm::to_upper_copy(strValue);
if (upperValue == "FS") {
Expand All @@ -35,6 +47,27 @@ ClpConfig::StorageType stringToStorageType(const std::string& strValue) {

} // namespace

ClpConfig::ClpConfig(std::shared_ptr<const config::ConfigBase> config) {
VELOX_CHECK_NOT_NULL(config, "Config is null for CLP initialization");
config_ = std::move(config);

// Set up S3 environment variables needed by CLP using configured auth
// provider
switch (
stringToS3AuthProvider(config_->get<std::string>(kAuthProvider, ""))) {
case ClpConfig::S3AuthProvider::kClpPackage:
s3AuthProvider_ = std::make_shared<ClpPackageS3AuthProvider>(config_);
break;
default:
VELOX_FAIL();
}
VELOX_CHECK(s3AuthProvider_->exportAuthEnvironmentVariables());
}

std::shared_ptr<ClpS3AuthProviderBase> ClpConfig::s3AuthProvider() const {
return s3AuthProvider_;
}

ClpConfig::StorageType ClpConfig::storageType() const {
return stringToStorageType(config_->get<std::string>(kStorageType, "FS"));
}
Expand Down
18 changes: 11 additions & 7 deletions velox/connectors/clp/ClpConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,41 @@

#pragma once

#include "velox/common/config/Config.h"

namespace facebook::velox::config {
class ConfigBase;
}
} // namespace facebook::velox::config

namespace facebook::velox::connector::clp {

class ClpS3AuthProviderBase;

class ClpConfig {
public:
enum class S3AuthProvider {
kClpPackage,
};

enum class StorageType {
kFs,
kS3,
};

static constexpr const char* kAuthProvider = "clp.s3-auth-provider";
static constexpr const char* kStorageType = "clp.storage-type";

explicit ClpConfig(std::shared_ptr<const config::ConfigBase> config) {
VELOX_CHECK_NOT_NULL(config, "Config is null for CLP initialization");
config_ = std::move(config);
}
explicit ClpConfig(std::shared_ptr<const config::ConfigBase> config);

[[nodiscard]] const std::shared_ptr<const config::ConfigBase>& config()
const {
return config_;
}

StorageType storageType() const;
std::shared_ptr<ClpS3AuthProviderBase> s3AuthProvider() const;

private:
std::shared_ptr<const config::ConfigBase> config_;
std::shared_ptr<ClpS3AuthProviderBase> s3AuthProvider_;
};

} // namespace facebook::velox::connector::clp
6 changes: 5 additions & 1 deletion velox/connectors/clp/ClpDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "velox/connectors/clp/ClpColumnHandle.h"
#include "velox/connectors/clp/ClpConnectorSplit.h"
#include "velox/connectors/clp/ClpDataSource.h"

#include "search_lib/ClpS3AuthProviderBase.h"
#include "velox/connectors/clp/ClpTableHandle.h"
#include "velox/connectors/clp/search_lib/ClpCursor.h"
#include "velox/connectors/clp/search_lib/ClpVectorLoader.h"
Expand All @@ -37,6 +39,7 @@ ClpDataSource::ClpDataSource(
: pool_(pool), outputType_(outputType) {
auto clpTableHandle = std::dynamic_pointer_cast<ClpTableHandle>(tableHandle);
storageType_ = clpConfig->storageType();
s3AuthProvider_ = clpConfig->s3AuthProvider();

for (const auto& outputName : outputType->names()) {
auto columnHandle = columnHandles.find(outputName);
Expand Down Expand Up @@ -106,7 +109,8 @@ void ClpDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
clp_s::InputSource::Filesystem, clpSplit->path_);
} else if (storageType_ == ClpConfig::StorageType::kS3) {
cursor_ = std::make_unique<search_lib::ClpCursor>(
clp_s::InputSource::Network, clpSplit->path_);
clp_s::InputSource::Network,
s3AuthProvider_->constructS3Url(clpSplit->path_));
}

auto pushDownQuery = clpSplit->kqlQuery_;
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/clp/ClpDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class BaseColumnReader;

namespace facebook::velox::connector::clp {

class ClpS3AuthProviderBase;

class ClpDataSource : public DataSource {
public:
ClpDataSource(
Expand Down Expand Up @@ -101,6 +103,7 @@ class ClpDataSource : public DataSource {
std::vector<search_lib::Field> fields_;

std::unique_ptr<search_lib::ClpCursor> cursor_;
std::shared_ptr<ClpS3AuthProviderBase> s3AuthProvider_;
};

} // namespace facebook::velox::connector::clp
4 changes: 4 additions & 0 deletions velox/connectors/clp/search_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ velox_add_library(
STATIC
ClpCursor.cpp
ClpCursor.h
ClpPackageS3AuthProvider.cpp
ClpPackageS3AuthProvider.h
ClpQueryRunner.cpp
ClpQueryRunner.h
ClpS3AuthProviderBase.cpp
ClpS3AuthProviderBase.h
ClpVectorLoader.cpp
ClpVectorLoader.h)

Expand Down
56 changes: 56 additions & 0 deletions velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/config/Config.h"

namespace facebook::velox::connector::clp {

std::string ClpPackageS3AuthProvider::constructS3Url(
std::string_view splitPath) {
VELOX_CHECK(!splitPath.empty(), "splitPath cannot be empty");
return fmt::format("{}/{}", this->endPoint_, splitPath);
}

bool ClpPackageS3AuthProvider::exportAuthEnvironmentVariables() {
this->endPoint_ = config_->get<std::string>(kEndPoint, "");
VELOX_CHECK(
!this->endPoint_.empty(), fmt::format("{} cannot be empty", kEndPoint));
if ('/' == this->endPoint_.back()) {
this->endPoint_.pop_back();
}

auto accessKeyId = config_->get<std::string>(kAccessKeyId, "");
auto secretAccessKey = config_->get<std::string>(kSecretAccessKey, "");
auto sessionToken = config_->get<std::string>(kSessionToken, "");
VELOX_CHECK(
!accessKeyId.empty(), fmt::format("{} cannot be empty", kAccessKeyId));
VELOX_CHECK(
!secretAccessKey.empty(),
fmt::format("{} cannot be empty", kSecretAccessKey));
setEnvironmentVariable(kEnvAwsAccessKeyId, accessKeyId);
setEnvironmentVariable(kEnvAwsSecretAccessKey, secretAccessKey);
if (!sessionToken.empty()) {
setEnvironmentVariable(kEnvAwsSessionToken, sessionToken);
} else {
unsetEnvironmentVariable(kEnvAwsSessionToken);
}

return true;
}

} // namespace facebook::velox::connector::clp
50 changes: 50 additions & 0 deletions velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/connectors/clp/search_lib/ClpS3AuthProviderBase.h"

namespace facebook::velox::config {
class ConfigBase;
} // namespace facebook::velox::config

namespace facebook::velox::connector::clp {

class ClpPackageS3AuthProvider : public ClpS3AuthProviderBase {
public:
explicit ClpPackageS3AuthProvider(
std::shared_ptr<const config::ConfigBase> config)
: ClpS3AuthProviderBase(config) {}

static constexpr const char* kAccessKeyId = "clp.s3-access-key-id";
static constexpr const char* kEndPoint = "clp.s3-end-point";
static constexpr const char* kSecretAccessKey = "clp.s3-secret-access-key";
static constexpr const char* kSessionToken = "clp.s3-session-token";

static constexpr const char* kEnvAwsAccessKeyId = "AWS_ACCESS_KEY_ID";
static constexpr const char* kEnvAwsSecretAccessKey = "AWS_SECRET_ACCESS_KEY";
static constexpr const char* kEnvAwsSessionToken = "AWS_SESSION_TOKEN";

std::string constructS3Url(std::string_view splitPath) override;

bool exportAuthEnvironmentVariables() override;

private:
std::string endPoint_;
};

} // namespace facebook::velox::connector::clp
73 changes: 73 additions & 0 deletions velox/connectors/clp/search_lib/ClpS3AuthProviderBase.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cstdlib>
#include <cstring>

#include "velox/common/base/Exceptions.h"
#include "velox/connectors/clp/search_lib/ClpS3AuthProviderBase.h"

namespace facebook::velox::config {
class ConfigBase;
} // namespace facebook::velox::config

namespace facebook::velox::connector::clp {

void ClpS3AuthProviderBase::setEnvironmentVariable(
std::string_view key,
std::string_view value) {
int err{0};
const auto keyStr = std::string(key);
const auto* keyCStr = keyStr.c_str();
const auto valueStr = std::string(value);
const auto* valueCStr = valueStr.c_str();
#ifdef _WIN32
// Windows version
err = _putenv_s(keyCStr, valueCStr);
#elif defined(__unix__) || defined(__APPLE__)
// Unix/macOS version
err = setenv(keyCStr, valueCStr, 1);
#else
VELOX_UNSUPPORTED("Unsupported OS");
#endif
VELOX_CHECK_EQ(0, err);

// Sanity check
auto valueForCheck = std::getenv(keyCStr);
VELOX_CHECK_EQ(0, std::strcmp(valueCStr, valueForCheck));
}

void ClpS3AuthProviderBase::unsetEnvironmentVariable(std::string_view key) {
int err{0};
const auto keyStr = std::string(key);
const auto* keyCStr = keyStr.c_str();
#ifdef _WIN32
// Windows version
err = _putenv(fmt::format("{}=", keyCStr));
#elif defined(__unix__) || defined(__APPLE__)
// Unix/macOS version
err = unsetenv(keyCStr);
#else
VELOX_UNSUPPORTED("Unsupported OS");
#endif
VELOX_CHECK_EQ(0, err);

// Sanity check
auto valueForCheck = std::getenv(keyCStr);
VELOX_CHECK_NULL(valueForCheck);
}

} // namespace facebook::velox::connector::clp
Loading