initial code repo
[stor4nfv.git] / src / ceph / src / test / osd / RadosModel.h
diff --git a/src/ceph/src/test/osd/RadosModel.h b/src/ceph/src/test/osd/RadosModel.h
new file mode 100644 (file)
index 0000000..0de5e35
--- /dev/null
@@ -0,0 +1,2600 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+#include "include/int_types.h"
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "include/rados/librados.hpp"
+
+#include <iostream>
+#include <sstream>
+#include <map>
+#include <set>
+#include <list>
+#include <string>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <time.h>
+#include "Object.h"
+#include "TestOpStat.h"
+#include "test/librados/test.h"
+#include "include/memory.h"
+#include "common/sharedptr_registry.hpp"
+#include "common/errno.h"
+#include "osd/HitSet.h"
+
+#ifndef RADOSMODEL_H
+#define RADOSMODEL_H
+
+using namespace std;
+
+class RadosTestContext;
+class TestOpStat;
+
+template <typename T>
+typename T::iterator rand_choose(T &cont) {
+  if (cont.size() == 0) {
+    return cont.end();
+  }
+  int index = rand() % cont.size();
+  typename T::iterator retval = cont.begin();
+
+  for (; index > 0; --index) ++retval;
+  return retval;
+}
+
+enum TestOpType {
+  TEST_OP_READ,
+  TEST_OP_WRITE,
+  TEST_OP_WRITE_EXCL,
+  TEST_OP_WRITESAME,
+  TEST_OP_DELETE,
+  TEST_OP_SNAP_CREATE,
+  TEST_OP_SNAP_REMOVE,
+  TEST_OP_ROLLBACK,
+  TEST_OP_SETATTR,
+  TEST_OP_RMATTR,
+  TEST_OP_WATCH,
+  TEST_OP_COPY_FROM,
+  TEST_OP_HIT_SET_LIST,
+  TEST_OP_UNDIRTY,
+  TEST_OP_IS_DIRTY,
+  TEST_OP_CACHE_FLUSH,
+  TEST_OP_CACHE_TRY_FLUSH,
+  TEST_OP_CACHE_EVICT,
+  TEST_OP_APPEND,
+  TEST_OP_APPEND_EXCL,
+  TEST_OP_SET_REDIRECT,
+  TEST_OP_UNSET_REDIRECT
+};
+
+class TestWatchContext : public librados::WatchCtx2 {
+  TestWatchContext(const TestWatchContext&);
+public:
+  Cond cond;
+  uint64_t handle;
+  bool waiting;
+  Mutex lock;
+  TestWatchContext() : handle(0), waiting(false),
+                      lock("watch lock") {}
+  void handle_notify(uint64_t notify_id, uint64_t cookie,
+                    uint64_t notifier_id,
+                    bufferlist &bl) override {
+    Mutex::Locker l(lock);
+    waiting = false;
+    cond.SignalAll();
+  }
+  void handle_error(uint64_t cookie, int err) override {
+    Mutex::Locker l(lock);
+    cout << "watch handle_error " << err << std::endl;
+  }
+  void start() {
+    Mutex::Locker l(lock);
+    waiting = true;
+  }
+  void wait() {
+    Mutex::Locker l(lock);
+    while (waiting)
+      cond.Wait(lock);
+  }
+  uint64_t &get_handle() {
+    return handle;
+  }
+};
+
+class TestOp {
+public:
+  int num;
+  RadosTestContext *context;
+  TestOpStat *stat;
+  bool done;
+  TestOp(int n, RadosTestContext *context,
+        TestOpStat *stat = 0)
+    : num(n),
+      context(context),
+      stat(stat),
+      done(false)
+  {}
+
+  virtual ~TestOp() {};
+
+  /**
+   * This struct holds data to be passed by a callback
+   * to a TestOp::finish method.
+   */
+  struct CallbackInfo {
+    uint64_t id;
+    explicit CallbackInfo(uint64_t id) : id(id) {}
+    virtual ~CallbackInfo() {};
+  };
+
+  virtual void _begin() = 0;
+
+  /**
+   * Called when the operation completes.
+   * This should be overridden by asynchronous operations.
+   *
+   * @param info information stored by a callback, or NULL -
+   *             useful for multi-operation TestOps
+   */
+  virtual void _finish(CallbackInfo *info)
+  {
+    return;
+  }
+  virtual string getType() = 0;
+  virtual bool finished()
+  {
+    return true;
+  }
+
+  void begin();
+  void finish(CallbackInfo *info);
+  virtual bool must_quiesce_other_ops() { return false; }
+};
+
+class TestOpGenerator {
+public:
+  virtual ~TestOpGenerator() {};
+  virtual TestOp *next(RadosTestContext &context) = 0;
+};
+
+class RadosTestContext {
+public:
+  Mutex state_lock;
+  Cond wait_cond;
+  map<int, map<string,ObjectDesc> > pool_obj_cont;
+  set<string> oid_in_use;
+  set<string> oid_not_in_use;
+  set<string> oid_flushing;
+  set<string> oid_not_flushing;
+  set<string> oid_redirect_not_in_use;
+  set<string> oid_redirect_in_use;
+  SharedPtrRegistry<int, int> snaps_in_use;
+  int current_snap;
+  string pool_name;
+  librados::IoCtx io_ctx;
+  librados::Rados rados;
+  int next_oid;
+  string prefix;
+  int errors;
+  int max_in_flight;
+  int seq_num;
+  map<int,uint64_t> snaps;
+  uint64_t seq;
+  const char *rados_id;
+  bool initialized;
+  map<string, TestWatchContext*> watches;
+  const uint64_t max_size;
+  const uint64_t min_stride_size;
+  const uint64_t max_stride_size;
+  AttrGenerator attr_gen;
+  const bool no_omap;
+  const bool no_sparse;
+  bool pool_snaps;
+  bool write_fadvise_dontneed;
+  int snapname_num;
+  map<string,string > redirect_objs;
+
+  RadosTestContext(const string &pool_name, 
+                  int max_in_flight,
+                  uint64_t max_size,
+                  uint64_t min_stride_size,
+                  uint64_t max_stride_size,
+                  bool no_omap,
+                  bool no_sparse,
+                  bool pool_snaps,
+                  bool write_fadvise_dontneed,
+                  const char *id = 0) :
+    state_lock("Context Lock"),
+    pool_obj_cont(),
+    current_snap(0),
+    pool_name(pool_name),
+    next_oid(0),
+    errors(0),
+    max_in_flight(max_in_flight),
+    seq_num(0), seq(0),
+    rados_id(id), initialized(false),
+    max_size(max_size), 
+    min_stride_size(min_stride_size), max_stride_size(max_stride_size),
+    attr_gen(2000, 20000),
+    no_omap(no_omap),
+    no_sparse(no_sparse),
+    pool_snaps(pool_snaps),
+    write_fadvise_dontneed(write_fadvise_dontneed),
+    snapname_num(0)
+  {
+  }
+
+  int init()
+  {
+    int r = rados.init(rados_id);
+    if (r < 0)
+      return r;
+    r = rados.conf_read_file(NULL);
+    if (r < 0)
+      return r;
+    r = rados.conf_parse_env(NULL);
+    if (r < 0)
+      return r;
+    r = rados.connect();
+    if (r < 0)
+      return r;
+    r = rados.ioctx_create(pool_name.c_str(), io_ctx);
+    if (r < 0) {
+      rados.shutdown();
+      return r;
+    }
+    bufferlist inbl;
+    r = rados.mon_command(
+      "{\"prefix\": \"osd pool set\", \"pool\": \"" + pool_name +
+      "\", \"var\": \"write_fadvise_dontneed\", \"val\": \"" + (write_fadvise_dontneed ? "true" : "false") + "\"}",
+      inbl, NULL, NULL);
+    if (r < 0) {
+      rados.shutdown();
+      return r;
+    }
+    char hostname_cstr[100];
+    gethostname(hostname_cstr, 100);
+    stringstream hostpid;
+    hostpid << hostname_cstr << getpid() << "-";
+    prefix = hostpid.str();
+    assert(!initialized);
+    initialized = true;
+    return 0;
+  }
+
+  void shutdown()
+  {
+    if (initialized) {
+      rados.shutdown();
+    }
+  }
+
+  void loop(TestOpGenerator *gen)
+  {
+    assert(initialized);
+    list<TestOp*> inflight;
+    state_lock.Lock();
+
+    TestOp *next = gen->next(*this);
+    TestOp *waiting = NULL;
+
+    while (next || !inflight.empty()) {
+      if (next && next->must_quiesce_other_ops() && !inflight.empty()) {
+       waiting = next;
+       next = NULL;   // Force to wait for inflight to drain
+      }
+      if (next) {
+       inflight.push_back(next);
+      }
+      state_lock.Unlock();
+      if (next) {
+       (*inflight.rbegin())->begin();
+      }
+      state_lock.Lock();
+      while (1) {
+       for (list<TestOp*>::iterator i = inflight.begin();
+            i != inflight.end();) {
+         if ((*i)->finished()) {
+           cout << (*i)->num << ": done (" << (inflight.size()-1) << " left)" << std::endl;
+           delete *i;
+           inflight.erase(i++);
+         } else {
+           ++i;
+         }
+       }
+       
+       if (inflight.size() >= (unsigned) max_in_flight || (!next && !inflight.empty())) {
+         cout << " waiting on " << inflight.size() << std::endl;
+         wait();
+       } else {
+         break;
+       }
+      }
+      if (waiting) {
+       next = waiting;
+       waiting = NULL;
+      } else {
+       next = gen->next(*this);
+      }
+    }
+    state_lock.Unlock();
+  }
+
+  void wait()
+  {
+    wait_cond.Wait(state_lock);
+  }
+
+  void kick()
+  {
+    wait_cond.Signal();
+  }
+
+  TestWatchContext *get_watch_context(const string &oid) {
+    return watches.count(oid) ? watches[oid] : 0;
+  }
+
+  TestWatchContext *watch(const string &oid) {
+    assert(!watches.count(oid));
+    return (watches[oid] = new TestWatchContext);
+  }
+
+  void unwatch(const string &oid) {
+    assert(watches.count(oid));
+    delete watches[oid];
+    watches.erase(oid);
+  }
+
+  ObjectDesc get_most_recent(const string &oid) {
+    ObjectDesc new_obj;
+    for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
+          pool_obj_cont.rbegin();
+        i != pool_obj_cont.rend();
+        ++i) {
+      map<string,ObjectDesc>::iterator j = i->second.find(oid);
+      if (j != i->second.end()) {
+       new_obj = j->second;
+       break;
+      }
+    }
+    return new_obj;
+  }
+
+  void rm_object_attrs(const string &oid, const set<string> &attrs)
+  {
+    ObjectDesc new_obj = get_most_recent(oid);
+    for (set<string>::const_iterator i = attrs.begin();
+        i != attrs.end();
+        ++i) {
+      new_obj.attrs.erase(*i);
+    }
+    new_obj.dirty = true;
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
+  }
+
+  void remove_object_header(const string &oid)
+  {
+    ObjectDesc new_obj = get_most_recent(oid);
+    new_obj.header = bufferlist();
+    new_obj.dirty = true;
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
+  }
+
+
+  void update_object_header(const string &oid, const bufferlist &bl)
+  {
+    ObjectDesc new_obj = get_most_recent(oid);
+    new_obj.header = bl;
+    new_obj.exists = true;
+    new_obj.dirty = true;
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
+  }
+
+  void update_object_attrs(const string &oid, const map<string, ContDesc> &attrs)
+  {
+    ObjectDesc new_obj = get_most_recent(oid);
+    for (map<string, ContDesc>::const_iterator i = attrs.begin();
+        i != attrs.end();
+        ++i) {
+      new_obj.attrs[i->first] = i->second;
+    }
+    new_obj.exists = true;
+    new_obj.dirty = true;
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
+  }
+
+  void update_object(ContentsGenerator *cont_gen,
+                    const string &oid, const ContDesc &contents)
+  {
+    ObjectDesc new_obj = get_most_recent(oid);
+    new_obj.exists = true;
+    new_obj.dirty = true;
+    new_obj.update(cont_gen,
+                  contents);
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
+  }
+
+  void update_object_full(const string &oid, const ObjectDesc &contents)
+  {
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, contents));
+    pool_obj_cont[current_snap][oid].dirty = true;
+  }
+
+  void update_object_undirty(const string &oid)
+  {
+    ObjectDesc new_obj = get_most_recent(oid);
+    new_obj.dirty = false;
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
+  }
+
+  void update_object_version(const string &oid, uint64_t version,
+                            int snap = -1)
+  {
+    for (map<int, map<string,ObjectDesc> >::reverse_iterator i = 
+          pool_obj_cont.rbegin();
+        i != pool_obj_cont.rend();
+        ++i) {
+      if (snap != -1 && snap < i->first)
+       continue;
+      map<string,ObjectDesc>::iterator j = i->second.find(oid);
+      if (j != i->second.end()) {
+       if (version)
+         j->second.version = version;
+       cout << __func__ << " oid " << oid
+            << " v " << version << " " << j->second.most_recent()
+            << " " << (j->second.dirty ? "dirty" : "clean")
+            << " " << (j->second.exists ? "exists" : "dne")
+            << std::endl;
+       break;
+      }
+    }
+  }
+
+  void remove_object(const string &oid)
+  {
+    assert(!get_watch_context(oid));
+    ObjectDesc new_obj;
+    pool_obj_cont[current_snap].erase(oid);
+    pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj));
+  }
+
+  bool find_object(const string &oid, ObjectDesc *contents, int snap = -1) const
+  {
+    for (map<int, map<string,ObjectDesc> >::const_reverse_iterator i = 
+          pool_obj_cont.rbegin();
+        i != pool_obj_cont.rend();
+        ++i) {
+      if (snap != -1 && snap < i->first) continue;
+      if (i->second.count(oid) != 0) {
+       *contents = i->second.find(oid)->second;
+       return true;
+      }
+    }
+    return false;
+  }
+
+  void update_object_redirect_target(const string &oid, const string &target)
+  {
+    redirect_objs[oid] = target;
+  }
+
+  bool object_existed_at(const string &oid, int snap = -1) const
+  {
+    ObjectDesc contents;
+    bool found = find_object(oid, &contents, snap);
+    return found && contents.exists;
+  }
+
+  void remove_snap(int snap)
+  {
+    map<int, map<string,ObjectDesc> >::iterator next_iter = pool_obj_cont.find(snap);
+    assert(next_iter != pool_obj_cont.end());
+    map<int, map<string,ObjectDesc> >::iterator current_iter = next_iter++;
+    assert(current_iter != pool_obj_cont.end());
+    map<string,ObjectDesc> &current = current_iter->second;
+    map<string,ObjectDesc> &next = next_iter->second;
+    for (map<string,ObjectDesc>::iterator i = current.begin();
+        i != current.end();
+        ++i) {
+      if (next.count(i->first) == 0) {
+       next.insert(pair<string,ObjectDesc>(i->first, i->second));
+      }
+    }
+    pool_obj_cont.erase(current_iter);
+    snaps.erase(snap);
+  }
+
+  void add_snap(uint64_t snap)
+  {
+    snaps[current_snap] = snap;
+    current_snap++;
+    pool_obj_cont[current_snap];
+    seq = snap;
+  }
+
+  void roll_back(const string &oid, int snap)
+  {
+    assert(!get_watch_context(oid));
+    ObjectDesc contents;
+    find_object(oid, &contents, snap);
+    contents.dirty = true;
+    pool_obj_cont.rbegin()->second.erase(oid);
+    pool_obj_cont.rbegin()->second.insert(pair<string,ObjectDesc>(oid, contents));
+  }
+};
+
+void read_callback(librados::completion_t comp, void *arg);
+void write_callback(librados::completion_t comp, void *arg);
+
+class RemoveAttrsOp : public TestOp {
+public:
+  string oid;
+  librados::ObjectWriteOperation op;
+  librados::AioCompletion *comp;
+  RemoveAttrsOp(int n, RadosTestContext *context,
+              const string &oid,
+              TestOpStat *stat)
+    : TestOp(n, context, stat), oid(oid), comp(NULL)
+  {}
+
+  void _begin() override
+  {
+    ContDesc cont;
+    set<string> to_remove;
+    {
+      Mutex::Locker l(context->state_lock);
+      ObjectDesc obj;
+      if (!context->find_object(oid, &obj)) {
+       context->kick();
+       done = true;
+       return;
+      }
+      cont = ContDesc(context->seq_num, context->current_snap,
+                     context->seq_num, "");
+      context->oid_in_use.insert(oid);
+      context->oid_not_in_use.erase(oid);
+
+      if (rand() % 30) {
+       ContentsGenerator::iterator iter = context->attr_gen.get_iterator(cont);
+       for (map<string, ContDesc>::iterator i = obj.attrs.begin();
+            i != obj.attrs.end();
+            ++i, ++iter) {
+         if (!(*iter % 3)) {
+           to_remove.insert(i->first);
+           op.rmxattr(i->first.c_str());
+         }
+       }
+       if (to_remove.empty()) {
+         context->kick();
+         context->oid_in_use.erase(oid);
+         context->oid_not_in_use.insert(oid);
+         done = true;
+         return;
+       }
+       if (!context->no_omap) {
+         op.omap_rm_keys(to_remove);
+       }
+      } else {
+       if (!context->no_omap) {
+         op.omap_clear();
+       }
+       for (map<string, ContDesc>::iterator i = obj.attrs.begin();
+            i != obj.attrs.end();
+            ++i) {
+         op.rmxattr(i->first.c_str());
+         to_remove.insert(i->first);
+       }
+       context->remove_object_header(oid);
+      }
+      context->rm_object_attrs(oid, to_remove);
+    }
+
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                               &write_callback);
+    context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    Mutex::Locker l(context->state_lock);
+    done = true;
+    context->update_object_version(oid, comp->get_version64());
+    context->oid_in_use.erase(oid);
+    context->oid_not_in_use.insert(oid);
+    context->kick();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "RemoveAttrsOp";
+  }
+};
+
+class SetAttrsOp : public TestOp {
+public:
+  string oid;
+  librados::ObjectWriteOperation op;
+  librados::AioCompletion *comp;
+  SetAttrsOp(int n,
+            RadosTestContext *context,
+            const string &oid,
+            TestOpStat *stat)
+    : TestOp(n, context, stat),
+      oid(oid), comp(NULL)
+  {}
+
+  void _begin() override
+  {
+    ContDesc cont;
+    {
+      Mutex::Locker l(context->state_lock);
+      cont = ContDesc(context->seq_num, context->current_snap,
+                     context->seq_num, "");
+      context->oid_in_use.insert(oid);
+      context->oid_not_in_use.erase(oid);
+    }
+
+    map<string, bufferlist> omap_contents;
+    map<string, ContDesc> omap;
+    bufferlist header;
+    ContentsGenerator::iterator keygen = context->attr_gen.get_iterator(cont);
+    op.create(false);
+    while (!*keygen) ++keygen;
+    while (*keygen) {
+      if (*keygen != '_')
+       header.append(*keygen);
+      ++keygen;
+    }
+    for (int i = 0; i < 20; ++i) {
+      string key;
+      while (!*keygen) ++keygen;
+      while (*keygen && key.size() < 40) {
+       key.push_back((*keygen % 20) + 'a');
+       ++keygen;
+      }
+      ContDesc val(cont);
+      val.seqnum += (unsigned)(*keygen);
+      val.prefix = ("oid: " + oid);
+      omap[key] = val;
+      bufferlist val_buffer = context->attr_gen.gen_bl(val);
+      omap_contents[key] = val_buffer;
+      op.setxattr(key.c_str(), val_buffer);
+    }
+    if (!context->no_omap) {
+      op.omap_set_header(header);
+      op.omap_set(omap_contents);
+    }
+
+    {
+      Mutex::Locker l(context->state_lock);
+      context->update_object_header(oid, header);
+      context->update_object_attrs(oid, omap);
+    }
+
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                               &write_callback);
+    context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    Mutex::Locker l(context->state_lock);
+    int r;
+    if ((r = comp->get_return_value())) {
+      cerr << "err " << r << std::endl;
+      ceph_abort();
+    }
+    done = true;
+    context->update_object_version(oid, comp->get_version64());
+    context->oid_in_use.erase(oid);
+    context->oid_not_in_use.insert(oid);
+    context->kick();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "SetAttrsOp";
+  }
+};
+
+class WriteOp : public TestOp {
+public:
+  string oid;
+  ContDesc cont;
+  set<librados::AioCompletion *> waiting;
+  librados::AioCompletion *rcompletion;
+  uint64_t waiting_on;
+  uint64_t last_acked_tid;
+
+  librados::ObjectReadOperation read_op;
+  librados::ObjectWriteOperation write_op;
+  bufferlist rbuffer;
+
+  bool do_append;
+  bool do_excl;
+
+  WriteOp(int n,
+         RadosTestContext *context,
+         const string &oid,
+         bool do_append,
+         bool do_excl,
+         TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      oid(oid), rcompletion(NULL), waiting_on(0), 
+      last_acked_tid(0), do_append(do_append),
+      do_excl(do_excl)
+  {}
+               
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    done = 0;
+    stringstream acc;
+    acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl;
+    string prefix = acc.str();
+
+    cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix);
+
+    ContentsGenerator *cont_gen;
+    if (do_append) {
+      ObjectDesc old_value;
+      bool found = context->find_object(oid, &old_value);
+      uint64_t prev_length = found && old_value.has_contents() ?
+       old_value.most_recent_gen()->get_length(old_value.most_recent()) :
+       0;
+      bool requires;
+      int r = context->io_ctx.pool_requires_alignment2(&requires);
+      assert(r == 0);
+      uint64_t alignment = 0;
+      if (requires) {
+        r = context->io_ctx.pool_required_alignment2(&alignment);
+        assert(r == 0);
+        assert(alignment != 0);
+      }
+      cont_gen = new AppendGenerator(
+       prev_length,
+       alignment,
+       context->min_stride_size,
+       context->max_stride_size,
+       3);
+    } else {
+      cont_gen = new VarLenGenerator(
+       context->max_size, context->min_stride_size, context->max_stride_size);
+    }
+    context->update_object(cont_gen, oid, cont);
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+
+    map<uint64_t, uint64_t> ranges;
+
+    cont_gen->get_ranges_map(cont, ranges);
+    std::cout << num << ":  seq_num " << context->seq_num << " ranges " << ranges << std::endl;
+    context->seq_num++;
+
+    waiting_on = ranges.size();
+    ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont);
+    uint64_t tid = 1;
+    for (map<uint64_t, uint64_t>::iterator i = ranges.begin(); 
+        i != ranges.end();
+        ++i, ++tid) {
+      gen_pos.seek(i->first);
+      bufferlist to_write = gen_pos.gen_bl_advance(i->second);
+      assert(to_write.length() == i->second);
+      assert(to_write.length() > 0);
+      std::cout << num << ":  writing " << context->prefix+oid
+               << " from " << i->first
+               << " to " << i->first + i->second << " tid " << tid << std::endl;
+      pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+       new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                                new TestOp::CallbackInfo(tid));
+      librados::AioCompletion *completion =
+       context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                            &write_callback);
+      waiting.insert(completion);
+      librados::ObjectWriteOperation op;
+      if (do_append) {
+       op.append(to_write);
+      } else {
+       op.write(i->first, to_write);
+      }
+      if (do_excl && tid == 1)
+       op.assert_exists();
+      context->io_ctx.aio_operate(
+       context->prefix+oid, completion,
+       &op);
+    }
+
+    bufferlist contbl;
+    ::encode(cont, contbl);
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(
+       this,
+       new TestOp::CallbackInfo(++tid));
+    librados::AioCompletion *completion = context->rados.aio_create_completion(
+      (void*) cb_arg, NULL, &write_callback);
+    waiting.insert(completion);
+    waiting_on++;
+    write_op.setxattr("_header", contbl);
+    if (!do_append) {
+      write_op.truncate(cont_gen->get_length(cont));
+    }
+    context->io_ctx.aio_operate(
+      context->prefix+oid, completion, &write_op);
+
+    cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(
+       this,
+       new TestOp::CallbackInfo(++tid));
+    rcompletion = context->rados.aio_create_completion(
+         (void*) cb_arg, NULL, &write_callback);
+    waiting_on++;
+    read_op.read(0, 1, &rbuffer, 0);
+    context->io_ctx.aio_operate(
+      context->prefix+oid, rcompletion,
+      &read_op,
+      librados::OPERATION_ORDER_READS_WRITES,  // order wrt previous write/update
+      0);
+    context->state_lock.Unlock();
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    assert(info);
+    context->state_lock.Lock();
+    uint64_t tid = info->id;
+
+    cout << num << ":  finishing write tid " << tid << " to " << context->prefix + oid << std::endl;
+
+    if (tid <= last_acked_tid) {
+      cerr << "Error: finished tid " << tid
+          << " when last_acked_tid was " << last_acked_tid << std::endl;
+      ceph_abort();
+    }
+    last_acked_tid = tid;
+
+    assert(!done);
+    waiting_on--;
+    if (waiting_on == 0) {
+      uint64_t version = 0;
+      for (set<librados::AioCompletion *>::iterator i = waiting.begin();
+          i != waiting.end();
+          ) {
+       assert((*i)->is_complete());
+       if (int err = (*i)->get_return_value()) {
+         cerr << "Error: oid " << oid << " write returned error code "
+              << err << std::endl;
+       }
+       if ((*i)->get_version64() > version)
+         version = (*i)->get_version64();
+       (*i)->release();
+       waiting.erase(i++);
+      }
+      
+      context->update_object_version(oid, version);
+      if (rcompletion->get_version64() != version) {
+       cerr << "Error: racing read on " << oid << " returned version "
+            << rcompletion->get_version64() << " rather than version "
+            << version << std::endl;
+       assert(0 == "racing read got wrong version");
+      }
+
+      {
+       ObjectDesc old_value;
+       assert(context->find_object(oid, &old_value, -1));
+       if (old_value.deleted())
+         std::cout << num << ":  left oid " << oid << " deleted" << std::endl;
+       else
+         std::cout << num << ":  left oid " << oid << " "
+                   << old_value.most_recent() << std::endl;
+      }
+
+      rcompletion->release();
+      context->oid_in_use.erase(oid);
+      context->oid_not_in_use.insert(oid);
+      context->kick();
+      done = true;
+    }
+    context->state_lock.Unlock();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "WriteOp";
+  }
+};
+
+class WriteSameOp : public TestOp {
+public:
+  string oid;
+  ContDesc cont;
+  set<librados::AioCompletion *> waiting;
+  librados::AioCompletion *rcompletion;
+  uint64_t waiting_on;
+  uint64_t last_acked_tid;
+
+  librados::ObjectReadOperation read_op;
+  librados::ObjectWriteOperation write_op;
+  bufferlist rbuffer;
+
+  WriteSameOp(int n,
+         RadosTestContext *context,
+         const string &oid,
+         TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      oid(oid), rcompletion(NULL), waiting_on(0),
+      last_acked_tid(0)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    done = 0;
+    stringstream acc;
+    acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl;
+    string prefix = acc.str();
+
+    cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix);
+
+    ContentsGenerator *cont_gen;
+    cont_gen = new VarLenGenerator(
+       context->max_size, context->min_stride_size, context->max_stride_size);
+    context->update_object(cont_gen, oid, cont);
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+
+    map<uint64_t, uint64_t> ranges;
+
+    cont_gen->get_ranges_map(cont, ranges);
+    std::cout << num << ":  seq_num " << context->seq_num << " ranges " << ranges << std::endl;
+    context->seq_num++;
+
+    waiting_on = ranges.size();
+    ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont);
+    uint64_t tid = 1;
+    for (map<uint64_t, uint64_t>::iterator i = ranges.begin();
+        i != ranges.end();
+        ++i, ++tid) {
+      gen_pos.seek(i->first);
+      bufferlist to_write = gen_pos.gen_bl_advance(i->second);
+      assert(to_write.length() == i->second);
+      assert(to_write.length() > 0);
+      std::cout << num << ":  writing " << context->prefix+oid
+               << " from " << i->first
+               << " to " << i->first + i->second << " tid " << tid << std::endl;
+      pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+       new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                                new TestOp::CallbackInfo(tid));
+      librados::AioCompletion *completion =
+       context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                            &write_callback);
+      waiting.insert(completion);
+      librados::ObjectWriteOperation op;
+      /* no writesame multiplication factor for now */
+      op.writesame(i->first, to_write.length(), to_write);
+
+      context->io_ctx.aio_operate(
+       context->prefix+oid, completion,
+       &op);
+    }
+
+    bufferlist contbl;
+    ::encode(cont, contbl);
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(
+       this,
+       new TestOp::CallbackInfo(++tid));
+    librados::AioCompletion *completion = context->rados.aio_create_completion(
+      (void*) cb_arg, NULL, &write_callback);
+    waiting.insert(completion);
+    waiting_on++;
+    write_op.setxattr("_header", contbl);
+    write_op.truncate(cont_gen->get_length(cont));
+    context->io_ctx.aio_operate(
+      context->prefix+oid, completion, &write_op);
+
+    cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(
+       this,
+       new TestOp::CallbackInfo(++tid));
+    rcompletion = context->rados.aio_create_completion(
+         (void*) cb_arg, NULL, &write_callback);
+    waiting_on++;
+    read_op.read(0, 1, &rbuffer, 0);
+    context->io_ctx.aio_operate(
+      context->prefix+oid, rcompletion,
+      &read_op,
+      librados::OPERATION_ORDER_READS_WRITES,  // order wrt previous write/update
+      0);
+    context->state_lock.Unlock();
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    assert(info);
+    context->state_lock.Lock();
+    uint64_t tid = info->id;
+
+    cout << num << ":  finishing writesame tid " << tid << " to " << context->prefix + oid << std::endl;
+
+    if (tid <= last_acked_tid) {
+      cerr << "Error: finished tid " << tid
+          << " when last_acked_tid was " << last_acked_tid << std::endl;
+      ceph_abort();
+    }
+    last_acked_tid = tid;
+
+    assert(!done);
+    waiting_on--;
+    if (waiting_on == 0) {
+      uint64_t version = 0;
+      for (set<librados::AioCompletion *>::iterator i = waiting.begin();
+          i != waiting.end();
+          ) {
+       assert((*i)->is_complete());
+       if (int err = (*i)->get_return_value()) {
+         cerr << "Error: oid " << oid << " writesame returned error code "
+              << err << std::endl;
+       }
+       if ((*i)->get_version64() > version)
+         version = (*i)->get_version64();
+       (*i)->release();
+       waiting.erase(i++);
+      }
+
+      context->update_object_version(oid, version);
+      if (rcompletion->get_version64() != version) {
+       cerr << "Error: racing read on " << oid << " returned version "
+            << rcompletion->get_version64() << " rather than version "
+            << version << std::endl;
+       assert(0 == "racing read got wrong version");
+      }
+
+      {
+       ObjectDesc old_value;
+       assert(context->find_object(oid, &old_value, -1));
+       if (old_value.deleted())
+         std::cout << num << ":  left oid " << oid << " deleted" << std::endl;
+       else
+         std::cout << num << ":  left oid " << oid << " "
+                   << old_value.most_recent() << std::endl;
+      }
+
+      rcompletion->release();
+      context->oid_in_use.erase(oid);
+      context->oid_not_in_use.insert(oid);
+      context->kick();
+      done = true;
+    }
+    context->state_lock.Unlock();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "WriteSameOp";
+  }
+};
+
+class DeleteOp : public TestOp {
+public:
+  string oid;
+
+  DeleteOp(int n,
+          RadosTestContext *context,
+          const string &oid,
+          TestOpStat *stat = 0)
+    : TestOp(n, context, stat), oid(oid)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    if (context->get_watch_context(oid)) {
+      context->kick();
+      context->state_lock.Unlock();
+      return;
+    }
+
+    ObjectDesc contents;
+    context->find_object(oid, &contents);
+    bool present = !contents.deleted();
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+    context->seq_num++;
+
+    context->remove_object(oid);
+
+    interval_set<uint64_t> ranges;
+    context->state_lock.Unlock();
+
+    int r = 0;
+    if (rand() % 2) {
+      librados::ObjectWriteOperation op;
+      op.assert_exists();
+      op.remove();
+      r = context->io_ctx.operate(context->prefix+oid, &op);
+    } else {
+      r = context->io_ctx.remove(context->prefix+oid);
+    }
+    if (r && !(r == -ENOENT && !present)) {
+      cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
+      ceph_abort();
+    }
+
+    context->state_lock.Lock();
+    context->oid_in_use.erase(oid);
+    context->oid_not_in_use.insert(oid);
+    context->kick();
+    context->state_lock.Unlock();
+  }
+
+  string getType() override
+  {
+    return "DeleteOp";
+  }
+};
+
+class ReadOp : public TestOp {
+public:
+  vector<librados::AioCompletion *> completions;
+  librados::ObjectReadOperation op;
+  string oid;
+  ObjectDesc old_value;
+  int snap;
+  bool balance_reads;
+
+  ceph::shared_ptr<int> in_use;
+
+  vector<bufferlist> results;
+  vector<int> retvals;
+  vector<std::map<uint64_t, uint64_t>> extent_results;
+  vector<bool> is_sparse_read;
+  uint64_t waiting_on;
+
+  vector<bufferlist> checksums;
+  vector<int> checksum_retvals;
+
+  map<string, bufferlist> attrs;
+  int attrretval;
+
+  set<string> omap_requested_keys;
+  map<string, bufferlist> omap_returned_values;
+  set<string> omap_keys;
+  map<string, bufferlist> omap;
+  bufferlist header;
+
+  map<string, bufferlist> xattrs;
+  ReadOp(int n,
+        RadosTestContext *context,
+        const string &oid,
+        bool balance_reads,
+        TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      completions(3),
+      oid(oid),
+      snap(0),
+      balance_reads(balance_reads),
+      results(3),
+      retvals(3),
+      extent_results(3),
+      is_sparse_read(3, false),
+      waiting_on(0),
+      checksums(3),
+      checksum_retvals(3),
+      attrretval(0)
+  {}
+
+  void _do_read(librados::ObjectReadOperation& read_op, int index) {
+    uint64_t len = 0;
+    if (old_value.has_contents())
+      len = old_value.most_recent_gen()->get_length(old_value.most_recent());
+    if (context->no_sparse || rand() % 2) {
+      is_sparse_read[index] = false;
+      read_op.read(0,
+                  len,
+                  &results[index],
+                  &retvals[index]);
+      bufferlist init_value_bl;
+      ::encode(static_cast<uint32_t>(-1), init_value_bl);
+      read_op.checksum(LIBRADOS_CHECKSUM_TYPE_CRC32C, init_value_bl, 0, len,
+                      0, &checksums[index], &checksum_retvals[index]);
+    } else {
+      is_sparse_read[index] = true;
+      read_op.sparse_read(0,
+                         len,
+                         &extent_results[index],
+                         &results[index],
+                         &retvals[index]);
+    }
+  }
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    if (!(rand() % 4) && !context->snaps.empty()) {
+      snap = rand_choose(context->snaps)->first;
+      in_use = context->snaps_in_use.lookup_or_create(snap, snap);
+    } else {
+      snap = -1;
+    }
+    std::cout << num << ": read oid " << oid << " snap " << snap << std::endl;
+    done = 0;
+    for (uint32_t i = 0; i < 3; i++) {
+      completions[i] = context->rados.aio_create_completion((void *) this, &read_callback, 0);
+    }
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+    assert(context->find_object(oid, &old_value, snap));
+    if (old_value.deleted())
+      std::cout << num << ":  expect deleted" << std::endl;
+    else
+      std::cout << num << ":  expect " << old_value.most_recent() << std::endl;
+
+    TestWatchContext *ctx = context->get_watch_context(oid);
+    context->state_lock.Unlock();
+    if (ctx) {
+      assert(old_value.exists);
+      TestAlarm alarm;
+      std::cerr << num << ":  about to start" << std::endl;
+      ctx->start();
+      std::cerr << num << ":  started" << std::endl;
+      bufferlist bl;
+      context->io_ctx.set_notify_timeout(600);
+      int r = context->io_ctx.notify2(context->prefix+oid, bl, 0, NULL);
+      if (r < 0) {
+       std::cerr << "r is " << r << std::endl;
+       ceph_abort();
+      }
+      std::cerr << num << ":  notified, waiting" << std::endl;
+      ctx->wait();
+    }
+    context->state_lock.Lock();
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(context->snaps[snap]);
+    }
+    _do_read(op, 0);
+    for (map<string, ContDesc>::iterator i = old_value.attrs.begin();
+        i != old_value.attrs.end();
+        ++i) {
+      if (rand() % 2) {
+       string key = i->first;
+       if (rand() % 2)
+         key.push_back((rand() % 26) + 'a');
+       omap_requested_keys.insert(key);
+      }
+    }
+    if (!context->no_omap) {
+      op.omap_get_vals_by_keys(omap_requested_keys, &omap_returned_values, 0);
+      // NOTE: we're ignore pmore here, which assumes the OSD limit is high
+      // enough for us.
+      op.omap_get_keys2("", -1, &omap_keys, nullptr, nullptr);
+      op.omap_get_vals2("", -1, &omap, nullptr, nullptr);
+      op.omap_get_header(&header, 0);
+    }
+    op.getxattrs(&xattrs, 0);
+
+    unsigned flags = 0;
+    if (balance_reads)
+      flags |= librados::OPERATION_BALANCE_READS;
+
+    assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[0], &op,
+                                       flags, NULL));
+    waiting_on++;
+    // send 2 pipelined reads on the same object/snap. This can help testing
+    // OSD's read behavior in some scenarios
+    for (uint32_t i = 1; i < 3; ++i) {
+      librados::ObjectReadOperation pipeline_op;
+      _do_read(pipeline_op, i);
+      assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[i], &pipeline_op, 0));
+      waiting_on++;
+    }
+
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(0);
+    }
+    context->state_lock.Unlock();
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    Mutex::Locker l(context->state_lock);
+    assert(!done);
+    assert(waiting_on > 0);
+    if (--waiting_on) {
+      return;
+    }
+
+    context->oid_in_use.erase(oid);
+    context->oid_not_in_use.insert(oid);
+    int retval = completions[0]->get_return_value();
+    for (vector<librados::AioCompletion *>::iterator it = completions.begin();
+         it != completions.end(); ++it) {
+      assert((*it)->is_complete());
+      uint64_t version = (*it)->get_version64();
+      int err = (*it)->get_return_value();
+      if (err != retval) {
+        cerr << num << ": Error: oid " << oid << " read returned different error codes: "
+             << retval << " and " << err << std::endl;
+       ceph_abort();
+      }
+      if (err) {
+        if (!(err == -ENOENT && old_value.deleted())) {
+          cerr << num << ": Error: oid " << oid << " read returned error code "
+               << err << std::endl;
+          ceph_abort();
+        }
+      } else if (version != old_value.version) {
+       cerr << num << ": oid " << oid << " version is " << version
+            << " and expected " << old_value.version << std::endl;
+       assert(version == old_value.version);
+      }
+    }
+    if (!retval) {
+      map<string, bufferlist>::iterator iter = xattrs.find("_header");
+      bufferlist headerbl;
+      if (iter == xattrs.end()) {
+       if (old_value.has_contents()) {
+         cerr << num << ": Error: did not find header attr, has_contents: "
+              << old_value.has_contents()
+              << std::endl;
+         assert(!old_value.has_contents());
+       }
+      } else {
+       headerbl = iter->second;
+       xattrs.erase(iter);
+      }
+      if (old_value.deleted()) {
+       std::cout << num << ":  expect deleted" << std::endl;
+       assert(0 == "expected deleted");
+      } else {
+       std::cout << num << ":  expect " << old_value.most_recent() << std::endl;
+      }
+      if (old_value.has_contents()) {
+       ContDesc to_check;
+       bufferlist::iterator p = headerbl.begin();
+       ::decode(to_check, p);
+       if (to_check != old_value.most_recent()) {
+         cerr << num << ": oid " << oid << " found incorrect object contents " << to_check
+              << ", expected " << old_value.most_recent() << std::endl;
+         context->errors++;
+       }
+        for (unsigned i = 0; i < results.size(); i++) {
+         if (is_sparse_read[i]) {
+           if (!old_value.check_sparse(extent_results[i], results[i])) {
+             cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
+             context->errors++;
+           }
+         } else {
+           if (!old_value.check(results[i])) {
+             cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
+             context->errors++;
+           }
+
+           uint32_t checksum = 0;
+           if (checksum_retvals[i] == 0) {
+             try {
+               auto bl_it = checksums[i].begin();
+               uint32_t csum_count;
+               ::decode(csum_count, bl_it);
+               ::decode(checksum, bl_it);
+             } catch (const buffer::error &err) {
+               checksum_retvals[i] = -EBADMSG;
+             }
+           }
+           if (checksum_retvals[i] != 0 || checksum != results[i].crc32c(-1)) {
+             cerr << num << ": oid " << oid << " checksum " << checksums[i]
+                  << " incorrect, expecting " << results[i].crc32c(-1)
+                   << std::endl;
+             context->errors++;
+           }
+         }
+       }
+       if (context->errors) ceph_abort();
+      }
+
+      // Attributes
+      if (!context->no_omap) {
+       if (!(old_value.header == header)) {
+         cerr << num << ": oid " << oid << " header does not match, old size: "
+              << old_value.header.length() << " new size " << header.length()
+              << std::endl;
+         assert(old_value.header == header);
+       }
+       if (omap.size() != old_value.attrs.size()) {
+         cerr << num << ": oid " << oid << " omap.size() is " << omap.size()
+              << " and old is " << old_value.attrs.size() << std::endl;
+         assert(omap.size() == old_value.attrs.size());
+       }
+       if (omap_keys.size() != old_value.attrs.size()) {
+         cerr << num << ": oid " << oid << " omap.size() is " << omap_keys.size()
+              << " and old is " << old_value.attrs.size() << std::endl;
+         assert(omap_keys.size() == old_value.attrs.size());
+       }
+      }
+      if (xattrs.size() != old_value.attrs.size()) {
+       cerr << num << ": oid " << oid << " xattrs.size() is " << xattrs.size()
+            << " and old is " << old_value.attrs.size() << std::endl;
+       assert(xattrs.size() == old_value.attrs.size());
+      }
+      for (map<string, ContDesc>::iterator iter = old_value.attrs.begin();
+          iter != old_value.attrs.end();
+          ++iter) {
+       bufferlist bl = context->attr_gen.gen_bl(
+         iter->second);
+       if (!context->no_omap) {
+         map<string, bufferlist>::iterator omap_iter = omap.find(iter->first);
+         assert(omap_iter != omap.end());
+         assert(bl.length() == omap_iter->second.length());
+         bufferlist::iterator k = bl.begin();
+         for(bufferlist::iterator l = omap_iter->second.begin();
+             !k.end() && !l.end();
+             ++k, ++l) {
+           assert(*l == *k);
+         }
+       }
+       map<string, bufferlist>::iterator xattr_iter = xattrs.find(iter->first);
+       assert(xattr_iter != xattrs.end());
+       assert(bl.length() == xattr_iter->second.length());
+       bufferlist::iterator k = bl.begin();
+       for (bufferlist::iterator j = xattr_iter->second.begin();
+            !k.end() && !j.end();
+            ++j, ++k) {
+         assert(*j == *k);
+       }
+      }
+      if (!context->no_omap) {
+       for (set<string>::iterator i = omap_requested_keys.begin();
+            i != omap_requested_keys.end();
+            ++i) {
+         if (!omap_returned_values.count(*i))
+           assert(!old_value.attrs.count(*i));
+         if (!old_value.attrs.count(*i))
+           assert(!omap_returned_values.count(*i));
+       }
+       for (map<string, bufferlist>::iterator i = omap_returned_values.begin();
+            i != omap_returned_values.end();
+            ++i) {
+         assert(omap_requested_keys.count(i->first));
+         assert(omap.count(i->first));
+         assert(old_value.attrs.count(i->first));
+         assert(i->second == omap[i->first]);
+       }
+      }
+    }
+    for (vector<librados::AioCompletion *>::iterator it = completions.begin();
+         it != completions.end(); ++it) {
+      (*it)->release();
+    }
+    context->kick();
+    done = true;
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "ReadOp";
+  }
+};
+
+class SnapCreateOp : public TestOp {
+public:
+  SnapCreateOp(int n,
+              RadosTestContext *context,
+              TestOpStat *stat = 0)
+    : TestOp(n, context, stat)
+  {}
+
+  void _begin() override
+  {
+    uint64_t snap;
+    string snapname;
+
+    if (context->pool_snaps) {
+      stringstream ss;
+
+      ss << context->prefix << "snap" << ++context->snapname_num;
+      snapname = ss.str();
+
+      int ret = context->io_ctx.snap_create(snapname.c_str());
+      if (ret) {
+       cerr << "snap_create returned " << ret << std::endl;
+       ceph_abort();
+      }
+      assert(!context->io_ctx.snap_lookup(snapname.c_str(), &snap));
+
+    } else {
+      assert(!context->io_ctx.selfmanaged_snap_create(&snap));
+    }
+
+    context->state_lock.Lock();
+    context->add_snap(snap);
+
+    if (context->pool_snaps) {
+      context->state_lock.Unlock();
+    } else {
+      vector<uint64_t> snapset(context->snaps.size());
+
+      int j = 0;
+      for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
+          i != context->snaps.rend();
+          ++i, ++j) {
+       snapset[j] = i->second;
+      }
+
+      context->state_lock.Unlock();
+
+      int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
+      if (r) {
+       cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
+       ceph_abort();
+      }
+    }
+  }
+
+  string getType() override
+  {
+    return "SnapCreateOp";
+  }
+  bool must_quiesce_other_ops() override { return context->pool_snaps; }
+};
+
+class SnapRemoveOp : public TestOp {
+public:
+  int to_remove;
+  SnapRemoveOp(int n, RadosTestContext *context,
+              int snap,
+              TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      to_remove(snap)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    uint64_t snap = context->snaps[to_remove];
+    context->remove_snap(to_remove);
+
+    if (context->pool_snaps) {
+      string snapname;
+
+      assert(!context->io_ctx.snap_get_name(snap, &snapname));
+      assert(!context->io_ctx.snap_remove(snapname.c_str()));
+     } else {
+      assert(!context->io_ctx.selfmanaged_snap_remove(snap));
+
+      vector<uint64_t> snapset(context->snaps.size());
+      int j = 0;
+      for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
+          i != context->snaps.rend();
+          ++i, ++j) {
+       snapset[j] = i->second;
+      }
+
+      int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
+      if (r) {
+       cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
+       ceph_abort();
+      }
+    }
+    context->state_lock.Unlock();
+  }
+
+  string getType() override
+  {
+    return "SnapRemoveOp";
+  }
+};
+
+class WatchOp : public TestOp {
+  string oid;
+public:
+  WatchOp(int n,
+         RadosTestContext *context,
+         const string &_oid,
+         TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      oid(_oid)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    ObjectDesc contents;
+    context->find_object(oid, &contents);
+    if (contents.deleted()) {
+      context->kick();
+      context->state_lock.Unlock();
+      return;
+    }
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+
+    TestWatchContext *ctx = context->get_watch_context(oid);
+    context->state_lock.Unlock();
+    int r;
+    if (!ctx) {
+      {
+       Mutex::Locker l(context->state_lock);
+       ctx = context->watch(oid);
+      }
+
+      r = context->io_ctx.watch2(context->prefix+oid,
+                                &ctx->get_handle(),
+                                ctx);
+    } else {
+      r = context->io_ctx.unwatch2(ctx->get_handle());
+      {
+       Mutex::Locker l(context->state_lock);
+       context->unwatch(oid);
+      }
+    }
+
+    if (r) {
+      cerr << "r is " << r << std::endl;
+      ceph_abort();
+    }
+
+    {
+      Mutex::Locker l(context->state_lock);
+      context->oid_in_use.erase(oid);
+      context->oid_not_in_use.insert(oid);
+    }
+  }
+
+  string getType() override
+  {
+    return "WatchOp";
+  }
+};
+
+class RollbackOp : public TestOp {
+public:
+  string oid;
+  int roll_back_to;
+  librados::ObjectWriteOperation zero_write_op1;
+  librados::ObjectWriteOperation zero_write_op2;
+  librados::ObjectWriteOperation op;
+  vector<librados::AioCompletion *> comps;
+  ceph::shared_ptr<int> in_use;
+  int last_finished;
+  int outstanding;
+
+  RollbackOp(int n,
+            RadosTestContext *context,
+            const string &_oid,
+            TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      oid(_oid), roll_back_to(-1), 
+      comps(3, NULL),
+      last_finished(-1), outstanding(3)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    if (context->get_watch_context(oid)) {
+      context->kick();
+      context->state_lock.Unlock();
+      return;
+    }
+
+    if (context->snaps.empty()) {
+      context->kick();
+      context->state_lock.Unlock();
+      done = true;
+      return;
+    }
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+
+    roll_back_to = rand_choose(context->snaps)->first;
+    in_use = context->snaps_in_use.lookup_or_create(
+      roll_back_to,
+      roll_back_to);
+
+
+    cout << "rollback oid " << oid << " to " << roll_back_to << std::endl;
+
+    bool existed_before = context->object_existed_at(oid);
+    bool existed_after = context->object_existed_at(oid, roll_back_to);
+
+    context->roll_back(oid, roll_back_to);
+    uint64_t snap = context->snaps[roll_back_to];
+
+    outstanding -= (!existed_before) + (!existed_after);
+
+    context->state_lock.Unlock();
+
+    bufferlist bl, bl2;
+    zero_write_op1.append(bl);
+    zero_write_op2.append(bl2);
+
+    if (context->pool_snaps) {
+      op.snap_rollback(snap);
+    } else {
+      op.selfmanaged_snap_rollback(snap);
+    }
+
+    if (existed_before) {
+      pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+       new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                                new TestOp::CallbackInfo(0));
+      comps[0] = 
+       context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                            &write_callback);
+      context->io_ctx.aio_operate(
+       context->prefix+oid, comps[0], &zero_write_op1);
+    }
+    {
+      pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+       new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                                new TestOp::CallbackInfo(1));
+      comps[1] =
+       context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                            &write_callback);
+      context->io_ctx.aio_operate(
+       context->prefix+oid, comps[1], &op);
+    }
+    if (existed_after) {
+      pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+       new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                                new TestOp::CallbackInfo(2));
+      comps[2] =
+       context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                            &write_callback);
+      context->io_ctx.aio_operate(
+       context->prefix+oid, comps[2], &zero_write_op2);
+    }
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    Mutex::Locker l(context->state_lock);
+    uint64_t tid = info->id;
+    cout << num << ":  finishing rollback tid " << tid
+        << " to " << context->prefix + oid << std::endl;
+    assert((int)(info->id) > last_finished);
+    last_finished = info->id;
+
+    int r;
+    if ((r = comps[last_finished]->get_return_value()) != 0) {
+      cerr << "err " << r << std::endl;
+      ceph_abort();
+    }
+    if (--outstanding == 0) {
+      done = true;
+      context->update_object_version(oid, comps[tid]->get_version64());
+      context->oid_in_use.erase(oid);
+      context->oid_not_in_use.insert(oid);
+      in_use = ceph::shared_ptr<int>();
+      context->kick();
+    }
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "RollBackOp";
+  }
+};
+
+class CopyFromOp : public TestOp {
+public:
+  string oid, oid_src;
+  ObjectDesc src_value;
+  librados::ObjectWriteOperation op;
+  librados::ObjectReadOperation rd_op;
+  librados::AioCompletion *comp;
+  librados::AioCompletion *comp_racing_read;
+  ceph::shared_ptr<int> in_use;
+  int snap;
+  int done;
+  uint64_t version;
+  int r;
+  CopyFromOp(int n,
+            RadosTestContext *context,
+            const string &oid,
+            const string &oid_src,
+            TestOpStat *stat)
+    : TestOp(n, context, stat),
+      oid(oid), oid_src(oid_src),
+      comp(NULL), snap(-1), done(0), 
+      version(0), r(0)
+  {}
+
+  void _begin() override
+  {
+    ContDesc cont;
+    {
+      Mutex::Locker l(context->state_lock);
+      cont = ContDesc(context->seq_num, context->current_snap,
+                     context->seq_num, "");
+      context->oid_in_use.insert(oid);
+      context->oid_not_in_use.erase(oid);
+      context->oid_in_use.insert(oid_src);
+      context->oid_not_in_use.erase(oid_src);
+
+      // choose source snap
+      if (0 && !(rand() % 4) && !context->snaps.empty()) {
+       snap = rand_choose(context->snaps)->first;
+       in_use = context->snaps_in_use.lookup_or_create(snap, snap);
+      } else {
+       snap = -1;
+      }
+      context->find_object(oid_src, &src_value, snap);
+      if (!src_value.deleted())
+       context->update_object_full(oid, src_value);
+    }
+
+    string src = context->prefix+oid_src;
+    op.copy_from(src.c_str(), context->io_ctx, src_value.version);
+
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                               &write_callback);
+    context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
+
+    // queue up a racing read, too.
+    pair<TestOp*, TestOp::CallbackInfo*> *read_cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(1));
+    comp_racing_read = context->rados.aio_create_completion((void*) read_cb_arg, NULL, &write_callback);
+    rd_op.stat(NULL, NULL, NULL);
+    context->io_ctx.aio_operate(context->prefix+oid, comp_racing_read, &rd_op,
+                               librados::OPERATION_ORDER_READS_WRITES,  // order wrt previous write/update
+                               NULL);
+
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    Mutex::Locker l(context->state_lock);
+
+    // note that the read can (and atm will) come back before the
+    // write reply, but will reflect the update and the versions will
+    // match.
+
+    if (info->id == 0) {
+      // copy_from
+      assert(comp->is_complete());
+      cout << num << ":  finishing copy_from to " << context->prefix + oid << std::endl;
+      if ((r = comp->get_return_value())) {
+       if (r == -ENOENT && src_value.deleted()) {
+         cout << num << ":  got expected ENOENT (src dne)" << std::endl;
+       } else {
+         cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
+              << r << std::endl;
+         ceph_abort();
+       }
+      } else {
+       assert(!version || comp->get_version64() == version);
+       version = comp->get_version64();
+       context->update_object_version(oid, comp->get_version64());
+      }
+    } else if (info->id == 1) {
+      // racing read
+      assert(comp_racing_read->is_complete());
+      cout << num << ":  finishing copy_from racing read to " << context->prefix + oid << std::endl;
+      if ((r = comp_racing_read->get_return_value())) {
+       if (!(r == -ENOENT && src_value.deleted())) {
+         cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
+              << r << std::endl;
+       }
+      } else {
+       assert(comp_racing_read->get_return_value() == 0);
+       assert(!version || comp_racing_read->get_version64() == version);
+       version = comp_racing_read->get_version64();
+      }
+    }
+    if (++done == 2) {
+      context->oid_in_use.erase(oid);
+      context->oid_not_in_use.insert(oid);
+      context->oid_in_use.erase(oid_src);
+      context->oid_not_in_use.insert(oid_src);
+      context->kick();
+    }
+  }
+
+  bool finished() override
+  {
+    return done == 2;
+  }
+
+  string getType() override
+  {
+    return "CopyFromOp";
+  }
+};
+
+class SetRedirectOp : public TestOp {
+public:
+  string oid, oid_tgt, tgt_pool_name;
+  ObjectDesc src_value, tgt_value;
+  librados::ObjectWriteOperation op;
+  librados::ObjectReadOperation rd_op;
+  librados::AioCompletion *comp;
+  ceph::shared_ptr<int> in_use;
+  int done;
+  int r;
+  SetRedirectOp(int n,
+            RadosTestContext *context,
+            const string &oid,
+            const string &oid_tgt,
+            const string &tgt_pool_name,
+            TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      oid(oid), oid_tgt(oid_tgt), tgt_pool_name(tgt_pool_name),
+      comp(NULL), done(0), 
+      r(0)
+  {}
+
+  void _begin() override
+  {
+    Mutex::Locker l(context->state_lock);
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+    context->oid_redirect_in_use.insert(oid_tgt);
+    context->oid_redirect_not_in_use.erase(oid_tgt);
+
+    context->find_object(oid, &src_value); 
+    if(!context->redirect_objs[oid].empty()) {
+      /* update target's user_version */
+      rd_op.stat(NULL, NULL, NULL);
+      comp = context->rados.aio_create_completion();
+      context->io_ctx.aio_operate(context->prefix+oid_tgt, comp, &rd_op,
+                           librados::OPERATION_ORDER_READS_WRITES,
+                           NULL);
+      comp->wait_for_safe();
+      context->update_object_version(oid_tgt, comp->get_version64());
+      comp->release();
+
+      /* unset redirect target */
+      comp = context->rados.aio_create_completion();
+      bool present = !src_value.deleted();
+      context->remove_object(oid);
+      op.remove();
+      context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
+                                 librados::OPERATION_ORDER_READS_WRITES |
+                                 librados::OPERATION_IGNORE_REDIRECT);
+      comp->wait_for_safe();
+      if ((r = comp->get_return_value())) {
+       if (!(r == -ENOENT && !present)) {
+         cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
+         ceph_abort();
+       }
+      }
+      comp->release();
+
+      context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]);
+      context->oid_redirect_in_use.erase(context->redirect_objs[oid]);
+
+      /* copy_from oid_tgt --> oid */
+      comp = context->rados.aio_create_completion();
+      context->find_object(oid_tgt, &tgt_value);
+      string src = context->prefix+oid_tgt;
+      op.copy_from(src.c_str(), context->io_ctx, tgt_value.version);
+      context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
+                                 librados::OPERATION_ORDER_READS_WRITES);
+      comp->wait_for_safe();
+      if ((r = comp->get_return_value())) {
+       cerr << "Error: oid " << oid << " copy_from " << oid_tgt << " returned error code "
+            << r << std::endl;
+       ceph_abort();
+      }
+      context->update_object_full(oid, tgt_value);
+      context->update_object_version(oid, comp->get_version64());
+      comp->release();
+    }
+
+    comp = context->rados.aio_create_completion();
+    rd_op.stat(NULL, NULL, NULL);
+    context->io_ctx.aio_operate(context->prefix+oid, comp, &rd_op,
+                         librados::OPERATION_ORDER_READS_WRITES |
+                         librados::OPERATION_IGNORE_REDIRECT,
+                         NULL);
+    comp->wait_for_safe();
+    if ((r = comp->get_return_value()) && !src_value.deleted()) {
+      cerr << "Error: oid " << oid << " stat returned error code "
+          << r << std::endl;
+      ceph_abort();
+    }
+    context->update_object_version(oid, comp->get_version64());
+    comp->release();
+
+    context->find_object(oid, &src_value); 
+    context->find_object(oid_tgt, &tgt_value);
+
+    if (!src_value.deleted() && !tgt_value.deleted())
+      context->update_object_full(oid, tgt_value);
+
+    if (src_value.version != 0 && !src_value.deleted())
+      op.assert_version(src_value.version);
+    op.set_redirect(context->prefix+oid_tgt, context->io_ctx, tgt_value.version);
+
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                               &write_callback);
+    context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
+                               librados::OPERATION_ORDER_READS_WRITES);
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    Mutex::Locker l(context->state_lock);
+
+    if (info->id == 0) {
+      assert(comp->is_complete());
+      cout << num << ":  finishing set_redirect to oid " << oid << std::endl;
+      if ((r = comp->get_return_value())) {
+       if (r == -ENOENT && src_value.deleted()) {
+         cout << num << ":  got expected ENOENT (src dne)" << std::endl;
+       } else {
+         cerr << "Error: oid " << oid << " set_redirect " << oid_tgt << " returned error code "
+              << r << std::endl;
+         ceph_abort();
+       }
+      } else {
+       context->update_object_redirect_target(oid, oid_tgt);
+       context->update_object_version(oid, comp->get_version64());
+      }
+    }
+
+    if (++done == 1) {
+      context->oid_in_use.erase(oid);
+      context->oid_not_in_use.insert(oid);
+      context->kick();
+    }
+  }
+
+  bool finished() override
+  {
+    return done == 1;
+  }
+
+  string getType() override
+  {
+    return "SetRedirectOp";
+  }
+};
+
+class UnsetRedirectOp : public TestOp {
+public:
+  string oid;
+  librados::ObjectWriteOperation op;
+  librados::AioCompletion *completion;
+  librados::AioCompletion *comp;
+
+  UnsetRedirectOp(int n,
+          RadosTestContext *context,
+          const string &oid,
+          TestOpStat *stat = 0)
+    : TestOp(n, context, stat), oid(oid)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    if (context->get_watch_context(oid)) {
+      context->kick();
+      context->state_lock.Unlock();
+      return;
+    }
+
+    ObjectDesc contents;
+    context->find_object(oid, &contents);
+    bool present = !contents.deleted();
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+    context->seq_num++;
+
+    context->remove_object(oid);
+
+    context->state_lock.Unlock();
+
+    comp = context->rados.aio_create_completion();
+    op.remove();
+    context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
+                               librados::OPERATION_ORDER_READS_WRITES |
+                               librados::OPERATION_IGNORE_REDIRECT);
+    comp->wait_for_safe();
+    int r = comp->get_return_value();
+    if (r && !(r == -ENOENT && !present)) {
+      cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
+      ceph_abort();
+    }
+
+    context->state_lock.Lock();
+    context->oid_in_use.erase(oid);
+    context->oid_not_in_use.insert(oid);
+    if(!context->redirect_objs[oid].empty()) {
+      context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]);
+      context->oid_redirect_in_use.erase(context->redirect_objs[oid]);
+      context->update_object_redirect_target(oid, string());
+    }
+    context->kick();
+    context->state_lock.Unlock();
+  }
+
+  string getType() override
+  {
+    return "UnsetRedirectOp";
+  }
+};
+
+class HitSetListOp : public TestOp {
+  librados::AioCompletion *comp1, *comp2;
+  uint32_t hash;
+  std::list< std::pair<time_t, time_t> > ls;
+  bufferlist bl;
+
+public:
+  HitSetListOp(int n,
+              RadosTestContext *context,
+              uint32_t hash,
+              TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      comp1(NULL), comp2(NULL),
+      hash(hash)
+  {}
+
+  void _begin() override
+  {
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    comp1 = context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                                &write_callback);
+    int r = context->io_ctx.hit_set_list(hash, comp1, &ls);
+    assert(r == 0);
+  }
+
+  void _finish(CallbackInfo *info) override {
+    Mutex::Locker l(context->state_lock);
+    if (!comp2) {
+      if (ls.empty()) {
+       cerr << num << ": no hitsets" << std::endl;
+       done = true;
+      } else {
+       cerr << num << ": hitsets are " << ls << std::endl;
+       int r = rand() % ls.size();
+       std::list<pair<time_t,time_t> >::iterator p = ls.begin();
+       while (r--)
+         ++p;
+       pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+         new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                                  new TestOp::CallbackInfo(0));
+       comp2 = context->rados.aio_create_completion((void*) cb_arg, NULL,
+                                                    &write_callback);
+       r = context->io_ctx.hit_set_get(hash, comp2, p->second, &bl);
+       assert(r == 0);
+      }
+    } else {
+      int r = comp2->get_return_value();
+      if (r == 0) {
+       HitSet hitset;
+       bufferlist::iterator p = bl.begin();
+       ::decode(hitset, p);
+       cout << num << ": got hitset of type " << hitset.get_type_name()
+            << " size " << bl.length()
+            << std::endl;
+      } else {
+       // FIXME: we could verify that we did in fact race with a trim...
+       assert(r == -ENOENT);
+      }
+      done = true;
+    }
+
+    context->kick();
+  }
+
+  bool finished() override {
+    return done;
+  }
+
+  string getType() override {
+    return "HitSetListOp";
+  }
+};
+
+class UndirtyOp : public TestOp {
+public:
+  librados::AioCompletion *completion;
+  librados::ObjectWriteOperation op;
+  string oid;
+
+  UndirtyOp(int n,
+           RadosTestContext *context,
+           const string &oid,
+           TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      completion(NULL),
+      oid(oid)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
+                                                     &write_callback);
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+    context->update_object_undirty(oid);
+    context->state_lock.Unlock();
+
+    op.undirty();
+    int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
+                                       &op, 0);
+    assert(!r);
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    context->state_lock.Lock();
+    assert(!done);
+    assert(completion->is_complete());
+    context->oid_in_use.erase(oid);
+    context->oid_not_in_use.insert(oid);
+    context->update_object_version(oid, completion->get_version64());
+    context->kick();
+    done = true;
+    context->state_lock.Unlock();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "UndirtyOp";
+  }
+};
+
+class IsDirtyOp : public TestOp {
+public:
+  librados::AioCompletion *completion;
+  librados::ObjectReadOperation op;
+  string oid;
+  bool dirty;
+  ObjectDesc old_value;
+  int snap;
+  ceph::shared_ptr<int> in_use;
+
+  IsDirtyOp(int n,
+           RadosTestContext *context,
+           const string &oid,
+           TestOpStat *stat = 0)
+    : TestOp(n, context, stat),
+      completion(NULL),
+      oid(oid),
+      dirty(false)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+
+    if (!(rand() % 4) && !context->snaps.empty()) {
+      snap = rand_choose(context->snaps)->first;
+      in_use = context->snaps_in_use.lookup_or_create(snap, snap);
+    } else {
+      snap = -1;
+    }
+    std::cout << num << ": is_dirty oid " << oid << " snap " << snap
+             << std::endl;
+
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
+                                                     &write_callback);
+
+    context->oid_in_use.insert(oid);
+    context->oid_not_in_use.erase(oid);
+    context->state_lock.Unlock();
+
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(context->snaps[snap]);
+    }
+
+    op.is_dirty(&dirty, NULL);
+    int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
+                                       &op, 0);
+    assert(!r);
+
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(0);
+    }
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    context->state_lock.Lock();
+    assert(!done);
+    assert(completion->is_complete());
+    context->oid_in_use.erase(oid);
+    context->oid_not_in_use.insert(oid);
+
+    assert(context->find_object(oid, &old_value, snap));
+
+    int r = completion->get_return_value();
+    if (r == 0) {
+      cout << num << ":  " << (dirty ? "dirty" : "clean") << std::endl;
+      assert(!old_value.deleted());
+      assert(dirty == old_value.dirty);
+    } else {
+      cout << num << ":  got " << r << std::endl;
+      assert(r == -ENOENT);
+      assert(old_value.deleted());
+    }
+    context->kick();
+    done = true;
+    context->state_lock.Unlock();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "IsDirtyOp";
+  }
+};
+
+
+
+class CacheFlushOp : public TestOp {
+public:
+  librados::AioCompletion *completion;
+  librados::ObjectReadOperation op;
+  string oid;
+  bool blocking;
+  int snap;
+  bool can_fail;
+  ceph::shared_ptr<int> in_use;
+
+  CacheFlushOp(int n,
+              RadosTestContext *context,
+              const string &oid,
+              TestOpStat *stat,
+              bool b)
+    : TestOp(n, context, stat),
+      completion(NULL),
+      oid(oid),
+      blocking(b),
+      snap(0),
+      can_fail(false)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+
+    if (!(rand() % 4) && !context->snaps.empty()) {
+      snap = rand_choose(context->snaps)->first;
+      in_use = context->snaps_in_use.lookup_or_create(snap, snap);
+    } else {
+      snap = -1;
+    }
+    // not being particularly specific here about knowing which
+    // flushes are on the oldest clean snap and which ones are not.
+    can_fail = !blocking || !context->snaps.empty();
+    // FIXME: we could fail if we've ever removed a snap due to
+    // the async snap trimming.
+    can_fail = true;
+    cout << num << ": " << (blocking ? "cache_flush" : "cache_try_flush")
+        << " oid " << oid << " snap " << snap << std::endl;
+
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(context->snaps[snap]);
+    }
+
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
+                                                     &write_callback);
+    context->oid_flushing.insert(oid);
+    context->oid_not_flushing.erase(oid);
+    context->state_lock.Unlock();
+
+    unsigned flags = librados::OPERATION_IGNORE_CACHE;
+    if (blocking) {
+      op.cache_flush();
+    } else {
+      op.cache_try_flush();
+      flags = librados::OPERATION_SKIPRWLOCKS;
+    }
+    int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
+                                       &op, flags, NULL);
+    assert(!r);
+
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(0);
+    }
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    context->state_lock.Lock();
+    assert(!done);
+    assert(completion->is_complete());
+    context->oid_flushing.erase(oid);
+    context->oid_not_flushing.insert(oid);
+    int r = completion->get_return_value();
+    cout << num << ":  got " << cpp_strerror(r) << std::endl;
+    if (r == 0) {
+      context->update_object_version(oid, 0, snap);
+    } else if (r == -EBUSY) {
+      assert(can_fail);
+    } else if (r == -EINVAL) {
+      // caching not enabled?
+    } else if (r == -ENOENT) {
+      // may have raced with a remove?
+    } else {
+      assert(0 == "shouldn't happen");
+    }
+    context->kick();
+    done = true;
+    context->state_lock.Unlock();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "CacheFlushOp";
+  }
+};
+
+class CacheEvictOp : public TestOp {
+public:
+  librados::AioCompletion *completion;
+  librados::ObjectReadOperation op;
+  string oid;
+  ceph::shared_ptr<int> in_use;
+
+  CacheEvictOp(int n,
+              RadosTestContext *context,
+              const string &oid,
+              TestOpStat *stat)
+    : TestOp(n, context, stat),
+      completion(NULL),
+      oid(oid)
+  {}
+
+  void _begin() override
+  {
+    context->state_lock.Lock();
+
+    int snap;
+    if (!(rand() % 4) && !context->snaps.empty()) {
+      snap = rand_choose(context->snaps)->first;
+      in_use = context->snaps_in_use.lookup_or_create(snap, snap);
+    } else {
+      snap = -1;
+    }
+    cout << num << ": cache_evict oid " << oid << " snap " << snap << std::endl;
+
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(context->snaps[snap]);
+    }
+
+    pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+      new pair<TestOp*, TestOp::CallbackInfo*>(this,
+                                              new TestOp::CallbackInfo(0));
+    completion = context->rados.aio_create_completion((void *) cb_arg, NULL,
+                                                     &write_callback);
+    context->state_lock.Unlock();
+
+    op.cache_evict();
+    int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
+                                       &op, librados::OPERATION_IGNORE_CACHE,
+                                       NULL);
+    assert(!r);
+
+    if (snap >= 0) {
+      context->io_ctx.snap_set_read(0);
+    }
+  }
+
+  void _finish(CallbackInfo *info) override
+  {
+    context->state_lock.Lock();
+    assert(!done);
+    assert(completion->is_complete());
+
+    int r = completion->get_return_value();
+    cout << num << ":  got " << cpp_strerror(r) << std::endl;
+    if (r == 0) {
+      // yay!
+    } else if (r == -EBUSY) {
+      // raced with something that dirtied the object
+    } else if (r == -EINVAL) {
+      // caching not enabled?
+    } else if (r == -ENOENT) {
+      // may have raced with a remove?
+    } else {
+      assert(0 == "shouldn't happen");
+    }
+    context->kick();
+    done = true;
+    context->state_lock.Unlock();
+  }
+
+  bool finished() override
+  {
+    return done;
+  }
+
+  string getType() override
+  {
+    return "CacheEvictOp";
+  }
+};
+
+
+#endif