Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / objectstore_bench.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <chrono>
5 #include <cassert>
6 #include <condition_variable>
7 #include <memory>
8 #include <mutex>
9 #include <thread>
10
11 #include "os/ObjectStore.h"
12
13 #include "global/global_init.h"
14
15 #include "common/strtol.h"
16 #include "common/ceph_argparse.h"
17
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_filestore
20
21 static void usage()
22 {
23   derr << "usage: ceph_objectstore_bench [flags]\n"
24       "  --size\n"
25       "        total size in bytes\n"
26       "  --block-size\n"
27       "        block size in bytes for each write\n"
28       "  --repeats\n"
29       "        number of times to repeat the write cycle\n"
30       "  --threads\n"
31       "        number of threads to carry out this workload\n"
32       "  --multi-object\n"
33       "        have each thread write to a separate object\n" << dendl;
34   generic_server_usage();
35 }
36
37 // helper class for bytes with units
38 struct byte_units {
39   size_t v;
40   // cppcheck-suppress noExplicitConstructor
41   byte_units(size_t v) : v(v) {}
42
43   bool parse(const std::string &val, std::string *err);
44
45   operator size_t() const { return v; }
46 };
47
48 bool byte_units::parse(const std::string &val, std::string *err)
49 {
50   v = strict_sistrtoll(val.c_str(), err);
51   return err->empty();
52 }
53
54 std::ostream& operator<<(std::ostream &out, const byte_units &amount)
55 {
56   static const char* units[] = { "B", "KB", "MB", "GB", "TB", "PB", "EB" };
57   static const int max_units = sizeof(units)/sizeof(*units);
58
59   int unit = 0;
60   auto v = amount.v;
61   while (v >= 1024 && unit < max_units) {
62     // preserve significant bytes
63     if (v < 1048576 && (v % 1024 != 0))
64       break;
65     v >>= 10;
66     unit++;
67   }
68   return out << v << ' ' << units[unit];
69 }
70
71 struct Config {
72   byte_units size;
73   byte_units block_size;
74   int repeats;
75   int threads;
76   bool multi_object;
77   Config()
78     : size(1048576), block_size(4096),
79       repeats(1), threads(1),
80       multi_object(false) {}
81 };
82
83 class C_NotifyCond : public Context {
84   std::mutex *mutex;
85   std::condition_variable *cond;
86   bool *done;
87 public:
88   C_NotifyCond(std::mutex *mutex, std::condition_variable *cond, bool *done)
89     : mutex(mutex), cond(cond), done(done) {}
90   void finish(int r) override {
91     std::lock_guard<std::mutex> lock(*mutex);
92     *done = true;
93     cond->notify_one();
94   }
95 };
96
97 void osbench_worker(ObjectStore *os, const Config &cfg,
98                     const coll_t cid, const ghobject_t oid,
99                     uint64_t starting_offset)
100 {
101   bufferlist data;
102   data.append(buffer::create(cfg.block_size));
103
104   dout(0) << "Writing " << cfg.size
105       << " in blocks of " << cfg.block_size << dendl;
106
107   assert(starting_offset < cfg.size);
108   assert(starting_offset % cfg.block_size == 0);
109
110   ObjectStore::Sequencer sequencer("osbench");
111
112   for (int i = 0; i < cfg.repeats; ++i) {
113     uint64_t offset = starting_offset;
114     size_t len = cfg.size;
115
116     vector<ObjectStore::Transaction> tls;
117
118     std::cout << "Write cycle " << i << std::endl;
119     while (len) {
120       size_t count = len < cfg.block_size ? len : (size_t)cfg.block_size;
121
122       auto t = new ObjectStore::Transaction;
123       t->write(cid, oid, offset, count, data);
124       tls.push_back(std::move(*t));
125       delete t;
126
127       offset += count;
128       if (offset > cfg.size)
129         offset -= cfg.size;
130       len -= count;
131     }
132
133     // set up the finisher
134     std::mutex mutex;
135     std::condition_variable cond;
136     bool done = false;
137
138     os->queue_transactions(&sequencer, tls, nullptr,
139                            new C_NotifyCond(&mutex, &cond, &done));
140
141     std::unique_lock<std::mutex> lock(mutex);
142     cond.wait(lock, [&done](){ return done; });
143     lock.unlock();
144
145
146   }
147   sequencer.flush();
148 }
149
150 int main(int argc, const char *argv[])
151 {
152   Config cfg;
153
154   // command-line arguments
155   vector<const char*> args;
156   argv_to_vec(argc, argv, args);
157   env_to_vec(args);
158
159   auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
160                          CODE_ENVIRONMENT_UTILITY, 0);
161
162   std::string val;
163   vector<const char*>::iterator i = args.begin();
164   while (i != args.end()) {
165     if (ceph_argparse_double_dash(args, i))
166       break;
167
168     if (ceph_argparse_witharg(args, i, &val, "--size", (char*)nullptr)) {
169       std::string err;
170       if (!cfg.size.parse(val, &err)) {
171         derr << "error parsing size: " << err << dendl;
172         usage();
173       }
174     } else if (ceph_argparse_witharg(args, i, &val, "--block-size", (char*)nullptr)) {
175       std::string err;
176       if (!cfg.block_size.parse(val, &err)) {
177         derr << "error parsing block-size: " << err << dendl;
178         usage();
179       }
180     } else if (ceph_argparse_witharg(args, i, &val, "--repeats", (char*)nullptr)) {
181       cfg.repeats = atoi(val.c_str());
182     } else if (ceph_argparse_witharg(args, i, &val, "--threads", (char*)nullptr)) {
183       cfg.threads = atoi(val.c_str());
184     } else if (ceph_argparse_flag(args, i, "--multi-object", (char*)nullptr)) {
185       cfg.multi_object = true;
186     } else {
187       derr << "Error: can't understand argument: " << *i << "\n" << dendl;
188       usage();
189     }
190   }
191
192   common_init_finish(g_ceph_context);
193
194   // create object store
195   dout(0) << "objectstore " << g_conf->osd_objectstore << dendl;
196   dout(0) << "data " << g_conf->osd_data << dendl;
197   dout(0) << "journal " << g_conf->osd_journal << dendl;
198   dout(0) << "size " << cfg.size << dendl;
199   dout(0) << "block-size " << cfg.block_size << dendl;
200   dout(0) << "repeats " << cfg.repeats << dendl;
201   dout(0) << "threads " << cfg.threads << dendl;
202
203   auto os = std::unique_ptr<ObjectStore>(
204       ObjectStore::create(g_ceph_context,
205                           g_conf->osd_objectstore,
206                           g_conf->osd_data,
207                           g_conf->osd_journal));
208
209   //Checking data folder: create if needed or error if it's not empty
210   DIR *dir = ::opendir(g_conf->osd_data.c_str());
211   if (!dir) {
212     std::string cmd("mkdir -p ");
213     cmd+=g_conf->osd_data;
214     int r = ::system( cmd.c_str() );
215     if( r<0 ){
216       derr << "Failed to create data directory, ret = " << r << dendl;
217       return 1;
218     }
219   }
220   else {
221      bool non_empty = readdir(dir) != NULL && readdir(dir) != NULL && readdir(dir) != NULL;
222      if( non_empty ){
223        derr << "Data directory '"<<g_conf->osd_data<<"' isn't empty, please clean it first."<< dendl;
224        return 1;
225      }
226   }
227   ::closedir(dir);
228
229   //Create folders for journal if needed
230   string journal_base = g_conf->osd_journal.substr(0, g_conf->osd_journal.rfind('/'));
231   struct stat sb;
232   if (stat(journal_base.c_str(), &sb) != 0 ){
233     std::string cmd("mkdir -p ");
234     cmd+=journal_base;
235     int r = ::system( cmd.c_str() );
236     if( r<0 ){
237       derr << "Failed to create journal directory, ret = " << r << dendl;
238       return 1;
239     }
240   }
241
242   if (!os) {
243     derr << "bad objectstore type " << g_conf->osd_objectstore << dendl;
244     return 1;
245   }
246   if (os->mkfs() < 0) {
247     derr << "mkfs failed" << dendl;
248     return 1;
249   }
250   if (os->mount() < 0) {
251     derr << "mount failed" << dendl;
252     return 1;
253   }
254
255   dout(10) << "created objectstore " << os.get() << dendl;
256
257   // create a collection
258   spg_t pg;
259   const coll_t cid(pg);
260   {
261     ObjectStore::Sequencer osr(__func__);
262     ObjectStore::Transaction t;
263     t.create_collection(cid, 0);
264     os->apply_transaction(&osr, std::move(t));
265   }
266
267   // create the objects
268   std::vector<ghobject_t> oids;
269   if (cfg.multi_object) {
270     oids.reserve(cfg.threads);
271     for (int i = 0; i < cfg.threads; i++) {
272       std::stringstream oss;
273       oss << "osbench-thread-" << i;
274       oids.emplace_back(hobject_t(sobject_t(oss.str(), CEPH_NOSNAP)));
275
276       ObjectStore::Sequencer osr(__func__);
277       ObjectStore::Transaction t;
278       t.touch(cid, oids[i]);
279       int r = os->apply_transaction(&osr, std::move(t));
280       assert(r == 0);
281     }
282   } else {
283     oids.emplace_back(hobject_t(sobject_t("osbench", CEPH_NOSNAP)));
284
285     ObjectStore::Sequencer osr(__func__);
286     ObjectStore::Transaction t;
287     t.touch(cid, oids.back());
288     int r = os->apply_transaction(&osr, std::move(t));
289     assert(r == 0);
290   }
291
292   // run the worker threads
293   std::vector<std::thread> workers;
294   workers.reserve(cfg.threads);
295
296   using namespace std::chrono;
297   auto t1 = high_resolution_clock::now();
298   for (int i = 0; i < cfg.threads; i++) {
299     const auto &oid = cfg.multi_object ? oids[i] : oids[0];
300     workers.emplace_back(osbench_worker, os.get(), std::ref(cfg),
301                          cid, oid, i * cfg.size / cfg.threads);
302   }
303   for (auto &worker : workers)
304     worker.join();
305   auto t2 = high_resolution_clock::now();
306   workers.clear();
307
308   auto duration = duration_cast<microseconds>(t2 - t1);
309   byte_units total = cfg.size * cfg.repeats * cfg.threads;
310   byte_units rate = (1000000LL * total) / duration.count();
311   size_t iops = (1000000LL * total / cfg.block_size) / duration.count();
312   dout(0) << "Wrote " << total << " in "
313       << duration.count() << "us, at a rate of " << rate << "/s and "
314       << iops << " iops" << dendl;
315
316   // remove the objects
317   ObjectStore::Sequencer osr(__func__);
318   ObjectStore::Transaction t;
319   for (const auto &oid : oids)
320     t.remove(cid, oid);
321   os->apply_transaction(&osr,std::move(t));
322
323   os->umount();
324   return 0;
325 }