X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fcommon%2FThrottle.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fcommon%2FThrottle.cc;h=836f22fe7a2c8e52bb2eb484ed554056e17e4c79;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/test/common/Throttle.cc b/src/ceph/src/test/common/Throttle.cc new file mode 100644 index 0000000..836f22f --- /dev/null +++ b/src/ceph/src/test/common/Throttle.cc @@ -0,0 +1,451 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Cloudwatt + * + * Author: Loic Dachary + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Library Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Library Public License for more details. + * + */ + +#include +#include +#include "gtest/gtest.h" +#include "common/Mutex.h" +#include "common/Thread.h" +#include "common/Throttle.h" +#include "common/ceph_argparse.h" +#include "common/backport14.h" + +#include +#include +#include +#include +#include +#include + +class ThrottleTest : public ::testing::Test { +protected: + + class Thread_get : public Thread { + public: + Throttle &throttle; + int64_t count; + bool waited; + + Thread_get(Throttle& _throttle, int64_t _count) : + throttle(_throttle), + count(_count), + waited(false) + { + } + + void *entry() override { + usleep(5); + waited = throttle.get(count); + throttle.put(count); + return NULL; + } + }; +}; + +TEST_F(ThrottleTest, Throttle) { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + ASSERT_EQ(throttle.get_max(), throttle_max); + ASSERT_EQ(throttle.get_current(), 0); +} + +TEST_F(ThrottleTest, take) { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + ASSERT_EQ(throttle.take(throttle_max), throttle_max); + ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2); +} + +TEST_F(ThrottleTest, get) { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle"); + + // test increasing max from 0 to throttle_max + { + ASSERT_FALSE(throttle.get(throttle_max, throttle_max)); + ASSERT_EQ(throttle.get_max(), throttle_max); + ASSERT_EQ(throttle.put(throttle_max), 0); + } + + ASSERT_FALSE(throttle.get(5)); + ASSERT_EQ(throttle.put(5), 0); + + ASSERT_FALSE(throttle.get(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_FALSE(throttle.get(1, throttle_max + 1)); + ASSERT_EQ(throttle.put(throttle_max + 1), 0); + ASSERT_FALSE(throttle.get(0, throttle_max)); + ASSERT_FALSE(throttle.get(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_EQ(throttle.put(throttle_max), 0); + + useconds_t delay = 1; + + bool waited; + + do { + cout << "Trying (1) with delay " << delay << "us\n"; + + ASSERT_FALSE(throttle.get(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max)); + + Thread_get t(throttle, 7); + t.create("t_throttle_1"); + usleep(delay); + ASSERT_EQ(throttle.put(throttle_max), 0); + t.join(); + + if (!(waited = t.waited)) + delay *= 2; + } while(!waited); + + delay = 1; + do { + cout << "Trying (2) with delay " << delay << "us\n"; + + ASSERT_FALSE(throttle.get(throttle_max / 2)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max)); + + Thread_get t(throttle, throttle_max); + t.create("t_throttle_2"); + usleep(delay); + + Thread_get u(throttle, 1); + u.create("u_throttle_2"); + usleep(delay); + + throttle.put(throttle_max / 2); + + t.join(); + u.join(); + + if (!(waited = t.waited && u.waited)) + delay *= 2; + } while(!waited); + +} + +TEST_F(ThrottleTest, get_or_fail) { + { + Throttle throttle(g_ceph_context, "throttle"); + + ASSERT_TRUE(throttle.get_or_fail(5)); + ASSERT_TRUE(throttle.get_or_fail(5)); + } + + { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + + ASSERT_TRUE(throttle.get_or_fail(throttle_max)); + ASSERT_EQ(throttle.put(throttle_max), 0); + + ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2)); + ASSERT_EQ(throttle.put(throttle_max * 2), 0); + + ASSERT_TRUE(throttle.get_or_fail(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_EQ(throttle.put(throttle_max), 0); + } +} + +TEST_F(ThrottleTest, wait) { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle"); + + // test increasing max from 0 to throttle_max + { + ASSERT_FALSE(throttle.wait(throttle_max)); + ASSERT_EQ(throttle.get_max(), throttle_max); + } + + useconds_t delay = 1; + + bool waited; + + do { + cout << "Trying (3) with delay " << delay << "us\n"; + + ASSERT_FALSE(throttle.get(throttle_max / 2)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max)); + + Thread_get t(throttle, throttle_max); + t.create("t_throttle_3"); + usleep(delay); + + // + // Throttle::_reset_max(int64_t m) used to contain a test + // that blocked the following statement, only if + // the argument was greater than throttle_max. + // Although a value lower than throttle_max would cover + // the same code in _reset_max, the throttle_max * 100 + // value is left here to demonstrate that the problem + // has been solved. + // + throttle.wait(throttle_max * 100); + usleep(delay); + t.join(); + ASSERT_EQ(throttle.get_current(), throttle_max / 2); + + if (!(waited = t.waited)) { + delay *= 2; + // undo the changes we made + throttle.put(throttle_max / 2); + throttle.wait(throttle_max); + } + } while(!waited); +} + + +TEST_F(ThrottleTest, destructor) { + EXPECT_DEATH({ + int64_t throttle_max = 10; + auto throttle = ceph::make_unique(g_ceph_context, "throttle", + throttle_max); + + + ASSERT_FALSE(throttle->get(5)); + unique_ptr t = ceph::make_unique(*throttle, 7); + t->create("t_throttle"); + bool blocked; + useconds_t delay = 1; + do { + usleep(delay); + if (throttle->get_or_fail(1)) { + throttle->put(1); + blocked = false; + } else { + blocked = true; + } + delay *= 2; + } while (!blocked); + }, ".*"); +} + +std::pair > test_backoff( + double low_threshhold, + double high_threshhold, + double expected_throughput, + double high_multiple, + double max_multiple, + uint64_t max, + double put_delay_per_count, + unsigned getters, + unsigned putters) +{ + std::mutex l; + std::condition_variable c; + uint64_t total = 0; + std::list in_queue; + bool stop_getters = false; + bool stop_putters = false; + + auto wait_time = std::chrono::duration(0); + uint64_t waits = 0; + + uint64_t total_observed_total = 0; + uint64_t total_observations = 0; + + BackoffThrottle throttle(g_ceph_context, "backoff_throttle_test", 5); + bool valid = throttle.set_params( + low_threshhold, + high_threshhold, + expected_throughput, + high_multiple, + max_multiple, + max, + 0); + assert(valid); + + auto getter = [&]() { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 10); + + std::unique_lock g(l); + while (!stop_getters) { + g.unlock(); + + uint64_t to_get = dis(gen); + auto waited = throttle.get(to_get); + + g.lock(); + wait_time += waited; + waits += to_get; + total += to_get; + in_queue.push_back(to_get); + c.notify_one(); + } + }; + + auto putter = [&]() { + std::unique_lock g(l); + while (!stop_putters || !in_queue.empty()) { + if (in_queue.empty()) { + c.wait(g); + continue; + } + + uint64_t c = in_queue.front(); + + total_observed_total += total; + total_observations++; + in_queue.pop_front(); + assert(total <= max); + + g.unlock(); + std::this_thread::sleep_for( + c * std::chrono::duration(put_delay_per_count*putters)); + g.lock(); + + total -= c; + throttle.put(c); + } + }; + + vector gts(getters); + for (auto &&i: gts) i = std::thread(getter); + + vector pts(putters); + for (auto &&i: pts) i = std::thread(putter); + + std::this_thread::sleep_for(std::chrono::duration(5)); + { + std::unique_lock g(l); + stop_getters = true; + c.notify_all(); + } + for (auto &&i: gts) i.join(); + gts.clear(); + + { + std::unique_lock g(l); + stop_putters = true; + c.notify_all(); + } + for (auto &&i: pts) i.join(); + pts.clear(); + + return make_pair( + ((double)total_observed_total)/((double)total_observations), + wait_time / waits); +} + +TEST(BackoffThrottle, destruct) { + EXPECT_DEATH({ + auto throttle = ceph::make_unique( + g_ceph_context, "destructor test", 10); + ASSERT_TRUE(throttle->set_params(0.4, 0.6, 1000, 2, 10, 6, nullptr)); + + throttle->get(5); + { + auto& t = *throttle; + std::thread([&t]() { + usleep(5); + t.get(6); + }); + } + // No equivalent of get_or_fail() + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + }, ".*"); +} + +TEST(BackoffThrottle, undersaturated) +{ + auto results = test_backoff( + 0.4, + 0.6, + 1000, + 2, + 10, + 100, + 0.0001, + 3, + 6); + ASSERT_LT(results.first, 45); + ASSERT_GT(results.first, 35); + ASSERT_LT(results.second.count(), 0.0002); + ASSERT_GT(results.second.count(), 0.00005); +} + +TEST(BackoffThrottle, balanced) +{ + auto results = test_backoff( + 0.4, + 0.6, + 1000, + 2, + 10, + 100, + 0.001, + 7, + 2); + ASSERT_LT(results.first, 60); + ASSERT_GT(results.first, 40); + ASSERT_LT(results.second.count(), 0.002); + ASSERT_GT(results.second.count(), 0.0005); +} + +TEST(BackoffThrottle, oversaturated) +{ + auto results = test_backoff( + 0.4, + 0.6, + 10000000, + 2, + 10, + 100, + 0.001, + 1, + 3); + ASSERT_LT(results.first, 101); + ASSERT_GT(results.first, 85); + ASSERT_LT(results.second.count(), 0.002); + ASSERT_GT(results.second.count(), 0.0005); +} + +TEST(OrderedThrottle, destruct) { + EXPECT_DEATH({ + auto throttle = ceph::make_unique(1, false); + throttle->start_op(nullptr); + { + auto& t = *throttle; + std::thread([&t]() { + usleep(5); + t.start_op(nullptr); + }); + } + // No equivalent of get_or_fail() + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + }, ".*"); +} + +/* + * Local Variables: + * compile-command: "cd ../.. ; + * make unittest_throttle ; + * ./unittest_throttle # --gtest_filter=ThrottleTest.destructor \ + * --log-to-stderr=true --debug-filestore=20 + * " + * End: + */