Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / osdc / object_cacher_stress.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <cstdlib>
5 #include <ctime>
6 #include <sstream>
7 #include <string>
8 #include <vector>
9 #include <boost/scoped_ptr.hpp>
10
11 #include "common/ceph_argparse.h"
12 #include "common/common_init.h"
13 #include "common/config.h"
14 #include "common/Mutex.h"
15 #include "common/snap_types.h"
16 #include "global/global_init.h"
17 #include "include/buffer.h"
18 #include "include/Context.h"
19 #include "include/stringify.h"
20 #include "osdc/ObjectCacher.h"
21
22 #include "FakeWriteback.h"
23 #include "MemWriteback.h"
24
25 #include <atomic>
26
27 // XXX: Only tests default namespace
28 struct op_data {
29   op_data(std::string oid, uint64_t offset, uint64_t len, bool read)
30     : extent(oid, 0, offset, len, 0), is_read(read)
31   {
32     extent.oloc.pool = 0;
33     extent.buffer_extents.push_back(make_pair(0, len));
34   }
35
36   ObjectExtent extent;
37   bool is_read;
38   ceph::bufferlist result;
39   std::atomic<unsigned> done = { 0 };
40 };
41
42 class C_Count : public Context {
43   op_data *m_op;
44   std::atomic<unsigned> *m_outstanding = nullptr;
45 public:
46   C_Count(op_data *op, std::atomic<unsigned> *outstanding)
47     : m_op(op), m_outstanding(outstanding) {}
48   void finish(int r) override {
49     m_op->done++;
50     assert(*m_outstanding > 0);
51     (*m_outstanding)--;
52   }
53 };
54
55 int stress_test(uint64_t num_ops, uint64_t num_objs,
56                 uint64_t max_obj_size, uint64_t delay_ns,
57                 uint64_t max_op_len, float percent_reads)
58 {
59   Mutex lock("object_cacher_stress::object_cacher");
60   FakeWriteback writeback(g_ceph_context, &lock, delay_ns);
61
62   ObjectCacher obc(g_ceph_context, "test", writeback, lock, NULL, NULL,
63                    g_conf->client_oc_size,
64                    g_conf->client_oc_max_objects,
65                    g_conf->client_oc_max_dirty,
66                    g_conf->client_oc_target_dirty,
67                    g_conf->client_oc_max_dirty_age,
68                    true);
69   obc.start();
70
71   std::atomic<unsigned> outstanding_reads = { 0 };
72   vector<ceph::shared_ptr<op_data> > ops;
73   ObjectCacher::ObjectSet object_set(NULL, 0, 0);
74   SnapContext snapc;
75   ceph::buffer::ptr bp(max_op_len);
76   ceph::bufferlist bl;
77   uint64_t journal_tid = 0;
78   bp.zero();
79   bl.append(bp);
80
81   // schedule ops
82   std::cout << "Test configuration:\n\n"
83             << setw(10) << "ops: " << num_ops << "\n"
84             << setw(10) << "objects: " << num_objs << "\n"
85             << setw(10) << "obj size: " << max_obj_size << "\n"
86             << setw(10) << "delay: " << delay_ns << "\n"
87             << setw(10) << "max op len: " << max_op_len << "\n"
88             << setw(10) << "percent reads: " << percent_reads << "\n\n";
89
90   for (uint64_t i = 0; i < num_ops; ++i) {
91     uint64_t offset = random() % max_obj_size;
92     uint64_t max_len = MIN(max_obj_size - offset, max_op_len);
93     // no zero-length operations
94     uint64_t length = random() % (MAX(max_len - 1, 1)) + 1;
95     std::string oid = "test" + stringify(random() % num_objs);
96     bool is_read = random() < percent_reads * RAND_MAX;
97     ceph::shared_ptr<op_data> op(new op_data(oid, offset, length, is_read));
98     ops.push_back(op);
99     std::cout << "op " << i << " " << (is_read ? "read" : "write")
100               << " " << op->extent << "\n";
101     if (op->is_read) {
102       ObjectCacher::OSDRead *rd = obc.prepare_read(CEPH_NOSNAP, &op->result, 0);
103       rd->extents.push_back(op->extent);
104       outstanding_reads++;
105       Context *completion = new C_Count(op.get(), &outstanding_reads);
106       lock.Lock();
107       int r = obc.readx(rd, &object_set, completion);
108       lock.Unlock();
109       assert(r >= 0);
110       if ((uint64_t)r == length)
111         completion->complete(r);
112       else
113         assert(r == 0);
114     } else {
115       ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl,
116                                                      ceph::real_time::min(), 0,
117                                                      ++journal_tid);
118       wr->extents.push_back(op->extent);
119       lock.Lock();
120       obc.writex(wr, &object_set, NULL);
121       lock.Unlock();
122     }
123   }
124
125   // check that all reads completed
126   for (uint64_t i = 0; i < num_ops; ++i) {
127     if (!ops[i]->is_read)
128       continue;
129     std::cout << "waiting for read " << i << ops[i]->extent << std::endl;
130     uint64_t done = 0;
131     while (done == 0) {
132       done = ops[i]->done;
133       if (!done) {
134         usleep(500);
135       }
136     }
137     if (done > 1) {
138       std::cout << "completion called more than once!\n" << std::endl;
139       return EXIT_FAILURE;
140     }
141   }
142
143   lock.Lock();
144   obc.release_set(&object_set);
145   lock.Unlock();
146
147   int r = 0;
148   Mutex mylock("librbd::ImageCtx::flush_cache");
149   Cond cond;
150   bool done;
151   Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
152   lock.Lock();
153   bool already_flushed = obc.flush_set(&object_set, onfinish);
154   std::cout << "already flushed = " << already_flushed << std::endl;
155   lock.Unlock();
156   mylock.Lock();
157   while (!done) {
158     cond.Wait(mylock);
159   }
160   mylock.Unlock();
161
162   lock.Lock();
163   bool unclean = obc.release_set(&object_set);
164   lock.Unlock();
165
166   if (unclean) {
167     std::cout << "unclean buffers left over!" << std::endl;
168     return EXIT_FAILURE;
169   }
170
171   obc.stop();
172
173   std::cout << "Test completed successfully." << std::endl;
174
175   return EXIT_SUCCESS;
176 }
177
178 int correctness_test(uint64_t delay_ns)
179 {
180   std::cerr << "starting correctness test" << std::endl;
181   Mutex lock("object_cacher_stress::object_cacher");
182   MemWriteback writeback(g_ceph_context, &lock, delay_ns);
183
184   ObjectCacher obc(g_ceph_context, "test", writeback, lock, NULL, NULL,
185                    1<<21, // max cache size, 2MB
186                    1, // max objects, just one
187                    1<<18, // max dirty, 256KB
188                    1<<17, // target dirty, 128KB
189                    g_conf->client_oc_max_dirty_age,
190                    true);
191   obc.start();
192   std::cerr << "just start()ed ObjectCacher" << std::endl;
193
194   SnapContext snapc;
195   ceph_tid_t journal_tid = 0;
196   std::string oid("correctness_test_obj");
197   ObjectCacher::ObjectSet object_set(NULL, 0, 0);
198   ceph::bufferlist zeroes_bl;
199   zeroes_bl.append_zero(1<<20);
200
201   // set up a 4MB all-zero object
202   std::cerr << "writing 4x1MB object" << std::endl;
203   std::map<int, C_SaferCond> create_finishers;
204   for (int i = 0; i < 4; ++i) {
205     ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, zeroes_bl,
206                                                    ceph::real_time::min(), 0,
207                                                    ++journal_tid);
208     ObjectExtent extent(oid, 0, zeroes_bl.length()*i, zeroes_bl.length(), 0);
209     extent.oloc.pool = 0;
210     extent.buffer_extents.push_back(make_pair(0, 1<<20));
211     wr->extents.push_back(extent);
212     lock.Lock();
213     obc.writex(wr, &object_set, &create_finishers[i]);
214     lock.Unlock();
215   }
216
217   // write some 1-valued bits at 256-KB intervals for checking consistency
218   std::cerr << "Writing some 0xff values" << std::endl;
219   ceph::buffer::ptr ones(1<<16);
220   memset(ones.c_str(), 0xff, ones.length());
221   ceph::bufferlist ones_bl;
222   ones_bl.append(ones);
223   for (int i = 1<<18; i < 1<<22; i+=1<<18) {
224     ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, ones_bl,
225                                                    ceph::real_time::min(), 0,
226                                                    ++journal_tid);
227     ObjectExtent extent(oid, 0, i, ones_bl.length(), 0);
228     extent.oloc.pool = 0;
229     extent.buffer_extents.push_back(make_pair(0, 1<<16));
230     wr->extents.push_back(extent);
231     lock.Lock();
232     obc.writex(wr, &object_set, &create_finishers[i]);
233     lock.Unlock();
234   }
235
236   for (auto i = create_finishers.begin(); i != create_finishers.end(); ++i) {
237     i->second.wait();
238   }
239   std::cout << "Finished setting up object" << std::endl;
240   lock.Lock();
241   C_SaferCond flushcond;
242   bool done = obc.flush_all(&flushcond);
243   if (!done) {
244     std::cout << "Waiting for flush" << std::endl;
245     lock.Unlock();
246     flushcond.wait();
247     lock.Lock();
248   }
249   lock.Unlock();
250
251   /* now read the back half of the object in, check consistency,
252    */
253   std::cout << "Reading back half of object (1<<21~1<<21)" << std::endl;
254   bufferlist readbl;
255   C_SaferCond backreadcond;
256   ObjectCacher::OSDRead *back_half_rd = obc.prepare_read(CEPH_NOSNAP, &readbl, 0);
257   ObjectExtent back_half_extent(oid, 0, 1<<21, 1<<21, 0);
258   back_half_extent.oloc.pool = 0;
259   back_half_extent.buffer_extents.push_back(make_pair(0, 1<<21));
260   back_half_rd->extents.push_back(back_half_extent);
261   lock.Lock();
262   int r = obc.readx(back_half_rd, &object_set, &backreadcond);
263   lock.Unlock();
264   assert(r >= 0);
265   if (r == 0) {
266     std::cout << "Waiting to read data into cache" << std::endl;
267     r = backreadcond.wait();
268   }
269
270   assert(r == 1<<21);
271
272   /* Read the whole object in,
273    * verify we have to wait for it to complete,
274    * overwrite a small piece, (http://tracker.ceph.com/issues/16002),
275    * and check consistency */
276
277   readbl.clear();
278   std::cout<< "Reading whole object (0~1<<22)" << std::endl;
279   C_SaferCond frontreadcond;
280   ObjectCacher::OSDRead *whole_rd = obc.prepare_read(CEPH_NOSNAP, &readbl, 0);
281   ObjectExtent whole_extent(oid, 0, 0, 1<<22, 0);
282   whole_extent.oloc.pool = 0;
283   whole_extent.buffer_extents.push_back(make_pair(0, 1<<22));
284   whole_rd->extents.push_back(whole_extent);
285   lock.Lock();
286   r = obc.readx(whole_rd, &object_set, &frontreadcond);
287   // we cleared out the cache by reading back half, it shouldn't pass immediately!
288   assert(r == 0);
289   std::cout << "Data (correctly) not available without fetching" << std::endl;
290
291   ObjectCacher::OSDWrite *verify_wr = obc.prepare_write(snapc, ones_bl,
292                                                         ceph::real_time::min(), 0,
293                                                         ++journal_tid);
294   ObjectExtent verify_extent(oid, 0, (1<<18)+(1<<16), ones_bl.length(), 0);
295   verify_extent.oloc.pool = 0;
296   verify_extent.buffer_extents.push_back(make_pair(0, 1<<16));
297   verify_wr->extents.push_back(verify_extent);
298   C_SaferCond verify_finisher;
299   obc.writex(verify_wr, &object_set, &verify_finisher);
300   lock.Unlock();
301   std::cout << "wrote dirtying data" << std::endl;
302
303   std::cout << "Waiting to read data into cache" << std::endl;
304   frontreadcond.wait();
305   verify_finisher.wait();
306
307   std::cout << "Validating data" << std::endl;
308
309   for (int i = 1<<18; i < 1<<22; i+=1<<18) {
310     bufferlist ones_maybe;
311     ones_maybe.substr_of(readbl, i, ones_bl.length());
312     assert(0 == memcmp(ones_maybe.c_str(), ones_bl.c_str(), ones_bl.length()));
313   }
314   bufferlist ones_maybe;
315   ones_maybe.substr_of(readbl, (1<<18)+(1<<16), ones_bl.length());
316   assert(0 == memcmp(ones_maybe.c_str(), ones_bl.c_str(), ones_bl.length()));
317
318   std::cout << "validated that data is 0xff where it should be" << std::endl;
319   
320   lock.Lock();
321   C_SaferCond flushcond2;
322   done = obc.flush_all(&flushcond2);
323   if (!done) {
324     std::cout << "Waiting for final write flush" << std::endl;
325     lock.Unlock();
326     flushcond2.wait();
327     lock.Lock();
328   }
329
330   bool unclean = obc.release_set(&object_set);
331   if (unclean) {
332     std::cout << "unclean buffers left over!" << std::endl;
333     vector<ObjectExtent> discard_extents;
334     int i = 0;
335     for (auto oi = object_set.objects.begin(); !oi.end(); ++oi) {
336       discard_extents.emplace_back(oid, i++, 0, 1<<22, 0);
337     }
338     obc.discard_set(&object_set, discard_extents);
339     lock.Unlock();
340     obc.stop();
341     goto fail;
342   }
343   lock.Unlock();
344
345   obc.stop();
346
347   std::cout << "Testing ObjectCacher correctness complete" << std::endl;
348   return EXIT_SUCCESS;
349
350  fail:
351   return EXIT_FAILURE;
352 }
353
354 int main(int argc, const char **argv)
355 {
356   std::vector<const char*> args;
357   argv_to_vec(argc, argv, args);
358   env_to_vec(args);
359   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
360                          CODE_ENVIRONMENT_UTILITY, 0);
361
362   long long delay_ns = 0;
363   long long num_ops = 1000;
364   long long obj_bytes = 4 << 20;
365   long long max_len = 128 << 10;
366   long long num_objs = 10;
367   float percent_reads = 0.90;
368   int seed = time(0) % 100000;
369   bool stress = false;
370   bool correctness = false;
371   std::ostringstream err;
372   std::vector<const char*>::iterator i;
373   for (i = args.begin(); i != args.end();) {
374     if (ceph_argparse_witharg(args, i, &delay_ns, err, "--delay-ns", (char*)NULL)) {
375       if (!err.str().empty()) {
376         cerr << argv[0] << ": " << err.str() << std::endl;
377         return EXIT_FAILURE;
378       }
379     } else if (ceph_argparse_witharg(args, i, &num_ops, err, "--ops", (char*)NULL)) {
380       if (!err.str().empty()) {
381         cerr << argv[0] << ": " << err.str() << std::endl;
382         return EXIT_FAILURE;
383       }
384     } else if (ceph_argparse_witharg(args, i, &num_objs, err, "--objects", (char*)NULL)) {
385       if (!err.str().empty()) {
386         cerr << argv[0] << ": " << err.str() << std::endl;
387         return EXIT_FAILURE;
388       }
389     } else if (ceph_argparse_witharg(args, i, &obj_bytes, err, "--obj-size", (char*)NULL)) {
390       if (!err.str().empty()) {
391         cerr << argv[0] << ": " << err.str() << std::endl;
392         return EXIT_FAILURE;
393       }
394     } else if (ceph_argparse_witharg(args, i, &max_len, err, "--max-op-size", (char*)NULL)) {
395       if (!err.str().empty()) {
396         cerr << argv[0] << ": " << err.str() << std::endl;
397         return EXIT_FAILURE;
398       }
399     } else if (ceph_argparse_witharg(args, i, &percent_reads, err, "--percent-read", (char*)NULL)) {
400       if (!err.str().empty()) {
401         cerr << argv[0] << ": " << err.str() << std::endl;
402         return EXIT_FAILURE;
403       }
404     } else if (ceph_argparse_witharg(args, i, &seed, err, "--seed", (char*)NULL)) {
405       if (!err.str().empty()) {
406         cerr << argv[0] << ": " << err.str() << std::endl;
407         return EXIT_FAILURE;
408       }
409     } else if (ceph_argparse_flag(args, i, "--stress-test", NULL)) {
410       stress = true;
411     } else if (ceph_argparse_flag(args, i, "--correctness-test", NULL)) {
412       correctness = true;
413     } else {
414       cerr << "unknown option " << *i << std::endl;
415       return EXIT_FAILURE;
416     }
417   }
418
419   if (stress) {
420     srandom(seed);
421     return stress_test(num_ops, num_objs, obj_bytes, delay_ns, max_len, percent_reads);
422   }
423   if (correctness) {
424     return correctness_test(delay_ns);
425   }
426 }