Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / osd / RadosModel.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 #include "include/int_types.h"
4
5 #include "common/Mutex.h"
6 #include "common/Cond.h"
7 #include "include/rados/librados.hpp"
8
9 #include <iostream>
10 #include <sstream>
11 #include <map>
12 #include <set>
13 #include <list>
14 #include <string>
15 #include <string.h>
16 #include <stdlib.h>
17 #include <errno.h>
18 #include <time.h>
19 #include "Object.h"
20 #include "TestOpStat.h"
21 #include "test/librados/test.h"
22 #include "include/memory.h"
23 #include "common/sharedptr_registry.hpp"
24 #include "common/errno.h"
25 #include "osd/HitSet.h"
26
27 #ifndef RADOSMODEL_H
28 #define RADOSMODEL_H
29
30 using namespace std;
31
32 class RadosTestContext;
33 class TestOpStat;
34
35 template <typename T>
36 typename T::iterator rand_choose(T &cont) {
37   if (cont.size() == 0) {
38     return cont.end();
39   }
40   int index = rand() % cont.size();
41   typename T::iterator retval = cont.begin();
42
43   for (; index > 0; --index) ++retval;
44   return retval;
45 }
46
47 enum TestOpType {
48   TEST_OP_READ,
49   TEST_OP_WRITE,
50   TEST_OP_WRITE_EXCL,
51   TEST_OP_WRITESAME,
52   TEST_OP_DELETE,
53   TEST_OP_SNAP_CREATE,
54   TEST_OP_SNAP_REMOVE,
55   TEST_OP_ROLLBACK,
56   TEST_OP_SETATTR,
57   TEST_OP_RMATTR,
58   TEST_OP_WATCH,
59   TEST_OP_COPY_FROM,
60   TEST_OP_HIT_SET_LIST,
61   TEST_OP_UNDIRTY,
62   TEST_OP_IS_DIRTY,
63   TEST_OP_CACHE_FLUSH,
64   TEST_OP_CACHE_TRY_FLUSH,
65   TEST_OP_CACHE_EVICT,
66   TEST_OP_APPEND,
67   TEST_OP_APPEND_EXCL,
68   TEST_OP_SET_REDIRECT,
69   TEST_OP_UNSET_REDIRECT
70 };
71
72 class TestWatchContext : public librados::WatchCtx2 {
73   TestWatchContext(const TestWatchContext&);
74 public:
75   Cond cond;
76   uint64_t handle;
77   bool waiting;
78   Mutex lock;
79   TestWatchContext() : handle(0), waiting(false),
80                        lock("watch lock") {}
81   void handle_notify(uint64_t notify_id, uint64_t cookie,
82                      uint64_t notifier_id,
83                      bufferlist &bl) override {
84     Mutex::Locker l(lock);
85     waiting = false;
86     cond.SignalAll();
87   }
88   void handle_error(uint64_t cookie, int err) override {
89     Mutex::Locker l(lock);
90     cout << "watch handle_error " << err << std::endl;
91   }
92   void start() {
93     Mutex::Locker l(lock);
94     waiting = true;
95   }
96   void wait() {
97     Mutex::Locker l(lock);
98     while (waiting)
99       cond.Wait(lock);
100   }
101   uint64_t &get_handle() {
102     return handle;
103   }
104 };
105
106 class TestOp {
107 public:
108   int num;
109   RadosTestContext *context;
110   TestOpStat *stat;
111   bool done;
112   TestOp(int n, RadosTestContext *context,
113          TestOpStat *stat = 0)
114     : num(n),
115       context(context),
116       stat(stat),
117       done(false)
118   {}
119
120   virtual ~TestOp() {};
121
122   /**
123    * This struct holds data to be passed by a callback
124    * to a TestOp::finish method.
125    */
126   struct CallbackInfo {
127     uint64_t id;
128     explicit CallbackInfo(uint64_t id) : id(id) {}
129     virtual ~CallbackInfo() {};
130   };
131
132   virtual void _begin() = 0;
133
134   /**
135    * Called when the operation completes.
136    * This should be overridden by asynchronous operations.
137    *
138    * @param info information stored by a callback, or NULL -
139    *             useful for multi-operation TestOps
140    */
141   virtual void _finish(CallbackInfo *info)
142   {
143     return;
144   }
145   virtual string getType() = 0;
146   virtual bool finished()
147   {
148     return true;
149   }
150
151   void begin();
152   void finish(CallbackInfo *info);
153   virtual bool must_quiesce_other_ops() { return false; }
154 };
155
156 class TestOpGenerator {
157 public:
158   virtual ~TestOpGenerator() {};
159   virtual TestOp *next(RadosTestContext &context) = 0;
160 };
161
162 class RadosTestContext {
163 public:
164   Mutex state_lock;
165   Cond wait_cond;
166   map<int, map<string,ObjectDesc> > pool_obj_cont;
167   set<string> oid_in_use;
168   set<string> oid_not_in_use;
169   set<string> oid_flushing;
170   set<string> oid_not_flushing;
171   set<string> oid_redirect_not_in_use;
172   set<string> oid_redirect_in_use;
173   SharedPtrRegistry<int, int> snaps_in_use;
174   int current_snap;
175   string pool_name;
176   librados::IoCtx io_ctx;
177   librados::Rados rados;
178   int next_oid;
179   string prefix;
180   int errors;
181   int max_in_flight;
182   int seq_num;
183   map<int,uint64_t> snaps;
184   uint64_t seq;
185   const char *rados_id;
186   bool initialized;
187   map<string, TestWatchContext*> watches;
188   const uint64_t max_size;
189   const uint64_t min_stride_size;
190   const uint64_t max_stride_size;
191   AttrGenerator attr_gen;
192   const bool no_omap;
193   const bool no_sparse;
194   bool pool_snaps;
195   bool write_fadvise_dontneed;
196   int snapname_num;
197   map<string,string > redirect_objs;
198
199   RadosTestContext(const string &pool_name, 
200                    int max_in_flight,
201                    uint64_t max_size,
202                    uint64_t min_stride_size,
203                    uint64_t max_stride_size,
204                    bool no_omap,
205                    bool no_sparse,
206                    bool pool_snaps,
207                    bool write_fadvise_dontneed,
208                    const char *id = 0) :
209     state_lock("Context Lock"),
210     pool_obj_cont(),
211     current_snap(0),
212     pool_name(pool_name),
213     next_oid(0),
214     errors(0),
215     max_in_flight(max_in_flight),
216     seq_num(0), seq(0),
217     rados_id(id), initialized(false),
218     max_size(max_size), 
219     min_stride_size(min_stride_size), max_stride_size(max_stride_size),
220     attr_gen(2000, 20000),
221     no_omap(no_omap),
222     no_sparse(no_sparse),
223     pool_snaps(pool_snaps),
224     write_fadvise_dontneed(write_fadvise_dontneed),
225     snapname_num(0)
226   {
227   }
228
229   int init()
230   {
231     int r = rados.init(rados_id);
232     if (r < 0)
233       return r;
234     r = rados.conf_read_file(NULL);
235     if (r < 0)
236       return r;
237     r = rados.conf_parse_env(NULL);
238     if (r < 0)
239       return r;
240     r = rados.connect();
241     if (r < 0)
242       return r;
243     r = rados.ioctx_create(pool_name.c_str(), io_ctx);
244     if (r < 0) {
245       rados.shutdown();
246       return r;
247     }
248     bufferlist inbl;
249     r = rados.mon_command(
250       "{\"prefix\": \"osd pool set\", \"pool\": \"" + pool_name +
251       "\", \"var\": \"write_fadvise_dontneed\", \"val\": \"" + (write_fadvise_dontneed ? "true" : "false") + "\"}",
252       inbl, NULL, NULL);
253     if (r < 0) {
254       rados.shutdown();
255       return r;
256     }
257     char hostname_cstr[100];
258     gethostname(hostname_cstr, 100);
259     stringstream hostpid;
260     hostpid << hostname_cstr << getpid() << "-";
261     prefix = hostpid.str();
262     assert(!initialized);
263     initialized = true;
264     return 0;
265   }
266
267   void shutdown()
268   {
269     if (initialized) {
270       rados.shutdown();
271     }
272   }
273
274   void loop(TestOpGenerator *gen)
275   {
276     assert(initialized);
277     list<TestOp*> inflight;
278     state_lock.Lock();
279
280     TestOp *next = gen->next(*this);
281     TestOp *waiting = NULL;
282
283     while (next || !inflight.empty()) {
284       if (next && next->must_quiesce_other_ops() && !inflight.empty()) {
285         waiting = next;
286         next = NULL;   // Force to wait for inflight to drain
287       }
288       if (next) {
289         inflight.push_back(next);
290       }
291       state_lock.Unlock();
292       if (next) {
293         (*inflight.rbegin())->begin();
294       }
295       state_lock.Lock();
296       while (1) {
297         for (list<TestOp*>::iterator i = inflight.begin();
298              i != inflight.end();) {
299           if ((*i)->finished()) {
300             cout << (*i)->num << ": done (" << (inflight.size()-1) << " left)" << std::endl;
301             delete *i;
302             inflight.erase(i++);
303           } else {
304             ++i;
305           }
306         }
307         
308         if (inflight.size() >= (unsigned) max_in_flight || (!next && !inflight.empty())) {
309           cout << " waiting on " << inflight.size() << std::endl;
310           wait();
311         } else {
312           break;
313         }
314       }
315       if (waiting) {
316         next = waiting;
317         waiting = NULL;
318       } else {
319         next = gen->next(*this);
320       }
321     }
322     state_lock.Unlock();
323   }
324
325   void wait()
326   {
327     wait_cond.Wait(state_lock);
328   }
329
330   void kick()
331   {
332     wait_cond.Signal();
333   }
334
335   TestWatchContext *get_watch_context(const string &oid) {
336     return watches.count(oid) ? watches[oid] : 0;
337   }
338
339   TestWatchContext *watch(const string &oid) {
340     assert(!watches.count(oid));
341     return (watches[oid] = new TestWatchContext);
342   }
343
344   void unwatch(const string &oid) {
345     assert(watches.count(oid));
346     delete watches[oid];
347     watches.erase(oid);
348   }
349
350   ObjectDesc get_most_recent(const string &oid) {
351     ObjectDesc new_obj;
352     for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
353            pool_obj_cont.rbegin();
354          i != pool_obj_cont.rend();
355          ++i) {
356       map<string,ObjectDesc>::iterator j = i->second.find(oid);
357       if (j != i->second.end()) {
358         new_obj = j->second;
359         break;
360       }
361     }
362     return new_obj;
363   }
364
365   void rm_object_attrs(const string &oid, const set<string> &attrs)
366   {
367     ObjectDesc new_obj = get_most_recent(oid);
368     for (set<string>::const_iterator i = attrs.begin();
369          i != attrs.end();
370          ++i) {
371       new_obj.attrs.erase(*i);
372     }
373     new_obj.dirty = true;
374     pool_obj_cont[current_snap].erase(oid);
375     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
376   }
377
378   void remove_object_header(const string &oid)
379   {
380     ObjectDesc new_obj = get_most_recent(oid);
381     new_obj.header = bufferlist();
382     new_obj.dirty = true;
383     pool_obj_cont[current_snap].erase(oid);
384     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
385   }
386
387
388   void update_object_header(const string &oid, const bufferlist &bl)
389   {
390     ObjectDesc new_obj = get_most_recent(oid);
391     new_obj.header = bl;
392     new_obj.exists = true;
393     new_obj.dirty = true;
394     pool_obj_cont[current_snap].erase(oid);
395     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
396   }
397
398   void update_object_attrs(const string &oid, const map<string, ContDesc> &attrs)
399   {
400     ObjectDesc new_obj = get_most_recent(oid);
401     for (map<string, ContDesc>::const_iterator i = attrs.begin();
402          i != attrs.end();
403          ++i) {
404       new_obj.attrs[i->first] = i->second;
405     }
406     new_obj.exists = true;
407     new_obj.dirty = true;
408     pool_obj_cont[current_snap].erase(oid);
409     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
410   }
411
412   void update_object(ContentsGenerator *cont_gen,
413                      const string &oid, const ContDesc &contents)
414   {
415     ObjectDesc new_obj = get_most_recent(oid);
416     new_obj.exists = true;
417     new_obj.dirty = true;
418     new_obj.update(cont_gen,
419                    contents);
420     pool_obj_cont[current_snap].erase(oid);
421     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
422   }
423
424   void update_object_full(const string &oid, const ObjectDesc &contents)
425   {
426     pool_obj_cont[current_snap].erase(oid);
427     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, contents));
428     pool_obj_cont[current_snap][oid].dirty = true;
429   }
430
431   void update_object_undirty(const string &oid)
432   {
433     ObjectDesc new_obj = get_most_recent(oid);
434     new_obj.dirty = false;
435     pool_obj_cont[current_snap].erase(oid);
436     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
437   }
438
439   void update_object_version(const string &oid, uint64_t version,
440                              int snap = -1)
441   {
442     for (map<int, map<string,ObjectDesc> >::reverse_iterator i = 
443            pool_obj_cont.rbegin();
444          i != pool_obj_cont.rend();
445          ++i) {
446       if (snap != -1 && snap < i->first)
447         continue;
448       map<string,ObjectDesc>::iterator j = i->second.find(oid);
449       if (j != i->second.end()) {
450         if (version)
451           j->second.version = version;
452         cout << __func__ << " oid " << oid
453              << " v " << version << " " << j->second.most_recent()
454              << " " << (j->second.dirty ? "dirty" : "clean")
455              << " " << (j->second.exists ? "exists" : "dne")
456              << std::endl;
457         break;
458       }
459     }
460   }
461
462   void remove_object(const string &oid)
463   {
464     assert(!get_watch_context(oid));
465     ObjectDesc new_obj;
466     pool_obj_cont[current_snap].erase(oid);
467     pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
468   }
469
470   bool find_object(const string &oid, ObjectDesc *contents, int snap = -1) const
471   {
472     for (map<int, map<string,ObjectDesc> >::const_reverse_iterator i = 
473            pool_obj_cont.rbegin();
474          i != pool_obj_cont.rend();
475          ++i) {
476       if (snap != -1 && snap < i->first) continue;
477       if (i->second.count(oid) != 0) {
478         *contents = i->second.find(oid)->second;
479         return true;
480       }
481     }
482     return false;
483   }
484
485   void update_object_redirect_target(const string &oid, const string &target)
486   {
487     redirect_objs[oid] = target;
488   }
489
490   bool object_existed_at(const string &oid, int snap = -1) const
491   {
492     ObjectDesc contents;
493     bool found = find_object(oid, &contents, snap);
494     return found && contents.exists;
495   }
496
497   void remove_snap(int snap)
498   {
499     map<int, map<string,ObjectDesc> >::iterator next_iter = pool_obj_cont.find(snap);
500     assert(next_iter != pool_obj_cont.end());
501     map<int, map<string,ObjectDesc> >::iterator current_iter = next_iter++;
502     assert(current_iter != pool_obj_cont.end());
503     map<string,ObjectDesc> &current = current_iter->second;
504     map<string,ObjectDesc> &next = next_iter->second;
505     for (map<string,ObjectDesc>::iterator i = current.begin();
506          i != current.end();
507          ++i) {
508       if (next.count(i->first) == 0) {
509         next.insert(pair<string,ObjectDesc>(i->first, i->second));
510       }
511     }
512     pool_obj_cont.erase(current_iter);
513     snaps.erase(snap);
514   }
515
516   void add_snap(uint64_t snap)
517   {
518     snaps[current_snap] = snap;
519     current_snap++;
520     pool_obj_cont[current_snap];
521     seq = snap;
522   }
523
524   void roll_back(const string &oid, int snap)
525   {
526     assert(!get_watch_context(oid));
527     ObjectDesc contents;
528     find_object(oid, &contents, snap);
529     contents.dirty = true;
530     pool_obj_cont.rbegin()->second.erase(oid);
531     pool_obj_cont.rbegin()->second.insert(pair<string,ObjectDesc>(oid, contents));
532   }
533 };
534
535 void read_callback(librados::completion_t comp, void *arg);
536 void write_callback(librados::completion_t comp, void *arg);
537
538 class RemoveAttrsOp : public TestOp {
539 public:
540   string oid;
541   librados::ObjectWriteOperation op;
542   librados::AioCompletion *comp;
543   RemoveAttrsOp(int n, RadosTestContext *context,
544                const string &oid,
545                TestOpStat *stat)
546     : TestOp(n, context, stat), oid(oid), comp(NULL)
547   {}
548
549   void _begin() override
550   {
551     ContDesc cont;
552     set<string> to_remove;
553     {
554       Mutex::Locker l(context->state_lock);
555       ObjectDesc obj;
556       if (!context->find_object(oid, &obj)) {
557         context->kick();
558         done = true;
559         return;
560       }
561       cont = ContDesc(context->seq_num, context->current_snap,
562                       context->seq_num, "");
563       context->oid_in_use.insert(oid);
564       context->oid_not_in_use.erase(oid);
565
566       if (rand() % 30) {
567         ContentsGenerator::iterator iter = context->attr_gen.get_iterator(cont);
568         for (map<string, ContDesc>::iterator i = obj.attrs.begin();
569              i != obj.attrs.end();
570              ++i, ++iter) {
571           if (!(*iter % 3)) {
572             to_remove.insert(i->first);
573             op.rmxattr(i->first.c_str());
574           }
575         }
576         if (to_remove.empty()) {
577           context->kick();
578           context->oid_in_use.erase(oid);
579           context->oid_not_in_use.insert(oid);
580           done = true;
581           return;
582         }
583         if (!context->no_omap) {
584           op.omap_rm_keys(to_remove);
585         }
586       } else {
587         if (!context->no_omap) {
588           op.omap_clear();
589         }
590         for (map<string, ContDesc>::iterator i = obj.attrs.begin();
591              i != obj.attrs.end();
592              ++i) {
593           op.rmxattr(i->first.c_str());
594           to_remove.insert(i->first);
595         }
596         context->remove_object_header(oid);
597       }
598       context->rm_object_attrs(oid, to_remove);
599     }
600
601     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
602       new pair<TestOp*, TestOp::CallbackInfo*>(this,
603                                                new TestOp::CallbackInfo(0));
604     comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
605                                                 &write_callback);
606     context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
607   }
608
609   void _finish(CallbackInfo *info) override
610   {
611     Mutex::Locker l(context->state_lock);
612     done = true;
613     context->update_object_version(oid, comp->get_version64());
614     context->oid_in_use.erase(oid);
615     context->oid_not_in_use.insert(oid);
616     context->kick();
617   }
618
619   bool finished() override
620   {
621     return done;
622   }
623
624   string getType() override
625   {
626     return "RemoveAttrsOp";
627   }
628 };
629
630 class SetAttrsOp : public TestOp {
631 public:
632   string oid;
633   librados::ObjectWriteOperation op;
634   librados::AioCompletion *comp;
635   SetAttrsOp(int n,
636              RadosTestContext *context,
637              const string &oid,
638              TestOpStat *stat)
639     : TestOp(n, context, stat),
640       oid(oid), comp(NULL)
641   {}
642
643   void _begin() override
644   {
645     ContDesc cont;
646     {
647       Mutex::Locker l(context->state_lock);
648       cont = ContDesc(context->seq_num, context->current_snap,
649                       context->seq_num, "");
650       context->oid_in_use.insert(oid);
651       context->oid_not_in_use.erase(oid);
652     }
653
654     map<string, bufferlist> omap_contents;
655     map<string, ContDesc> omap;
656     bufferlist header;
657     ContentsGenerator::iterator keygen = context->attr_gen.get_iterator(cont);
658     op.create(false);
659     while (!*keygen) ++keygen;
660     while (*keygen) {
661       if (*keygen != '_')
662         header.append(*keygen);
663       ++keygen;
664     }
665     for (int i = 0; i < 20; ++i) {
666       string key;
667       while (!*keygen) ++keygen;
668       while (*keygen && key.size() < 40) {
669         key.push_back((*keygen % 20) + 'a');
670         ++keygen;
671       }
672       ContDesc val(cont);
673       val.seqnum += (unsigned)(*keygen);
674       val.prefix = ("oid: " + oid);
675       omap[key] = val;
676       bufferlist val_buffer = context->attr_gen.gen_bl(val);
677       omap_contents[key] = val_buffer;
678       op.setxattr(key.c_str(), val_buffer);
679     }
680     if (!context->no_omap) {
681       op.omap_set_header(header);
682       op.omap_set(omap_contents);
683     }
684
685     {
686       Mutex::Locker l(context->state_lock);
687       context->update_object_header(oid, header);
688       context->update_object_attrs(oid, omap);
689     }
690
691     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
692       new pair<TestOp*, TestOp::CallbackInfo*>(this,
693                                                new TestOp::CallbackInfo(0));
694     comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
695                                                 &write_callback);
696     context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
697   }
698
699   void _finish(CallbackInfo *info) override
700   {
701     Mutex::Locker l(context->state_lock);
702     int r;
703     if ((r = comp->get_return_value())) {
704       cerr << "err " << r << std::endl;
705       ceph_abort();
706     }
707     done = true;
708     context->update_object_version(oid, comp->get_version64());
709     context->oid_in_use.erase(oid);
710     context->oid_not_in_use.insert(oid);
711     context->kick();
712   }
713
714   bool finished() override
715   {
716     return done;
717   }
718
719   string getType() override
720   {
721     return "SetAttrsOp";
722   }
723 };
724
725 class WriteOp : public TestOp {
726 public:
727   string oid;
728   ContDesc cont;
729   set<librados::AioCompletion *> waiting;
730   librados::AioCompletion *rcompletion;
731   uint64_t waiting_on;
732   uint64_t last_acked_tid;
733
734   librados::ObjectReadOperation read_op;
735   librados::ObjectWriteOperation write_op;
736   bufferlist rbuffer;
737
738   bool do_append;
739   bool do_excl;
740
741   WriteOp(int n,
742           RadosTestContext *context,
743           const string &oid,
744           bool do_append,
745           bool do_excl,
746           TestOpStat *stat = 0)
747     : TestOp(n, context, stat),
748       oid(oid), rcompletion(NULL), waiting_on(0), 
749       last_acked_tid(0), do_append(do_append),
750       do_excl(do_excl)
751   {}
752                 
753   void _begin() override
754   {
755     context->state_lock.Lock();
756     done = 0;
757     stringstream acc;
758     acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl;
759     string prefix = acc.str();
760
761     cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix);
762
763     ContentsGenerator *cont_gen;
764     if (do_append) {
765       ObjectDesc old_value;
766       bool found = context->find_object(oid, &old_value);
767       uint64_t prev_length = found && old_value.has_contents() ?
768         old_value.most_recent_gen()->get_length(old_value.most_recent()) :
769         0;
770       bool requires;
771       int r = context->io_ctx.pool_requires_alignment2(&requires);
772       assert(r == 0);
773       uint64_t alignment = 0;
774       if (requires) {
775         r = context->io_ctx.pool_required_alignment2(&alignment);
776         assert(r == 0);
777         assert(alignment != 0);
778       }
779       cont_gen = new AppendGenerator(
780         prev_length,
781         alignment,
782         context->min_stride_size,
783         context->max_stride_size,
784         3);
785     } else {
786       cont_gen = new VarLenGenerator(
787         context->max_size, context->min_stride_size, context->max_stride_size);
788     }
789     context->update_object(cont_gen, oid, cont);
790
791     context->oid_in_use.insert(oid);
792     context->oid_not_in_use.erase(oid);
793
794     map<uint64_t, uint64_t> ranges;
795
796     cont_gen->get_ranges_map(cont, ranges);
797     std::cout << num << ":  seq_num " << context->seq_num << " ranges " << ranges << std::endl;
798     context->seq_num++;
799
800     waiting_on = ranges.size();
801     ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont);
802     uint64_t tid = 1;
803     for (map<uint64_t, uint64_t>::iterator i = ranges.begin(); 
804          i != ranges.end();
805          ++i, ++tid) {
806       gen_pos.seek(i->first);
807       bufferlist to_write = gen_pos.gen_bl_advance(i->second);
808       assert(to_write.length() == i->second);
809       assert(to_write.length() > 0);
810       std::cout << num << ":  writing " << context->prefix+oid
811                 << " from " << i->first
812                 << " to " << i->first + i->second << " tid " << tid << std::endl;
813       pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
814         new pair<TestOp*, TestOp::CallbackInfo*>(this,
815                                                  new TestOp::CallbackInfo(tid));
816       librados::AioCompletion *completion =
817         context->rados.aio_create_completion((void*) cb_arg, NULL,
818                                              &write_callback);
819       waiting.insert(completion);
820       librados::ObjectWriteOperation op;
821       if (do_append) {
822         op.append(to_write);
823       } else {
824         op.write(i->first, to_write);
825       }
826       if (do_excl && tid == 1)
827         op.assert_exists();
828       context->io_ctx.aio_operate(
829         context->prefix+oid, completion,
830         &op);
831     }
832
833     bufferlist contbl;
834     ::encode(cont, contbl);
835     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
836       new pair<TestOp*, TestOp::CallbackInfo*>(
837         this,
838         new TestOp::CallbackInfo(++tid));
839     librados::AioCompletion *completion = context->rados.aio_create_completion(
840       (void*) cb_arg, NULL, &write_callback);
841     waiting.insert(completion);
842     waiting_on++;
843     write_op.setxattr("_header", contbl);
844     if (!do_append) {
845       write_op.truncate(cont_gen->get_length(cont));
846     }
847     context->io_ctx.aio_operate(
848       context->prefix+oid, completion, &write_op);
849
850     cb_arg =
851       new pair<TestOp*, TestOp::CallbackInfo*>(
852         this,
853         new TestOp::CallbackInfo(++tid));
854     rcompletion = context->rados.aio_create_completion(
855          (void*) cb_arg, NULL, &write_callback);
856     waiting_on++;
857     read_op.read(0, 1, &rbuffer, 0);
858     context->io_ctx.aio_operate(
859       context->prefix+oid, rcompletion,
860       &read_op,
861       librados::OPERATION_ORDER_READS_WRITES,  // order wrt previous write/update
862       0);
863     context->state_lock.Unlock();
864   }
865
866   void _finish(CallbackInfo *info) override
867   {
868     assert(info);
869     context->state_lock.Lock();
870     uint64_t tid = info->id;
871
872     cout << num << ":  finishing write tid " << tid << " to " << context->prefix + oid << std::endl;
873
874     if (tid <= last_acked_tid) {
875       cerr << "Error: finished tid " << tid
876            << " when last_acked_tid was " << last_acked_tid << std::endl;
877       ceph_abort();
878     }
879     last_acked_tid = tid;
880
881     assert(!done);
882     waiting_on--;
883     if (waiting_on == 0) {
884       uint64_t version = 0;
885       for (set<librados::AioCompletion *>::iterator i = waiting.begin();
886            i != waiting.end();
887            ) {
888         assert((*i)->is_complete());
889         if (int err = (*i)->get_return_value()) {
890           cerr << "Error: oid " << oid << " write returned error code "
891                << err << std::endl;
892         }
893         if ((*i)->get_version64() > version)
894           version = (*i)->get_version64();
895         (*i)->release();
896         waiting.erase(i++);
897       }
898       
899       context->update_object_version(oid, version);
900       if (rcompletion->get_version64() != version) {
901         cerr << "Error: racing read on " << oid << " returned version "
902              << rcompletion->get_version64() << " rather than version "
903              << version << std::endl;
904         assert(0 == "racing read got wrong version");
905       }
906
907       {
908         ObjectDesc old_value;
909         assert(context->find_object(oid, &old_value, -1));
910         if (old_value.deleted())
911           std::cout << num << ":  left oid " << oid << " deleted" << std::endl;
912         else
913           std::cout << num << ":  left oid " << oid << " "
914                     << old_value.most_recent() << std::endl;
915       }
916
917       rcompletion->release();
918       context->oid_in_use.erase(oid);
919       context->oid_not_in_use.insert(oid);
920       context->kick();
921       done = true;
922     }
923     context->state_lock.Unlock();
924   }
925
926   bool finished() override
927   {
928     return done;
929   }
930
931   string getType() override
932   {
933     return "WriteOp";
934   }
935 };
936
937 class WriteSameOp : public TestOp {
938 public:
939   string oid;
940   ContDesc cont;
941   set<librados::AioCompletion *> waiting;
942   librados::AioCompletion *rcompletion;
943   uint64_t waiting_on;
944   uint64_t last_acked_tid;
945
946   librados::ObjectReadOperation read_op;
947   librados::ObjectWriteOperation write_op;
948   bufferlist rbuffer;
949
950   WriteSameOp(int n,
951           RadosTestContext *context,
952           const string &oid,
953           TestOpStat *stat = 0)
954     : TestOp(n, context, stat),
955       oid(oid), rcompletion(NULL), waiting_on(0),
956       last_acked_tid(0)
957   {}
958
959   void _begin() override
960   {
961     context->state_lock.Lock();
962     done = 0;
963     stringstream acc;
964     acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl;
965     string prefix = acc.str();
966
967     cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix);
968
969     ContentsGenerator *cont_gen;
970     cont_gen = new VarLenGenerator(
971         context->max_size, context->min_stride_size, context->max_stride_size);
972     context->update_object(cont_gen, oid, cont);
973
974     context->oid_in_use.insert(oid);
975     context->oid_not_in_use.erase(oid);
976
977     map<uint64_t, uint64_t> ranges;
978
979     cont_gen->get_ranges_map(cont, ranges);
980     std::cout << num << ":  seq_num " << context->seq_num << " ranges " << ranges << std::endl;
981     context->seq_num++;
982
983     waiting_on = ranges.size();
984     ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont);
985     uint64_t tid = 1;
986     for (map<uint64_t, uint64_t>::iterator i = ranges.begin();
987          i != ranges.end();
988          ++i, ++tid) {
989       gen_pos.seek(i->first);
990       bufferlist to_write = gen_pos.gen_bl_advance(i->second);
991       assert(to_write.length() == i->second);
992       assert(to_write.length() > 0);
993       std::cout << num << ":  writing " << context->prefix+oid
994                 << " from " << i->first
995                 << " to " << i->first + i->second << " tid " << tid << std::endl;
996       pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
997         new pair<TestOp*, TestOp::CallbackInfo*>(this,
998                                                  new TestOp::CallbackInfo(tid));
999       librados::AioCompletion *completion =
1000         context->rados.aio_create_completion((void*) cb_arg, NULL,
1001                                              &write_callback);
1002       waiting.insert(completion);
1003       librados::ObjectWriteOperation op;
1004       /* no writesame multiplication factor for now */
1005       op.writesame(i->first, to_write.length(), to_write);
1006
1007       context->io_ctx.aio_operate(
1008         context->prefix+oid, completion,
1009         &op);
1010     }
1011
1012     bufferlist contbl;
1013     ::encode(cont, contbl);
1014     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1015       new pair<TestOp*, TestOp::CallbackInfo*>(
1016         this,
1017         new TestOp::CallbackInfo(++tid));
1018     librados::AioCompletion *completion = context->rados.aio_create_completion(
1019       (void*) cb_arg, NULL, &write_callback);
1020     waiting.insert(completion);
1021     waiting_on++;
1022     write_op.setxattr("_header", contbl);
1023     write_op.truncate(cont_gen->get_length(cont));
1024     context->io_ctx.aio_operate(
1025       context->prefix+oid, completion, &write_op);
1026
1027     cb_arg =
1028       new pair<TestOp*, TestOp::CallbackInfo*>(
1029         this,
1030         new TestOp::CallbackInfo(++tid));
1031     rcompletion = context->rados.aio_create_completion(
1032          (void*) cb_arg, NULL, &write_callback);
1033     waiting_on++;
1034     read_op.read(0, 1, &rbuffer, 0);
1035     context->io_ctx.aio_operate(
1036       context->prefix+oid, rcompletion,
1037       &read_op,
1038       librados::OPERATION_ORDER_READS_WRITES,  // order wrt previous write/update
1039       0);
1040     context->state_lock.Unlock();
1041   }
1042
1043   void _finish(CallbackInfo *info) override
1044   {
1045     assert(info);
1046     context->state_lock.Lock();
1047     uint64_t tid = info->id;
1048
1049     cout << num << ":  finishing writesame tid " << tid << " to " << context->prefix + oid << std::endl;
1050
1051     if (tid <= last_acked_tid) {
1052       cerr << "Error: finished tid " << tid
1053            << " when last_acked_tid was " << last_acked_tid << std::endl;
1054       ceph_abort();
1055     }
1056     last_acked_tid = tid;
1057
1058     assert(!done);
1059     waiting_on--;
1060     if (waiting_on == 0) {
1061       uint64_t version = 0;
1062       for (set<librados::AioCompletion *>::iterator i = waiting.begin();
1063            i != waiting.end();
1064            ) {
1065         assert((*i)->is_complete());
1066         if (int err = (*i)->get_return_value()) {
1067           cerr << "Error: oid " << oid << " writesame returned error code "
1068                << err << std::endl;
1069         }
1070         if ((*i)->get_version64() > version)
1071           version = (*i)->get_version64();
1072         (*i)->release();
1073         waiting.erase(i++);
1074       }
1075
1076       context->update_object_version(oid, version);
1077       if (rcompletion->get_version64() != version) {
1078         cerr << "Error: racing read on " << oid << " returned version "
1079              << rcompletion->get_version64() << " rather than version "
1080              << version << std::endl;
1081         assert(0 == "racing read got wrong version");
1082       }
1083
1084       {
1085         ObjectDesc old_value;
1086         assert(context->find_object(oid, &old_value, -1));
1087         if (old_value.deleted())
1088           std::cout << num << ":  left oid " << oid << " deleted" << std::endl;
1089         else
1090           std::cout << num << ":  left oid " << oid << " "
1091                     << old_value.most_recent() << std::endl;
1092       }
1093
1094       rcompletion->release();
1095       context->oid_in_use.erase(oid);
1096       context->oid_not_in_use.insert(oid);
1097       context->kick();
1098       done = true;
1099     }
1100     context->state_lock.Unlock();
1101   }
1102
1103   bool finished() override
1104   {
1105     return done;
1106   }
1107
1108   string getType() override
1109   {
1110     return "WriteSameOp";
1111   }
1112 };
1113
1114 class DeleteOp : public TestOp {
1115 public:
1116   string oid;
1117
1118   DeleteOp(int n,
1119            RadosTestContext *context,
1120            const string &oid,
1121            TestOpStat *stat = 0)
1122     : TestOp(n, context, stat), oid(oid)
1123   {}
1124
1125   void _begin() override
1126   {
1127     context->state_lock.Lock();
1128     if (context->get_watch_context(oid)) {
1129       context->kick();
1130       context->state_lock.Unlock();
1131       return;
1132     }
1133
1134     ObjectDesc contents;
1135     context->find_object(oid, &contents);
1136     bool present = !contents.deleted();
1137
1138     context->oid_in_use.insert(oid);
1139     context->oid_not_in_use.erase(oid);
1140     context->seq_num++;
1141
1142     context->remove_object(oid);
1143
1144     interval_set<uint64_t> ranges;
1145     context->state_lock.Unlock();
1146
1147     int r = 0;
1148     if (rand() % 2) {
1149       librados::ObjectWriteOperation op;
1150       op.assert_exists();
1151       op.remove();
1152       r = context->io_ctx.operate(context->prefix+oid, &op);
1153     } else {
1154       r = context->io_ctx.remove(context->prefix+oid);
1155     }
1156     if (r && !(r == -ENOENT && !present)) {
1157       cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
1158       ceph_abort();
1159     }
1160
1161     context->state_lock.Lock();
1162     context->oid_in_use.erase(oid);
1163     context->oid_not_in_use.insert(oid);
1164     context->kick();
1165     context->state_lock.Unlock();
1166   }
1167
1168   string getType() override
1169   {
1170     return "DeleteOp";
1171   }
1172 };
1173
1174 class ReadOp : public TestOp {
1175 public:
1176   vector<librados::AioCompletion *> completions;
1177   librados::ObjectReadOperation op;
1178   string oid;
1179   ObjectDesc old_value;
1180   int snap;
1181   bool balance_reads;
1182
1183   ceph::shared_ptr<int> in_use;
1184
1185   vector<bufferlist> results;
1186   vector<int> retvals;
1187   vector<std::map<uint64_t, uint64_t>> extent_results;
1188   vector<bool> is_sparse_read;
1189   uint64_t waiting_on;
1190
1191   vector<bufferlist> checksums;
1192   vector<int> checksum_retvals;
1193
1194   map<string, bufferlist> attrs;
1195   int attrretval;
1196
1197   set<string> omap_requested_keys;
1198   map<string, bufferlist> omap_returned_values;
1199   set<string> omap_keys;
1200   map<string, bufferlist> omap;
1201   bufferlist header;
1202
1203   map<string, bufferlist> xattrs;
1204   ReadOp(int n,
1205          RadosTestContext *context,
1206          const string &oid,
1207          bool balance_reads,
1208          TestOpStat *stat = 0)
1209     : TestOp(n, context, stat),
1210       completions(3),
1211       oid(oid),
1212       snap(0),
1213       balance_reads(balance_reads),
1214       results(3),
1215       retvals(3),
1216       extent_results(3),
1217       is_sparse_read(3, false),
1218       waiting_on(0),
1219       checksums(3),
1220       checksum_retvals(3),
1221       attrretval(0)
1222   {}
1223
1224   void _do_read(librados::ObjectReadOperation& read_op, int index) {
1225     uint64_t len = 0;
1226     if (old_value.has_contents())
1227       len = old_value.most_recent_gen()->get_length(old_value.most_recent());
1228     if (context->no_sparse || rand() % 2) {
1229       is_sparse_read[index] = false;
1230       read_op.read(0,
1231                    len,
1232                    &results[index],
1233                    &retvals[index]);
1234       bufferlist init_value_bl;
1235       ::encode(static_cast<uint32_t>(-1), init_value_bl);
1236       read_op.checksum(LIBRADOS_CHECKSUM_TYPE_CRC32C, init_value_bl, 0, len,
1237                        0, &checksums[index], &checksum_retvals[index]);
1238     } else {
1239       is_sparse_read[index] = true;
1240       read_op.sparse_read(0,
1241                           len,
1242                           &extent_results[index],
1243                           &results[index],
1244                           &retvals[index]);
1245     }
1246   }
1247
1248   void _begin() override
1249   {
1250     context->state_lock.Lock();
1251     if (!(rand() % 4) && !context->snaps.empty()) {
1252       snap = rand_choose(context->snaps)->first;
1253       in_use = context->snaps_in_use.lookup_or_create(snap, snap);
1254     } else {
1255       snap = -1;
1256     }
1257     std::cout << num << ": read oid " << oid << " snap " << snap << std::endl;
1258     done = 0;
1259     for (uint32_t i = 0; i < 3; i++) {
1260       completions[i] = context->rados.aio_create_completion((void *) this, &read_callback, 0);
1261     }
1262
1263     context->oid_in_use.insert(oid);
1264     context->oid_not_in_use.erase(oid);
1265     assert(context->find_object(oid, &old_value, snap));
1266     if (old_value.deleted())
1267       std::cout << num << ":  expect deleted" << std::endl;
1268     else
1269       std::cout << num << ":  expect " << old_value.most_recent() << std::endl;
1270
1271     TestWatchContext *ctx = context->get_watch_context(oid);
1272     context->state_lock.Unlock();
1273     if (ctx) {
1274       assert(old_value.exists);
1275       TestAlarm alarm;
1276       std::cerr << num << ":  about to start" << std::endl;
1277       ctx->start();
1278       std::cerr << num << ":  started" << std::endl;
1279       bufferlist bl;
1280       context->io_ctx.set_notify_timeout(600);
1281       int r = context->io_ctx.notify2(context->prefix+oid, bl, 0, NULL);
1282       if (r < 0) {
1283         std::cerr << "r is " << r << std::endl;
1284         ceph_abort();
1285       }
1286       std::cerr << num << ":  notified, waiting" << std::endl;
1287       ctx->wait();
1288     }
1289     context->state_lock.Lock();
1290     if (snap >= 0) {
1291       context->io_ctx.snap_set_read(context->snaps[snap]);
1292     }
1293     _do_read(op, 0);
1294     for (map<string, ContDesc>::iterator i = old_value.attrs.begin();
1295          i != old_value.attrs.end();
1296          ++i) {
1297       if (rand() % 2) {
1298         string key = i->first;
1299         if (rand() % 2)
1300           key.push_back((rand() % 26) + 'a');
1301         omap_requested_keys.insert(key);
1302       }
1303     }
1304     if (!context->no_omap) {
1305       op.omap_get_vals_by_keys(omap_requested_keys, &omap_returned_values, 0);
1306       // NOTE: we're ignore pmore here, which assumes the OSD limit is high
1307       // enough for us.
1308       op.omap_get_keys2("", -1, &omap_keys, nullptr, nullptr);
1309       op.omap_get_vals2("", -1, &omap, nullptr, nullptr);
1310       op.omap_get_header(&header, 0);
1311     }
1312     op.getxattrs(&xattrs, 0);
1313
1314     unsigned flags = 0;
1315     if (balance_reads)
1316       flags |= librados::OPERATION_BALANCE_READS;
1317
1318     assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[0], &op,
1319                                         flags, NULL));
1320     waiting_on++;
1321  
1322     // send 2 pipelined reads on the same object/snap. This can help testing
1323     // OSD's read behavior in some scenarios
1324     for (uint32_t i = 1; i < 3; ++i) {
1325       librados::ObjectReadOperation pipeline_op;
1326       _do_read(pipeline_op, i);
1327       assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[i], &pipeline_op, 0));
1328       waiting_on++;
1329     }
1330
1331     if (snap >= 0) {
1332       context->io_ctx.snap_set_read(0);
1333     }
1334     context->state_lock.Unlock();
1335   }
1336
1337   void _finish(CallbackInfo *info) override
1338   {
1339     Mutex::Locker l(context->state_lock);
1340     assert(!done);
1341     assert(waiting_on > 0);
1342     if (--waiting_on) {
1343       return;
1344     }
1345
1346     context->oid_in_use.erase(oid);
1347     context->oid_not_in_use.insert(oid);
1348     int retval = completions[0]->get_return_value();
1349     for (vector<librados::AioCompletion *>::iterator it = completions.begin();
1350          it != completions.end(); ++it) {
1351       assert((*it)->is_complete());
1352       uint64_t version = (*it)->get_version64();
1353       int err = (*it)->get_return_value();
1354       if (err != retval) {
1355         cerr << num << ": Error: oid " << oid << " read returned different error codes: "
1356              << retval << " and " << err << std::endl;
1357         ceph_abort();
1358       }
1359       if (err) {
1360         if (!(err == -ENOENT && old_value.deleted())) {
1361           cerr << num << ": Error: oid " << oid << " read returned error code "
1362                << err << std::endl;
1363           ceph_abort();
1364         }
1365       } else if (version != old_value.version) {
1366         cerr << num << ": oid " << oid << " version is " << version
1367              << " and expected " << old_value.version << std::endl;
1368         assert(version == old_value.version);
1369       }
1370     }
1371     if (!retval) {
1372       map<string, bufferlist>::iterator iter = xattrs.find("_header");
1373       bufferlist headerbl;
1374       if (iter == xattrs.end()) {
1375         if (old_value.has_contents()) {
1376           cerr << num << ": Error: did not find header attr, has_contents: "
1377                << old_value.has_contents()
1378                << std::endl;
1379           assert(!old_value.has_contents());
1380         }
1381       } else {
1382         headerbl = iter->second;
1383         xattrs.erase(iter);
1384       }
1385       if (old_value.deleted()) {
1386         std::cout << num << ":  expect deleted" << std::endl;
1387         assert(0 == "expected deleted");
1388       } else {
1389         std::cout << num << ":  expect " << old_value.most_recent() << std::endl;
1390       }
1391       if (old_value.has_contents()) {
1392         ContDesc to_check;
1393         bufferlist::iterator p = headerbl.begin();
1394         ::decode(to_check, p);
1395         if (to_check != old_value.most_recent()) {
1396           cerr << num << ": oid " << oid << " found incorrect object contents " << to_check
1397                << ", expected " << old_value.most_recent() << std::endl;
1398           context->errors++;
1399         }
1400         for (unsigned i = 0; i < results.size(); i++) {
1401           if (is_sparse_read[i]) {
1402             if (!old_value.check_sparse(extent_results[i], results[i])) {
1403               cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
1404               context->errors++;
1405             }
1406           } else {
1407             if (!old_value.check(results[i])) {
1408               cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
1409               context->errors++;
1410             }
1411
1412             uint32_t checksum = 0;
1413             if (checksum_retvals[i] == 0) {
1414               try {
1415                 auto bl_it = checksums[i].begin();
1416                 uint32_t csum_count;
1417                 ::decode(csum_count, bl_it);
1418                 ::decode(checksum, bl_it);
1419               } catch (const buffer::error &err) {
1420                 checksum_retvals[i] = -EBADMSG;
1421               }
1422             }
1423             if (checksum_retvals[i] != 0 || checksum != results[i].crc32c(-1)) {
1424               cerr << num << ": oid " << oid << " checksum " << checksums[i]
1425                    << " incorrect, expecting " << results[i].crc32c(-1)
1426                    << std::endl;
1427               context->errors++;
1428             }
1429           }
1430         }
1431         if (context->errors) ceph_abort();
1432       }
1433
1434       // Attributes
1435       if (!context->no_omap) {
1436         if (!(old_value.header == header)) {
1437           cerr << num << ": oid " << oid << " header does not match, old size: "
1438                << old_value.header.length() << " new size " << header.length()
1439                << std::endl;
1440           assert(old_value.header == header);
1441         }
1442         if (omap.size() != old_value.attrs.size()) {
1443           cerr << num << ": oid " << oid << " omap.size() is " << omap.size()
1444                << " and old is " << old_value.attrs.size() << std::endl;
1445           assert(omap.size() == old_value.attrs.size());
1446         }
1447         if (omap_keys.size() != old_value.attrs.size()) {
1448           cerr << num << ": oid " << oid << " omap.size() is " << omap_keys.size()
1449                << " and old is " << old_value.attrs.size() << std::endl;
1450           assert(omap_keys.size() == old_value.attrs.size());
1451         }
1452       }
1453       if (xattrs.size() != old_value.attrs.size()) {
1454         cerr << num << ": oid " << oid << " xattrs.size() is " << xattrs.size()
1455              << " and old is " << old_value.attrs.size() << std::endl;
1456         assert(xattrs.size() == old_value.attrs.size());
1457       }
1458       for (map<string, ContDesc>::iterator iter = old_value.attrs.begin();
1459            iter != old_value.attrs.end();
1460            ++iter) {
1461         bufferlist bl = context->attr_gen.gen_bl(
1462           iter->second);
1463         if (!context->no_omap) {
1464           map<string, bufferlist>::iterator omap_iter = omap.find(iter->first);
1465           assert(omap_iter != omap.end());
1466           assert(bl.length() == omap_iter->second.length());
1467           bufferlist::iterator k = bl.begin();
1468           for(bufferlist::iterator l = omap_iter->second.begin();
1469               !k.end() && !l.end();
1470               ++k, ++l) {
1471             assert(*l == *k);
1472           }
1473         }
1474         map<string, bufferlist>::iterator xattr_iter = xattrs.find(iter->first);
1475         assert(xattr_iter != xattrs.end());
1476         assert(bl.length() == xattr_iter->second.length());
1477         bufferlist::iterator k = bl.begin();
1478         for (bufferlist::iterator j = xattr_iter->second.begin();
1479              !k.end() && !j.end();
1480              ++j, ++k) {
1481           assert(*j == *k);
1482         }
1483       }
1484       if (!context->no_omap) {
1485         for (set<string>::iterator i = omap_requested_keys.begin();
1486              i != omap_requested_keys.end();
1487              ++i) {
1488           if (!omap_returned_values.count(*i))
1489             assert(!old_value.attrs.count(*i));
1490           if (!old_value.attrs.count(*i))
1491             assert(!omap_returned_values.count(*i));
1492         }
1493         for (map<string, bufferlist>::iterator i = omap_returned_values.begin();
1494              i != omap_returned_values.end();
1495              ++i) {
1496           assert(omap_requested_keys.count(i->first));
1497           assert(omap.count(i->first));
1498           assert(old_value.attrs.count(i->first));
1499           assert(i->second == omap[i->first]);
1500         }
1501       }
1502     }
1503     for (vector<librados::AioCompletion *>::iterator it = completions.begin();
1504          it != completions.end(); ++it) {
1505       (*it)->release();
1506     }
1507     context->kick();
1508     done = true;
1509   }
1510
1511   bool finished() override
1512   {
1513     return done;
1514   }
1515
1516   string getType() override
1517   {
1518     return "ReadOp";
1519   }
1520 };
1521
1522 class SnapCreateOp : public TestOp {
1523 public:
1524   SnapCreateOp(int n,
1525                RadosTestContext *context,
1526                TestOpStat *stat = 0)
1527     : TestOp(n, context, stat)
1528   {}
1529
1530   void _begin() override
1531   {
1532     uint64_t snap;
1533     string snapname;
1534
1535     if (context->pool_snaps) {
1536       stringstream ss;
1537
1538       ss << context->prefix << "snap" << ++context->snapname_num;
1539       snapname = ss.str();
1540
1541       int ret = context->io_ctx.snap_create(snapname.c_str());
1542       if (ret) {
1543         cerr << "snap_create returned " << ret << std::endl;
1544         ceph_abort();
1545       }
1546       assert(!context->io_ctx.snap_lookup(snapname.c_str(), &snap));
1547
1548     } else {
1549       assert(!context->io_ctx.selfmanaged_snap_create(&snap));
1550     }
1551
1552     context->state_lock.Lock();
1553     context->add_snap(snap);
1554
1555     if (context->pool_snaps) {
1556       context->state_lock.Unlock();
1557     } else {
1558       vector<uint64_t> snapset(context->snaps.size());
1559
1560       int j = 0;
1561       for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
1562            i != context->snaps.rend();
1563            ++i, ++j) {
1564         snapset[j] = i->second;
1565       }
1566
1567       context->state_lock.Unlock();
1568
1569       int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
1570       if (r) {
1571         cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
1572         ceph_abort();
1573       }
1574     }
1575   }
1576
1577   string getType() override
1578   {
1579     return "SnapCreateOp";
1580   }
1581   bool must_quiesce_other_ops() override { return context->pool_snaps; }
1582 };
1583
1584 class SnapRemoveOp : public TestOp {
1585 public:
1586   int to_remove;
1587   SnapRemoveOp(int n, RadosTestContext *context,
1588                int snap,
1589                TestOpStat *stat = 0)
1590     : TestOp(n, context, stat),
1591       to_remove(snap)
1592   {}
1593
1594   void _begin() override
1595   {
1596     context->state_lock.Lock();
1597     uint64_t snap = context->snaps[to_remove];
1598     context->remove_snap(to_remove);
1599
1600     if (context->pool_snaps) {
1601       string snapname;
1602
1603       assert(!context->io_ctx.snap_get_name(snap, &snapname));
1604       assert(!context->io_ctx.snap_remove(snapname.c_str()));
1605      } else {
1606       assert(!context->io_ctx.selfmanaged_snap_remove(snap));
1607
1608       vector<uint64_t> snapset(context->snaps.size());
1609       int j = 0;
1610       for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
1611            i != context->snaps.rend();
1612            ++i, ++j) {
1613         snapset[j] = i->second;
1614       }
1615
1616       int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
1617       if (r) {
1618         cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
1619         ceph_abort();
1620       }
1621     }
1622     context->state_lock.Unlock();
1623   }
1624
1625   string getType() override
1626   {
1627     return "SnapRemoveOp";
1628   }
1629 };
1630
1631 class WatchOp : public TestOp {
1632   string oid;
1633 public:
1634   WatchOp(int n,
1635           RadosTestContext *context,
1636           const string &_oid,
1637           TestOpStat *stat = 0)
1638     : TestOp(n, context, stat),
1639       oid(_oid)
1640   {}
1641
1642   void _begin() override
1643   {
1644     context->state_lock.Lock();
1645     ObjectDesc contents;
1646     context->find_object(oid, &contents);
1647     if (contents.deleted()) {
1648       context->kick();
1649       context->state_lock.Unlock();
1650       return;
1651     }
1652     context->oid_in_use.insert(oid);
1653     context->oid_not_in_use.erase(oid);
1654
1655     TestWatchContext *ctx = context->get_watch_context(oid);
1656     context->state_lock.Unlock();
1657     int r;
1658     if (!ctx) {
1659       {
1660         Mutex::Locker l(context->state_lock);
1661         ctx = context->watch(oid);
1662       }
1663
1664       r = context->io_ctx.watch2(context->prefix+oid,
1665                                  &ctx->get_handle(),
1666                                  ctx);
1667     } else {
1668       r = context->io_ctx.unwatch2(ctx->get_handle());
1669       {
1670         Mutex::Locker l(context->state_lock);
1671         context->unwatch(oid);
1672       }
1673     }
1674
1675     if (r) {
1676       cerr << "r is " << r << std::endl;
1677       ceph_abort();
1678     }
1679
1680     {
1681       Mutex::Locker l(context->state_lock);
1682       context->oid_in_use.erase(oid);
1683       context->oid_not_in_use.insert(oid);
1684     }
1685   }
1686
1687   string getType() override
1688   {
1689     return "WatchOp";
1690   }
1691 };
1692
1693 class RollbackOp : public TestOp {
1694 public:
1695   string oid;
1696   int roll_back_to;
1697   librados::ObjectWriteOperation zero_write_op1;
1698   librados::ObjectWriteOperation zero_write_op2;
1699   librados::ObjectWriteOperation op;
1700   vector<librados::AioCompletion *> comps;
1701   ceph::shared_ptr<int> in_use;
1702   int last_finished;
1703   int outstanding;
1704
1705   RollbackOp(int n,
1706              RadosTestContext *context,
1707              const string &_oid,
1708              TestOpStat *stat = 0)
1709     : TestOp(n, context, stat),
1710       oid(_oid), roll_back_to(-1), 
1711       comps(3, NULL),
1712       last_finished(-1), outstanding(3)
1713   {}
1714
1715   void _begin() override
1716   {
1717     context->state_lock.Lock();
1718     if (context->get_watch_context(oid)) {
1719       context->kick();
1720       context->state_lock.Unlock();
1721       return;
1722     }
1723
1724     if (context->snaps.empty()) {
1725       context->kick();
1726       context->state_lock.Unlock();
1727       done = true;
1728       return;
1729     }
1730
1731     context->oid_in_use.insert(oid);
1732     context->oid_not_in_use.erase(oid);
1733
1734     roll_back_to = rand_choose(context->snaps)->first;
1735     in_use = context->snaps_in_use.lookup_or_create(
1736       roll_back_to,
1737       roll_back_to);
1738
1739
1740     cout << "rollback oid " << oid << " to " << roll_back_to << std::endl;
1741
1742     bool existed_before = context->object_existed_at(oid);
1743     bool existed_after = context->object_existed_at(oid, roll_back_to);
1744
1745     context->roll_back(oid, roll_back_to);
1746     uint64_t snap = context->snaps[roll_back_to];
1747
1748     outstanding -= (!existed_before) + (!existed_after);
1749
1750     context->state_lock.Unlock();
1751
1752     bufferlist bl, bl2;
1753     zero_write_op1.append(bl);
1754     zero_write_op2.append(bl2);
1755
1756     if (context->pool_snaps) {
1757       op.snap_rollback(snap);
1758     } else {
1759       op.selfmanaged_snap_rollback(snap);
1760     }
1761
1762     if (existed_before) {
1763       pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1764         new pair<TestOp*, TestOp::CallbackInfo*>(this,
1765                                                  new TestOp::CallbackInfo(0));
1766       comps[0] = 
1767         context->rados.aio_create_completion((void*) cb_arg, NULL,
1768                                              &write_callback);
1769       context->io_ctx.aio_operate(
1770         context->prefix+oid, comps[0], &zero_write_op1);
1771     }
1772     {
1773       pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1774         new pair<TestOp*, TestOp::CallbackInfo*>(this,
1775                                                  new TestOp::CallbackInfo(1));
1776       comps[1] =
1777         context->rados.aio_create_completion((void*) cb_arg, NULL,
1778                                              &write_callback);
1779       context->io_ctx.aio_operate(
1780         context->prefix+oid, comps[1], &op);
1781     }
1782     if (existed_after) {
1783       pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1784         new pair<TestOp*, TestOp::CallbackInfo*>(this,
1785                                                  new TestOp::CallbackInfo(2));
1786       comps[2] =
1787         context->rados.aio_create_completion((void*) cb_arg, NULL,
1788                                              &write_callback);
1789       context->io_ctx.aio_operate(
1790         context->prefix+oid, comps[2], &zero_write_op2);
1791     }
1792   }
1793
1794   void _finish(CallbackInfo *info) override
1795   {
1796     Mutex::Locker l(context->state_lock);
1797     uint64_t tid = info->id;
1798     cout << num << ":  finishing rollback tid " << tid
1799          << " to " << context->prefix + oid << std::endl;
1800     assert((int)(info->id) > last_finished);
1801     last_finished = info->id;
1802
1803     int r;
1804     if ((r = comps[last_finished]->get_return_value()) != 0) {
1805       cerr << "err " << r << std::endl;
1806       ceph_abort();
1807     }
1808     if (--outstanding == 0) {
1809       done = true;
1810       context->update_object_version(oid, comps[tid]->get_version64());
1811       context->oid_in_use.erase(oid);
1812       context->oid_not_in_use.insert(oid);
1813       in_use = ceph::shared_ptr<int>();
1814       context->kick();
1815     }
1816   }
1817
1818   bool finished() override
1819   {
1820     return done;
1821   }
1822
1823   string getType() override
1824   {
1825     return "RollBackOp";
1826   }
1827 };
1828
1829 class CopyFromOp : public TestOp {
1830 public:
1831   string oid, oid_src;
1832   ObjectDesc src_value;
1833   librados::ObjectWriteOperation op;
1834   librados::ObjectReadOperation rd_op;
1835   librados::AioCompletion *comp;
1836   librados::AioCompletion *comp_racing_read;
1837   ceph::shared_ptr<int> in_use;
1838   int snap;
1839   int done;
1840   uint64_t version;
1841   int r;
1842   CopyFromOp(int n,
1843              RadosTestContext *context,
1844              const string &oid,
1845              const string &oid_src,
1846              TestOpStat *stat)
1847     : TestOp(n, context, stat),
1848       oid(oid), oid_src(oid_src),
1849       comp(NULL), snap(-1), done(0), 
1850       version(0), r(0)
1851   {}
1852
1853   void _begin() override
1854   {
1855     ContDesc cont;
1856     {
1857       Mutex::Locker l(context->state_lock);
1858       cont = ContDesc(context->seq_num, context->current_snap,
1859                       context->seq_num, "");
1860       context->oid_in_use.insert(oid);
1861       context->oid_not_in_use.erase(oid);
1862       context->oid_in_use.insert(oid_src);
1863       context->oid_not_in_use.erase(oid_src);
1864
1865       // choose source snap
1866       if (0 && !(rand() % 4) && !context->snaps.empty()) {
1867         snap = rand_choose(context->snaps)->first;
1868         in_use = context->snaps_in_use.lookup_or_create(snap, snap);
1869       } else {
1870         snap = -1;
1871       }
1872       context->find_object(oid_src, &src_value, snap);
1873       if (!src_value.deleted())
1874         context->update_object_full(oid, src_value);
1875     }
1876
1877     string src = context->prefix+oid_src;
1878     op.copy_from(src.c_str(), context->io_ctx, src_value.version);
1879
1880     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1881       new pair<TestOp*, TestOp::CallbackInfo*>(this,
1882                                                new TestOp::CallbackInfo(0));
1883     comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
1884                                                 &write_callback);
1885     context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
1886
1887     // queue up a racing read, too.
1888     pair<TestOp*, TestOp::CallbackInfo*> *read_cb_arg =
1889       new pair<TestOp*, TestOp::CallbackInfo*>(this,
1890                                                new TestOp::CallbackInfo(1));
1891     comp_racing_read = context->rados.aio_create_completion((void*) read_cb_arg, NULL, &write_callback);
1892     rd_op.stat(NULL, NULL, NULL);
1893     context->io_ctx.aio_operate(context->prefix+oid, comp_racing_read, &rd_op,
1894                                 librados::OPERATION_ORDER_READS_WRITES,  // order wrt previous write/update
1895                                 NULL);
1896
1897   }
1898
1899   void _finish(CallbackInfo *info) override
1900   {
1901     Mutex::Locker l(context->state_lock);
1902
1903     // note that the read can (and atm will) come back before the
1904     // write reply, but will reflect the update and the versions will
1905     // match.
1906
1907     if (info->id == 0) {
1908       // copy_from
1909       assert(comp->is_complete());
1910       cout << num << ":  finishing copy_from to " << context->prefix + oid << std::endl;
1911       if ((r = comp->get_return_value())) {
1912         if (r == -ENOENT && src_value.deleted()) {
1913           cout << num << ":  got expected ENOENT (src dne)" << std::endl;
1914         } else {
1915           cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
1916                << r << std::endl;
1917           ceph_abort();
1918         }
1919       } else {
1920         assert(!version || comp->get_version64() == version);
1921         version = comp->get_version64();
1922         context->update_object_version(oid, comp->get_version64());
1923       }
1924     } else if (info->id == 1) {
1925       // racing read
1926       assert(comp_racing_read->is_complete());
1927       cout << num << ":  finishing copy_from racing read to " << context->prefix + oid << std::endl;
1928       if ((r = comp_racing_read->get_return_value())) {
1929         if (!(r == -ENOENT && src_value.deleted())) {
1930           cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
1931                << r << std::endl;
1932         }
1933       } else {
1934         assert(comp_racing_read->get_return_value() == 0);
1935         assert(!version || comp_racing_read->get_version64() == version);
1936         version = comp_racing_read->get_version64();
1937       }
1938     }
1939     if (++done == 2) {
1940       context->oid_in_use.erase(oid);
1941       context->oid_not_in_use.insert(oid);
1942       context->oid_in_use.erase(oid_src);
1943       context->oid_not_in_use.insert(oid_src);
1944       context->kick();
1945     }
1946   }
1947
1948   bool finished() override
1949   {
1950     return done == 2;
1951   }
1952
1953   string getType() override
1954   {
1955     return "CopyFromOp";
1956   }
1957 };
1958
1959 class SetRedirectOp : public TestOp {
1960 public:
1961   string oid, oid_tgt, tgt_pool_name;
1962   ObjectDesc src_value, tgt_value;
1963   librados::ObjectWriteOperation op;
1964   librados::ObjectReadOperation rd_op;
1965   librados::AioCompletion *comp;
1966   ceph::shared_ptr<int> in_use;
1967   int done;
1968   int r;
1969   SetRedirectOp(int n,
1970              RadosTestContext *context,
1971              const string &oid,
1972              const string &oid_tgt,
1973              const string &tgt_pool_name,
1974              TestOpStat *stat = 0)
1975     : TestOp(n, context, stat),
1976       oid(oid), oid_tgt(oid_tgt), tgt_pool_name(tgt_pool_name),
1977       comp(NULL), done(0), 
1978       r(0)
1979   {}
1980
1981   void _begin() override
1982   {
1983     Mutex::Locker l(context->state_lock);
1984     context->oid_in_use.insert(oid);
1985     context->oid_not_in_use.erase(oid);
1986     context->oid_redirect_in_use.insert(oid_tgt);
1987     context->oid_redirect_not_in_use.erase(oid_tgt);
1988
1989     context->find_object(oid, &src_value); 
1990     if(!context->redirect_objs[oid].empty()) {
1991       /* update target's user_version */
1992       rd_op.stat(NULL, NULL, NULL);
1993       comp = context->rados.aio_create_completion();
1994       context->io_ctx.aio_operate(context->prefix+oid_tgt, comp, &rd_op,
1995                             librados::OPERATION_ORDER_READS_WRITES,
1996                             NULL);
1997       comp->wait_for_safe();
1998       context->update_object_version(oid_tgt, comp->get_version64());
1999       comp->release();
2000
2001       /* unset redirect target */
2002       comp = context->rados.aio_create_completion();
2003       bool present = !src_value.deleted();
2004       context->remove_object(oid);
2005       op.remove();
2006       context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2007                                   librados::OPERATION_ORDER_READS_WRITES |
2008                                   librados::OPERATION_IGNORE_REDIRECT);
2009       comp->wait_for_safe();
2010       if ((r = comp->get_return_value())) {
2011         if (!(r == -ENOENT && !present)) {
2012           cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
2013           ceph_abort();
2014         }
2015       }
2016       comp->release();
2017
2018       context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]);
2019       context->oid_redirect_in_use.erase(context->redirect_objs[oid]);
2020
2021       /* copy_from oid_tgt --> oid */
2022       comp = context->rados.aio_create_completion();
2023       context->find_object(oid_tgt, &tgt_value);
2024       string src = context->prefix+oid_tgt;
2025       op.copy_from(src.c_str(), context->io_ctx, tgt_value.version);
2026       context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2027                                   librados::OPERATION_ORDER_READS_WRITES);
2028       comp->wait_for_safe();
2029       if ((r = comp->get_return_value())) {
2030         cerr << "Error: oid " << oid << " copy_from " << oid_tgt << " returned error code "
2031              << r << std::endl;
2032         ceph_abort();
2033       }
2034       context->update_object_full(oid, tgt_value);
2035       context->update_object_version(oid, comp->get_version64());
2036       comp->release();
2037     }
2038
2039     comp = context->rados.aio_create_completion();
2040     rd_op.stat(NULL, NULL, NULL);
2041     context->io_ctx.aio_operate(context->prefix+oid, comp, &rd_op,
2042                           librados::OPERATION_ORDER_READS_WRITES |
2043                           librados::OPERATION_IGNORE_REDIRECT,
2044                           NULL);
2045     comp->wait_for_safe();
2046     if ((r = comp->get_return_value()) && !src_value.deleted()) {
2047       cerr << "Error: oid " << oid << " stat returned error code "
2048            << r << std::endl;
2049       ceph_abort();
2050     }
2051     context->update_object_version(oid, comp->get_version64());
2052     comp->release();
2053
2054     context->find_object(oid, &src_value); 
2055     context->find_object(oid_tgt, &tgt_value);
2056
2057     if (!src_value.deleted() && !tgt_value.deleted())
2058       context->update_object_full(oid, tgt_value);
2059
2060     if (src_value.version != 0 && !src_value.deleted())
2061       op.assert_version(src_value.version);
2062     op.set_redirect(context->prefix+oid_tgt, context->io_ctx, tgt_value.version);
2063
2064     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2065       new pair<TestOp*, TestOp::CallbackInfo*>(this,
2066                                                new TestOp::CallbackInfo(0));
2067     comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
2068                                                 &write_callback);
2069     context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2070                                 librados::OPERATION_ORDER_READS_WRITES);
2071   }
2072
2073   void _finish(CallbackInfo *info) override
2074   {
2075     Mutex::Locker l(context->state_lock);
2076
2077     if (info->id == 0) {
2078       assert(comp->is_complete());
2079       cout << num << ":  finishing set_redirect to oid " << oid << std::endl;
2080       if ((r = comp->get_return_value())) {
2081         if (r == -ENOENT && src_value.deleted()) {
2082           cout << num << ":  got expected ENOENT (src dne)" << std::endl;
2083         } else {
2084           cerr << "Error: oid " << oid << " set_redirect " << oid_tgt << " returned error code "
2085                << r << std::endl;
2086           ceph_abort();
2087         }
2088       } else {
2089         context->update_object_redirect_target(oid, oid_tgt);
2090         context->update_object_version(oid, comp->get_version64());
2091       }
2092     }
2093
2094     if (++done == 1) {
2095       context->oid_in_use.erase(oid);
2096       context->oid_not_in_use.insert(oid);
2097       context->kick();
2098     }
2099   }
2100
2101   bool finished() override
2102   {
2103     return done == 1;
2104   }
2105
2106   string getType() override
2107   {
2108     return "SetRedirectOp";
2109   }
2110 };
2111
2112 class UnsetRedirectOp : public TestOp {
2113 public:
2114   string oid;
2115   librados::ObjectWriteOperation op;
2116   librados::AioCompletion *completion;
2117   librados::AioCompletion *comp;
2118
2119   UnsetRedirectOp(int n,
2120            RadosTestContext *context,
2121            const string &oid,
2122            TestOpStat *stat = 0)
2123     : TestOp(n, context, stat), oid(oid)
2124   {}
2125
2126   void _begin() override
2127   {
2128     context->state_lock.Lock();
2129     if (context->get_watch_context(oid)) {
2130       context->kick();
2131       context->state_lock.Unlock();
2132       return;
2133     }
2134
2135     ObjectDesc contents;
2136     context->find_object(oid, &contents);
2137     bool present = !contents.deleted();
2138
2139     context->oid_in_use.insert(oid);
2140     context->oid_not_in_use.erase(oid);
2141     context->seq_num++;
2142
2143     context->remove_object(oid);
2144
2145     context->state_lock.Unlock();
2146
2147     comp = context->rados.aio_create_completion();
2148     op.remove();
2149     context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2150                                 librados::OPERATION_ORDER_READS_WRITES |
2151                                 librados::OPERATION_IGNORE_REDIRECT);
2152     comp->wait_for_safe();
2153     int r = comp->get_return_value();
2154     if (r && !(r == -ENOENT && !present)) {
2155       cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
2156       ceph_abort();
2157     }
2158
2159     context->state_lock.Lock();
2160     context->oid_in_use.erase(oid);
2161     context->oid_not_in_use.insert(oid);
2162     if(!context->redirect_objs[oid].empty()) {
2163       context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]);
2164       context->oid_redirect_in_use.erase(context->redirect_objs[oid]);
2165       context->update_object_redirect_target(oid, string());
2166     }
2167     context->kick();
2168     context->state_lock.Unlock();
2169   }
2170
2171   string getType() override
2172   {
2173     return "UnsetRedirectOp";
2174   }
2175 };
2176
2177 class HitSetListOp : public TestOp {
2178   librados::AioCompletion *comp1, *comp2;
2179   uint32_t hash;
2180   std::list< std::pair<time_t, time_t> > ls;
2181   bufferlist bl;
2182
2183 public:
2184   HitSetListOp(int n,
2185                RadosTestContext *context,
2186                uint32_t hash,
2187                TestOpStat *stat = 0)
2188     : TestOp(n, context, stat),
2189       comp1(NULL), comp2(NULL),
2190       hash(hash)
2191   {}
2192
2193   void _begin() override
2194   {
2195     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2196       new pair<TestOp*, TestOp::CallbackInfo*>(this,
2197                                                new TestOp::CallbackInfo(0));
2198     comp1 = context->rados.aio_create_completion((void*) cb_arg, NULL,
2199                                                  &write_callback);
2200     int r = context->io_ctx.hit_set_list(hash, comp1, &ls);
2201     assert(r == 0);
2202   }
2203
2204   void _finish(CallbackInfo *info) override {
2205     Mutex::Locker l(context->state_lock);
2206     if (!comp2) {
2207       if (ls.empty()) {
2208         cerr << num << ": no hitsets" << std::endl;
2209         done = true;
2210       } else {
2211         cerr << num << ": hitsets are " << ls << std::endl;
2212         int r = rand() % ls.size();
2213         std::list<pair<time_t,time_t> >::iterator p = ls.begin();
2214         while (r--)
2215           ++p;
2216         pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2217           new pair<TestOp*, TestOp::CallbackInfo*>(this,
2218                                                    new TestOp::CallbackInfo(0));
2219         comp2 = context->rados.aio_create_completion((void*) cb_arg, NULL,
2220                                                      &write_callback);
2221         r = context->io_ctx.hit_set_get(hash, comp2, p->second, &bl);
2222         assert(r == 0);
2223       }
2224     } else {
2225       int r = comp2->get_return_value();
2226       if (r == 0) {
2227         HitSet hitset;
2228         bufferlist::iterator p = bl.begin();
2229         ::decode(hitset, p);
2230         cout << num << ": got hitset of type " << hitset.get_type_name()
2231              << " size " << bl.length()
2232              << std::endl;
2233       } else {
2234         // FIXME: we could verify that we did in fact race with a trim...
2235         assert(r == -ENOENT);
2236       }
2237       done = true;
2238     }
2239
2240     context->kick();
2241   }
2242
2243   bool finished() override {
2244     return done;
2245   }
2246
2247   string getType() override {
2248     return "HitSetListOp";
2249   }
2250 };
2251
2252 class UndirtyOp : public TestOp {
2253 public:
2254   librados::AioCompletion *completion;
2255   librados::ObjectWriteOperation op;
2256   string oid;
2257
2258   UndirtyOp(int n,
2259             RadosTestContext *context,
2260             const string &oid,
2261             TestOpStat *stat = 0)
2262     : TestOp(n, context, stat),
2263       completion(NULL),
2264       oid(oid)
2265   {}
2266
2267   void _begin() override
2268   {
2269     context->state_lock.Lock();
2270     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2271       new pair<TestOp*, TestOp::CallbackInfo*>(this,
2272                                                new TestOp::CallbackInfo(0));
2273     completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
2274                                                       &write_callback);
2275
2276     context->oid_in_use.insert(oid);
2277     context->oid_not_in_use.erase(oid);
2278     context->update_object_undirty(oid);
2279     context->state_lock.Unlock();
2280
2281     op.undirty();
2282     int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2283                                         &op, 0);
2284     assert(!r);
2285   }
2286
2287   void _finish(CallbackInfo *info) override
2288   {
2289     context->state_lock.Lock();
2290     assert(!done);
2291     assert(completion->is_complete());
2292     context->oid_in_use.erase(oid);
2293     context->oid_not_in_use.insert(oid);
2294     context->update_object_version(oid, completion->get_version64());
2295     context->kick();
2296     done = true;
2297     context->state_lock.Unlock();
2298   }
2299
2300   bool finished() override
2301   {
2302     return done;
2303   }
2304
2305   string getType() override
2306   {
2307     return "UndirtyOp";
2308   }
2309 };
2310
2311 class IsDirtyOp : public TestOp {
2312 public:
2313   librados::AioCompletion *completion;
2314   librados::ObjectReadOperation op;
2315   string oid;
2316   bool dirty;
2317   ObjectDesc old_value;
2318   int snap;
2319   ceph::shared_ptr<int> in_use;
2320
2321   IsDirtyOp(int n,
2322             RadosTestContext *context,
2323             const string &oid,
2324             TestOpStat *stat = 0)
2325     : TestOp(n, context, stat),
2326       completion(NULL),
2327       oid(oid),
2328       dirty(false)
2329   {}
2330
2331   void _begin() override
2332   {
2333     context->state_lock.Lock();
2334
2335     if (!(rand() % 4) && !context->snaps.empty()) {
2336       snap = rand_choose(context->snaps)->first;
2337       in_use = context->snaps_in_use.lookup_or_create(snap, snap);
2338     } else {
2339       snap = -1;
2340     }
2341     std::cout << num << ": is_dirty oid " << oid << " snap " << snap
2342               << std::endl;
2343
2344     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2345       new pair<TestOp*, TestOp::CallbackInfo*>(this,
2346                                                new TestOp::CallbackInfo(0));
2347     completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
2348                                                       &write_callback);
2349
2350     context->oid_in_use.insert(oid);
2351     context->oid_not_in_use.erase(oid);
2352     context->state_lock.Unlock();
2353
2354     if (snap >= 0) {
2355       context->io_ctx.snap_set_read(context->snaps[snap]);
2356     }
2357
2358     op.is_dirty(&dirty, NULL);
2359     int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2360                                         &op, 0);
2361     assert(!r);
2362
2363     if (snap >= 0) {
2364       context->io_ctx.snap_set_read(0);
2365     }
2366   }
2367
2368   void _finish(CallbackInfo *info) override
2369   {
2370     context->state_lock.Lock();
2371     assert(!done);
2372     assert(completion->is_complete());
2373     context->oid_in_use.erase(oid);
2374     context->oid_not_in_use.insert(oid);
2375
2376     assert(context->find_object(oid, &old_value, snap));
2377
2378     int r = completion->get_return_value();
2379     if (r == 0) {
2380       cout << num << ":  " << (dirty ? "dirty" : "clean") << std::endl;
2381       assert(!old_value.deleted());
2382       assert(dirty == old_value.dirty);
2383     } else {
2384       cout << num << ":  got " << r << std::endl;
2385       assert(r == -ENOENT);
2386       assert(old_value.deleted());
2387     }
2388     context->kick();
2389     done = true;
2390     context->state_lock.Unlock();
2391   }
2392
2393   bool finished() override
2394   {
2395     return done;
2396   }
2397
2398   string getType() override
2399   {
2400     return "IsDirtyOp";
2401   }
2402 };
2403
2404
2405
2406 class CacheFlushOp : public TestOp {
2407 public:
2408   librados::AioCompletion *completion;
2409   librados::ObjectReadOperation op;
2410   string oid;
2411   bool blocking;
2412   int snap;
2413   bool can_fail;
2414   ceph::shared_ptr<int> in_use;
2415
2416   CacheFlushOp(int n,
2417                RadosTestContext *context,
2418                const string &oid,
2419                TestOpStat *stat,
2420                bool b)
2421     : TestOp(n, context, stat),
2422       completion(NULL),
2423       oid(oid),
2424       blocking(b),
2425       snap(0),
2426       can_fail(false)
2427   {}
2428
2429   void _begin() override
2430   {
2431     context->state_lock.Lock();
2432
2433     if (!(rand() % 4) && !context->snaps.empty()) {
2434       snap = rand_choose(context->snaps)->first;
2435       in_use = context->snaps_in_use.lookup_or_create(snap, snap);
2436     } else {
2437       snap = -1;
2438     }
2439     // not being particularly specific here about knowing which
2440     // flushes are on the oldest clean snap and which ones are not.
2441     can_fail = !blocking || !context->snaps.empty();
2442     // FIXME: we could fail if we've ever removed a snap due to
2443     // the async snap trimming.
2444     can_fail = true;
2445     cout << num << ": " << (blocking ? "cache_flush" : "cache_try_flush")
2446          << " oid " << oid << " snap " << snap << std::endl;
2447
2448     if (snap >= 0) {
2449       context->io_ctx.snap_set_read(context->snaps[snap]);
2450     }
2451
2452     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2453       new pair<TestOp*, TestOp::CallbackInfo*>(this,
2454                                                new TestOp::CallbackInfo(0));
2455     completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
2456                                                       &write_callback);
2457     context->oid_flushing.insert(oid);
2458     context->oid_not_flushing.erase(oid);
2459     context->state_lock.Unlock();
2460
2461     unsigned flags = librados::OPERATION_IGNORE_CACHE;
2462     if (blocking) {
2463       op.cache_flush();
2464     } else {
2465       op.cache_try_flush();
2466       flags = librados::OPERATION_SKIPRWLOCKS;
2467     }
2468     int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2469                                         &op, flags, NULL);
2470     assert(!r);
2471
2472     if (snap >= 0) {
2473       context->io_ctx.snap_set_read(0);
2474     }
2475   }
2476
2477   void _finish(CallbackInfo *info) override
2478   {
2479     context->state_lock.Lock();
2480     assert(!done);
2481     assert(completion->is_complete());
2482     context->oid_flushing.erase(oid);
2483     context->oid_not_flushing.insert(oid);
2484     int r = completion->get_return_value();
2485     cout << num << ":  got " << cpp_strerror(r) << std::endl;
2486     if (r == 0) {
2487       context->update_object_version(oid, 0, snap);
2488     } else if (r == -EBUSY) {
2489       assert(can_fail);
2490     } else if (r == -EINVAL) {
2491       // caching not enabled?
2492     } else if (r == -ENOENT) {
2493       // may have raced with a remove?
2494     } else {
2495       assert(0 == "shouldn't happen");
2496     }
2497     context->kick();
2498     done = true;
2499     context->state_lock.Unlock();
2500   }
2501
2502   bool finished() override
2503   {
2504     return done;
2505   }
2506
2507   string getType() override
2508   {
2509     return "CacheFlushOp";
2510   }
2511 };
2512
2513 class CacheEvictOp : public TestOp {
2514 public:
2515   librados::AioCompletion *completion;
2516   librados::ObjectReadOperation op;
2517   string oid;
2518   ceph::shared_ptr<int> in_use;
2519
2520   CacheEvictOp(int n,
2521                RadosTestContext *context,
2522                const string &oid,
2523                TestOpStat *stat)
2524     : TestOp(n, context, stat),
2525       completion(NULL),
2526       oid(oid)
2527   {}
2528
2529   void _begin() override
2530   {
2531     context->state_lock.Lock();
2532
2533     int snap;
2534     if (!(rand() % 4) && !context->snaps.empty()) {
2535       snap = rand_choose(context->snaps)->first;
2536       in_use = context->snaps_in_use.lookup_or_create(snap, snap);
2537     } else {
2538       snap = -1;
2539     }
2540     cout << num << ": cache_evict oid " << oid << " snap " << snap << std::endl;
2541
2542     if (snap >= 0) {
2543       context->io_ctx.snap_set_read(context->snaps[snap]);
2544     }
2545
2546     pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2547       new pair<TestOp*, TestOp::CallbackInfo*>(this,
2548                                                new TestOp::CallbackInfo(0));
2549     completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
2550                                                       &write_callback);
2551     context->state_lock.Unlock();
2552
2553     op.cache_evict();
2554     int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2555                                         &op, librados::OPERATION_IGNORE_CACHE,
2556                                         NULL);
2557     assert(!r);
2558
2559     if (snap >= 0) {
2560       context->io_ctx.snap_set_read(0);
2561     }
2562   }
2563
2564   void _finish(CallbackInfo *info) override
2565   {
2566     context->state_lock.Lock();
2567     assert(!done);
2568     assert(completion->is_complete());
2569
2570     int r = completion->get_return_value();
2571     cout << num << ":  got " << cpp_strerror(r) << std::endl;
2572     if (r == 0) {
2573       // yay!
2574     } else if (r == -EBUSY) {
2575       // raced with something that dirtied the object
2576     } else if (r == -EINVAL) {
2577       // caching not enabled?
2578     } else if (r == -ENOENT) {
2579       // may have raced with a remove?
2580     } else {
2581       assert(0 == "shouldn't happen");
2582     }
2583     context->kick();
2584     done = true;
2585     context->state_lock.Unlock();
2586   }
2587
2588   bool finished() override
2589   {
2590     return done;
2591   }
2592
2593   string getType() override
2594   {
2595     return "CacheEvictOp";
2596   }
2597 };
2598
2599
2600 #endif