4 * Created on: Aug 23, 2012
8 #include "test/kv_store_bench.h"
9 #include "key_value_store/key_value_structure.h"
10 #include "key_value_store/kv_flat_btree_async.h"
11 #include "include/rados/librados.hpp"
12 #include "test/omap_bench.h"
13 #include "common/ceph_argparse.h"
22 KvStoreBench::KvStoreBench()
36 data_lock("data lock"),
38 ops_in_flight_lock("KvStoreBench::ops_in_flight_lock"),
49 KvStoreBench::~KvStoreBench()
52 librados::ObjectWriteOperation owo;
54 io_ctx.operate(client_name + ".done-setting", &owo);
59 int KvStoreBench::setup(int argc, const char** argv) {
60 vector<const char*> args;
61 argv_to_vec(argc,argv,args);
66 << "Usage: KvStoreBench [options]\n"
67 << "Generate latency and throughput statistics for the key value store\n"
69 << "There are two sets of options - workload options affect the kind of\n"
70 << "test to run, while algorithm options affect how the key value\n"
71 << "store handles the workload.\n"
73 << "There are about entries / k objects in the store to begin with.\n"
74 << "Higher k values reduce the likelihood of splits and the likelihood\n"
75 << "multiple writers simultaneously faling to write because an object \n"
76 << "is full, but having a high k also means there will be more object\n"
79 << "WORKLOAD OPTIONS\n"
80 << " --name <client name> client name (default admin)\n"
81 << " --entries <number> number of key/value pairs to store initially\n"
82 << " (default " << entries << ")\n"
83 << " --ops <number> number of operations to run\n"
84 << " --keysize <number> number of characters per key (default " << key_size << ")\n"
85 << " --valsize <number> number of characters per value (default " << val_size << ")\n"
86 << " -t <number> number of operations in flight concurrently\n"
87 << " (default " << max_ops_in_flight << ")\n"
88 << " --clients <number> tells this instance how many total clients are. Note that\n"
89 << " changing this does not change the number of clients."
90 << " -d <insert> <update> <delete> <read> percent (1-100) of operations that should be of each type\n"
91 << " (default 25 25 25 25)\n"
92 << " -r <number> random seed to use (default time(0))\n"
93 << "ALGORITHM OPTIONS\n"
94 << " --kval k, where each object has a number of entries\n"
95 << " >= k and <= 2k.\n"
96 << " --cache-size number of index entries to keep in cache\n"
97 << " (default " << cache_size << ")\n"
98 << " --cache-refresh percent (1-100) of cache-size to read each \n"
99 << " time the index is read\n"
101 << " --verbosity-on display debug output\n"
102 << " --clear-first delete all existing objects in the pool before running tests\n";
103 for (unsigned i = 0; i < args.size(); i++) {
104 if(i < args.size() - 1) {
105 if (strcmp(args[i], "--ops") == 0) {
106 ops = atoi(args[i+1]);
107 } else if (strcmp(args[i], "--entries") == 0) {
108 entries = atoi(args[i+1]);
109 } else if (strcmp(args[i], "--kval") == 0) {
111 } else if (strcmp(args[i], "--keysize") == 0) {
112 key_size = atoi(args[i+1]);
113 } else if (strcmp(args[i], "--valsize") == 0) {
114 val_size = atoi(args[i+1]);
115 } else if (strcmp(args[i], "--cache-size") == 0) {
116 cache_size = atoi(args[i+1]);
117 } else if (strcmp(args[i], "--cache-refresh") == 0) {
118 cache_refresh = 100 / atoi(args[i+1]);
119 } else if (strcmp(args[i], "-t") == 0) {
120 max_ops_in_flight = atoi(args[i+1]);
121 } else if (strcmp(args[i], "--clients") == 0) {
122 clients = atoi(args[i+1]);
123 } else if (strcmp(args[i], "-d") == 0) {
124 if (i + 4 >= args.size()) {
125 cout << "Invalid arguments after -d: there must be 4 of them."
130 int sum = atoi(args[i + 1]);
132 sum += atoi(args[i + 2]);
134 sum += atoi(args[i + 3]);
136 sum += atoi(args[i + 4]);
139 cout << "Invalid arguments after -d: they must add to 100."
143 } else if (strcmp(args[i], "--name") == 0) {
144 client_name = args[i+1];
145 } else if (strcmp(args[i], "-r") == 0) {
146 srand(atoi(args[i+1]));
148 } else if (strcmp(args[i], "--verbosity-on") == 0) {
150 } else if (strcmp(args[i], "--clear-first") == 0) {
152 } else if (strcmp(args[i], "--help") == 0) {
153 cout << help.str() << std::endl;
158 KvFlatBtreeAsync * kvba = new KvFlatBtreeAsync(k, client_name, cache_size,
159 cache_refresh, verbose);
162 int r = rados.init(rados_id.c_str());
164 cout << "error during init" << std::endl;
167 r = rados.conf_parse_argv(argc, argv);
169 cout << "error during parsing args" << std::endl;
172 r = rados.conf_parse_env(NULL);
174 cout << "error during parsing env" << std::endl;
177 r = rados.conf_read_file(NULL);
179 cout << "error during read file" << std::endl;
184 cout << "error during connect: " << r << std::endl;
187 r = rados.ioctx_create(pool_name.c_str(), io_ctx);
189 cout << "error creating io ctx" << std::endl;
196 librados::NObjectIterator it;
197 for (it = io_ctx.nobjects_begin(); it != io_ctx.nobjects_end(); ++it) {
198 librados::ObjectWriteOperation rm;
200 io_ctx.operate(it->get_oid(), &rm);
204 int err = kvs->setup(argc, argv);
205 if (err < 0 && err != -17) {
206 cout << "error during setup of kvs: " << err << std::endl;
213 string KvStoreBench::random_string(int len) {
215 string alphanum = "0123456789"
216 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
217 "abcdefghijklmnopqrstuvwxyz";
218 for (int i = 0; i < len; ++i) {
219 ret.push_back(alphanum[rand() % (alphanum.size() - 1)]);
225 pair<string, bufferlist> KvStoreBench::rand_distr(bool new_elem) {
226 pair<string, bufferlist> ret;
228 ret = make_pair(random_string(key_size),
229 KvFlatBtreeAsync::to_bl(random_string(val_size)));
230 key_set.insert(ret.first);
232 if (key_set.size() == 0) {
233 return make_pair("",KvFlatBtreeAsync::to_bl(""));
235 string get_string = random_string(key_size);
236 std::set<string>::iterator it = key_set.lower_bound(get_string);
237 if (it == key_set.end()) {
238 ret.first = *(key_set.rbegin());
242 ret.second = KvFlatBtreeAsync::to_bl(random_string(val_size));
247 int KvStoreBench::test_random_insertions() {
252 stringstream prev_ss;
253 prev_ss << (atoi(client_name.c_str()) - 1);
254 string prev_rid = prev_ss.str();
255 stringstream last_ss;
256 if (client_name.size() > 1) {
257 last_ss << client_name.substr(0,client_name.size() - 2);
259 last_ss << clients - 1;
260 string last_rid = client_name == "admin" ? "admin" : last_ss.str();
262 map<string, bufferlist> big_map;
263 for (int i = 0; i < entries; i++) {
265 bfr.append(random_string(7));
266 big_map[random_string(5)] = bfr;
271 if (client_name[client_name.size() - 1] != '0' && client_name != "admin") {
273 librados::ObjectReadOperation oro;
274 oro.stat(&uint, &t, &err);
275 err = io_ctx.operate(prev_rid + ".done-setting", &oro, NULL);
276 if (verbose) cout << "reading " << prev_rid << ": err = " << err
279 cout << "detected " << prev_rid << ".done-setting" << std::endl;
282 cout << "testing random insertions";
283 err = kvs->set_many(big_map);
285 cout << "error setting things" << std::endl;
289 librados::ObjectWriteOperation owo;
291 io_ctx.operate(client_name + ".done-setting", &owo);
292 cout << "created " << client_name + ".done-setting. waiting for "
293 << last_rid << ".done-setting" << std::endl;
296 librados::ObjectReadOperation oro;
297 oro.stat(&uint, &t, &err);
298 err = io_ctx.operate(last_rid + ".done-setting", &oro, NULL);
300 cout << "detected " << last_rid << ".done-setting" << std::endl;
305 void KvStoreBench::aio_callback_timed(int * err, void *arg) {
306 timed_args *args = reinterpret_cast<timed_args *>(arg);
307 Mutex * ops_in_flight_lock = &args->kvsb->ops_in_flight_lock;
308 Mutex * data_lock = &args->kvsb->data_lock;
309 Cond * op_avail = &args->kvsb->op_avail;
310 int *ops_in_flight = &args->kvsb->ops_in_flight;
311 if (*err < 0 && *err != -61) {
312 cerr << "Error during " << args->op << " operation: " << *err << std::endl;
315 args->sw.stop_time();
316 double time = args->sw.get_time();
321 args->kvsb->data.latency_jf.open_object_section("latency");
322 args->kvsb->data.latency_jf.dump_float(string(1, args->op).c_str(),
324 args->kvsb->data.latency_jf.close_section();
327 args->kvsb->data.throughput_jf.open_object_section("throughput");
328 args->kvsb->data.throughput_jf.dump_unsigned(string(1, args->op).c_str(),
330 args->kvsb->data.throughput_jf.close_section();
334 ops_in_flight_lock->Lock();
337 ops_in_flight_lock->Unlock();
342 int KvStoreBench::test_teuthology_aio(next_gen_t distr,
343 const map<int, char> &probs)
346 cout << "inserting initial entries..." << std::endl;
347 err = test_random_insertions();
351 cout << "finished inserting initial entries. Waiting 10 seconds for everyone"
352 << " to catch up..." << std::endl;
356 cout << "done waiting. Starting random operations..." << std::endl;
358 Mutex::Locker l(ops_in_flight_lock);
359 for (int i = 0; i < ops; i++) {
360 assert(ops_in_flight <= max_ops_in_flight);
361 if (ops_in_flight == max_ops_in_flight) {
362 int err = op_avail.Wait(ops_in_flight_lock);
367 assert(ops_in_flight < max_ops_in_flight);
369 cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / "
371 timed_args * cb_args = new timed_args(this);
372 pair<string, bufferlist> kv;
373 int random = (rand() % 100);
374 cb_args->op = probs.lower_bound(random)->second;
375 switch (cb_args->op) {
377 kv = (((KvStoreBench *)this)->*distr)(true);
378 if (kv.first == "") {
384 cb_args->sw.start_time();
385 kvs->aio_set(kv.first, kv.second, false, aio_callback_timed,
386 cb_args, &cb_args->err);
389 kv = (((KvStoreBench *)this)->*distr)(false);
390 if (kv.first == "") {
396 cb_args->sw.start_time();
397 kvs->aio_set(kv.first, kv.second, true, aio_callback_timed,
398 cb_args, &cb_args->err);
401 kv = (((KvStoreBench *)this)->*distr)(false);
402 if (kv.first == "") {
407 key_set.erase(kv.first);
409 cb_args->sw.start_time();
410 kvs->aio_remove(kv.first, aio_callback_timed, cb_args, &cb_args->err);
413 kv = (((KvStoreBench *)this)->*distr)(false);
414 if (kv.first == "") {
421 cb_args->sw.start_time();
422 kvs->aio_get(kv.first, &cb_args->val, aio_callback_timed,
423 cb_args, &cb_args->err);
430 while(ops_in_flight > 0) {
431 op_avail.Wait(ops_in_flight_lock);
438 int KvStoreBench::test_teuthology_sync(next_gen_t distr,
439 const map<int, char> &probs)
442 err = test_random_insertions();
447 for (int i = 0; i < ops; i++) {
449 pair<char, double> d;
450 cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / "
452 pair<string, bufferlist> kv;
453 int random = (rand() % 100);
454 d.first = probs.lower_bound(random)->second;
457 kv = (((KvStoreBench *)this)->*distr)(true);
458 if (kv.first == "") {
463 err = kvs->set(kv.first, kv.second, true);
466 cout << "Error setting " << kv << ": " << err << std::endl;
471 kv = (((KvStoreBench *)this)->*distr)(false);
472 if (kv.first == "") {
477 err = kvs->set(kv.first, kv.second, true);
479 if (err < 0 && err != -61) {
480 cout << "Error updating " << kv << ": " << err << std::endl;
485 kv = (((KvStoreBench *)this)->*distr)(false);
486 if (kv.first == "") {
490 key_set.erase(kv.first);
492 err = kvs->remove(kv.first);
494 if (err < 0 && err != -61) {
495 cout << "Error removing " << kv << ": " << err << std::endl;
500 kv = (((KvStoreBench *)this)->*distr)(false);
501 if (kv.first == "") {
507 err = kvs->get(kv.first, &kv.second);
509 if (err < 0 && err != -61) {
510 cout << "Error getting " << kv << ": " << err << std::endl;
516 double time = sw.get_time();
520 data.latency_jf.open_object_section("latency");
521 data.latency_jf.dump_float(string(1, d.first).c_str(),
523 data.latency_jf.close_section();
530 void KvStoreBench::print_time_data() {
531 cout << "========================================================\n";
532 cout << "latency:" << std::endl;
533 data.latency_jf.flush(cout);
535 cout << "throughput:" << std::endl;
536 data.throughput_jf.flush(cout);
537 cout << "\n========================================================"
541 int KvStoreBench::teuthology_tests() {
543 if (max_ops_in_flight > 1) {
544 test_teuthology_aio(&KvStoreBench::rand_distr, probs);
546 err = test_teuthology_sync(&KvStoreBench::rand_distr, probs);
551 int main(int argc, const char** argv) {
553 int err = kvsb.setup(argc, argv);
554 if (err == 0) cout << "setup successful" << std::endl;
556 cout << "error " << err << std::endl;
559 err = kvsb.teuthology_tests();
560 if (err < 0) return err;