Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMake/resolve_dependency_modules/clp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ include_guard(GLOBAL)
FetchContent_Declare(
clp
GIT_REPOSITORY https://github.com/y-scope/clp.git
GIT_TAG 0bfaf5caec80d17527a2e91ba21fdede88b44f19)
GIT_TAG 581bd46198a97a89b174849851cc3f1f2100e466)

set(CLP_BUILD_CLP_REGEX_UTILS
OFF
Expand Down
1 change: 0 additions & 1 deletion velox/connectors/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#set(CLP_SRC_DIR ${clp_SOURCE_DIR}/components/core/src)
add_subdirectory(search_lib)

velox_add_library(
Expand Down
32 changes: 32 additions & 0 deletions velox/connectors/clp/ClpConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@

#include <boost/algorithm/string.hpp>

#include "velox/common/base/Exceptions.h"
#include "velox/common/config/Config.h"
#include "velox/connectors/clp/ClpConfig.h"
#include "velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.h"

namespace facebook::velox::connector::clp {

namespace {

ClpConfig::S3AuthProvider stringToS3AuthProvider(const std::string& strValue) {
auto upperValue = boost::algorithm::to_upper_copy(strValue);
VELOX_CHECK(!upperValue.empty());
if (upperValue == "CLP_PACKAGE") {
return ClpConfig::S3AuthProvider::kClpPackage;
}
VELOX_UNSUPPORTED("Unsupported s3 auth provider type: {}.", strValue);
}

ClpConfig::StorageType stringToStorageType(const std::string& strValue) {
auto upperValue = boost::algorithm::to_upper_copy(strValue);
if (upperValue == "FS") {
Expand All @@ -35,6 +47,26 @@ ClpConfig::StorageType stringToStorageType(const std::string& strValue) {

} // namespace

ClpConfig::ClpConfig(std::shared_ptr<const config::ConfigBase> config) {
VELOX_CHECK_NOT_NULL(config, "Config is null for CLP initialization");
config_ = std::move(config);

// Setup S3 environment variables needed by CLP by user-specific ways
switch (
stringToS3AuthProvider(config_->get<std::string>(kAuthProvider, ""))) {
case ClpConfig::S3AuthProvider::kClpPackage:
s3AuthProvider_ = std::make_shared<ClpPackageS3AuthProvider>(config_);
break;
default:
VELOX_FAIL();
}
VELOX_CHECK(s3AuthProvider_->exportAuthEnvironmentVariables());
}

std::shared_ptr<ClpS3AuthProviderBase> ClpConfig::s3AuthProvider() const {
return s3AuthProvider_;
}

ClpConfig::StorageType ClpConfig::storageType() const {
return stringToStorageType(config_->get<std::string>(kStorageType, "FS"));
}
Expand Down
18 changes: 11 additions & 7 deletions velox/connectors/clp/ClpConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,41 @@

#pragma once

#include "velox/common/config/Config.h"

namespace facebook::velox::config {
class ConfigBase;
}
} // namespace facebook::velox::config

namespace facebook::velox::connector::clp {

class ClpS3AuthProviderBase;

class ClpConfig {
public:
enum class S3AuthProvider {
kClpPackage,
};

enum class StorageType {
kFs,
kS3,
};

static constexpr const char* kAuthProvider = "clp.s3-auth-provider";
static constexpr const char* kStorageType = "clp.storage-type";

explicit ClpConfig(std::shared_ptr<const config::ConfigBase> config) {
VELOX_CHECK_NOT_NULL(config, "Config is null for CLP initialization");
config_ = std::move(config);
}
explicit ClpConfig(std::shared_ptr<const config::ConfigBase> config);

[[nodiscard]] const std::shared_ptr<const config::ConfigBase>& config()
const {
return config_;
}

StorageType storageType() const;
std::shared_ptr<ClpS3AuthProviderBase> s3AuthProvider() const;

private:
std::shared_ptr<const config::ConfigBase> config_;
std::shared_ptr<ClpS3AuthProviderBase> s3AuthProvider_;
};

} // namespace facebook::velox::connector::clp
5 changes: 4 additions & 1 deletion velox/connectors/clp/ClpDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "velox/connectors/clp/ClpColumnHandle.h"
#include "velox/connectors/clp/ClpConnectorSplit.h"
#include "velox/connectors/clp/ClpDataSource.h"

#include "search_lib/ClpS3AuthProviderBase.h"
#include "velox/connectors/clp/ClpTableHandle.h"
#include "velox/connectors/clp/search_lib/ClpCursor.h"
#include "velox/connectors/clp/search_lib/ClpVectorLoader.h"
Expand All @@ -37,6 +39,7 @@ ClpDataSource::ClpDataSource(
: pool_(pool), outputType_(outputType) {
auto clpTableHandle = std::dynamic_pointer_cast<ClpTableHandle>(tableHandle);
storageType_ = clpConfig->storageType();
s3AuthProvider_ = clpConfig->s3AuthProvider();

for (const auto& outputName : outputType->names()) {
auto columnHandle = columnHandles.find(outputName);
Expand Down Expand Up @@ -106,7 +109,7 @@ void ClpDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
clp_s::InputSource::Filesystem, clpSplit->path_);
} else if (storageType_ == ClpConfig::StorageType::kS3) {
cursor_ = std::make_unique<search_lib::ClpCursor>(
clp_s::InputSource::Network, clpSplit->path_);
clp_s::InputSource::Network, s3AuthProvider_->constructS3Url(clpSplit->path_));
}

auto pushDownQuery = clpSplit->kqlQuery_;
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/clp/ClpDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class ClpDataSource : public DataSource {
std::vector<search_lib::Field> fields_;

std::unique_ptr<search_lib::ClpCursor> cursor_;
std::shared_ptr<ClpS3AuthProviderBase> s3AuthProvider_;
};

} // namespace facebook::velox::connector::clp
4 changes: 4 additions & 0 deletions velox/connectors/clp/search_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ velox_add_library(
STATIC
ClpCursor.cpp
ClpCursor.h
ClpPackageS3AuthProvider.cpp
ClpPackageS3AuthProvider.h
ClpQueryRunner.cpp
ClpQueryRunner.h
ClpS3AuthProviderBase.cpp
ClpS3AuthProviderBase.h
ClpVectorLoader.cpp
ClpVectorLoader.h)

Expand Down
61 changes: 61 additions & 0 deletions velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <glog/logging.h>

#include "velox/common/base/Exceptions.h"
#include "velox/common/config/Config.h"
#include "velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.h"

namespace facebook::velox::connector::clp {

const std::string ClpPackageS3AuthProvider::constructS3Url(
std::string_view splitPath) {
if (this->endPoint_.empty()) {
this->endPoint_ = config_->get<std::string>(kEndPoint, "");
}
if ('/' == this->endPoint_.back()) {
this->endPoint_.pop_back();
}
return fmt::format("{}/{}", this->endPoint_, splitPath);
}

bool ClpPackageS3AuthProvider::exportAuthEnvironmentVariables() const {
auto accessKeyId = config_->get<std::string>(kAccessKeyId, "");
auto secretAccessKey = config_->get<std::string>(kSecretAccessKey, "");
auto sessionToken = config_->get<std::string>(kSessionToken, "");
VELOX_CHECK(!accessKeyId.empty());
VELOX_CHECK(!secretAccessKey.empty());
LOG(INFO) << "Setting " << kEnvAwsAccessKeyId
<< " environment variable: " << accessKeyId;
setupEnvironmentVariable(kEnvAwsAccessKeyId, accessKeyId);
LOG(INFO) << "Setting " << kEnvAwsSecretAccessKey
<< " environment variable: " << secretAccessKey;
setupEnvironmentVariable(kEnvAwsSecretAccessKey, secretAccessKey);
if (!sessionToken.empty()) {
LOG(INFO) << "Setting " << kEnvAwsSessionToken
<< " environment variable: " << sessionToken;
setupEnvironmentVariable(kEnvAwsSessionToken, sessionToken);
} else {
LOG(INFO) << "Unsetting " << kEnvAwsSessionToken
<< " environment variable.";
unsetEnvironmentVariable(kEnvAwsSessionToken);
}

return true;
}

} // namespace facebook::velox::connector::clp
50 changes: 50 additions & 0 deletions velox/connectors/clp/search_lib/ClpPackageS3AuthProvider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/connectors/clp/search_lib/ClpS3AuthProviderBase.h"

namespace facebook::velox::config {
class ConfigBase;
} // namespace facebook::velox::config

namespace facebook::velox::connector::clp {

class ClpPackageS3AuthProvider : public ClpS3AuthProviderBase {
public:
explicit ClpPackageS3AuthProvider(
std::shared_ptr<const config::ConfigBase> config)
: ClpS3AuthProviderBase(config) {}

static constexpr const char* kAccessKeyId = "clp.s3-access-key-id";
static constexpr const char* kEndPoint = "clp.s3-end-point";
static constexpr const char* kSecretAccessKey = "clp.s3-secret-access-key";
static constexpr const char* kSessionToken = "clp.s3-session-token";

static constexpr const char* kEnvAwsAccessKeyId = "AWS_ACCESS_KEY_ID";
static constexpr const char* kEnvAwsSecretAccessKey = "AWS_SECRET_ACCESS_KEY";
static constexpr const char* kEnvAwsSessionToken = "AWS_SESSION_TOKEN";

const std::string constructS3Url(std::string_view splitPath) override;

bool exportAuthEnvironmentVariables() const override;

private:
std::string endPoint_;
};

} // namespace facebook::velox::connector::clp
67 changes: 67 additions & 0 deletions velox/connectors/clp/search_lib/ClpS3AuthProviderBase.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cstdlib>
#include <cstring>

#include "velox/common/base/Exceptions.h"
#include "velox/connectors/clp/search_lib/ClpS3AuthProviderBase.h"

namespace facebook::velox::config {
class ConfigBase;
} // namespace facebook::velox::config

namespace facebook::velox::connector::clp {

void ClpS3AuthProviderBase::setupEnvironmentVariable(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could rename this to setEnvironmentVariable?

std::string_view key,
std::string_view value) {
int err{0};
#ifdef _WIN32
// Windows version
err = _putenv_s(std::string(key).c_str(), std::string(value).c_str());
#elif defined(__unix__) || defined(__APPLE__)
// Unix/macOS version
err = setenv(std::string(key).c_str(), std::string(value).c_str(), 1);
#else
VELOX_UNSUPPORTED("Unsuppported OS");
#endif
VELOX_CHECK_EQ(0, err);

// Sanity check
auto valueForCheck = std::getenv(std::string(key).c_str());
VELOX_CHECK_EQ(0, std::strcmp(std::string(value).c_str(), valueForCheck));
}

void ClpS3AuthProviderBase::unsetEnvironmentVariable(std::string_view key) {
int err{0};
#ifdef _WIN32
// Windows version
err = _putenv(fmt::format("{}=", std::string(key).c_str()));
#elif defined(__unix__) || defined(__APPLE__)
// Unix/macOS version
err = unsetenv(std::string(key).c_str());
#else
VELOX_UNSUPPORTED("Unsuppported OS");
#endif
VELOX_CHECK_EQ(0, err);

// Sanity check
auto valueForCheck = std::getenv(std::string(key).c_str());
VELOX_CHECK_NULL(valueForCheck);
}

} // namespace facebook::velox::connector::clp
Loading
Loading