Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / common / Throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7  *
8  * Author: Loic Dachary <loic@dachary.org>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU Library Public License as published by
12  * the Free Software Foundation; either version 2, or (at your option)
13  * any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Library Public License for more details.
19  *
20  */
21
22 #include <stdio.h>
23 #include <signal.h>
24 #include "gtest/gtest.h"
25 #include "common/Mutex.h"
26 #include "common/Thread.h"
27 #include "common/Throttle.h"
28 #include "common/ceph_argparse.h"
29 #include "common/backport14.h"
30
31 #include <thread>
32 #include <atomic>
33 #include <chrono>
34 #include <mutex>
35 #include <list>
36 #include <random>
37
38 class ThrottleTest : public ::testing::Test {
39 protected:
40
41   class Thread_get : public Thread {
42   public:
43     Throttle &throttle;
44     int64_t count;
45     bool waited;
46
47     Thread_get(Throttle& _throttle, int64_t _count) :
48       throttle(_throttle),
49       count(_count),
50       waited(false)
51     {
52     }
53
54     void *entry() override {
55       usleep(5);
56       waited = throttle.get(count);
57       throttle.put(count);
58       return NULL;
59     }
60   };
61 };
62
63 TEST_F(ThrottleTest, Throttle) {
64   int64_t throttle_max = 10;
65   Throttle throttle(g_ceph_context, "throttle", throttle_max);
66   ASSERT_EQ(throttle.get_max(), throttle_max);
67   ASSERT_EQ(throttle.get_current(), 0);
68 }
69
70 TEST_F(ThrottleTest, take) {
71   int64_t throttle_max = 10;
72   Throttle throttle(g_ceph_context, "throttle", throttle_max);
73   ASSERT_EQ(throttle.take(throttle_max), throttle_max);
74   ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2);
75 }
76
77 TEST_F(ThrottleTest, get) {
78   int64_t throttle_max = 10;
79   Throttle throttle(g_ceph_context, "throttle");
80
81   // test increasing max from 0 to throttle_max
82   {
83     ASSERT_FALSE(throttle.get(throttle_max, throttle_max));
84     ASSERT_EQ(throttle.get_max(), throttle_max);
85     ASSERT_EQ(throttle.put(throttle_max), 0);
86   }
87
88   ASSERT_FALSE(throttle.get(5));
89   ASSERT_EQ(throttle.put(5), 0);
90
91   ASSERT_FALSE(throttle.get(throttle_max));
92   ASSERT_FALSE(throttle.get_or_fail(1));
93   ASSERT_FALSE(throttle.get(1, throttle_max + 1));
94   ASSERT_EQ(throttle.put(throttle_max + 1), 0);
95   ASSERT_FALSE(throttle.get(0, throttle_max));
96   ASSERT_FALSE(throttle.get(throttle_max));
97   ASSERT_FALSE(throttle.get_or_fail(1));
98   ASSERT_EQ(throttle.put(throttle_max), 0);
99
100   useconds_t delay = 1;
101
102   bool waited;
103
104   do {
105     cout << "Trying (1) with delay " << delay << "us\n";
106
107     ASSERT_FALSE(throttle.get(throttle_max));
108     ASSERT_FALSE(throttle.get_or_fail(throttle_max));
109
110     Thread_get t(throttle, 7);
111     t.create("t_throttle_1");
112     usleep(delay);
113     ASSERT_EQ(throttle.put(throttle_max), 0);
114     t.join();
115
116     if (!(waited = t.waited))
117       delay *= 2;
118   } while(!waited);
119
120   delay = 1;
121   do {
122     cout << "Trying (2) with delay " << delay << "us\n";
123
124     ASSERT_FALSE(throttle.get(throttle_max / 2));
125     ASSERT_FALSE(throttle.get_or_fail(throttle_max));
126
127     Thread_get t(throttle, throttle_max);
128     t.create("t_throttle_2");
129     usleep(delay);
130
131     Thread_get u(throttle, 1);
132     u.create("u_throttle_2");
133     usleep(delay);
134
135     throttle.put(throttle_max / 2);
136
137     t.join();
138     u.join();
139
140     if (!(waited = t.waited && u.waited))
141       delay *= 2;
142   } while(!waited);
143
144 }
145
146 TEST_F(ThrottleTest, get_or_fail) {
147   {
148     Throttle throttle(g_ceph_context, "throttle");
149
150     ASSERT_TRUE(throttle.get_or_fail(5));
151     ASSERT_TRUE(throttle.get_or_fail(5));
152   }
153
154   {
155     int64_t throttle_max = 10;
156     Throttle throttle(g_ceph_context, "throttle", throttle_max);
157
158     ASSERT_TRUE(throttle.get_or_fail(throttle_max));
159     ASSERT_EQ(throttle.put(throttle_max), 0);
160
161     ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2));
162     ASSERT_FALSE(throttle.get_or_fail(1));
163     ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2));
164     ASSERT_EQ(throttle.put(throttle_max * 2), 0);
165
166     ASSERT_TRUE(throttle.get_or_fail(throttle_max));
167     ASSERT_FALSE(throttle.get_or_fail(1));
168     ASSERT_EQ(throttle.put(throttle_max), 0);
169   }
170 }
171
172 TEST_F(ThrottleTest, wait) {
173   int64_t throttle_max = 10;
174   Throttle throttle(g_ceph_context, "throttle");
175
176   // test increasing max from 0 to throttle_max
177   {
178     ASSERT_FALSE(throttle.wait(throttle_max));
179     ASSERT_EQ(throttle.get_max(), throttle_max);
180   }
181
182   useconds_t delay = 1;
183
184   bool waited;
185
186   do {
187     cout << "Trying (3) with delay " << delay << "us\n";
188
189     ASSERT_FALSE(throttle.get(throttle_max / 2));
190     ASSERT_FALSE(throttle.get_or_fail(throttle_max));
191
192     Thread_get t(throttle, throttle_max);
193     t.create("t_throttle_3");
194     usleep(delay);
195
196     //
197     // Throttle::_reset_max(int64_t m) used to contain a test
198     // that blocked the following statement, only if
199     // the argument was greater than throttle_max.
200     // Although a value lower than throttle_max would cover
201     // the same code in _reset_max, the throttle_max * 100
202     // value is left here to demonstrate that the problem
203     // has been solved.
204     //
205     throttle.wait(throttle_max * 100);
206     usleep(delay);
207     t.join();
208     ASSERT_EQ(throttle.get_current(), throttle_max / 2);
209
210     if (!(waited = t.waited)) {
211       delay *= 2;
212       // undo the changes we made
213       throttle.put(throttle_max / 2);
214       throttle.wait(throttle_max);
215     }
216   } while(!waited);
217 }
218
219
220 TEST_F(ThrottleTest, destructor) {
221   EXPECT_DEATH({
222       int64_t throttle_max = 10;
223       auto throttle = ceph::make_unique<Throttle>(g_ceph_context, "throttle",
224                                                   throttle_max);
225
226
227       ASSERT_FALSE(throttle->get(5));
228       unique_ptr<Thread_get> t = ceph::make_unique<Thread_get>(*throttle, 7);
229       t->create("t_throttle");
230       bool blocked;
231       useconds_t delay = 1;
232       do {
233         usleep(delay);
234         if (throttle->get_or_fail(1)) {
235           throttle->put(1);
236           blocked = false;
237         } else {
238           blocked = true;
239         }
240         delay *= 2;
241       } while (!blocked);
242     }, ".*");
243 }
244
245 std::pair<double, std::chrono::duration<double> > test_backoff(
246   double low_threshhold,
247   double high_threshhold,
248   double expected_throughput,
249   double high_multiple,
250   double max_multiple,
251   uint64_t max,
252   double put_delay_per_count,
253   unsigned getters,
254   unsigned putters)
255 {
256   std::mutex l;
257   std::condition_variable c;
258   uint64_t total = 0;
259   std::list<uint64_t> in_queue;
260   bool stop_getters = false;
261   bool stop_putters = false;
262
263   auto wait_time = std::chrono::duration<double>(0);
264   uint64_t waits = 0;
265
266   uint64_t total_observed_total = 0;
267   uint64_t total_observations = 0;
268
269   BackoffThrottle throttle(g_ceph_context, "backoff_throttle_test", 5);
270   bool valid = throttle.set_params(
271     low_threshhold,
272     high_threshhold,
273     expected_throughput,
274     high_multiple,
275     max_multiple,
276     max,
277     0);
278   assert(valid);
279
280   auto getter = [&]() {
281     std::random_device rd;
282     std::mt19937 gen(rd());
283     std::uniform_int_distribution<> dis(0, 10);
284
285     std::unique_lock<std::mutex> g(l);
286     while (!stop_getters) {
287       g.unlock();
288
289       uint64_t to_get = dis(gen);
290       auto waited = throttle.get(to_get);
291
292       g.lock();
293       wait_time += waited;
294       waits += to_get;
295       total += to_get;
296       in_queue.push_back(to_get);
297       c.notify_one();
298     }
299   };
300
301   auto putter = [&]() {
302     std::unique_lock<std::mutex> g(l);
303     while (!stop_putters || !in_queue.empty()) {
304       if (in_queue.empty()) {
305         c.wait(g);
306         continue;
307       }
308
309       uint64_t c = in_queue.front();
310
311       total_observed_total += total;
312       total_observations++;
313       in_queue.pop_front();
314       assert(total <= max);
315
316       g.unlock();
317       std::this_thread::sleep_for(
318         c * std::chrono::duration<double>(put_delay_per_count*putters));
319       g.lock();
320
321       total -= c;
322       throttle.put(c);
323     }
324   };
325
326   vector<std::thread> gts(getters);
327   for (auto &&i: gts) i = std::thread(getter);
328
329   vector<std::thread> pts(putters);
330   for (auto &&i: pts) i = std::thread(putter);
331
332   std::this_thread::sleep_for(std::chrono::duration<double>(5));
333   {
334     std::unique_lock<std::mutex> g(l);
335     stop_getters = true;
336     c.notify_all();
337   }
338   for (auto &&i: gts) i.join();
339   gts.clear();
340
341   {
342     std::unique_lock<std::mutex> g(l);
343     stop_putters = true;
344     c.notify_all();
345   }
346   for (auto &&i: pts) i.join();
347   pts.clear();
348
349   return make_pair(
350     ((double)total_observed_total)/((double)total_observations),
351     wait_time / waits);
352 }
353
354 TEST(BackoffThrottle, destruct) {
355   EXPECT_DEATH({
356       auto throttle = ceph::make_unique<BackoffThrottle>(
357         g_ceph_context, "destructor test", 10);
358       ASSERT_TRUE(throttle->set_params(0.4, 0.6, 1000, 2, 10, 6, nullptr));
359
360       throttle->get(5);
361       {
362         auto& t = *throttle;
363         std::thread([&t]() {
364             usleep(5);
365             t.get(6);
366           });
367       }
368       // No equivalent of get_or_fail()
369       std::this_thread::sleep_for(std::chrono::milliseconds(250));
370     }, ".*");
371 }
372
373 TEST(BackoffThrottle, undersaturated)
374 {
375   auto results = test_backoff(
376     0.4,
377     0.6,
378     1000,
379     2,
380     10,
381     100,
382     0.0001,
383     3,
384     6);
385   ASSERT_LT(results.first, 45);
386   ASSERT_GT(results.first, 35);
387   ASSERT_LT(results.second.count(), 0.0002);
388   ASSERT_GT(results.second.count(), 0.00005);
389 }
390
391 TEST(BackoffThrottle, balanced)
392 {
393   auto results = test_backoff(
394     0.4,
395     0.6,
396     1000,
397     2,
398     10,
399     100,
400     0.001,
401     7,
402     2);
403   ASSERT_LT(results.first, 60);
404   ASSERT_GT(results.first, 40);
405   ASSERT_LT(results.second.count(), 0.002);
406   ASSERT_GT(results.second.count(), 0.0005);
407 }
408
409 TEST(BackoffThrottle, oversaturated)
410 {
411   auto results = test_backoff(
412     0.4,
413     0.6,
414     10000000,
415     2,
416     10,
417     100,
418     0.001,
419     1,
420     3);
421   ASSERT_LT(results.first, 101);
422   ASSERT_GT(results.first, 85);
423   ASSERT_LT(results.second.count(), 0.002);
424   ASSERT_GT(results.second.count(), 0.0005);
425 }
426
427 TEST(OrderedThrottle, destruct) {
428   EXPECT_DEATH({
429       auto throttle = ceph::make_unique<OrderedThrottle>(1, false);
430       throttle->start_op(nullptr);
431       {
432         auto& t = *throttle;
433         std::thread([&t]() {
434             usleep(5);
435             t.start_op(nullptr);
436           });
437       }
438       // No equivalent of get_or_fail()
439       std::this_thread::sleep_for(std::chrono::milliseconds(250));
440     }, ".*");
441 }
442
443 /*
444  * Local Variables:
445  * compile-command: "cd ../.. ;
446  *   make unittest_throttle ;
447  *   ./unittest_throttle # --gtest_filter=ThrottleTest.destructor \
448  *       --log-to-stderr=true --debug-filestore=20
449  * "
450  * End:
451  */