/* * KvStoreBench.cc * * Created on: Aug 23, 2012 * Author: eleanor */ #include "test/kv_store_bench.h" #include "key_value_store/key_value_structure.h" #include "key_value_store/kv_flat_btree_async.h" #include "include/rados/librados.hpp" #include "test/omap_bench.h" #include "common/ceph_argparse.h" #include #include #include #include #include KvStoreBench::KvStoreBench() : entries(30), ops(100), clients(5), key_size(5), val_size(7), max_ops_in_flight(8), clear_first(false), k(2), cache_size(10), cache_refresh(1), client_name("admin"), verbose(false), kvs(NULL), data_lock("data lock"), ops_in_flight(0), ops_in_flight_lock("KvStoreBench::ops_in_flight_lock"), rados_id("admin"), pool_name("rbd"), io_ctx_ready(false) { probs[25] = 'i'; probs[50] = 'u'; probs[75] = 'd'; probs[100] = 'r'; } KvStoreBench::~KvStoreBench() { if (io_ctx_ready) { librados::ObjectWriteOperation owo; owo.remove(); io_ctx.operate(client_name + ".done-setting", &owo); } delete kvs; } int KvStoreBench::setup(int argc, const char** argv) { vector args; argv_to_vec(argc,argv,args); srand(time(NULL)); stringstream help; help << "Usage: KvStoreBench [options]\n" << "Generate latency and throughput statistics for the key value store\n" << "\n" << "There are two sets of options - workload options affect the kind of\n" << "test to run, while algorithm options affect how the key value\n" << "store handles the workload.\n" << "\n" << "There are about entries / k objects in the store to begin with.\n" << "Higher k values reduce the likelihood of splits and the likelihood\n" << "multiple writers simultaneously faling to write because an object \n" << "is full, but having a high k also means there will be more object\n" << "contention.\n" << "\n" << "WORKLOAD OPTIONS\n" << " --name client name (default admin)\n" << " --entries number of key/value pairs to store initially\n" << " (default " << entries << ")\n" << " --ops number of operations to run\n" << " --keysize number of characters per key (default " << key_size << ")\n" << " --valsize number of characters per value (default " << val_size << ")\n" << " -t number of operations in flight concurrently\n" << " (default " << max_ops_in_flight << ")\n" << " --clients tells this instance how many total clients are. Note that\n" << " changing this does not change the number of clients." << " -d percent (1-100) of operations that should be of each type\n" << " (default 25 25 25 25)\n" << " -r random seed to use (default time(0))\n" << "ALGORITHM OPTIONS\n" << " --kval k, where each object has a number of entries\n" << " >= k and <= 2k.\n" << " --cache-size number of index entries to keep in cache\n" << " (default " << cache_size << ")\n" << " --cache-refresh percent (1-100) of cache-size to read each \n" << " time the index is read\n" << "OTHER OPTIONS\n" << " --verbosity-on display debug output\n" << " --clear-first delete all existing objects in the pool before running tests\n"; for (unsigned i = 0; i < args.size(); i++) { if(i < args.size() - 1) { if (strcmp(args[i], "--ops") == 0) { ops = atoi(args[i+1]); } else if (strcmp(args[i], "--entries") == 0) { entries = atoi(args[i+1]); } else if (strcmp(args[i], "--kval") == 0) { k = atoi(args[i+1]); } else if (strcmp(args[i], "--keysize") == 0) { key_size = atoi(args[i+1]); } else if (strcmp(args[i], "--valsize") == 0) { val_size = atoi(args[i+1]); } else if (strcmp(args[i], "--cache-size") == 0) { cache_size = atoi(args[i+1]); } else if (strcmp(args[i], "--cache-refresh") == 0) { cache_refresh = 100 / atoi(args[i+1]); } else if (strcmp(args[i], "-t") == 0) { max_ops_in_flight = atoi(args[i+1]); } else if (strcmp(args[i], "--clients") == 0) { clients = atoi(args[i+1]); } else if (strcmp(args[i], "-d") == 0) { if (i + 4 >= args.size()) { cout << "Invalid arguments after -d: there must be 4 of them." << std::endl; continue; } else { probs.clear(); int sum = atoi(args[i + 1]); probs[sum] = 'i'; sum += atoi(args[i + 2]); probs[sum] = 'u'; sum += atoi(args[i + 3]); probs[sum] = 'd'; sum += atoi(args[i + 4]); probs[sum] = 'r'; if (sum != 100) { cout << "Invalid arguments after -d: they must add to 100." << std::endl; } } } else if (strcmp(args[i], "--name") == 0) { client_name = args[i+1]; } else if (strcmp(args[i], "-r") == 0) { srand(atoi(args[i+1])); } } else if (strcmp(args[i], "--verbosity-on") == 0) { verbose = true; } else if (strcmp(args[i], "--clear-first") == 0) { clear_first = true; } else if (strcmp(args[i], "--help") == 0) { cout << help.str() << std::endl; exit(1); } } KvFlatBtreeAsync * kvba = new KvFlatBtreeAsync(k, client_name, cache_size, cache_refresh, verbose); kvs = kvba; int r = rados.init(rados_id.c_str()); if (r < 0) { cout << "error during init" << std::endl; return r; } r = rados.conf_parse_argv(argc, argv); if (r < 0) { cout << "error during parsing args" << std::endl; return r; } r = rados.conf_parse_env(NULL); if (r < 0) { cout << "error during parsing env" << std::endl; return r; } r = rados.conf_read_file(NULL); if (r < 0) { cout << "error during read file" << std::endl; return r; } r = rados.connect(); if (r < 0) { cout << "error during connect: " << r << std::endl; return r; } r = rados.ioctx_create(pool_name.c_str(), io_ctx); if (r < 0) { cout << "error creating io ctx" << std::endl; rados.shutdown(); return r; } io_ctx_ready = true; if (clear_first) { librados::NObjectIterator it; for (it = io_ctx.nobjects_begin(); it != io_ctx.nobjects_end(); ++it) { librados::ObjectWriteOperation rm; rm.remove(); io_ctx.operate(it->get_oid(), &rm); } } int err = kvs->setup(argc, argv); if (err < 0 && err != -17) { cout << "error during setup of kvs: " << err << std::endl; return err; } return 0; } string KvStoreBench::random_string(int len) { string ret; string alphanum = "0123456789" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"; for (int i = 0; i < len; ++i) { ret.push_back(alphanum[rand() % (alphanum.size() - 1)]); } return ret; } pair KvStoreBench::rand_distr(bool new_elem) { pair ret; if (new_elem) { ret = make_pair(random_string(key_size), KvFlatBtreeAsync::to_bl(random_string(val_size))); key_set.insert(ret.first); } else { if (key_set.size() == 0) { return make_pair("",KvFlatBtreeAsync::to_bl("")); } string get_string = random_string(key_size); std::set::iterator it = key_set.lower_bound(get_string); if (it == key_set.end()) { ret.first = *(key_set.rbegin()); } else { ret.first = *it; } ret.second = KvFlatBtreeAsync::to_bl(random_string(val_size)); } return ret; } int KvStoreBench::test_random_insertions() { int err; if (entries == 0) { return 0; } stringstream prev_ss; prev_ss << (atoi(client_name.c_str()) - 1); string prev_rid = prev_ss.str(); stringstream last_ss; if (client_name.size() > 1) { last_ss << client_name.substr(0,client_name.size() - 2); } last_ss << clients - 1; string last_rid = client_name == "admin" ? "admin" : last_ss.str(); map big_map; for (int i = 0; i < entries; i++) { bufferlist bfr; bfr.append(random_string(7)); big_map[random_string(5)] = bfr; } uint64_t uint; time_t t; if (client_name[client_name.size() - 1] != '0' && client_name != "admin") { do { librados::ObjectReadOperation oro; oro.stat(&uint, &t, &err); err = io_ctx.operate(prev_rid + ".done-setting", &oro, NULL); if (verbose) cout << "reading " << prev_rid << ": err = " << err << std::endl; } while (err != 0); cout << "detected " << prev_rid << ".done-setting" << std::endl; } cout << "testing random insertions"; err = kvs->set_many(big_map); if (err < 0) { cout << "error setting things" << std::endl; return err; } librados::ObjectWriteOperation owo; owo.create(true); io_ctx.operate(client_name + ".done-setting", &owo); cout << "created " << client_name + ".done-setting. waiting for " << last_rid << ".done-setting" << std::endl; do { librados::ObjectReadOperation oro; oro.stat(&uint, &t, &err); err = io_ctx.operate(last_rid + ".done-setting", &oro, NULL); } while (err != 0); cout << "detected " << last_rid << ".done-setting" << std::endl; return err; } void KvStoreBench::aio_callback_timed(int * err, void *arg) { timed_args *args = reinterpret_cast(arg); Mutex * ops_in_flight_lock = &args->kvsb->ops_in_flight_lock; Mutex * data_lock = &args->kvsb->data_lock; Cond * op_avail = &args->kvsb->op_avail; int *ops_in_flight = &args->kvsb->ops_in_flight; if (*err < 0 && *err != -61) { cerr << "Error during " << args->op << " operation: " << *err << std::endl; } args->sw.stop_time(); double time = args->sw.get_time(); args->sw.clear(); data_lock->Lock(); //latency args->kvsb->data.latency_jf.open_object_section("latency"); args->kvsb->data.latency_jf.dump_float(string(1, args->op).c_str(), time); args->kvsb->data.latency_jf.close_section(); //throughput args->kvsb->data.throughput_jf.open_object_section("throughput"); args->kvsb->data.throughput_jf.dump_unsigned(string(1, args->op).c_str(), ceph_clock_now()); args->kvsb->data.throughput_jf.close_section(); data_lock->Unlock(); ops_in_flight_lock->Lock(); (*ops_in_flight)--; op_avail->Signal(); ops_in_flight_lock->Unlock(); delete args; } int KvStoreBench::test_teuthology_aio(next_gen_t distr, const map &probs) { int err = 0; cout << "inserting initial entries..." << std::endl; err = test_random_insertions(); if (err < 0) { return err; } cout << "finished inserting initial entries. Waiting 10 seconds for everyone" << " to catch up..." << std::endl; sleep(10); cout << "done waiting. Starting random operations..." << std::endl; Mutex::Locker l(ops_in_flight_lock); for (int i = 0; i < ops; i++) { assert(ops_in_flight <= max_ops_in_flight); if (ops_in_flight == max_ops_in_flight) { int err = op_avail.Wait(ops_in_flight_lock); if (err < 0) { assert(false); return err; } assert(ops_in_flight < max_ops_in_flight); } cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / " << ops << std::endl; timed_args * cb_args = new timed_args(this); pair kv; int random = (rand() % 100); cb_args->op = probs.lower_bound(random)->second; switch (cb_args->op) { case 'i': kv = (((KvStoreBench *)this)->*distr)(true); if (kv.first == "") { i--; delete cb_args; continue; } ops_in_flight++; cb_args->sw.start_time(); kvs->aio_set(kv.first, kv.second, false, aio_callback_timed, cb_args, &cb_args->err); break; case 'u': kv = (((KvStoreBench *)this)->*distr)(false); if (kv.first == "") { i--; delete cb_args; continue; } ops_in_flight++; cb_args->sw.start_time(); kvs->aio_set(kv.first, kv.second, true, aio_callback_timed, cb_args, &cb_args->err); break; case 'd': kv = (((KvStoreBench *)this)->*distr)(false); if (kv.first == "") { i--; delete cb_args; continue; } key_set.erase(kv.first); ops_in_flight++; cb_args->sw.start_time(); kvs->aio_remove(kv.first, aio_callback_timed, cb_args, &cb_args->err); break; case 'r': kv = (((KvStoreBench *)this)->*distr)(false); if (kv.first == "") { i--; delete cb_args; continue; } bufferlist val; ops_in_flight++; cb_args->sw.start_time(); kvs->aio_get(kv.first, &cb_args->val, aio_callback_timed, cb_args, &cb_args->err); break; } delete cb_args; } while(ops_in_flight > 0) { op_avail.Wait(ops_in_flight_lock); } print_time_data(); return err; } int KvStoreBench::test_teuthology_sync(next_gen_t distr, const map &probs) { int err = 0; err = test_random_insertions(); if (err < 0) { return err; } sleep(10); for (int i = 0; i < ops; i++) { StopWatch sw; pair d; cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / " << ops << std::endl; pair kv; int random = (rand() % 100); d.first = probs.lower_bound(random)->second; switch (d.first) { case 'i': kv = (((KvStoreBench *)this)->*distr)(true); if (kv.first == "") { i--; continue; } sw.start_time(); err = kvs->set(kv.first, kv.second, true); sw.stop_time(); if (err < 0) { cout << "Error setting " << kv << ": " << err << std::endl; return err; } break; case 'u': kv = (((KvStoreBench *)this)->*distr)(false); if (kv.first == "") { i--; continue; } sw.start_time(); err = kvs->set(kv.first, kv.second, true); sw.stop_time(); if (err < 0 && err != -61) { cout << "Error updating " << kv << ": " << err << std::endl; return err; } break; case 'd': kv = (((KvStoreBench *)this)->*distr)(false); if (kv.first == "") { i--; continue; } key_set.erase(kv.first); sw.start_time(); err = kvs->remove(kv.first); sw.stop_time(); if (err < 0 && err != -61) { cout << "Error removing " << kv << ": " << err << std::endl; return err; } break; case 'r': kv = (((KvStoreBench *)this)->*distr)(false); if (kv.first == "") { i--; continue; } bufferlist val; sw.start_time(); err = kvs->get(kv.first, &kv.second); sw.stop_time(); if (err < 0 && err != -61) { cout << "Error getting " << kv << ": " << err << std::endl; return err; } break; } double time = sw.get_time(); d.second = time; sw.clear(); //latency data.latency_jf.open_object_section("latency"); data.latency_jf.dump_float(string(1, d.first).c_str(), time); data.latency_jf.close_section(); } print_time_data(); return err; } void KvStoreBench::print_time_data() { cout << "========================================================\n"; cout << "latency:" << std::endl; data.latency_jf.flush(cout); cout << std::endl; cout << "throughput:" << std::endl; data.throughput_jf.flush(cout); cout << "\n========================================================" << std::endl; } int KvStoreBench::teuthology_tests() { int err = 0; if (max_ops_in_flight > 1) { test_teuthology_aio(&KvStoreBench::rand_distr, probs); } else { err = test_teuthology_sync(&KvStoreBench::rand_distr, probs); } return err; } int main(int argc, const char** argv) { KvStoreBench kvsb; int err = kvsb.setup(argc, argv); if (err == 0) cout << "setup successful" << std::endl; else{ cout << "error " << err << std::endl; return err; } err = kvsb.teuthology_tests(); if (err < 0) return err; return 0; };