initial code repo
[stor4nfv.git] / src / ceph / src / librados / IoCtxImpl.cc
diff --git a/src/ceph/src/librados/IoCtxImpl.cc b/src/ceph/src/librados/IoCtxImpl.cc
new file mode 100644 (file)
index 0000000..2805ed8
--- /dev/null
@@ -0,0 +1,2247 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <limits.h>
+
+#include "IoCtxImpl.h"
+
+#include "librados/AioCompletionImpl.h"
+#include "librados/PoolAsyncCompletionImpl.h"
+#include "librados/RadosClient.h"
+#include "include/assert.h"
+#include "common/valgrind.h"
+#include "common/EventTrace.h"
+
+#define dout_subsys ceph_subsys_rados
+#undef dout_prefix
+#define dout_prefix *_dout << "librados: "
+
+namespace librados {
+namespace {
+
+struct C_notify_Finish : public Context {
+  CephContext *cct;
+  Context *ctx;
+  Objecter *objecter;
+  Objecter::LingerOp *linger_op;
+  bufferlist reply_bl;
+  bufferlist *preply_bl;
+  char **preply_buf;
+  size_t *preply_buf_len;
+
+  C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
+                  Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
+                  char **_preply_buf, size_t *_preply_buf_len)
+    : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
+      preply_bl(_preply_bl), preply_buf(_preply_buf),
+      preply_buf_len(_preply_buf_len)
+  {
+    linger_op->on_notify_finish = this;
+    linger_op->notify_result_bl = &reply_bl;
+  }
+
+  void finish(int r) override
+  {
+    ldout(cct, 10) << __func__ << " completed notify (linger op "
+                   << linger_op << "), r = " << r << dendl;
+
+    // pass result back to user
+    // NOTE: we do this regardless of what error code we return
+    if (preply_buf) {
+      if (reply_bl.length()) {
+        *preply_buf = (char*)malloc(reply_bl.length());
+        memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
+      } else {
+        *preply_buf = NULL;
+      }
+    }
+    if (preply_buf_len)
+      *preply_buf_len = reply_bl.length();
+    if (preply_bl)
+      preply_bl->claim(reply_bl);
+
+    ctx->complete(r);
+  }
+};
+
+struct C_aio_linger_cancel : public Context {
+  Objecter *objecter;
+  Objecter::LingerOp *linger_op;
+
+  C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
+    : objecter(_objecter), linger_op(_linger_op)
+  {
+  }
+
+  void finish(int r) override
+  {
+    objecter->linger_cancel(linger_op);
+  }
+};
+
+struct C_aio_linger_Complete : public Context {
+  AioCompletionImpl *c;
+  Objecter::LingerOp *linger_op;
+  bool cancel;
+
+  C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
+    : c(_c), linger_op(_linger_op), cancel(_cancel)
+  {
+    c->get();
+  }
+
+  void finish(int r) override {
+    if (cancel || r < 0)
+      c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
+                                                            linger_op));
+
+    c->lock.Lock();
+    c->rval = r;
+    c->complete = true;
+    c->cond.Signal();
+
+    if (c->callback_complete ||
+       c->callback_safe) {
+      c->io->client->finisher.queue(new C_AioComplete(c));
+    }
+    c->put_unlock();
+  }
+};
+
+struct C_aio_notify_Complete : public C_aio_linger_Complete {
+  Mutex lock;
+  bool acked = false;
+  bool finished = false;
+  int ret_val = 0;
+
+  C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
+    : C_aio_linger_Complete(_c, _linger_op, false),
+      lock("C_aio_notify_Complete::lock") {
+  }
+
+  void handle_ack(int r) {
+    // invoked by C_aio_notify_Ack
+    lock.Lock();
+    acked = true;
+    complete_unlock(r);
+  }
+
+  void complete(int r) override {
+    // invoked by C_notify_Finish (or C_aio_notify_Ack on failure)
+    lock.Lock();
+    finished = true;
+    complete_unlock(r);
+  }
+
+  void complete_unlock(int r) {
+    if (ret_val == 0 && r < 0) {
+      ret_val = r;
+    }
+
+    if (acked && finished) {
+      lock.Unlock();
+      cancel = true;
+      C_aio_linger_Complete::complete(ret_val);
+    } else {
+      lock.Unlock();
+    }
+  }
+};
+
+struct C_aio_notify_Ack : public Context {
+  CephContext *cct;
+  C_notify_Finish *onfinish;
+  C_aio_notify_Complete *oncomplete;
+
+  C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_onfinish,
+                   C_aio_notify_Complete *_oncomplete)
+    : cct(_cct), onfinish(_onfinish), oncomplete(_oncomplete)
+  {
+  }
+
+  void finish(int r) override
+  {
+    ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " "
+                   << "acked (" << r << ")" << dendl;
+    oncomplete->handle_ack(r);
+    if (r < 0) {
+      // on failure, we won't expect to see a notify_finish callback
+      onfinish->complete(r);
+    }
+  }
+};
+
+struct C_aio_selfmanaged_snap_op_Complete : public Context {
+  librados::RadosClient *client;
+  librados::AioCompletionImpl *c;
+
+  C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client,
+                                     librados::AioCompletionImpl *c)
+    : client(client), c(c) {
+    c->get();
+  }
+
+  void finish(int r) override {
+    c->lock.Lock();
+    c->rval = r;
+    c->complete = true;
+    c->cond.Signal();
+
+    if (c->callback_complete || c->callback_safe) {
+      client->finisher.queue(new librados::C_AioComplete(c));
+    }
+    c->put_unlock();
+  }
+};
+
+struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete {
+  snapid_t snapid;
+  uint64_t *dest_snapid;
+
+  C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client,
+                                         librados::AioCompletionImpl *c,
+                                         uint64_t *dest_snapid)
+    : C_aio_selfmanaged_snap_op_Complete(client, c),
+      dest_snapid(dest_snapid) {
+  }
+
+  void finish(int r) override {
+    if (r >= 0) {
+      *dest_snapid = snapid;
+    }
+    C_aio_selfmanaged_snap_op_Complete::finish(r);
+  }
+};
+
+} // anonymous namespace
+} // namespace librados
+
+librados::IoCtxImpl::IoCtxImpl() :
+  ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
+  notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
+  aio_write_seq(0), objecter(NULL)
+{
+}
+
+librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
+                              int64_t poolid, snapid_t s)
+  : ref_cnt(0), client(c), poolid(poolid), snap_seq(s),
+    assert_ver(0), last_objver(0),
+    notify_timeout(c->cct->_conf->client_notify_timeout),
+    oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
+    aio_write_seq(0), objecter(objecter)
+{
+}
+
+void librados::IoCtxImpl::set_snap_read(snapid_t s)
+{
+  if (!s)
+    s = CEPH_NOSNAP;
+  ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl;
+  snap_seq = s;
+}
+
+int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
+{
+  ::SnapContext n;
+  ldout(client->cct, 10) << "set snap write context: seq = " << seq
+                        << " and snaps = " << snaps << dendl;
+  n.seq = seq;
+  n.snaps = snaps;
+  if (!n.is_valid())
+    return -EINVAL;
+  snapc = n;
+  return 0;
+}
+
+int librados::IoCtxImpl::get_object_hash_position(
+    const std::string& oid, uint32_t *hash_position)
+{
+  int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace);
+  if (r < 0)
+    return r;
+  *hash_position = (uint32_t)r;
+  return 0;
+}
+
+int librados::IoCtxImpl::get_object_pg_hash_position(
+    const std::string& oid, uint32_t *pg_hash_position)
+{
+  int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace);
+  if (r < 0)
+    return r;
+  *pg_hash_position = (uint32_t)r;
+  return 0;
+}
+
+void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
+{
+  get();
+  aio_write_list_lock.Lock();
+  assert(c->io == this);
+  c->aio_write_seq = ++aio_write_seq;
+  ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
+                        << " write_seq " << aio_write_seq << dendl;
+  aio_write_list.push_back(&c->aio_write_list_item);
+  aio_write_list_lock.Unlock();
+}
+
+void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
+{
+  ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
+  aio_write_list_lock.Lock();
+  assert(c->io == this);
+  c->aio_write_list_item.remove_myself();
+
+  map<ceph_tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
+  while (waiters != aio_write_waiters.end()) {
+    if (!aio_write_list.empty() &&
+       aio_write_list.front()->aio_write_seq <= waiters->first) {
+      ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
+                            << " <= waiter " << waiters->first
+                            << ", stopping" << dendl;
+      break;
+    }
+    ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
+    for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
+        it != waiters->second.end(); ++it) {
+      client->finisher.queue(new C_AioCompleteAndSafe(*it));
+      (*it)->put();
+    }
+    aio_write_waiters.erase(waiters++);
+  }
+
+  aio_write_cond.Signal();
+  aio_write_list_lock.Unlock();
+  put();
+}
+
+void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
+{
+  ldout(client->cct, 20) << "flush_aio_writes_async " << this
+                        << " completion " << c << dendl;
+  Mutex::Locker l(aio_write_list_lock);
+  ceph_tid_t seq = aio_write_seq;
+  if (aio_write_list.empty()) {
+    ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
+                          << seq << ")" << dendl;
+    client->finisher.queue(new C_AioCompleteAndSafe(c));
+  } else {
+    ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
+                          << " writes in flight; waiting on tid " << seq << dendl;
+    c->get();
+    aio_write_waiters[seq].push_back(c);
+  }
+}
+
+void librados::IoCtxImpl::flush_aio_writes()
+{
+  ldout(client->cct, 20) << "flush_aio_writes" << dendl;
+  aio_write_list_lock.Lock();
+  ceph_tid_t seq = aio_write_seq;
+  while (!aio_write_list.empty() &&
+        aio_write_list.front()->aio_write_seq <= seq)
+    aio_write_cond.Wait(aio_write_list_lock);
+  aio_write_list_lock.Unlock();
+}
+
+string librados::IoCtxImpl::get_cached_pool_name()
+{
+  std::string pn;
+  client->pool_get_name(get_id(), &pn);
+  return pn;
+}
+
+// SNAPS
+
+int librados::IoCtxImpl::snap_create(const char *snapName)
+{
+  int reply;
+  string sName(snapName);
+
+  Mutex mylock ("IoCtxImpl::snap_create::mylock");
+  Cond cond;
+  bool done;
+  Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
+  reply = objecter->create_pool_snap(poolid, sName, onfinish);
+
+  if (reply < 0) {
+    delete onfinish;
+  } else {
+    mylock.Lock();
+    while (!done)
+      cond.Wait(mylock);
+    mylock.Unlock();
+  }
+  return reply;
+}
+
+int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
+{
+  int reply;
+
+  Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
+  Cond cond;
+  bool done;
+  Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
+  snapid_t snapid;
+  reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
+
+  if (reply < 0) {
+    delete onfinish;
+  } else {
+    mylock.Lock();
+    while (!done)
+      cond.Wait(mylock);
+    mylock.Unlock();
+    if (reply == 0)
+      *psnapid = snapid;
+  }
+  return reply;
+}
+
+void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid,
+                                                      AioCompletionImpl *c)
+{
+  C_aio_selfmanaged_snap_create_Complete *onfinish =
+    new C_aio_selfmanaged_snap_create_Complete(client, c, snapid);
+  int r = objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid,
+                                              onfinish);
+  if (r < 0) {
+    onfinish->complete(r);
+  }
+}
+
+int librados::IoCtxImpl::snap_remove(const char *snapName)
+{
+  int reply;
+  string sName(snapName);
+
+  Mutex mylock ("IoCtxImpl::snap_remove::mylock");
+  Cond cond;
+  bool done;
+  Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
+  reply = objecter->delete_pool_snap(poolid, sName, onfinish);
+
+  if (reply < 0) {
+    delete onfinish; 
+  } else {
+    mylock.Lock();
+    while(!done)
+      cond.Wait(mylock);
+    mylock.Unlock();
+  }
+  return reply;
+}
+
+int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
+                                                         ::SnapContext& snapc,
+                                                         uint64_t snapid)
+{
+  int reply;
+
+  Mutex mylock("IoCtxImpl::snap_rollback::mylock");
+  Cond cond;
+  bool done;
+  Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
+
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.rollback(snapid);
+  objecter->mutate(oid, oloc,
+                  op, snapc, ceph::real_clock::now(), 0,
+                  onack, NULL);
+
+  mylock.Lock();
+  while (!done) cond.Wait(mylock);
+  mylock.Unlock();
+  return reply;
+}
+
+int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
+{
+  snapid_t snap;
+
+  int r = objecter->pool_snap_by_name(poolid, snapName, &snap);
+  if (r < 0) {
+    return r;
+  }
+
+  return selfmanaged_snap_rollback_object(oid, snapc, snap);
+}
+
+int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
+{
+  int reply;
+
+  Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
+  Cond cond;
+  bool done;
+  objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
+                                   new C_SafeCond(&mylock, &cond, &done, &reply));
+
+  mylock.Lock();
+  while (!done) cond.Wait(mylock);
+  mylock.Unlock();
+  return (int)reply;
+}
+
+void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid,
+                                                      AioCompletionImpl *c)
+{
+  Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c);
+  objecter->delete_selfmanaged_snap(poolid, snapid, onfinish);
+}
+
+int librados::IoCtxImpl::pool_change_auid(unsigned long long auid)
+{
+  int reply;
+
+  Mutex mylock("IoCtxImpl::pool_change_auid::mylock");
+  Cond cond;
+  bool done;
+  objecter->change_pool_auid(poolid,
+                            new C_SafeCond(&mylock, &cond, &done, &reply),
+                            auid);
+
+  mylock.Lock();
+  while (!done) cond.Wait(mylock);
+  mylock.Unlock();
+  return reply;
+}
+
+int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid,
+                                                 PoolAsyncCompletionImpl *c)
+{
+  objecter->change_pool_auid(poolid,
+                            new C_PoolAsync_Safe(c),
+                            auid);
+  return 0;
+}
+
+int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
+{
+  return objecter->pool_snap_list(poolid, snaps);
+}
+
+int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
+{
+  return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid);
+}
+
+int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
+{
+  pool_snap_info_t info;
+  int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
+  if (ret < 0) {
+    return ret;
+  }
+  *s = info.name.c_str();
+  return 0;
+}
+
+int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
+{
+  pool_snap_info_t info;
+  int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
+  if (ret < 0) {
+    return ret;
+  }
+  *t = info.stamp.sec();
+  return 0;
+}
+
+
+// IO
+
+int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
+{
+  Cond cond;
+  bool done;
+  int r = 0;
+  Mutex mylock("IoCtxImpl::nlist::mylock");
+
+  if (context->at_end())
+    return 0;
+
+  context->max_entries = max_entries;
+  context->nspace = oloc.nspace;
+
+  objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r));
+
+  mylock.Lock();
+  while(!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+
+  return r;
+}
+
+uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
+                                       uint32_t pos)
+{
+  context->list.clear();
+  return objecter->list_nobjects_seek(context, pos);
+}
+
+uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
+                                    const rados_object_list_cursor& cursor)
+{
+  context->list.clear();
+  return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor);
+}
+
+rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context)
+{
+  hobject_t *c = new hobject_t;
+
+  objecter->list_nobjects_get_cursor(context, c);
+  return (rados_object_list_cursor)c;
+}
+
+int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.create(exclusive);
+  return operate(oid, &op, NULL);
+}
+
+/*
+ * add any version assert operations that are appropriate given the
+ * stat in the IoCtx, either the target version assert or any src
+ * object asserts.  these affect a single ioctx operation, so clear
+ * the ioctx state when we're doing.
+ *
+ * return a pointer to the ObjectOperation if we added any events;
+ * this is convenient for passing the extra_ops argument into Objecter
+ * methods.
+ */
+::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
+{
+  ::ObjectOperation *pop = NULL;
+  if (assert_ver) {
+    op->assert_version(assert_ver);
+    assert_ver = 0;
+    pop = op;
+  }
+  return pop;
+}
+
+int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
+                              size_t len, uint64_t off)
+{
+  if (len > UINT_MAX/2)
+    return -E2BIG;
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  bufferlist mybl;
+  mybl.substr_of(bl, 0, len);
+  op.write(off, mybl);
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
+{
+  if (len > UINT_MAX/2)
+    return -E2BIG;
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  bufferlist mybl;
+  mybl.substr_of(bl, 0, len);
+  op.append(mybl);
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
+{
+  if (bl.length() > UINT_MAX/2)
+    return -E2BIG;
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.write_full(bl);
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
+                                  size_t write_len, uint64_t off)
+{
+  if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
+    return -E2BIG;
+  if ((bl.length() == 0) || (write_len % bl.length()))
+    return -EINVAL;
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  bufferlist mybl;
+  mybl.substr_of(bl, 0, bl.length());
+  op.writesame(off, write_len, mybl);
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
+                                ceph::real_time *pmtime, int flags)
+{
+  ceph::real_time ut = (pmtime ? *pmtime :
+    ceph::real_clock::now());
+
+  /* can't write to a snapshot */
+  if (snap_seq != CEPH_NOSNAP)
+    return -EROFS;
+
+  if (!o->size())
+    return 0;
+
+  Mutex mylock("IoCtxImpl::operate::mylock");
+  Cond cond;
+  bool done;
+  int r;
+  version_t ver;
+
+  Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
+
+  int op = o->ops[0].op.op;
+  ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
+                        << " nspace=" << oloc.nspace << dendl;
+  Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
+                                                         *o, snapc, ut, flags,
+                                                         oncommit, &ver);
+  objecter->op_submit(objecter_op);
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+  ldout(client->cct, 10) << "Objecter returned from "
+       << ceph_osd_op_name(op) << " r=" << r << dendl;
+
+  set_sync_op_version(ver);
+
+  return r;
+}
+
+int librados::IoCtxImpl::operate_read(const object_t& oid,
+                                     ::ObjectOperation *o,
+                                     bufferlist *pbl,
+                                     int flags)
+{
+  if (!o->size())
+    return 0;
+
+  Mutex mylock("IoCtxImpl::operate_read::mylock");
+  Cond cond;
+  bool done;
+  int r;
+  version_t ver;
+
+  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+  int op = o->ops[0].op.op;
+  ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
+  Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
+                                             *o, snap_seq, pbl, flags,
+                                             onack, &ver);
+  objecter->op_submit(objecter_op);
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+  ldout(client->cct, 10) << "Objecter returned from "
+       << ceph_osd_op_name(op) << " r=" << r << dendl;
+
+  set_sync_op_version(ver);
+
+  return r;
+}
+
+int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
+                                         ::ObjectOperation *o,
+                                         AioCompletionImpl *c,
+                                         int flags,
+                                         bufferlist *pbl,
+                                          const blkin_trace_info *trace_info)
+{
+  FUNCTRACE();
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  c->is_read = true;
+  c->io = this;
+
+  ZTracer::Trace trace;
+  if (trace_info) {
+    ZTracer::Trace parent_trace("", nullptr, trace_info);
+    trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace);
+  }
+
+  trace.event("init root span");
+  Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
+                *o, snap_seq, pbl, flags,
+                oncomplete, &c->objver, nullptr, 0, &trace);
+  objecter->op_submit(objecter_op, &c->tid);
+  trace.event("rados operate read submitted");
+
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_operate(const object_t& oid,
+                                    ::ObjectOperation *o, AioCompletionImpl *c,
+                                    const SnapContext& snap_context, int flags,
+                                     const blkin_trace_info *trace_info)
+{
+  FUNCTRACE();
+  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
+  auto ut = ceph::real_clock::now();
+  /* can't write to a snapshot */
+  if (snap_seq != CEPH_NOSNAP)
+    return -EROFS;
+
+  Context *oncomplete = new C_aio_Complete(c);
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+
+  c->io = this;
+  queue_aio_write(c);
+
+  ZTracer::Trace trace;
+  if (trace_info) {
+    ZTracer::Trace parent_trace("", nullptr, trace_info);
+    trace.init("rados operate", &objecter->trace_endpoint, &parent_trace);
+  }
+
+  trace.event("init root span");
+  Objecter::Op *op = objecter->prepare_mutate_op(
+    oid, oloc, *o, snap_context, ut, flags,
+    oncomplete, &c->objver, osd_reqid_t(), &trace);
+  objecter->op_submit(op, &c->tid);
+  trace.event("rados operate op submitted");
+
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
+                                 bufferlist *pbl, size_t len, uint64_t off,
+                                 uint64_t snapid, const blkin_trace_info *info)
+{
+  FUNCTRACE();
+  if (len > (size_t) INT_MAX)
+    return -EDOM;
+
+  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  c->is_read = true;
+  c->io = this;
+  c->blp = pbl;
+
+  ZTracer::Trace trace;
+  if (info)
+    trace.init("rados read", &objecter->trace_endpoint, info);
+
+  Objecter::Op *o = objecter->prepare_read_op(
+    oid, oloc,
+    off, len, snapid, pbl, 0,
+    oncomplete, &c->objver, nullptr, 0, &trace);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
+                                 char *buf, size_t len, uint64_t off,
+                                 uint64_t snapid, const blkin_trace_info *info)
+{
+  FUNCTRACE();
+  if (len > (size_t) INT_MAX)
+    return -EDOM;
+
+  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  c->is_read = true;
+  c->io = this;
+  c->bl.clear();
+  c->bl.push_back(buffer::create_static(len, buf));
+  c->blp = &c->bl;
+  c->out_buf = buf;
+
+  ZTracer::Trace trace;
+  if (info)
+    trace.init("rados read", &objecter->trace_endpoint, info);
+
+  Objecter::Op *o = objecter->prepare_read_op(
+    oid, oloc,
+    off, len, snapid, &c->bl, 0,
+    oncomplete, &c->objver, nullptr, 0, &trace);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+class C_ObjectOperation : public Context {
+public:
+  ::ObjectOperation m_ops;
+  explicit C_ObjectOperation(Context *c) : m_ctx(c) {}
+  void finish(int r) override {
+    m_ctx->complete(r);
+  }
+private:
+  Context *m_ctx;
+};
+
+int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
+                                        AioCompletionImpl *c,
+                                        std::map<uint64_t,uint64_t> *m,
+                                        bufferlist *data_bl, size_t len,
+                                        uint64_t off, uint64_t snapid)
+{
+  FUNCTRACE();
+  if (len > (size_t) INT_MAX)
+    return -EDOM;
+
+  Context *nested = new C_aio_Complete(c);
+  C_ObjectOperation *onack = new C_ObjectOperation(nested);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) nested)->oid = oid;
+#endif
+  c->is_read = true;
+  c->io = this;
+
+  onack->m_ops.sparse_read(off, len, m, data_bl, NULL);
+
+  Objecter::Op *o = objecter->prepare_read_op(
+    oid, oloc,
+    onack->m_ops, snapid, NULL, 0,
+    onack, &c->objver);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
+                                   AioCompletionImpl *c,
+                                   uint64_t off,
+                                   bufferlist& cmp_bl)
+{
+  if (cmp_bl.length() > UINT_MAX/2)
+    return -E2BIG;
+
+  Context *onack = new C_aio_Complete(c);
+
+  c->is_read = true;
+  c->io = this;
+
+  Objecter::Op *o = objecter->prepare_cmpext_op(
+    oid, oloc, off, cmp_bl, snap_seq, 0,
+    onack, &c->objver);
+  objecter->op_submit(o, &c->tid);
+
+  return 0;
+}
+
+/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
+int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
+                                   AioCompletionImpl *c,
+                                   const char *cmp_buf,
+                                   size_t cmp_len,
+                                   uint64_t off)
+{
+  if (cmp_len > UINT_MAX/2)
+    return -E2BIG;
+
+  bufferlist cmp_bl;
+  cmp_bl.append(cmp_buf, cmp_len);
+
+  Context *nested = new C_aio_Complete(c);
+  C_ObjectOperation *onack = new C_ObjectOperation(nested);
+
+  c->is_read = true;
+  c->io = this;
+
+  onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
+
+  Objecter::Op *o = objecter->prepare_read_op(
+    oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
+                                  const bufferlist& bl, size_t len,
+                                  uint64_t off, const blkin_trace_info *info)
+{
+  FUNCTRACE();
+  auto ut = ceph::real_clock::now();
+  ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
+  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
+
+  if (len > UINT_MAX/2)
+    return -E2BIG;
+  /* can't write to a snapshot */
+  if (snap_seq != CEPH_NOSNAP)
+    return -EROFS;
+
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  ZTracer::Trace trace;
+  if (info)
+    trace.init("rados write", &objecter->trace_endpoint, info);
+
+  c->io = this;
+  queue_aio_write(c);
+
+  Objecter::Op *o = objecter->prepare_write_op(
+    oid, oloc,
+    off, len, snapc, bl, ut, 0,
+    oncomplete, &c->objver, nullptr, 0, &trace);
+  objecter->op_submit(o, &c->tid);
+
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
+                                   const bufferlist& bl, size_t len)
+{
+  FUNCTRACE();
+  auto ut = ceph::real_clock::now();
+
+  if (len > UINT_MAX/2)
+    return -E2BIG;
+  /* can't write to a snapshot */
+  if (snap_seq != CEPH_NOSNAP)
+    return -EROFS;
+
+  Context *oncomplete = new C_aio_Complete(c);
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+
+  c->io = this;
+  queue_aio_write(c);
+
+  Objecter::Op *o = objecter->prepare_append_op(
+    oid, oloc,
+    len, snapc, bl, ut, 0,
+    oncomplete, &c->objver);
+  objecter->op_submit(o, &c->tid);
+
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_write_full(const object_t &oid,
+                                       AioCompletionImpl *c,
+                                       const bufferlist& bl)
+{
+  FUNCTRACE();
+  auto ut = ceph::real_clock::now();
+
+  if (bl.length() > UINT_MAX/2)
+    return -E2BIG;
+  /* can't write to a snapshot */
+  if (snap_seq != CEPH_NOSNAP)
+    return -EROFS;
+
+  Context *oncomplete = new C_aio_Complete(c);
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+
+  c->io = this;
+  queue_aio_write(c);
+
+  Objecter::Op *o = objecter->prepare_write_full_op(
+    oid, oloc,
+    snapc, bl, ut, 0,
+    oncomplete, &c->objver);
+  objecter->op_submit(o, &c->tid);
+
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_writesame(const object_t &oid,
+                                      AioCompletionImpl *c,
+                                      const bufferlist& bl,
+                                      size_t write_len,
+                                      uint64_t off)
+{
+  FUNCTRACE();
+  auto ut = ceph::real_clock::now();
+
+  if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
+    return -E2BIG;
+  if ((bl.length() == 0) || (write_len % bl.length()))
+    return -EINVAL;
+  /* can't write to a snapshot */
+  if (snap_seq != CEPH_NOSNAP)
+    return -EROFS;
+
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  c->io = this;
+  queue_aio_write(c);
+
+  Objecter::Op *o = objecter->prepare_writesame_op(
+    oid, oloc,
+    write_len, off,
+    snapc, bl, ut, 0,
+    oncomplete, &c->objver);
+  objecter->op_submit(o, &c->tid);
+
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
+{
+  FUNCTRACE();
+  auto ut = ceph::real_clock::now();
+
+  /* can't write to a snapshot */
+  if (snap_seq != CEPH_NOSNAP)
+    return -EROFS;
+
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  c->io = this;
+  queue_aio_write(c);
+
+  Objecter::Op *o = objecter->prepare_remove_op(
+    oid, oloc,
+    snapc, ut, flags,
+    oncomplete, &c->objver);
+  objecter->op_submit(o, &c->tid);
+
+  return 0;
+}
+
+
+int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
+                                 uint64_t *psize, time_t *pmtime)
+{
+  C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime);
+  c->is_read = true;
+  c->io = this;
+  Objecter::Op *o = objecter->prepare_stat_op(
+    oid, oloc,
+    snap_seq, psize, &onack->mtime, 0,
+    onack, &c->objver);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c,
+                                 uint64_t *psize, struct timespec *pts)
+{
+  C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts);
+  c->is_read = true;
+  c->io = this;
+  Objecter::Op *o = objecter->prepare_stat_op(
+    oid, oloc,
+    snap_seq, psize, &onack->mtime, 0,
+    onack, &c->objver);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c,
+                                     const char *name, bufferlist& bl)
+{
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.getxattr(name, &bl, NULL);
+  int r = aio_operate_read(oid, &rd, c, 0, &bl);
+  return r;
+}
+
+int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c,
+                                    const char *name)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.rmxattr(name);
+  return aio_operate(oid, &op, c, snapc, 0);
+}
+
+int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c,
+                                     const char *name, bufferlist& bl)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.setxattr(name, bl);
+  return aio_operate(oid, &op, c, snapc, 0);
+}
+
+namespace {
+struct AioGetxattrsData {
+  AioGetxattrsData(librados::AioCompletionImpl *c, map<string, bufferlist>* attrset,
+                  librados::RadosClient *_client) :
+    user_completion(c), user_attrset(attrset), client(_client) {}
+  struct librados::C_AioCompleteAndSafe user_completion;
+  map<string, bufferlist> result_attrset;
+  map<std::string, bufferlist>* user_attrset;
+  librados::RadosClient *client;
+};
+}
+
+static void aio_getxattrs_complete(rados_completion_t c, void *arg) {
+  AioGetxattrsData *cdata = reinterpret_cast<AioGetxattrsData*>(arg);
+  int rc = rados_aio_get_return_value(c);
+  cdata->user_attrset->clear();
+  if (rc >= 0) {
+    for (map<string,bufferlist>::iterator p = cdata->result_attrset.begin();
+        p != cdata->result_attrset.end();
+        ++p) {
+      ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
+      (*cdata->user_attrset)[p->first] = p->second;
+    }
+  }
+  cdata->user_completion.finish(rc);
+  ((librados::AioCompletionImpl*)c)->put();
+  delete cdata;
+}
+
+int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c,
+                                      map<std::string, bufferlist>& attrset)
+{
+  AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client);
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.getxattrs(&cdata->result_attrset, NULL);
+  librados::AioCompletionImpl *comp = new librados::AioCompletionImpl;
+  comp->set_complete_callback(cdata, aio_getxattrs_complete);
+  return aio_operate_read(oid, &rd, comp, 0, NULL);
+}
+
+int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c)
+{
+  return objecter->op_cancel(c->tid, -ECANCELED);
+}
+
+
+int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
+                             std::list< std::pair<time_t, time_t> > *pls)
+{
+  Context *oncomplete = new C_aio_Complete(c);
+  c->is_read = true;
+  c->io = this;
+
+  ::ObjectOperation rd;
+  rd.hit_set_ls(pls, NULL);
+  object_locator_t oloc(poolid);
+  Objecter::Op *o = objecter->prepare_pg_read_op(
+    hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
+                                    time_t stamp,
+                                    bufferlist *pbl)
+{
+  Context *oncomplete = new C_aio_Complete(c);
+  c->is_read = true;
+  c->io = this;
+
+  ::ObjectOperation rd;
+  rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
+  object_locator_t oloc(poolid);
+  Objecter::Op *o = objecter->prepare_pg_read_op(
+    hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::remove(const object_t& oid)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.remove();
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::remove(const object_t& oid, int flags)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.remove();
+  return operate(oid, &op, NULL, flags);
+}
+
+int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.truncate(size);
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg,
+                                                 const librados::object_id_t& start_after,
+                                                 uint64_t max_to_get,
+                                                 AioCompletionImpl *c,
+                                                 std::vector<inconsistent_obj_t>* objects,
+                                                 uint32_t* interval)
+{
+  Context *oncomplete = new C_aio_Complete(c);
+  c->is_read = true;
+  c->io = this;
+
+  ::ObjectOperation op;
+  op.scrub_ls(start_after, max_to_get, objects, interval, nullptr);
+  object_locator_t oloc{poolid, pg.ps()};
+  Objecter::Op *o = objecter->prepare_pg_read_op(
+    oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
+    nullptr, nullptr);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg,
+                                                  const librados::object_id_t& start_after,
+                                                  uint64_t max_to_get,
+                                                  AioCompletionImpl *c,
+                                                  std::vector<inconsistent_snapset_t>* snapsets,
+                                                  uint32_t* interval)
+{
+  Context *oncomplete = new C_aio_Complete(c);
+  c->is_read = true;
+  c->io = this;
+
+  ::ObjectOperation op;
+  op.scrub_ls(start_after, max_to_get, snapsets, interval, nullptr);
+  object_locator_t oloc{poolid, pg.ps()};
+  Objecter::Op *o = objecter->prepare_pg_read_op(
+    oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
+    nullptr, nullptr);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
+{
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.tmap_update(cmdbl);
+  return operate(oid, &wr, NULL);
+}
+
+int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl)
+{
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.tmap_put(bl);
+  return operate(oid, &wr, NULL);
+}
+
+int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl)
+{
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.tmap_get(&bl, NULL);
+  return operate_read(oid, &rd, NULL);
+}
+
+int librados::IoCtxImpl::tmap_to_omap(const object_t& oid, bool nullok)
+{
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.tmap_to_omap(nullok);
+  return operate(oid, &wr, NULL);
+}
+
+int librados::IoCtxImpl::exec(const object_t& oid,
+                             const char *cls, const char *method,
+                             bufferlist& inbl, bufferlist& outbl)
+{
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.call(cls, method, inbl);
+  return operate_read(oid, &rd, &outbl);
+}
+
+int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
+                                 const char *cls, const char *method,
+                                 bufferlist& inbl, bufferlist *outbl)
+{
+  FUNCTRACE();
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  c->is_read = true;
+  c->io = this;
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.call(cls, method, inbl);
+  Objecter::Op *o = objecter->prepare_read_op(
+    oid, oloc, rd, snap_seq, outbl, 0, oncomplete, &c->objver);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
+                                 const char *cls, const char *method,
+                                 bufferlist& inbl, char *buf, size_t out_len)
+{
+  FUNCTRACE();
+  Context *oncomplete = new C_aio_Complete(c);
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  ((C_aio_Complete *) oncomplete)->oid = oid;
+#endif
+  c->is_read = true;
+  c->io = this;
+  c->bl.clear();
+  c->bl.push_back(buffer::create_static(out_len, buf));
+  c->blp = &c->bl;
+  c->out_buf = buf;
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.call(cls, method, inbl);
+  Objecter::Op *o = objecter->prepare_read_op(
+    oid, oloc, rd, snap_seq, &c->bl, 0, oncomplete, &c->objver);
+  objecter->op_submit(o, &c->tid);
+  return 0;
+}
+
+int librados::IoCtxImpl::read(const object_t& oid,
+                             bufferlist& bl, size_t len, uint64_t off)
+{
+  if (len > (size_t) INT_MAX)
+    return -EDOM;
+  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.read(off, len, &bl, NULL, NULL);
+  int r = operate_read(oid, &rd, &bl);
+  if (r < 0)
+    return r;
+
+  if (bl.length() < len) {
+    ldout(client->cct, 10) << "Returned length " << bl.length()
+            << " less than original length "<< len << dendl;
+  }
+
+  return bl.length();
+}
+
+int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
+                                bufferlist& cmp_bl)
+{
+  if (cmp_bl.length() > UINT_MAX/2)
+    return -E2BIG;
+
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.cmpext(off, cmp_bl, NULL);
+  return operate_read(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::mapext(const object_t& oid,
+                               uint64_t off, size_t len,
+                               std::map<uint64_t,uint64_t>& m)
+{
+  bufferlist bl;
+
+  Mutex mylock("IoCtxImpl::read::mylock");
+  Cond cond;
+  bool done;
+  int r;
+  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+
+  objecter->mapext(oid, oloc,
+                  off, len, snap_seq, &bl, 0,
+                  onack);
+
+  mylock.Lock();
+  while (!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
+  ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
+
+  if (r < 0)
+    return r;
+
+  bufferlist::iterator iter = bl.begin();
+  ::decode(m, iter);
+
+  return m.size();
+}
+
+int librados::IoCtxImpl::sparse_read(const object_t& oid,
+                                    std::map<uint64_t,uint64_t>& m,
+                                    bufferlist& data_bl, size_t len,
+                                    uint64_t off)
+{
+  if (len > (size_t) INT_MAX)
+    return -EDOM;
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.sparse_read(off, len, &m, &data_bl, NULL);
+
+  int r = operate_read(oid, &rd, NULL);
+  if (r < 0)
+    return r;
+
+  return m.size();
+}
+
+int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
+                                 const bufferlist &init_value, size_t len,
+                                 uint64_t off, size_t chunk_size,
+                                 bufferlist *pbl)
+{
+  if (len > (size_t) INT_MAX) {
+    return -EDOM;
+  }
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
+
+  int r = operate_read(oid, &rd, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
+{
+  uint64_t size;
+  real_time mtime;
+
+  if (!psize)
+    psize = &size;
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.stat(psize, &mtime, NULL);
+  int r = operate_read(oid, &rd, NULL);
+
+  if (r >= 0 && pmtime) {
+    *pmtime = real_clock::to_time_t(mtime);
+  }
+
+  return r;
+}
+
+int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts)
+{
+  uint64_t size;
+  ceph::real_time mtime;
+
+  if (!psize)
+    psize = &size;
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.stat(psize, &mtime, NULL);
+  int r = operate_read(oid, &rd, NULL);
+  if (r < 0) {
+    return r;
+  }
+
+  if (pts) {
+    *pts = ceph::real_clock::to_timespec(mtime);
+  }
+
+  return 0;
+}
+
+int librados::IoCtxImpl::getxattr(const object_t& oid,
+                                   const char *name, bufferlist& bl)
+{
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.getxattr(name, &bl, NULL);
+  int r = operate_read(oid, &rd, &bl);
+  if (r < 0)
+    return r;
+
+  return bl.length();
+}
+
+int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.rmxattr(name);
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::setxattr(const object_t& oid,
+                                   const char *name, bufferlist& bl)
+{
+  ::ObjectOperation op;
+  prepare_assert_ops(&op);
+  op.setxattr(name, bl);
+  return operate(oid, &op, NULL);
+}
+
+int librados::IoCtxImpl::getxattrs(const object_t& oid,
+                                    map<std::string, bufferlist>& attrset)
+{
+  map<string, bufferlist> aset;
+
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.getxattrs(&aset, NULL);
+  int r = operate_read(oid, &rd, NULL);
+
+  attrset.clear();
+  if (r >= 0) {
+    for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); ++p) {
+      ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
+      attrset[p->first.c_str()] = p->second;
+    }
+  }
+
+  return r;
+}
+
+void librados::IoCtxImpl::set_sync_op_version(version_t ver)
+{
+  ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver),
+                             "IoCtxImpl last_objver");
+  last_objver = ver;
+}
+
+struct WatchInfo : public Objecter::WatchContext {
+  librados::IoCtxImpl *ioctx;
+  object_t oid;
+  librados::WatchCtx *ctx;
+  librados::WatchCtx2 *ctx2;
+  bool internal = false;
+
+  WatchInfo(librados::IoCtxImpl *io, object_t o,
+           librados::WatchCtx *c, librados::WatchCtx2 *c2,
+            bool inter)
+    : ioctx(io), oid(o), ctx(c), ctx2(c2), internal(inter) {
+    ioctx->get();
+  }
+  ~WatchInfo() override {
+    ioctx->put();
+    if (internal) {
+      delete ctx;
+      delete ctx2;
+    }
+  }
+
+  void handle_notify(uint64_t notify_id,
+                    uint64_t cookie,
+                    uint64_t notifier_id,
+                    bufferlist& bl) override {
+    ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
+                                 << " cookie " << cookie
+                                 << " notifier_id " << notifier_id
+                                 << " len " << bl.length()
+                                 << dendl;
+
+    if (ctx2)
+      ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
+    if (ctx) {
+      ctx->notify(0, 0, bl);
+
+      // send ACK back to OSD if using legacy protocol
+      bufferlist empty;
+      ioctx->notify_ack(oid, notify_id, cookie, empty);
+    }
+  }
+  void handle_error(uint64_t cookie, int err) override {
+    ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
+                                 << " err " << err
+                                 << dendl;
+    if (ctx2)
+      ctx2->handle_error(cookie, err);
+  }
+};
+
+int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
+                               librados::WatchCtx *ctx,
+                               librados::WatchCtx2 *ctx2,
+                               bool internal)
+{
+  return watch(oid, handle, ctx, ctx2, 0, internal);
+}
+
+int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
+                               librados::WatchCtx *ctx,
+                               librados::WatchCtx2 *ctx2,
+                               uint32_t timeout,
+                               bool internal)
+{
+  ::ObjectOperation wr;
+  version_t objver;
+  C_SaferCond onfinish;
+
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+  *handle = linger_op->get_cookie();
+  linger_op->watch_context = new WatchInfo(this,
+                                          oid, ctx, ctx2, internal);
+
+  prepare_assert_ops(&wr);
+  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
+  bufferlist bl;
+  objecter->linger_watch(linger_op, wr,
+                        snapc, ceph::real_clock::now(), bl,
+                        &onfinish,
+                        &objver);
+
+  int r = onfinish.wait();
+
+  set_sync_op_version(objver);
+
+  if (r < 0) {
+    objecter->linger_cancel(linger_op);
+    *handle = 0;
+  }
+
+  return r;
+}
+
+int librados::IoCtxImpl::aio_watch(const object_t& oid,
+                                   AioCompletionImpl *c,
+                                   uint64_t *handle,
+                                   librados::WatchCtx *ctx,
+                                   librados::WatchCtx2 *ctx2,
+                                   bool internal) {
+  return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
+}
+
+int librados::IoCtxImpl::aio_watch(const object_t& oid,
+                                   AioCompletionImpl *c,
+                                   uint64_t *handle,
+                                   librados::WatchCtx *ctx,
+                                   librados::WatchCtx2 *ctx2,
+                                   uint32_t timeout,
+                                   bool internal)
+{
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+  c->io = this;
+  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
+
+  ::ObjectOperation wr;
+  *handle = linger_op->get_cookie();
+  linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);
+
+  prepare_assert_ops(&wr);
+  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
+  bufferlist bl;
+  objecter->linger_watch(linger_op, wr,
+                         snapc, ceph::real_clock::now(), bl,
+                         oncomplete, &c->objver);
+
+  return 0;
+}
+
+
+int librados::IoCtxImpl::notify_ack(
+  const object_t& oid,
+  uint64_t notify_id,
+  uint64_t cookie,
+  bufferlist& bl)
+{
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  rd.notify_ack(notify_id, cookie, bl);
+  objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
+  return 0;
+}
+
+int librados::IoCtxImpl::watch_check(uint64_t cookie)
+{
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+  return objecter->linger_check(linger_op);
+}
+
+int librados::IoCtxImpl::unwatch(uint64_t cookie)
+{
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+  C_SaferCond onfinish;
+  version_t ver = 0;
+
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+  objecter->mutate(linger_op->target.base_oid, oloc, wr,
+                  snapc, ceph::real_clock::now(), 0,
+                  &onfinish, &ver);
+  objecter->linger_cancel(linger_op);
+
+  int r = onfinish.wait();
+  set_sync_op_version(ver);
+  return r;
+}
+
+int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
+{
+  c->io = this;
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
+
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+  objecter->mutate(linger_op->target.base_oid, oloc, wr,
+                  snapc, ceph::real_clock::now(), 0,
+                  oncomplete, &c->objver);
+  return 0;
+}
+
+int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
+                               uint64_t timeout_ms,
+                               bufferlist *preply_bl,
+                               char **preply_buf, size_t *preply_buf_len)
+{
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+
+  C_SaferCond notify_finish_cond;
+  Context *notify_finish = new C_notify_Finish(client->cct, &notify_finish_cond,
+                                               objecter, linger_op, preply_bl,
+                                               preply_buf, preply_buf_len);
+
+  uint32_t timeout = notify_timeout;
+  if (timeout_ms)
+    timeout = timeout_ms / 1000;
+
+  // Construct RADOS op
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  bufferlist inbl;
+  rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
+
+  // Issue RADOS op
+  C_SaferCond onack;
+  version_t objver;
+  objecter->linger_notify(linger_op,
+                         rd, snap_seq, inbl, NULL,
+                         &onack, &objver);
+
+  ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
+  int r = onack.wait();
+  ldout(client->cct, 10) << __func__ << " linger op " << linger_op
+                        << " acked (" << r << ")" << dendl;
+
+  if (r == 0) {
+    ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
+                          << linger_op << dendl;
+    r = notify_finish_cond.wait();
+
+  } else {
+    ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
+                          << r << dendl;
+    notify_finish->complete(r);
+  }
+
+  objecter->linger_cancel(linger_op);
+
+  set_sync_op_version(objver);
+  return r;
+}
+
+int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
+                                    bufferlist& bl, uint64_t timeout_ms,
+                                    bufferlist *preply_bl, char **preply_buf,
+                                    size_t *preply_buf_len)
+{
+  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+
+  c->io = this;
+
+  C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
+  C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
+                                                  objecter, linger_op,
+                                                  preply_bl, preply_buf,
+                                                  preply_buf_len);
+  Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete);
+
+  uint32_t timeout = notify_timeout;
+  if (timeout_ms)
+    timeout = timeout_ms / 1000;
+
+  // Construct RADOS op
+  ::ObjectOperation rd;
+  prepare_assert_ops(&rd);
+  bufferlist inbl;
+  rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
+
+  // Issue RADOS op
+  objecter->linger_notify(linger_op,
+                         rd, snap_seq, inbl, NULL,
+                         onack, &c->objver);
+  return 0;
+}
+
+int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
+                                        uint64_t expected_object_size,
+                                        uint64_t expected_write_size,
+                                       uint32_t flags)
+{
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.set_alloc_hint(expected_object_size, expected_write_size, flags);
+  return operate(oid, &wr, NULL);
+}
+
+version_t librados::IoCtxImpl::last_version()
+{
+  return last_objver;
+}
+
+void librados::IoCtxImpl::set_assert_version(uint64_t ver)
+{
+  assert_ver = ver;
+}
+
+void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
+{
+  notify_timeout = timeout;
+}
+
+int librados::IoCtxImpl::cache_pin(const object_t& oid)
+{
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.cache_pin();
+  return operate(oid, &wr, NULL);
+}
+
+int librados::IoCtxImpl::cache_unpin(const object_t& oid)
+{
+  ::ObjectOperation wr;
+  prepare_assert_ops(&wr);
+  wr.cache_unpin();
+  return operate(oid, &wr, NULL);
+}
+
+
+///////////////////////////// C_aio_stat_Ack ////////////////////////////
+
+librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c,
+                                                   time_t *pm)
+   : c(_c), pmtime(pm)
+{
+  assert(!c->io);
+  c->get();
+}
+
+void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
+{
+  c->lock.Lock();
+  c->rval = r;
+  c->complete = true;
+  c->cond.Signal();
+
+  if (r >= 0 && pmtime) {
+    *pmtime = real_clock::to_time_t(mtime);
+  }
+
+  if (c->callback_complete) {
+    c->io->client->finisher.queue(new C_AioComplete(c));
+  }
+
+  c->put_unlock();
+}
+
+///////////////////////////// C_aio_stat2_Ack ////////////////////////////
+
+librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c,
+                                                    struct timespec *pt)
+   : c(_c), pts(pt)
+{
+  assert(!c->io);
+  c->get();
+}
+
+void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
+{
+  c->lock.Lock();
+  c->rval = r;
+  c->complete = true;
+  c->cond.Signal();
+
+  if (r >= 0 && pts) {
+    *pts = real_clock::to_timespec(mtime);
+  }
+
+  if (c->callback_complete) {
+    c->io->client->finisher.queue(new C_AioComplete(c));
+  }
+
+  c->put_unlock();
+}
+
+//////////////////////////// C_aio_Complete ////////////////////////////////
+
+librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c)
+  : c(_c)
+{
+  c->get();
+}
+
+void librados::IoCtxImpl::C_aio_Complete::finish(int r)
+{
+  c->lock.Lock();
+  c->rval = r;
+  c->complete = true;
+  c->cond.Signal();
+
+  if (r == 0 && c->blp && c->blp->length() > 0) {
+    if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf))
+      c->blp->copy(0, c->blp->length(), c->out_buf);
+    c->rval = c->blp->length();
+  }
+
+  if (c->callback_complete ||
+      c->callback_safe) {
+    c->io->client->finisher.queue(new C_AioComplete(c));
+  }
+
+  if (c->aio_write_seq) {
+    c->io->complete_aio_write(c);
+  }
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
+#endif
+  c->put_unlock();
+}
+
+void librados::IoCtxImpl::object_list_slice(
+  const hobject_t start,
+  const hobject_t finish,
+  const size_t n,
+  const size_t m,
+  hobject_t *split_start,
+  hobject_t *split_finish)
+{
+  if (start.is_max()) {
+    *split_start = hobject_t::get_max();
+    *split_finish = hobject_t::get_max();
+    return;
+  }
+
+  uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash());
+  uint64_t finish_hash =
+    finish.is_max() ? 0x100000000 :
+    hobject_t::_reverse_bits(finish.get_hash());
+
+  uint64_t diff = finish_hash - start_hash;
+  uint64_t rev_start = start_hash + (diff * n / m);
+  uint64_t rev_finish = start_hash + (diff * (n + 1) / m);
+  if (n == 0) {
+    *split_start = start;
+  } else {
+    *split_start = hobject_t(
+      object_t(), string(), CEPH_NOSNAP,
+      hobject_t::_reverse_bits(rev_start), poolid, string());
+  }
+
+  if (n == m - 1)
+    *split_finish = finish;
+  else if (rev_finish >= 0x100000000)
+    *split_finish = hobject_t::get_max();
+  else
+    *split_finish = hobject_t(
+      object_t(), string(), CEPH_NOSNAP,
+      hobject_t::_reverse_bits(rev_finish), poolid, string());
+}
+
+int librados::IoCtxImpl::application_enable(const std::string& app_name,
+                                            bool force)
+{
+  auto c = new PoolAsyncCompletionImpl();
+  application_enable_async(app_name, force, c);
+
+  int r = c->wait();
+  assert(r == 0);
+
+  r = c->get_return_value();
+  c->release();
+  if (r < 0) {
+    return r;
+  }
+
+  return client->wait_for_latest_osdmap();
+}
+
+void librados::IoCtxImpl::application_enable_async(const std::string& app_name,
+                                                   bool force,
+                                                   PoolAsyncCompletionImpl *c)
+{
+  // pre-Luminous clusters will return -EINVAL and application won't be
+  // preserved until Luminous is configured as minimim version.
+  if (!client->get_required_monitor_features().contains_all(
+        ceph::features::mon::FEATURE_LUMINOUS)) {
+    client->finisher.queue(new C_PoolAsync_Safe(c), -EOPNOTSUPP);
+    return;
+  }
+
+  std::stringstream cmd;
+  cmd << "{"
+      << "\"prefix\": \"osd pool application enable\","
+      << "\"pool\": \"" << get_cached_pool_name() << "\","
+      << "\"app\": \"" << app_name << "\"";
+  if (force) {
+    cmd << ",\"force\":\"--yes-i-really-mean-it\"";
+  }
+  cmd << "}";
+
+  std::vector<std::string> cmds;
+  cmds.push_back(cmd.str());
+  bufferlist inbl;
+  client->mon_command_async(cmds, inbl, nullptr, nullptr,
+                            new C_PoolAsync_Safe(c));
+}
+
+int librados::IoCtxImpl::application_list(std::set<std::string> *app_names)
+{
+  int r = 0;
+  app_names->clear();
+  objecter->with_osdmap([&](const OSDMap& o) {
+      auto pg_pool = o.get_pg_pool(poolid);
+      if (pg_pool == nullptr) {
+       r = -ENOENT;
+        return;
+      }
+
+      for (auto &pair : pg_pool->application_metadata) {
+        app_names->insert(pair.first);
+      }
+    });
+  return r;
+}
+
+int librados::IoCtxImpl::application_metadata_get(const std::string& app_name,
+                                                  const std::string &key,
+                                                  std::string* value)
+{
+  int r = 0;
+  objecter->with_osdmap([&](const OSDMap& o) {
+      auto pg_pool = o.get_pg_pool(poolid);
+      if (pg_pool == nullptr) {
+       r = -ENOENT;
+        return;
+      }
+
+      auto app_it = pg_pool->application_metadata.find(app_name);
+      if (app_it == pg_pool->application_metadata.end()) {
+        r = -ENOENT;
+        return;
+      }
+
+      auto it = app_it->second.find(key);
+      if (it == app_it->second.end()) {
+        r = -ENOENT;
+        return;
+      }
+
+      *value = it->second;
+    });
+  return r;
+}
+
+int librados::IoCtxImpl::application_metadata_set(const std::string& app_name,
+                                                  const std::string &key,
+                                                  const std::string& value)
+{
+  std::stringstream cmd;
+  cmd << "{"
+      << "\"prefix\":\"osd pool application set\","
+      << "\"pool\":\"" << get_cached_pool_name() << "\","
+      << "\"app\":\"" << app_name << "\","
+      << "\"key\":\"" << key << "\","
+      << "\"value\":\"" << value << "\""
+      << "}";
+
+  std::vector<std::string> cmds;
+  cmds.push_back(cmd.str());
+  bufferlist inbl;
+  int r = client->mon_command(cmds, inbl, nullptr, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
+  // ensure we have the latest osd map epoch before proceeding
+  return client->wait_for_latest_osdmap();
+}
+
+int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name,
+                                                     const std::string &key)
+{
+  std::stringstream cmd;
+  cmd << "{"
+      << "\"prefix\":\"osd pool application rm\","
+      << "\"pool\":\"" << get_cached_pool_name() << "\","
+      << "\"app\":\"" << app_name << "\","
+      << "\"key\":\"" << key << "\""
+      << "}";
+
+  std::vector<std::string> cmds;
+  cmds.push_back(cmd.str());
+  bufferlist inbl;
+  int r = client->mon_command(cmds, inbl, nullptr, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
+  // ensure we have the latest osd map epoch before proceeding
+  return client->wait_for_latest_osdmap();
+}
+
+int librados::IoCtxImpl::application_metadata_list(const std::string& app_name,
+                                                   std::map<std::string, std::string> *values)
+{
+  int r = 0;
+  values->clear();
+  objecter->with_osdmap([&](const OSDMap& o) {
+      auto pg_pool = o.get_pg_pool(poolid);
+      if (pg_pool == nullptr) {
+        r = -ENOENT;
+        return;
+      }
+
+      auto it = pg_pool->application_metadata.find(app_name);
+      if (it == pg_pool->application_metadata.end()) {
+        r = -ENOENT;
+        return;
+      }
+
+      *values = it->second;
+    });
+  return r;
+}
+