1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <condition_variable>
11 #include "os/ObjectStore.h"
13 #include "global/global_init.h"
15 #include "common/strtol.h"
16 #include "common/ceph_argparse.h"
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_filestore
23 derr << "usage: ceph_objectstore_bench [flags]\n"
25 " total size in bytes\n"
27 " block size in bytes for each write\n"
29 " number of times to repeat the write cycle\n"
31 " number of threads to carry out this workload\n"
33 " have each thread write to a separate object\n" << dendl;
34 generic_server_usage();
37 // helper class for bytes with units
40 // cppcheck-suppress noExplicitConstructor
41 byte_units(size_t v) : v(v) {}
43 bool parse(const std::string &val, std::string *err);
45 operator size_t() const { return v; }
48 bool byte_units::parse(const std::string &val, std::string *err)
50 v = strict_sistrtoll(val.c_str(), err);
54 std::ostream& operator<<(std::ostream &out, const byte_units &amount)
56 static const char* units[] = { "B", "KB", "MB", "GB", "TB", "PB", "EB" };
57 static const int max_units = sizeof(units)/sizeof(*units);
61 while (v >= 1024 && unit < max_units) {
62 // preserve significant bytes
63 if (v < 1048576 && (v % 1024 != 0))
68 return out << v << ' ' << units[unit];
73 byte_units block_size;
78 : size(1048576), block_size(4096),
79 repeats(1), threads(1),
80 multi_object(false) {}
83 class C_NotifyCond : public Context {
85 std::condition_variable *cond;
88 C_NotifyCond(std::mutex *mutex, std::condition_variable *cond, bool *done)
89 : mutex(mutex), cond(cond), done(done) {}
90 void finish(int r) override {
91 std::lock_guard<std::mutex> lock(*mutex);
97 void osbench_worker(ObjectStore *os, const Config &cfg,
98 const coll_t cid, const ghobject_t oid,
99 uint64_t starting_offset)
102 data.append(buffer::create(cfg.block_size));
104 dout(0) << "Writing " << cfg.size
105 << " in blocks of " << cfg.block_size << dendl;
107 assert(starting_offset < cfg.size);
108 assert(starting_offset % cfg.block_size == 0);
110 ObjectStore::Sequencer sequencer("osbench");
112 for (int i = 0; i < cfg.repeats; ++i) {
113 uint64_t offset = starting_offset;
114 size_t len = cfg.size;
116 vector<ObjectStore::Transaction> tls;
118 std::cout << "Write cycle " << i << std::endl;
120 size_t count = len < cfg.block_size ? len : (size_t)cfg.block_size;
122 auto t = new ObjectStore::Transaction;
123 t->write(cid, oid, offset, count, data);
124 tls.push_back(std::move(*t));
128 if (offset > cfg.size)
133 // set up the finisher
135 std::condition_variable cond;
138 os->queue_transactions(&sequencer, tls, nullptr,
139 new C_NotifyCond(&mutex, &cond, &done));
141 std::unique_lock<std::mutex> lock(mutex);
142 cond.wait(lock, [&done](){ return done; });
150 int main(int argc, const char *argv[])
154 // command-line arguments
155 vector<const char*> args;
156 argv_to_vec(argc, argv, args);
159 auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
160 CODE_ENVIRONMENT_UTILITY, 0);
163 vector<const char*>::iterator i = args.begin();
164 while (i != args.end()) {
165 if (ceph_argparse_double_dash(args, i))
168 if (ceph_argparse_witharg(args, i, &val, "--size", (char*)nullptr)) {
170 if (!cfg.size.parse(val, &err)) {
171 derr << "error parsing size: " << err << dendl;
174 } else if (ceph_argparse_witharg(args, i, &val, "--block-size", (char*)nullptr)) {
176 if (!cfg.block_size.parse(val, &err)) {
177 derr << "error parsing block-size: " << err << dendl;
180 } else if (ceph_argparse_witharg(args, i, &val, "--repeats", (char*)nullptr)) {
181 cfg.repeats = atoi(val.c_str());
182 } else if (ceph_argparse_witharg(args, i, &val, "--threads", (char*)nullptr)) {
183 cfg.threads = atoi(val.c_str());
184 } else if (ceph_argparse_flag(args, i, "--multi-object", (char*)nullptr)) {
185 cfg.multi_object = true;
187 derr << "Error: can't understand argument: " << *i << "\n" << dendl;
192 common_init_finish(g_ceph_context);
194 // create object store
195 dout(0) << "objectstore " << g_conf->osd_objectstore << dendl;
196 dout(0) << "data " << g_conf->osd_data << dendl;
197 dout(0) << "journal " << g_conf->osd_journal << dendl;
198 dout(0) << "size " << cfg.size << dendl;
199 dout(0) << "block-size " << cfg.block_size << dendl;
200 dout(0) << "repeats " << cfg.repeats << dendl;
201 dout(0) << "threads " << cfg.threads << dendl;
203 auto os = std::unique_ptr<ObjectStore>(
204 ObjectStore::create(g_ceph_context,
205 g_conf->osd_objectstore,
207 g_conf->osd_journal));
209 //Checking data folder: create if needed or error if it's not empty
210 DIR *dir = ::opendir(g_conf->osd_data.c_str());
212 std::string cmd("mkdir -p ");
213 cmd+=g_conf->osd_data;
214 int r = ::system( cmd.c_str() );
216 derr << "Failed to create data directory, ret = " << r << dendl;
221 bool non_empty = readdir(dir) != NULL && readdir(dir) != NULL && readdir(dir) != NULL;
223 derr << "Data directory '"<<g_conf->osd_data<<"' isn't empty, please clean it first."<< dendl;
229 //Create folders for journal if needed
230 string journal_base = g_conf->osd_journal.substr(0, g_conf->osd_journal.rfind('/'));
232 if (stat(journal_base.c_str(), &sb) != 0 ){
233 std::string cmd("mkdir -p ");
235 int r = ::system( cmd.c_str() );
237 derr << "Failed to create journal directory, ret = " << r << dendl;
243 derr << "bad objectstore type " << g_conf->osd_objectstore << dendl;
246 if (os->mkfs() < 0) {
247 derr << "mkfs failed" << dendl;
250 if (os->mount() < 0) {
251 derr << "mount failed" << dendl;
255 dout(10) << "created objectstore " << os.get() << dendl;
257 // create a collection
259 const coll_t cid(pg);
261 ObjectStore::Sequencer osr(__func__);
262 ObjectStore::Transaction t;
263 t.create_collection(cid, 0);
264 os->apply_transaction(&osr, std::move(t));
267 // create the objects
268 std::vector<ghobject_t> oids;
269 if (cfg.multi_object) {
270 oids.reserve(cfg.threads);
271 for (int i = 0; i < cfg.threads; i++) {
272 std::stringstream oss;
273 oss << "osbench-thread-" << i;
274 oids.emplace_back(hobject_t(sobject_t(oss.str(), CEPH_NOSNAP)));
276 ObjectStore::Sequencer osr(__func__);
277 ObjectStore::Transaction t;
278 t.touch(cid, oids[i]);
279 int r = os->apply_transaction(&osr, std::move(t));
283 oids.emplace_back(hobject_t(sobject_t("osbench", CEPH_NOSNAP)));
285 ObjectStore::Sequencer osr(__func__);
286 ObjectStore::Transaction t;
287 t.touch(cid, oids.back());
288 int r = os->apply_transaction(&osr, std::move(t));
292 // run the worker threads
293 std::vector<std::thread> workers;
294 workers.reserve(cfg.threads);
296 using namespace std::chrono;
297 auto t1 = high_resolution_clock::now();
298 for (int i = 0; i < cfg.threads; i++) {
299 const auto &oid = cfg.multi_object ? oids[i] : oids[0];
300 workers.emplace_back(osbench_worker, os.get(), std::ref(cfg),
301 cid, oid, i * cfg.size / cfg.threads);
303 for (auto &worker : workers)
305 auto t2 = high_resolution_clock::now();
308 auto duration = duration_cast<microseconds>(t2 - t1);
309 byte_units total = cfg.size * cfg.repeats * cfg.threads;
310 byte_units rate = (1000000LL * total) / duration.count();
311 size_t iops = (1000000LL * total / cfg.block_size) / duration.count();
312 dout(0) << "Wrote " << total << " in "
313 << duration.count() << "us, at a rate of " << rate << "/s and "
314 << iops << " iops" << dendl;
316 // remove the objects
317 ObjectStore::Sequencer osr(__func__);
318 ObjectStore::Transaction t;
319 for (const auto &oid : oids)
321 os->apply_transaction(&osr,std::move(t));