Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / objectstore / workload_generator.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2012 New Dream Network
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13 #include <stdio.h>
14 #include <string.h>
15 #include <iostream>
16 #include <assert.h>
17 #include <time.h>
18 #include <stdlib.h>
19 #include <signal.h>
20 #include <cctype>
21 #include <errno.h>
22 #include <sys/time.h>
23 #include "os/ObjectStore.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "common/debug.h"
27 #include <boost/scoped_ptr.hpp>
28 #include <boost/lexical_cast.hpp>
29 #include "workload_generator.h"
30 #include "include/assert.h"
31
32 #include "TestObjectStoreState.h"
33
34 #define dout_context g_ceph_context
35
36 static const char *our_name = NULL;
37 void usage();
38
39 boost::scoped_ptr<WorkloadGenerator> wrkldgen;
40
41 #define dout_subsys ceph_subsys_
42
43
44 WorkloadGenerator::WorkloadGenerator(vector<const char*> args)
45   : TestObjectStoreState(NULL),
46     m_max_in_flight(def_max_in_flight),
47     m_num_ops(-1),
48     m_destroy_coll_every_nr_runs(def_destroy_coll_every_nr_runs),
49     m_num_colls(def_num_colls),
50     m_write_data_bytes(0), m_write_xattr_obj_bytes(0),
51     m_write_xattr_coll_bytes(0), m_write_pglog_bytes(0),
52     m_suppress_write_data(false), m_suppress_write_xattr_obj(false),
53     m_suppress_write_xattr_coll(false), m_suppress_write_log(false),
54     m_do_stats(false),
55     m_stats_finished_txs(0),
56     m_stats_lock("WorldloadGenerator::m_stats_lock"),
57     m_stats_show_secs(5),
58     m_stats_total_written(0),
59     m_stats_begin()
60 {
61   int err = 0;
62
63   m_nr_runs = 0;
64
65   init_args(args);
66   dout(0) << "data            = " << g_conf->osd_data << dendl;
67   dout(0) << "journal         = " << g_conf->osd_journal << dendl;
68   dout(0) << "journal size    = " << g_conf->osd_journal_size << dendl;
69
70   err = ::mkdir(g_conf->osd_data.c_str(), 0755);
71   ceph_assert(err == 0 || (err < 0 && errno == EEXIST));
72   ObjectStore *store_ptr = ObjectStore::create(g_ceph_context,
73                                                g_conf->osd_objectstore,
74                                                g_conf->osd_data,
75                                                g_conf->osd_journal);
76   m_store.reset(store_ptr);
77   err = m_store->mkfs();
78   ceph_assert(err == 0);
79   err = m_store->mount();
80   ceph_assert(err == 0);
81
82   set_max_in_flight(m_max_in_flight);
83   set_num_objs_per_coll(def_num_obj_per_coll);
84
85   init(m_num_colls, 0);
86
87   dout(0) << "#colls          = " << m_num_colls << dendl;
88   dout(0) << "#objs per coll  = " << m_num_objs_per_coll << dendl;
89   dout(0) << "#txs per destr  = " << m_destroy_coll_every_nr_runs << dendl;
90
91 }
92
93 size_t WorkloadGenerator::_parse_size_or_die(std::string& val)
94 {
95   size_t s = 0;
96   int multiplier = 0;
97   size_t i = 0;
98
99   if (val.empty()) // this should never happen, but catch it anyway.
100     goto die;
101
102
103   for (i = 0; i < val.length(); i++) {
104     if (!isdigit(val[i])) {
105       if (isalpha(val[i])) {
106         val[i] = tolower(val[i]);
107         switch (val[i]) {
108         case 'b': break;
109         case 'k': multiplier = 10; break;
110         case 'm': multiplier = 20; break;
111         case 'g': multiplier = 30; break;
112         default:
113           goto die;
114         }
115         val[i] = '\0';
116         break;
117       } else {
118         goto die;
119       }
120     }
121   }
122
123   s = strtoll(val.c_str(), NULL, 10) * (1 << multiplier);
124   return s;
125
126 die:
127   usage();
128   exit(1);
129 }
130
131 void WorkloadGenerator::_suppress_ops_or_die(std::string& val)
132 {
133   for (size_t i = 0; i < val.length(); i++) {
134     switch (val[i]) {
135     case 'c': m_suppress_write_xattr_coll = true; break;
136     case 'o': m_suppress_write_xattr_obj = true; break;
137     case 'l': m_suppress_write_log = true; break;
138     case 'd': m_suppress_write_data = true; break;
139     default:
140       usage();
141       exit(1);
142     }
143   }
144 }
145
146 void WorkloadGenerator::init_args(vector<const char*> args)
147 {
148   for (std::vector<const char*>::iterator i = args.begin(); i != args.end();) {
149     string val;
150
151     if (ceph_argparse_double_dash(args, i)) {
152       break;
153     } else if (ceph_argparse_witharg(args, i, &val,
154         "--test-num-colls", (char*) NULL)) {
155       m_num_colls = strtoll(val.c_str(), NULL, 10);
156     } else if (ceph_argparse_witharg(args, i, &val,
157         "--test-objs-per-coll", (char*) NULL)) {
158       m_num_objs_per_coll = strtoll(val.c_str(), NULL, 10);
159     } else if (ceph_argparse_witharg(args, i, &val,
160         "--test-destroy-coll-per-N-trans", (char*) NULL)) {
161       m_destroy_coll_every_nr_runs = strtoll(val.c_str(), NULL, 10);
162     } else if (ceph_argparse_witharg(args, i, &val,
163         "--test-num-ops", (char*) NULL)) {
164       m_num_ops = strtoll(val.c_str(), NULL, 10);
165     } else if (ceph_argparse_witharg(args, i, &val,
166         "--test-max-in-flight", (char*) NULL)) {
167       m_max_in_flight = strtoll(val.c_str(), NULL, 10);
168     } else if (ceph_argparse_witharg(args, i, &val,
169         "--test-write-data-size", (char*) NULL)) {
170       m_write_data_bytes = _parse_size_or_die(val);
171     } else if (ceph_argparse_witharg(args, i, &val,
172         "--test-write-xattr-obj-size", (char*) NULL)) {
173       m_write_xattr_obj_bytes = _parse_size_or_die(val);
174     } else if (ceph_argparse_witharg(args, i, &val,
175         "--test-write-xattr-coll-size", (char*) NULL)) {
176       m_write_xattr_coll_bytes = _parse_size_or_die(val);
177     } else if (ceph_argparse_witharg(args, i, &val,
178         "--test-write-pglog-size", (char*) NULL)) {
179       m_write_pglog_bytes = _parse_size_or_die(val);
180     } else if (ceph_argparse_witharg(args, i, &val,
181         "--test-suppress-ops", (char*) NULL)) {
182       _suppress_ops_or_die(val);
183     } else if (ceph_argparse_witharg(args, i, &val,
184         "--test-show-stats-period", (char*) NULL)) {
185       m_stats_show_secs = strtoll(val.c_str(), NULL, 10);
186     } else if (ceph_argparse_flag(args, i, "--test-show-stats", (char*) NULL)) {
187       m_do_stats = true;
188     } else if (ceph_argparse_flag(args, i, "--help", (char*) NULL)) {
189       usage();
190       exit(0);
191     }
192   }
193 }
194
195 int WorkloadGenerator::get_uniform_random_value(int min, int max)
196 {
197   boost::uniform_int<> value(min, max);
198   return value(m_rng);
199 }
200
201 TestObjectStoreState::coll_entry_t *WorkloadGenerator::get_rnd_coll_entry(bool erase = false)
202 {
203   int index = get_uniform_random_value(0, m_collections_ids.size()-1);
204   coll_entry_t *entry = get_coll_at(index, erase);
205   return entry;
206 }
207
208 hobject_t *WorkloadGenerator::get_rnd_obj(coll_entry_t *entry)
209 {
210   assert(entry != NULL);
211
212   bool create =
213       (get_uniform_random_value(0,100) < 50 || !entry->m_objects.size());
214
215   if (create && ((int) entry->m_objects.size() < m_num_objs_per_coll)) {
216     return (entry->touch_obj(entry->m_next_object_id++));
217   }
218
219   int idx = get_uniform_random_value(0, entry->m_objects.size()-1);
220   return entry->get_obj_at(idx);
221 }
222
223 /**
224  * We'll generate a random amount of bytes, ranging from a single byte up to
225  * a couple of MB.
226  */
227 size_t WorkloadGenerator::get_random_byte_amount(size_t min, size_t max)
228 {
229   size_t diff = max - min;
230   return (size_t) (min + (rand() % diff));
231 }
232
233 void WorkloadGenerator::get_filled_byte_array(bufferlist& bl, size_t size)
234 {
235   static const char alphanum[] = "0123456789"
236     "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
237     "abcdefghijklmnopqrstuvwxyz";
238
239   bufferptr bp(size);
240   if (false) {
241     for (unsigned int i = 0; i < size - 1; i++) {
242       bp[i] = alphanum[rand() % sizeof(alphanum)];
243     }
244     bp[size - 1] = '\0';
245   } else {
246     bp.zero();
247   }
248   bl.append(bp);
249 }
250
251 void WorkloadGenerator::do_write_object(ObjectStore::Transaction *t,
252                                         coll_t coll, hobject_t obj,
253                                         C_StatState *stat)
254 {
255   if (m_suppress_write_data) {
256     dout(5) << __func__ << " suppressed" << dendl;
257     return;
258   }
259
260   size_t size = m_write_data_bytes;
261   if (!size)
262     size = get_random_byte_amount(min_write_bytes, max_write_bytes);
263
264   bufferlist bl;
265   get_filled_byte_array(bl, size);
266
267   dout(2) << __func__ << " " << coll << "/" << obj
268           << " size " << bl.length() << dendl;
269
270   if (m_do_stats && (stat != NULL))
271     stat->written_data += bl.length();
272
273   t->write(coll, ghobject_t(obj), 0, bl.length(), bl);
274 }
275
276 void WorkloadGenerator::do_setattr_object(ObjectStore::Transaction *t,
277                                           coll_t coll, hobject_t obj,
278                                           C_StatState *stat)
279 {
280   if (m_suppress_write_xattr_obj) {
281     dout(5) << __func__ << " suppressed" << dendl;
282     return;
283   }
284
285   size_t size = m_write_xattr_obj_bytes;
286   if (!size)
287     size = get_random_byte_amount(min_xattr_obj_bytes, max_xattr_obj_bytes);
288
289   bufferlist bl;
290   get_filled_byte_array(bl, size);
291
292   dout(2) << __func__ << " " << coll << "/" << obj << " size " << size << dendl;
293
294   if (m_do_stats && (stat != NULL))
295       stat->written_data += bl.length();
296
297   t->setattr(coll, ghobject_t(obj), "objxattr", bl);
298 }
299
300 void WorkloadGenerator::do_pgmeta_omap_set(ObjectStore::Transaction *t, spg_t pgid,
301                                            coll_t coll, C_StatState *stat)
302 {
303   if (m_suppress_write_xattr_coll) {
304     dout(5) << __func__ << " suppressed" << dendl;
305     return;
306   }
307
308   size_t size = m_write_xattr_coll_bytes;
309   if (!size)
310     size = get_random_byte_amount(min_xattr_coll_bytes, max_xattr_coll_bytes);
311
312   bufferlist bl;
313   get_filled_byte_array(bl, size);
314   dout(2) << __func__ << " coll " << coll << " size " << size << dendl;
315
316   if (m_do_stats && (stat != NULL))
317       stat->written_data += bl.length();
318
319   ghobject_t pgmeta(pgid.make_pgmeta_oid());
320   map<string,bufferlist> values;
321   values["_"].claim(bl);
322   t->omap_setkeys(coll, pgmeta, values);
323 }
324
325
326 void WorkloadGenerator::do_append_log(ObjectStore::Transaction *t,
327                                       coll_entry_t *entry, C_StatState *stat)
328 {
329   if (m_suppress_write_log) {
330     dout(5) << __func__ << " suppressed" << dendl;
331     return;
332   }
333
334   size_t size = (m_write_pglog_bytes ? m_write_pglog_bytes : log_append_bytes);
335
336   bufferlist bl;
337   get_filled_byte_array(bl, size);
338   ghobject_t log_obj = entry->m_meta_obj;
339
340   dout(2) << __func__ << " coll " << entry->m_coll << " "
341       << coll_t::meta() << " /" << log_obj << " (" << bl.length() << ")" << dendl;
342
343   if (m_do_stats && (stat != NULL))
344       stat->written_data += bl.length();
345
346   uint64_t s = pg_log_size[entry->m_coll];
347   t->write(coll_t::meta(), log_obj, s, bl.length(), bl);
348   pg_log_size[entry->m_coll] += bl.length();
349 }
350
351 void WorkloadGenerator::do_destroy_collection(ObjectStore::Transaction *t,
352                                               coll_entry_t *entry,
353                                               C_StatState *stat)
354 {  
355   m_nr_runs = 0;
356   entry->m_osr.flush();
357   vector<ghobject_t> ls;
358   m_store->collection_list(entry->m_coll, ghobject_t(), ghobject_t::get_max(),
359                            INT_MAX, &ls, NULL);
360   dout(2) << __func__ << " coll " << entry->m_coll
361       << " (" << ls.size() << " objects)" << dendl;
362
363   for (vector<ghobject_t>::iterator it = ls.begin(); it < ls.end(); ++it) {
364     t->remove(entry->m_coll, *it);
365   }
366
367   t->remove_collection(entry->m_coll);
368   t->remove(coll_t::meta(), entry->m_meta_obj);
369 }
370
371 TestObjectStoreState::coll_entry_t
372 *WorkloadGenerator::do_create_collection(ObjectStore::Transaction *t,
373                                          C_StatState *stat)
374 {
375   coll_entry_t *entry = coll_create(m_next_coll_nr++);
376   if (!entry) {
377     dout(0) << __func__ << " failed to create coll id "
378         << m_next_coll_nr << dendl;
379     return NULL;
380   }
381   m_collections.insert(make_pair(entry->m_id, entry));
382
383   dout(2) << __func__ << " id " << entry->m_id << " coll " << entry->m_coll << dendl;
384   t->create_collection(entry->m_coll, 32);
385   dout(2) << __func__ << " meta " << coll_t::meta() << "/" << entry->m_meta_obj << dendl;
386   t->touch(coll_t::meta(), entry->m_meta_obj);
387   return entry;
388 }
389
390 void WorkloadGenerator::do_stats()
391 {
392   utime_t now = ceph_clock_now();
393   m_stats_lock.Lock();
394
395   utime_t duration = (now - m_stats_begin);
396
397   // when cast to double, a utime_t behaves properly
398   double throughput = (m_stats_total_written / ((double) duration));
399   double tx_throughput (m_stats_finished_txs / ((double) duration));
400
401   dout(0) << __func__
402           << " written: " << m_stats_total_written
403           << " duration: " << duration << " sec"
404           << " bandwidth: " << prettybyte_t(throughput) << "/s"
405           << " iops: " << tx_throughput << "/s"
406           << dendl;
407
408   m_stats_lock.Unlock();
409 }
410
411 void WorkloadGenerator::run()
412 {
413   bool create_coll = false;
414   int ops_run = 0;
415
416   utime_t stats_interval(m_stats_show_secs, 0);
417   utime_t now = ceph_clock_now();
418   utime_t stats_time = now;
419   m_stats_begin = now;
420
421   do {
422     C_StatState *stat_state = NULL;
423
424     if (m_num_ops && (ops_run == m_num_ops))
425       break;
426
427     if (!create_coll && !m_collections.size()) {
428       dout(0) << "We ran out of collections!" << dendl;
429       break;
430     }
431
432     dout(5) << __func__
433         << " m_finished_lock is-locked: " << m_finished_lock.is_locked()
434         << " in-flight: " << m_in_flight.load()
435         << dendl;
436
437     wait_for_ready();
438
439     ObjectStore::Transaction *t = new ObjectStore::Transaction;
440     Context *c;
441     bool destroy_collection = false;
442     TestObjectStoreState::coll_entry_t *entry = NULL;
443
444
445     if (m_do_stats) {
446       utime_t now = ceph_clock_now();
447       utime_t elapsed = now - stats_time;
448       if (elapsed >= stats_interval) {
449         do_stats();
450         stats_time = now;
451       }
452       stat_state = new C_StatState(this, now);
453     }
454
455     if (create_coll) {
456       create_coll = false;
457
458       entry = do_create_collection(t, stat_state);
459       if (!entry) {
460         dout(0) << __func__ << " something went terribly wrong creating coll" << dendl;
461         break;
462       }
463
464       c = new C_OnReadable(this);
465       goto queue_tx;
466     }
467
468     destroy_collection = should_destroy_collection();
469     entry = get_rnd_coll_entry(destroy_collection);
470     assert(entry != NULL);
471
472     if (destroy_collection) {
473       do_destroy_collection(t, entry, stat_state);
474       c = new C_OnDestroyed(this, entry);
475       if (!m_num_ops)
476         create_coll = true;
477     } else {
478       hobject_t *obj = get_rnd_obj(entry);
479
480       do_write_object(t, entry->m_coll, *obj, stat_state);
481       do_setattr_object(t, entry->m_coll, *obj, stat_state);
482       do_pgmeta_omap_set(t, entry->m_pgid, entry->m_coll, stat_state);
483       do_append_log(t, entry, stat_state);
484
485       c = new C_OnReadable(this);
486     }
487
488 queue_tx:
489
490     if (m_do_stats) {
491       Context *tmp = c;
492       c = new C_StatWrapper(stat_state, tmp);
493     }
494
495     m_store->queue_transaction(&(entry->m_osr), std::move(*t), c);
496     delete t;
497
498     inc_in_flight();
499
500     ops_run ++;
501
502   } while (true);
503
504   dout(2) << __func__ << " waiting for "
505           << m_in_flight.load() << " in-flight transactions" << dendl;
506
507   wait_for_done();
508
509   do_stats();
510
511   dout(0) << __func__ << " finishing" << dendl;
512 }
513
514 void usage()
515 {
516   cout << "usage: " << our_name << "[options]" << std::endl;
517
518   cout << "\
519 \n\
520 Global Options:\n\
521   -c FILE                             Read configuration from FILE\n\
522   --osd-objectstore TYPE              Set OSD ObjectStore type\n\
523   --osd-data PATH                     Set OSD Data path\n\
524   --osd-journal PATH                  Set OSD Journal path\n\
525   --osd-journal-size VAL              Set Journal size\n\
526   --help                              This message\n\
527 \n\
528 Test-specific Options:\n\
529   --test-num-colls VAL                Set the number of collections\n\
530   --test-num-objs-per-coll VAL        Set the number of objects per collection\n\
531   --test-destroy-coll-per-N-trans VAL Set how many transactions to run before\n\
532                                       destroying a collection.\n\
533   --test-num-ops VAL                  Run a certain number of operations\n\
534                                       (a VAL of 0 runs the test forever)\n\
535    --test-max-in-flight VAL           Maximum number of in-flight transactions\n\
536                                       (default: 50)\n\
537    --test-suppress-ops OPS            Suppress ops specified in OPS\n\
538    --test-write-data-size SIZE        Specify SIZE for all data writes\n\
539    --test-write-xattr-obj-size SIZE   Specify SIZE for all xattrs on objects\n\
540    --test-write-xattr-coll-size SIZE  Specify SIZE for all xattrs on colls\n\
541    --test-write-pglog-size SIZE       Specify SIZE for all pglog writes\n\
542    --test-show-stats                  Show stats as we go\n\
543    --test-show-stats-period SECS      Show stats every SECS (default: 5)\n\
544 \n\
545    SIZE is a numeric value that can be assumed as being bytes, or may be any\n\
546    other unit if specified: B or b, K or k, M or m, G or g.\n\
547       e.g., 1G = 1024M = 1048576k = 1073741824\n\
548 \n\
549    OPS can be one or more of the following options:\n\
550       c    writes on collection's xattrs\n\
551       o    writes on object's xattr\n\
552       l    writes on pglog\n\
553       d    data writes on objects\n\
554 \n\
555 " << std::endl;
556 }
557
558 int main(int argc, const char *argv[])
559 {
560   vector<const char*> def_args;
561   vector<const char*> args;
562
563   our_name = argv[0];
564
565   def_args.push_back("--osd-journal-size");
566   def_args.push_back("400");
567 //  def_args.push_back("--osd-data");
568 //  def_args.push_back("workload_gen_dir");
569 //  def_args.push_back("--osd-journal");
570 //  def_args.push_back("workload_gen_dir/journal");
571   argv_to_vec(argc, argv, args);
572
573   auto cct = global_init(&def_args, args,
574                          CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY,
575                          CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
576   common_init_finish(g_ceph_context);
577   g_ceph_context->_conf->apply_changes(NULL);
578
579   WorkloadGenerator *wrkldgen_ptr = new WorkloadGenerator(args);
580   wrkldgen.reset(wrkldgen_ptr);
581   wrkldgen->run();
582   return 0;
583 }