Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / kv_store_bench.cc
1 /*
2  * KvStoreBench.cc
3  *
4  *  Created on: Aug 23, 2012
5  *      Author: eleanor
6  */
7
8 #include "test/kv_store_bench.h"
9 #include "key_value_store/key_value_structure.h"
10 #include "key_value_store/kv_flat_btree_async.h"
11 #include "include/rados/librados.hpp"
12 #include "test/omap_bench.h"
13 #include "common/ceph_argparse.h"
14
15
16 #include <string>
17 #include <climits>
18 #include <iostream>
19 #include <sstream>
20 #include <cmath>
21
22 KvStoreBench::KvStoreBench()
23 : entries(30),
24   ops(100),
25   clients(5),
26   key_size(5),
27   val_size(7),
28   max_ops_in_flight(8),
29   clear_first(false),
30   k(2),
31   cache_size(10),
32   cache_refresh(1),
33   client_name("admin"),
34   verbose(false),
35   kvs(NULL),
36   data_lock("data lock"),
37   ops_in_flight(0),
38   ops_in_flight_lock("KvStoreBench::ops_in_flight_lock"),
39   rados_id("admin"),
40   pool_name("rbd"),
41   io_ctx_ready(false)
42 {
43   probs[25] = 'i';
44   probs[50] = 'u';
45   probs[75] = 'd';
46   probs[100] = 'r';
47 }
48
49 KvStoreBench::~KvStoreBench()
50 {
51   if (io_ctx_ready) {
52     librados::ObjectWriteOperation owo;
53     owo.remove();
54     io_ctx.operate(client_name + ".done-setting", &owo);
55   }
56   delete kvs;
57 }
58
59 int KvStoreBench::setup(int argc, const char** argv) {
60   vector<const char*> args;
61   argv_to_vec(argc,argv,args);
62   srand(time(NULL));
63
64   stringstream help;
65   help
66       << "Usage: KvStoreBench [options]\n"
67       << "Generate latency and throughput statistics for the key value store\n"
68       << "\n"
69       << "There are two sets of options - workload options affect the kind of\n"
70       << "test to run, while algorithm options affect how the key value\n"
71       << "store handles the workload.\n"
72       << "\n"
73       << "There are about entries / k objects in the store to begin with.\n"
74       << "Higher k values reduce the likelihood of splits and the likelihood\n"
75       << "multiple writers simultaneously faling to write because an object \n"
76       << "is full, but having a high k also means there will be more object\n"
77       << "contention.\n"
78       << "\n"
79       << "WORKLOAD OPTIONS\n"
80       << "   --name <client name>                          client name (default admin)\n"
81       << "   --entries <number>                            number of key/value pairs to store initially\n"
82       << "                                                 (default " << entries << ")\n"
83       << "   --ops <number>                                number of operations to run\n"
84       << "   --keysize <number>                            number of characters per key (default " << key_size << ")\n"
85       << "   --valsize <number>                            number of characters per value (default " << val_size << ")\n"
86       << "   -t <number>                                   number of operations in flight concurrently\n"
87       << "                                                 (default " << max_ops_in_flight << ")\n"
88       << "   --clients <number>                            tells this instance how many total clients are. Note that\n"
89       << "                                                 changing this does not change the number of clients."
90       << "   -d <insert> <update> <delete> <read>          percent (1-100) of operations that should be of each type\n"
91       << "                                                 (default 25 25 25 25)\n"
92       << "   -r <number>                                   random seed to use (default time(0))\n"
93       << "ALGORITHM OPTIONS\n"
94       << "   --kval                                        k, where each object has a number of entries\n"
95       << "                                                 >= k and <= 2k.\n"
96       << "   --cache-size                                  number of index entries to keep in cache\n"
97       << "                                                 (default " << cache_size << ")\n"
98       << "   --cache-refresh                               percent (1-100) of cache-size to read each \n"
99       << "                                                 time the index is read\n"
100       << "OTHER OPTIONS\n"
101       << "   --verbosity-on                                display debug output\n"
102       << "   --clear-first                                 delete all existing objects in the pool before running tests\n";
103   for (unsigned i = 0; i < args.size(); i++) {
104     if(i < args.size() - 1) {
105       if (strcmp(args[i], "--ops") == 0) {
106         ops = atoi(args[i+1]);
107       } else if (strcmp(args[i], "--entries") == 0) {
108         entries = atoi(args[i+1]);
109       } else if (strcmp(args[i], "--kval") == 0) {
110         k = atoi(args[i+1]);
111       } else if (strcmp(args[i], "--keysize") == 0) {
112         key_size = atoi(args[i+1]);
113       } else if (strcmp(args[i], "--valsize") == 0) {
114         val_size = atoi(args[i+1]);
115       } else if (strcmp(args[i], "--cache-size") == 0) {
116         cache_size = atoi(args[i+1]);
117       } else if (strcmp(args[i], "--cache-refresh") == 0) {
118         cache_refresh = 100 / atoi(args[i+1]);
119       } else if (strcmp(args[i], "-t") == 0) {
120         max_ops_in_flight = atoi(args[i+1]);
121       } else if (strcmp(args[i], "--clients") == 0) {
122         clients = atoi(args[i+1]);
123       } else if (strcmp(args[i], "-d") == 0) {
124         if (i + 4 >= args.size()) {
125           cout << "Invalid arguments after -d: there must be 4 of them."
126               << std::endl;
127           continue;
128         } else {
129           probs.clear();
130           int sum = atoi(args[i + 1]);
131           probs[sum] = 'i';
132           sum += atoi(args[i + 2]);
133           probs[sum] = 'u';
134           sum += atoi(args[i + 3]);
135           probs[sum] = 'd';
136           sum += atoi(args[i + 4]);
137           probs[sum] = 'r';
138           if (sum != 100) {
139             cout << "Invalid arguments after -d: they must add to 100."
140                 << std::endl;
141           }
142         }
143       } else if (strcmp(args[i], "--name") == 0) {
144         client_name = args[i+1];
145       } else if (strcmp(args[i], "-r") == 0) {
146         srand(atoi(args[i+1]));
147       }
148     } else if (strcmp(args[i], "--verbosity-on") == 0) {
149       verbose = true;
150     } else if (strcmp(args[i], "--clear-first") == 0) {
151       clear_first = true;
152     } else if (strcmp(args[i], "--help") == 0) {
153       cout << help.str() << std::endl;
154       exit(1);
155     }
156   }
157
158   KvFlatBtreeAsync * kvba = new KvFlatBtreeAsync(k, client_name, cache_size,
159       cache_refresh, verbose);
160   kvs = kvba;
161
162   int r = rados.init(rados_id.c_str());
163   if (r < 0) {
164     cout << "error during init" << std::endl;
165     return r;
166   }
167   r = rados.conf_parse_argv(argc, argv);
168   if (r < 0) {
169     cout << "error during parsing args" << std::endl;
170     return r;
171   }
172   r = rados.conf_parse_env(NULL);
173   if (r < 0) {
174     cout << "error during parsing env" << std::endl;
175     return r;
176   }
177   r = rados.conf_read_file(NULL);
178   if (r < 0) {
179     cout << "error during read file" << std::endl;
180     return r;
181   }
182   r = rados.connect();
183   if (r < 0) {
184     cout << "error during connect: " << r << std::endl;
185     return r;
186   }
187   r = rados.ioctx_create(pool_name.c_str(), io_ctx);
188   if (r < 0) {
189     cout << "error creating io ctx" << std::endl;
190     rados.shutdown();
191     return r;
192   }
193   io_ctx_ready = true;
194
195   if (clear_first) {
196     librados::NObjectIterator it;
197     for (it = io_ctx.nobjects_begin(); it != io_ctx.nobjects_end(); ++it) {
198       librados::ObjectWriteOperation rm;
199       rm.remove();
200       io_ctx.operate(it->get_oid(), &rm);
201     }
202   }
203
204   int err = kvs->setup(argc, argv);
205   if (err < 0 && err != -17) {
206     cout << "error during setup of kvs: " << err << std::endl;
207     return err;
208   }
209
210   return 0;
211 }
212
213 string KvStoreBench::random_string(int len) {
214   string ret;
215   string alphanum = "0123456789"
216       "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
217       "abcdefghijklmnopqrstuvwxyz";
218   for (int i = 0; i < len; ++i) {
219     ret.push_back(alphanum[rand() % (alphanum.size() - 1)]);
220   }
221
222   return ret;
223 }
224
225 pair<string, bufferlist> KvStoreBench::rand_distr(bool new_elem) {
226   pair<string, bufferlist> ret;
227   if (new_elem) {
228     ret = make_pair(random_string(key_size),
229         KvFlatBtreeAsync::to_bl(random_string(val_size)));
230     key_set.insert(ret.first);
231   } else {
232     if (key_set.size() == 0) {
233       return make_pair("",KvFlatBtreeAsync::to_bl(""));
234     }
235     string get_string = random_string(key_size);
236     std::set<string>::iterator it = key_set.lower_bound(get_string);
237     if (it == key_set.end()) {
238       ret.first = *(key_set.rbegin());
239     } else {
240       ret.first = *it;
241     }
242     ret.second = KvFlatBtreeAsync::to_bl(random_string(val_size));
243   }
244   return ret;
245 }
246
247 int KvStoreBench::test_random_insertions() {
248   int err;
249   if (entries == 0) {
250     return 0;
251   }
252   stringstream prev_ss;
253   prev_ss << (atoi(client_name.c_str()) - 1);
254   string prev_rid = prev_ss.str();
255   stringstream last_ss;
256   if (client_name.size() > 1) {
257     last_ss << client_name.substr(0,client_name.size() - 2);
258   }
259   last_ss << clients - 1;
260   string last_rid = client_name == "admin" ? "admin" : last_ss.str();
261
262   map<string, bufferlist> big_map;
263   for (int i = 0; i < entries; i++) {
264     bufferlist bfr;
265     bfr.append(random_string(7));
266     big_map[random_string(5)] = bfr;
267   }
268
269   uint64_t uint;
270   time_t t;
271   if (client_name[client_name.size() - 1] != '0' && client_name != "admin") {
272     do {
273       librados::ObjectReadOperation oro;
274       oro.stat(&uint, &t, &err);
275       err = io_ctx.operate(prev_rid + ".done-setting", &oro, NULL);
276       if (verbose) cout << "reading " << prev_rid << ": err = " << err
277           << std::endl;
278     } while (err != 0);
279     cout << "detected " << prev_rid << ".done-setting" << std::endl;
280   }
281
282   cout << "testing random insertions";
283   err = kvs->set_many(big_map);
284   if (err < 0) {
285     cout << "error setting things" << std::endl;
286     return err;
287   }
288
289   librados::ObjectWriteOperation owo;
290   owo.create(true);
291   io_ctx.operate(client_name + ".done-setting", &owo);
292   cout << "created " << client_name + ".done-setting. waiting for "
293       << last_rid << ".done-setting" << std::endl;
294
295   do {
296     librados::ObjectReadOperation oro;
297     oro.stat(&uint, &t, &err);
298     err = io_ctx.operate(last_rid + ".done-setting", &oro, NULL);
299   } while (err != 0);
300   cout << "detected " << last_rid << ".done-setting" << std::endl;
301
302   return err;
303 }
304
305 void KvStoreBench::aio_callback_timed(int * err, void *arg) {
306   timed_args *args = reinterpret_cast<timed_args *>(arg);
307   Mutex * ops_in_flight_lock = &args->kvsb->ops_in_flight_lock;
308   Mutex * data_lock = &args->kvsb->data_lock;
309   Cond * op_avail = &args->kvsb->op_avail;
310   int *ops_in_flight = &args->kvsb->ops_in_flight;
311   if (*err < 0 && *err != -61) {
312     cerr << "Error during " << args->op << " operation: " << *err << std::endl;
313   }
314
315   args->sw.stop_time();
316   double time = args->sw.get_time();
317   args->sw.clear();
318
319   data_lock->Lock();
320   //latency
321   args->kvsb->data.latency_jf.open_object_section("latency");
322   args->kvsb->data.latency_jf.dump_float(string(1, args->op).c_str(),
323       time);
324   args->kvsb->data.latency_jf.close_section();
325
326   //throughput
327   args->kvsb->data.throughput_jf.open_object_section("throughput");
328   args->kvsb->data.throughput_jf.dump_unsigned(string(1, args->op).c_str(),
329       ceph_clock_now());
330   args->kvsb->data.throughput_jf.close_section();
331
332   data_lock->Unlock();
333
334   ops_in_flight_lock->Lock();
335   (*ops_in_flight)--;
336   op_avail->Signal();
337   ops_in_flight_lock->Unlock();
338
339   delete args;
340 }
341
342 int KvStoreBench::test_teuthology_aio(next_gen_t distr,
343     const map<int, char> &probs)
344 {
345   int err = 0;
346   cout << "inserting initial entries..." << std::endl;
347   err = test_random_insertions();
348   if (err < 0) {
349     return err;
350   }
351   cout << "finished inserting initial entries. Waiting 10 seconds for everyone"
352       << " to catch up..." << std::endl;
353
354   sleep(10);
355
356   cout << "done waiting. Starting random operations..." << std::endl;
357
358   Mutex::Locker l(ops_in_flight_lock);
359   for (int i = 0; i < ops; i++) {
360     assert(ops_in_flight <= max_ops_in_flight);
361     if (ops_in_flight == max_ops_in_flight) {
362       int err = op_avail.Wait(ops_in_flight_lock);
363       if (err < 0) {
364         assert(false);
365         return err;
366       }
367       assert(ops_in_flight < max_ops_in_flight);
368     }
369     cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / "
370         << ops << std::endl;
371     timed_args * cb_args = new timed_args(this);
372     pair<string, bufferlist> kv;
373     int random = (rand() % 100);
374     cb_args->op = probs.lower_bound(random)->second;
375     switch (cb_args->op) {
376     case 'i':
377       kv = (((KvStoreBench *)this)->*distr)(true);
378       if (kv.first == "") {
379         i--;
380         delete cb_args;
381         continue;
382       }
383       ops_in_flight++;
384       cb_args->sw.start_time();
385       kvs->aio_set(kv.first, kv.second, false, aio_callback_timed,
386           cb_args, &cb_args->err);
387       break;
388     case 'u':
389       kv = (((KvStoreBench *)this)->*distr)(false);
390       if (kv.first == "") {
391         i--;
392         delete cb_args;
393         continue;
394       }
395       ops_in_flight++;
396       cb_args->sw.start_time();
397       kvs->aio_set(kv.first, kv.second, true, aio_callback_timed,
398           cb_args, &cb_args->err);
399       break;
400     case 'd':
401       kv = (((KvStoreBench *)this)->*distr)(false);
402       if (kv.first == "") {
403         i--;
404         delete cb_args;
405         continue;
406       }
407       key_set.erase(kv.first);
408       ops_in_flight++;
409       cb_args->sw.start_time();
410       kvs->aio_remove(kv.first, aio_callback_timed, cb_args, &cb_args->err);
411       break;
412     case 'r':
413       kv = (((KvStoreBench *)this)->*distr)(false);
414       if (kv.first == "") {
415         i--;
416         delete cb_args;
417         continue;
418       }
419       bufferlist val;
420       ops_in_flight++;
421       cb_args->sw.start_time();
422       kvs->aio_get(kv.first, &cb_args->val, aio_callback_timed,
423           cb_args, &cb_args->err);
424       break;
425     }
426
427     delete cb_args;
428   }
429
430   while(ops_in_flight > 0) {
431     op_avail.Wait(ops_in_flight_lock);
432   }
433
434   print_time_data();
435   return err;
436 }
437
438 int KvStoreBench::test_teuthology_sync(next_gen_t distr,
439     const map<int, char> &probs)
440 {
441   int err = 0;
442   err = test_random_insertions();
443   if (err < 0) {
444     return err;
445   }
446   sleep(10);
447   for (int i = 0; i < ops; i++) {
448     StopWatch sw;
449     pair<char, double> d;
450     cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / "
451         << ops << std::endl;
452     pair<string, bufferlist> kv;
453     int random = (rand() % 100);
454     d.first = probs.lower_bound(random)->second;
455     switch (d.first) {
456     case 'i':
457       kv = (((KvStoreBench *)this)->*distr)(true);
458       if (kv.first == "") {
459         i--;
460         continue;
461       }
462       sw.start_time();
463       err = kvs->set(kv.first, kv.second, true);
464       sw.stop_time();
465       if (err < 0) {
466         cout << "Error setting " << kv << ": " << err << std::endl;
467         return err;
468       }
469       break;
470     case 'u':
471       kv = (((KvStoreBench *)this)->*distr)(false);
472       if (kv.first == "") {
473         i--;
474         continue;
475       }
476       sw.start_time();
477       err = kvs->set(kv.first, kv.second, true);
478       sw.stop_time();
479       if (err < 0 && err != -61) {
480         cout << "Error updating " << kv << ": " << err << std::endl;
481         return err;
482       }
483       break;
484     case 'd':
485       kv = (((KvStoreBench *)this)->*distr)(false);
486       if (kv.first == "") {
487         i--;
488         continue;
489       }
490       key_set.erase(kv.first);
491       sw.start_time();
492       err = kvs->remove(kv.first);
493       sw.stop_time();
494       if (err < 0 && err != -61) {
495         cout << "Error removing " << kv << ": " << err << std::endl;
496         return err;
497       }
498       break;
499     case 'r':
500       kv = (((KvStoreBench *)this)->*distr)(false);
501       if (kv.first == "") {
502         i--;
503         continue;
504       }
505       bufferlist val;
506       sw.start_time();
507       err = kvs->get(kv.first, &kv.second);
508       sw.stop_time();
509       if (err < 0 && err != -61) {
510         cout << "Error getting " << kv << ": " << err << std::endl;
511         return err;
512       }
513       break;
514     }
515
516     double time = sw.get_time();
517     d.second = time;
518     sw.clear();
519     //latency
520     data.latency_jf.open_object_section("latency");
521     data.latency_jf.dump_float(string(1, d.first).c_str(),
522         time);
523     data.latency_jf.close_section();
524   }
525
526   print_time_data();
527   return err;
528 }
529
530 void KvStoreBench::print_time_data() {
531   cout << "========================================================\n";
532   cout << "latency:" << std::endl;
533   data.latency_jf.flush(cout);
534   cout << std::endl;
535   cout << "throughput:" << std::endl;
536   data.throughput_jf.flush(cout);
537   cout << "\n========================================================"
538        << std::endl;
539 }
540
541 int KvStoreBench::teuthology_tests() {
542   int err = 0;
543   if (max_ops_in_flight > 1) {
544     test_teuthology_aio(&KvStoreBench::rand_distr, probs);
545   } else {
546     err = test_teuthology_sync(&KvStoreBench::rand_distr, probs);
547   }
548   return err;
549 }
550
551 int main(int argc, const char** argv) {
552   KvStoreBench kvsb;
553   int err = kvsb.setup(argc, argv);
554   if (err == 0) cout << "setup successful" << std::endl;
555   else{
556     cout << "error " << err << std::endl;
557     return err;
558   }
559   err = kvsb.teuthology_tests();
560   if (err < 0) return err;
561   return 0;
562 };