Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,14 @@ class KubernetesReader::NonRetriableError

KubernetesReader::KubernetesReader(const Configuration& config,
HealthChecker* health_checker)
: KubernetesReader(config, health_checker, SleeperImpl::New()) {}

KubernetesReader::KubernetesReader(const Configuration& config,
HealthChecker* health_checker,
std::unique_ptr<Sleeper> sleeper)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename sleeper to be a bit more descriptive? This only seems to be used during retry backoffs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The sleep functionality is to induce the passage of time, so time-related tool names like "Timer", "Stopwatch", "AlarmClock", etc are usually appropriate. Would it make sense to reuse the existing Timer class and broaden its interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to rename. What would you like?

: config_(config), environment_(config), health_checker_(health_checker),
service_account_directory_(kServiceAccountDirectory) {}
service_account_directory_(kServiceAccountDirectory),
sleeper_(std::move(sleeper)) {}

std::string KubernetesReader::SecretPath(const std::string& secret) const {
return service_account_directory_ + "/" + secret;
Expand Down Expand Up @@ -837,21 +843,29 @@ void KubernetesReader::WatchMaster(
if (verbose) {
LOG(INFO) << "WatchMaster(" << name << ") completed " << body(response);
}
if (status(response) >= 300) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated fix not mentioned in the PR description. Mitigations, in order of preference, would be to pull it (and the associated test changes) out into a separate PR, or at least mention it in the description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description. The fix is intertwined with the new tests, so it's not really possible to pull out into a separate PR.

throw boost::system::system_error(
boost::system::errc::make_error_code(
boost::system::errc::not_connected),
format::Substitute("Server responded with '{{message}}' ({{code}})",
{{"message", status_message(response)},
{"code", format::str(status(response))}}));
}
// Connection closed without an error; reset failure count.
failures = 0;
} catch (const boost::system::system_error& e) {
LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what();
++failures;
if (failures > config_.KubernetesUpdaterWatchMaxConnectionFailures()) {
if (failures >= config_.KubernetesUpdaterWatchMaxConnectionFailures()) {
LOG(ERROR) << "WatchMaster(" << name << "): Exiting after "
<< failures << " failures";
throw QueryException(endpoint + " -> " + e.what());
}
++failures;
double backoff = fmin(pow(1.5, failures), 30);
if (verbose) {
LOG(INFO) << "Backing off for " << backoff << " seconds";
}
std::this_thread::sleep_for(time::seconds(backoff));
sleeper_->SleepFor(backoff);
}
}
}
Expand Down Expand Up @@ -1310,10 +1324,17 @@ void KubernetesReader::WatchEndpoints(
KubernetesUpdater::KubernetesUpdater(const Configuration& config,
HealthChecker* health_checker,
MetadataStore* store)
: reader_(config, health_checker), PollingMetadataUpdater(
config, store, "KubernetesUpdater",
config.KubernetesUpdaterIntervalSeconds(),
[=]() { return reader_.MetadataQuery(); }) { }
: KubernetesUpdater(config, health_checker, store, SleeperImpl::New()) {}

KubernetesUpdater::KubernetesUpdater(const Configuration& config,
HealthChecker* health_checker,
MetadataStore* store,
std::unique_ptr<Sleeper> sleeper)
: reader_(config, health_checker, std::move(sleeper)),
PollingMetadataUpdater(
config, store, "KubernetesUpdater",
config.KubernetesUpdaterIntervalSeconds(),
[=]() { return reader_.MetadataQuery(); }) { }

void KubernetesUpdater::ValidateDynamicConfiguration() const
throw(ConfigurationValidationError) {
Expand Down
6 changes: 6 additions & 0 deletions src/kubernetes.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class KubernetesReader {
public:
KubernetesReader(const Configuration& config,
HealthChecker* health_checker);
KubernetesReader(const Configuration& config,
HealthChecker* health_checker,
std::unique_ptr<Sleeper> sleeper);
// A Kubernetes metadata query function.
std::vector<MetadataUpdater::ResourceMetadata> MetadataQuery() const;

Expand Down Expand Up @@ -218,12 +221,15 @@ class KubernetesReader {
HealthChecker* health_checker_;
Environment environment_;
std::string service_account_directory_;
std::unique_ptr<Sleeper> sleeper_;
};

class KubernetesUpdater : public PollingMetadataUpdater {
public:
KubernetesUpdater(const Configuration& config, HealthChecker* health_checker,
MetadataStore* store);
KubernetesUpdater(const Configuration& config, HealthChecker* health_checker,
MetadataStore* store, std::unique_ptr<Sleeper> sleeper);
~KubernetesUpdater() {
if (node_watch_thread_.joinable()) {
node_watch_thread_.join();
Expand Down
23 changes: 23 additions & 0 deletions src/time.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <ctime>
#include <memory>
#include <string>
#include <thread>

#include "logging.h"

Expand Down Expand Up @@ -146,6 +147,28 @@ class ExpirationImpl : public Expiration {
typename Clock::time_point token_expiration_;
};

// Abstract class representing an object that can sleep.
// Used for mocking out std::this_thread::sleep_for() in tests.
class Sleeper {
public:
virtual ~Sleeper() = default;

// Sleeps for the given number of seconds.
virtual void SleepFor(double seconds) = 0;
};

// Implementation of the Sleeper interface using std::this_thread::sleep_for().
class SleeperImpl : public Sleeper {
public:
static std::unique_ptr<Sleeper> New() {
return std::unique_ptr<Sleeper>(new SleeperImpl());
}

void SleepFor(double seconds) override {
std::this_thread::sleep_for(time::seconds(seconds));
}
};

}

#endif // TIME_H_
22 changes: 17 additions & 5 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ GTEST_HEADERS=$(GTEST_DIR)/include/gtest/*.h \
$(GTEST_DIR)/include/gtest/internal/*.h
GTEST_SRCS_=$(GTEST_SOURCEDIR)/*.cc $(GTEST_SOURCEDIR)/*.h $(GTEST_HEADERS)
GMOCK_DIR=$(LIBDIR)/googletest/googlemock
GMOCK_SOURCEDIR=$(GMOCK_DIR)/src
GMOCK_HEADERS=$(GMOCK_DIR)/include/gtest/*.h \
$(GMOCK_DIR)/include/gtest/internal/*.h
GMOCK_SRCS_=$(GMOCK_SOURCEDIR)/*.cc $(GMOCK_SOURCEDIR)/*.h $(GMOCK_HEADERS)

# TODO: Factor out the common variables.
CPP_NETLIB_DIR=$(LIBDIR)/cpp-netlib
Expand Down Expand Up @@ -85,6 +89,7 @@ UTIL_SOURCES=$(TEST_DIR)/fake_clock.cc $(TEST_DIR)/fake_http_server.cc
UTIL_OBJS=$(UTIL_SOURCES:$(TEST_DIR)/%.cc=%.o)

GTEST_LIB=gtest_lib.a
GMOCK_LIB=gmock_lib.a

all: $(SRC_DIR)/build-cpp-netlib $(SRC_DIR)/build-yaml-cpp $(TESTS)

Expand All @@ -97,14 +102,14 @@ test: $(TESTS)
done

clean:
$(RM) $(TESTS) $(GTEST_LIB) $(TEST_OBJS) $(UTIL_OBJS)
$(RM) $(TESTS) $(GTEST_LIB) $(GMOCK_LIB) $(TEST_OBJS) $(UTIL_OBJS)

purge: clean
$(RM) init-submodules
(cd .. && git submodule deinit -f $(GTEST_MODULE:../%=%))
(cd .. && git submodule deinit -f $(GTEST_MODULE:../%=%) && git submodule deinit -f $(GMOCK_MODULE:../%=%))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no GMOCK_MODULE defined yet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we end up deinit 2 paths, you can use single command:

git submodule deinit -f $(GTEST_MODULE:../%=%) ${GMOCK_MODULE:../%=%)}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed.


init-submodules:
(cd .. && git submodule update --init $(GTEST_MODULE:../%=%))
(cd .. && git submodule update --init $(GTEST_MODULE:../%=%) && git submodule update --init $(GMOCK_MODULE:../%=%))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed.

touch init-submodules

$(SRC_DIR)/init-submodules:
Expand All @@ -121,21 +126,28 @@ $(SRC_DIR)/%.o: $(SRC_DIR)/build-cpp-netlib $(SRC_DIR)/build-yaml-cpp $(SRC_DIR)
cd $(SRC_DIR) && $(MAKE) $(@:$(SRC_DIR)/%=%)

$(GTEST_SOURCEDIR)/gtest-all.cc $(GTEST_SOURCEDIR)/gtest_main.cc: init-submodules
$(GMOCK_SOURCEDIR)/gmock-all.cc $(GMOCK_SOURCEDIR)/gmock_main.cc: init-submodules

gtest-all.o: $(GTEST_SOURCEDIR)/gtest-all.cc
$(CXX) -c $(CPPFLAGS) -I$(GTEST_DIR) $(CXXFLAGS) -o $@ $^
gmock-all.o: $(GMOCK_SOURCEDIR)/gmock-all.cc
$(CXX) -c $(CPPFLAGS) -I$(GMOCK_DIR) $(CXXFLAGS) -o $@ $^

gtest_main.o: $(GTEST_SOURCEDIR)/gtest_main.cc
$(CXX) $(CPPFLAGS) $(CXXFLAGS) -c -o $@ $^
gmock_main.o: $(GMOCK_SOURCEDIR)/gmock_main.cc
$(CXX) $(CPPFLAGS) $(CXXFLAGS) -c -o $@ $^

$(GTEST_LIB): gtest-all.o gtest_main.o
$(AR) $(ARFLAGS) $@ $^
$(GMOCK_LIB): gmock-all.o gmock_main.o
$(AR) $(ARFLAGS) $@ $^

$(TESTS): $(GTEST_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS)
$(TESTS): $(GTEST_LIB) $(GMOCK_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS)

# All unittest objects depend on GTEST_LIB.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And GMOCK_LIB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed.

# Some headers need CPP_NETLIB_LIBS and YAML_CPP_LIBS.
$(TESTS:%=%.o): $(GTEST_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS)
$(TESTS:%=%.o): $(GTEST_LIB) $(GMOCK_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS)

api_server_unittest: api_server_unittest.o $(SRC_DIR)/api_server.o $(SRC_DIR)/configuration.o $(SRC_DIR)/store.o $(SRC_DIR)/json.o $(SRC_DIR)/resource.o $(SRC_DIR)/logging.o $(SRC_DIR)/time.o $(SRC_DIR)/health_checker.o
$(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@
Expand Down
26 changes: 25 additions & 1 deletion test/fake_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,18 @@ void FakeServer::SendStreamResponse(const std::string& path,
stream_it->second.SendToAllQueues(response);
}

void FakeServer::TerminateAllStreams() {
bool FakeServer::TerminateAllStreams(time::seconds timeout) {
// Send sentinel (empty string) to all queues.
for (auto& s : handler_.path_streams) {
s.second.SendToAllQueues("");
}

// Block until all queues removed.
bool success = true;
for (auto& s : handler_.path_streams) {
success = success && s.second.WaitUntilNoWatchers(timeout);
}
return success;
}

void FakeServer::Handler::operator()(Server::request const &request,
Expand All @@ -116,6 +123,7 @@ void FakeServer::Handler::operator()(Server::request const &request,
while (true) {
std::string s = stream.GetNextResponse(&my_queue);
if (s.empty()) {
stream.RemoveQueue(&my_queue);
break;
}
connection->write(s);
Expand Down Expand Up @@ -195,13 +203,29 @@ void FakeServer::Handler::Stream::AddQueue(std::queue<std::string>* queue) {
cv_.notify_all();
}

void FakeServer::Handler::Stream::RemoveQueue(std::queue<std::string>* queue) {
{
std::lock_guard<std::mutex> lk(mutex_);
queues_.erase(std::remove(queues_.begin(), queues_.end(), queue),
queues_.end());
}
cv_.notify_all();
}

bool FakeServer::Handler::Stream::WaitForOneWatcher(time::seconds timeout) {
std::unique_lock<std::mutex> queues_lock(mutex_);
return cv_.wait_for(queues_lock,
timeout,
[this]{ return queues_.size() > 0; });
}

bool FakeServer::Handler::Stream::WaitUntilNoWatchers(time::seconds timeout) {
std::unique_lock<std::mutex> queues_lock(mutex_);
return cv_.wait_for(queues_lock,
timeout,
[this]{ return queues_.empty(); });
}

void FakeServer::Handler::Stream::SendToAllQueues(const std::string& response) {
{
std::lock_guard<std::mutex> lk(mutex_);
Expand Down
4 changes: 3 additions & 1 deletion test/fake_http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class FakeServer {
void SendStreamResponse(const std::string& path, const std::string& response);

// Closes all open streams on the server.
void TerminateAllStreams();
bool TerminateAllStreams(time::seconds timeout);

private:
struct Handler;
Expand All @@ -95,7 +95,9 @@ class FakeServer {
class Stream {
public:
void AddQueue(std::queue<std::string>* queue);
void RemoveQueue(std::queue<std::string>* queue);
bool WaitForOneWatcher(time::seconds timeout);
bool WaitUntilNoWatchers(time::seconds timeout);
void SendToAllQueues(const std::string& response);
std::string GetNextResponse(std::queue<std::string>* queue);

Expand Down
Loading