remove ceph code
[stor4nfv.git] / src / ceph / src / librados / IoCtxImpl.cc
diff --git a/src/ceph/src/librados/IoCtxImpl.cc b/src/ceph/src/librados/IoCtxImpl.cc
deleted file mode 100644 (file)
index 2805ed8..0000000
+++ /dev/null
@@ -1,2247 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include <limits.h>
-
-#include "IoCtxImpl.h"
-
-#include "librados/AioCompletionImpl.h"
-#include "librados/PoolAsyncCompletionImpl.h"
-#include "librados/RadosClient.h"
-#include "include/assert.h"
-#include "common/valgrind.h"
-#include "common/EventTrace.h"
-
-#define dout_subsys ceph_subsys_rados
-#undef dout_prefix
-#define dout_prefix *_dout << "librados: "
-
-namespace librados {
-namespace {
-
-struct C_notify_Finish : public Context {
-  CephContext *cct;
-  Context *ctx;
-  Objecter *objecter;
-  Objecter::LingerOp *linger_op;
-  bufferlist reply_bl;
-  bufferlist *preply_bl;
-  char **preply_buf;
-  size_t *preply_buf_len;
-
-  C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
-                  Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
-                  char **_preply_buf, size_t *_preply_buf_len)
-    : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
-      preply_bl(_preply_bl), preply_buf(_preply_buf),
-      preply_buf_len(_preply_buf_len)
-  {
-    linger_op->on_notify_finish = this;
-    linger_op->notify_result_bl = &reply_bl;
-  }
-
-  void finish(int r) override
-  {
-    ldout(cct, 10) << __func__ << " completed notify (linger op "
-                   << linger_op << "), r = " << r << dendl;
-
-    // pass result back to user
-    // NOTE: we do this regardless of what error code we return
-    if (preply_buf) {
-      if (reply_bl.length()) {
-        *preply_buf = (char*)malloc(reply_bl.length());
-        memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
-      } else {
-        *preply_buf = NULL;
-      }
-    }
-    if (preply_buf_len)
-      *preply_buf_len = reply_bl.length();
-    if (preply_bl)
-      preply_bl->claim(reply_bl);
-
-    ctx->complete(r);
-  }
-};
-
-struct C_aio_linger_cancel : public Context {
-  Objecter *objecter;
-  Objecter::LingerOp *linger_op;
-
-  C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
-    : objecter(_objecter), linger_op(_linger_op)
-  {
-  }
-
-  void finish(int r) override
-  {
-    objecter->linger_cancel(linger_op);
-  }
-};
-
-struct C_aio_linger_Complete : public Context {
-  AioCompletionImpl *c;
-  Objecter::LingerOp *linger_op;
-  bool cancel;
-
-  C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
-    : c(_c), linger_op(_linger_op), cancel(_cancel)
-  {
-    c->get();
-  }
-
-  void finish(int r) override {
-    if (cancel || r < 0)
-      c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
-                                                            linger_op));
-
-    c->lock.Lock();
-    c->rval = r;
-    c->complete = true;
-    c->cond.Signal();
-
-    if (c->callback_complete ||
-       c->callback_safe) {
-      c->io->client->finisher.queue(new C_AioComplete(c));
-    }
-    c->put_unlock();
-  }
-};
-
-struct C_aio_notify_Complete : public C_aio_linger_Complete {
-  Mutex lock;
-  bool acked = false;
-  bool finished = false;
-  int ret_val = 0;
-
-  C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
-    : C_aio_linger_Complete(_c, _linger_op, false),
-      lock("C_aio_notify_Complete::lock") {
-  }
-
-  void handle_ack(int r) {
-    // invoked by C_aio_notify_Ack
-    lock.Lock();
-    acked = true;
-    complete_unlock(r);
-  }
-
-  void complete(int r) override {
-    // invoked by C_notify_Finish (or C_aio_notify_Ack on failure)
-    lock.Lock();
-    finished = true;
-    complete_unlock(r);
-  }
-
-  void complete_unlock(int r) {
-    if (ret_val == 0 && r < 0) {
-      ret_val = r;
-    }
-
-    if (acked && finished) {
-      lock.Unlock();
-      cancel = true;
-      C_aio_linger_Complete::complete(ret_val);
-    } else {
-      lock.Unlock();
-    }
-  }
-};
-
-struct C_aio_notify_Ack : public Context {
-  CephContext *cct;
-  C_notify_Finish *onfinish;
-  C_aio_notify_Complete *oncomplete;
-
-  C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_onfinish,
-                   C_aio_notify_Complete *_oncomplete)
-    : cct(_cct), onfinish(_onfinish), oncomplete(_oncomplete)
-  {
-  }
-
-  void finish(int r) override
-  {
-    ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " "
-                   << "acked (" << r << ")" << dendl;
-    oncomplete->handle_ack(r);
-    if (r < 0) {
-      // on failure, we won't expect to see a notify_finish callback
-      onfinish->complete(r);
-    }
-  }
-};
-
-struct C_aio_selfmanaged_snap_op_Complete : public Context {
-  librados::RadosClient *client;
-  librados::AioCompletionImpl *c;
-
-  C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client,
-                                     librados::AioCompletionImpl *c)
-    : client(client), c(c) {
-    c->get();
-  }
-
-  void finish(int r) override {
-    c->lock.Lock();
-    c->rval = r;
-    c->complete = true;
-    c->cond.Signal();
-
-    if (c->callback_complete || c->callback_safe) {
-      client->finisher.queue(new librados::C_AioComplete(c));
-    }
-    c->put_unlock();
-  }
-};
-
-struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete {
-  snapid_t snapid;
-  uint64_t *dest_snapid;
-
-  C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client,
-                                         librados::AioCompletionImpl *c,
-                                         uint64_t *dest_snapid)
-    : C_aio_selfmanaged_snap_op_Complete(client, c),
-      dest_snapid(dest_snapid) {
-  }
-
-  void finish(int r) override {
-    if (r >= 0) {
-      *dest_snapid = snapid;
-    }
-    C_aio_selfmanaged_snap_op_Complete::finish(r);
-  }
-};
-
-} // anonymous namespace
-} // namespace librados
-
-librados::IoCtxImpl::IoCtxImpl() :
-  ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
-  notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
-  aio_write_seq(0), objecter(NULL)
-{
-}
-
-librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
-                              int64_t poolid, snapid_t s)
-  : ref_cnt(0), client(c), poolid(poolid), snap_seq(s),
-    assert_ver(0), last_objver(0),
-    notify_timeout(c->cct->_conf->client_notify_timeout),
-    oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
-    aio_write_seq(0), objecter(objecter)
-{
-}
-
-void librados::IoCtxImpl::set_snap_read(snapid_t s)
-{
-  if (!s)
-    s = CEPH_NOSNAP;
-  ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl;
-  snap_seq = s;
-}
-
-int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
-{
-  ::SnapContext n;
-  ldout(client->cct, 10) << "set snap write context: seq = " << seq
-                        << " and snaps = " << snaps << dendl;
-  n.seq = seq;
-  n.snaps = snaps;
-  if (!n.is_valid())
-    return -EINVAL;
-  snapc = n;
-  return 0;
-}
-
-int librados::IoCtxImpl::get_object_hash_position(
-    const std::string& oid, uint32_t *hash_position)
-{
-  int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace);
-  if (r < 0)
-    return r;
-  *hash_position = (uint32_t)r;
-  return 0;
-}
-
-int librados::IoCtxImpl::get_object_pg_hash_position(
-    const std::string& oid, uint32_t *pg_hash_position)
-{
-  int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace);
-  if (r < 0)
-    return r;
-  *pg_hash_position = (uint32_t)r;
-  return 0;
-}
-
-void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
-{
-  get();
-  aio_write_list_lock.Lock();
-  assert(c->io == this);
-  c->aio_write_seq = ++aio_write_seq;
-  ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
-                        << " write_seq " << aio_write_seq << dendl;
-  aio_write_list.push_back(&c->aio_write_list_item);
-  aio_write_list_lock.Unlock();
-}
-
-void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
-{
-  ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
-  aio_write_list_lock.Lock();
-  assert(c->io == this);
-  c->aio_write_list_item.remove_myself();
-
-  map<ceph_tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
-  while (waiters != aio_write_waiters.end()) {
-    if (!aio_write_list.empty() &&
-       aio_write_list.front()->aio_write_seq <= waiters->first) {
-      ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
-                            << " <= waiter " << waiters->first
-                            << ", stopping" << dendl;
-      break;
-    }
-    ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
-    for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
-        it != waiters->second.end(); ++it) {
-      client->finisher.queue(new C_AioCompleteAndSafe(*it));
-      (*it)->put();
-    }
-    aio_write_waiters.erase(waiters++);
-  }
-
-  aio_write_cond.Signal();
-  aio_write_list_lock.Unlock();
-  put();
-}
-
-void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
-{
-  ldout(client->cct, 20) << "flush_aio_writes_async " << this
-                        << " completion " << c << dendl;
-  Mutex::Locker l(aio_write_list_lock);
-  ceph_tid_t seq = aio_write_seq;
-  if (aio_write_list.empty()) {
-    ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
-                          << seq << ")" << dendl;
-    client->finisher.queue(new C_AioCompleteAndSafe(c));
-  } else {
-    ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
-                          << " writes in flight; waiting on tid " << seq << dendl;
-    c->get();
-    aio_write_waiters[seq].push_back(c);
-  }
-}
-
-void librados::IoCtxImpl::flush_aio_writes()
-{
-  ldout(client->cct, 20) << "flush_aio_writes" << dendl;
-  aio_write_list_lock.Lock();
-  ceph_tid_t seq = aio_write_seq;
-  while (!aio_write_list.empty() &&
-        aio_write_list.front()->aio_write_seq <= seq)
-    aio_write_cond.Wait(aio_write_list_lock);
-  aio_write_list_lock.Unlock();
-}
-
-string librados::IoCtxImpl::get_cached_pool_name()
-{
-  std::string pn;
-  client->pool_get_name(get_id(), &pn);
-  return pn;
-}
-
-// SNAPS
-
-int librados::IoCtxImpl::snap_create(const char *snapName)
-{
-  int reply;
-  string sName(snapName);
-
-  Mutex mylock ("IoCtxImpl::snap_create::mylock");
-  Cond cond;
-  bool done;
-  Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
-  reply = objecter->create_pool_snap(poolid, sName, onfinish);
-
-  if (reply < 0) {
-    delete onfinish;
-  } else {
-    mylock.Lock();
-    while (!done)
-      cond.Wait(mylock);
-    mylock.Unlock();
-  }
-  return reply;
-}
-
-int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
-{
-  int reply;
-
-  Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
-  Cond cond;
-  bool done;
-  Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
-  snapid_t snapid;
-  reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
-
-  if (reply < 0) {
-    delete onfinish;
-  } else {
-    mylock.Lock();
-    while (!done)
-      cond.Wait(mylock);
-    mylock.Unlock();
-    if (reply == 0)
-      *psnapid = snapid;
-  }
-  return reply;
-}
-
-void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid,
-                                                      AioCompletionImpl *c)
-{
-  C_aio_selfmanaged_snap_create_Complete *onfinish =
-    new C_aio_selfmanaged_snap_create_Complete(client, c, snapid);
-  int r = objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid,
-                                              onfinish);
-  if (r < 0) {
-    onfinish->complete(r);
-  }
-}
-
-int librados::IoCtxImpl::snap_remove(const char *snapName)
-{
-  int reply;
-  string sName(snapName);
-
-  Mutex mylock ("IoCtxImpl::snap_remove::mylock");
-  Cond cond;
-  bool done;
-  Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
-  reply = objecter->delete_pool_snap(poolid, sName, onfinish);
-
-  if (reply < 0) {
-    delete onfinish; 
-  } else {
-    mylock.Lock();
-    while(!done)
-      cond.Wait(mylock);
-    mylock.Unlock();
-  }
-  return reply;
-}
-
-int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
-                                                         ::SnapContext& snapc,
-                                                         uint64_t snapid)
-{
-  int reply;
-
-  Mutex mylock("IoCtxImpl::snap_rollback::mylock");
-  Cond cond;
-  bool done;
-  Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
-
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.rollback(snapid);
-  objecter->mutate(oid, oloc,
-                  op, snapc, ceph::real_clock::now(), 0,
-                  onack, NULL);
-
-  mylock.Lock();
-  while (!done) cond.Wait(mylock);
-  mylock.Unlock();
-  return reply;
-}
-
-int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
-{
-  snapid_t snap;
-
-  int r = objecter->pool_snap_by_name(poolid, snapName, &snap);
-  if (r < 0) {
-    return r;
-  }
-
-  return selfmanaged_snap_rollback_object(oid, snapc, snap);
-}
-
-int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
-{
-  int reply;
-
-  Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
-  Cond cond;
-  bool done;
-  objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
-                                   new C_SafeCond(&mylock, &cond, &done, &reply));
-
-  mylock.Lock();
-  while (!done) cond.Wait(mylock);
-  mylock.Unlock();
-  return (int)reply;
-}
-
-void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid,
-                                                      AioCompletionImpl *c)
-{
-  Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c);
-  objecter->delete_selfmanaged_snap(poolid, snapid, onfinish);
-}
-
-int librados::IoCtxImpl::pool_change_auid(unsigned long long auid)
-{
-  int reply;
-
-  Mutex mylock("IoCtxImpl::pool_change_auid::mylock");
-  Cond cond;
-  bool done;
-  objecter->change_pool_auid(poolid,
-                            new C_SafeCond(&mylock, &cond, &done, &reply),
-                            auid);
-
-  mylock.Lock();
-  while (!done) cond.Wait(mylock);
-  mylock.Unlock();
-  return reply;
-}
-
-int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid,
-                                                 PoolAsyncCompletionImpl *c)
-{
-  objecter->change_pool_auid(poolid,
-                            new C_PoolAsync_Safe(c),
-                            auid);
-  return 0;
-}
-
-int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
-{
-  return objecter->pool_snap_list(poolid, snaps);
-}
-
-int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
-{
-  return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid);
-}
-
-int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
-{
-  pool_snap_info_t info;
-  int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
-  if (ret < 0) {
-    return ret;
-  }
-  *s = info.name.c_str();
-  return 0;
-}
-
-int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
-{
-  pool_snap_info_t info;
-  int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
-  if (ret < 0) {
-    return ret;
-  }
-  *t = info.stamp.sec();
-  return 0;
-}
-
-
-// IO
-
-int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
-{
-  Cond cond;
-  bool done;
-  int r = 0;
-  Mutex mylock("IoCtxImpl::nlist::mylock");
-
-  if (context->at_end())
-    return 0;
-
-  context->max_entries = max_entries;
-  context->nspace = oloc.nspace;
-
-  objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r));
-
-  mylock.Lock();
-  while(!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-
-  return r;
-}
-
-uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
-                                       uint32_t pos)
-{
-  context->list.clear();
-  return objecter->list_nobjects_seek(context, pos);
-}
-
-uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
-                                    const rados_object_list_cursor& cursor)
-{
-  context->list.clear();
-  return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor);
-}
-
-rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context)
-{
-  hobject_t *c = new hobject_t;
-
-  objecter->list_nobjects_get_cursor(context, c);
-  return (rados_object_list_cursor)c;
-}
-
-int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.create(exclusive);
-  return operate(oid, &op, NULL);
-}
-
-/*
- * add any version assert operations that are appropriate given the
- * stat in the IoCtx, either the target version assert or any src
- * object asserts.  these affect a single ioctx operation, so clear
- * the ioctx state when we're doing.
- *
- * return a pointer to the ObjectOperation if we added any events;
- * this is convenient for passing the extra_ops argument into Objecter
- * methods.
- */
-::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
-{
-  ::ObjectOperation *pop = NULL;
-  if (assert_ver) {
-    op->assert_version(assert_ver);
-    assert_ver = 0;
-    pop = op;
-  }
-  return pop;
-}
-
-int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
-                              size_t len, uint64_t off)
-{
-  if (len > UINT_MAX/2)
-    return -E2BIG;
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  bufferlist mybl;
-  mybl.substr_of(bl, 0, len);
-  op.write(off, mybl);
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
-{
-  if (len > UINT_MAX/2)
-    return -E2BIG;
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  bufferlist mybl;
-  mybl.substr_of(bl, 0, len);
-  op.append(mybl);
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
-{
-  if (bl.length() > UINT_MAX/2)
-    return -E2BIG;
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.write_full(bl);
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
-                                  size_t write_len, uint64_t off)
-{
-  if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
-    return -E2BIG;
-  if ((bl.length() == 0) || (write_len % bl.length()))
-    return -EINVAL;
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  bufferlist mybl;
-  mybl.substr_of(bl, 0, bl.length());
-  op.writesame(off, write_len, mybl);
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
-                                ceph::real_time *pmtime, int flags)
-{
-  ceph::real_time ut = (pmtime ? *pmtime :
-    ceph::real_clock::now());
-
-  /* can't write to a snapshot */
-  if (snap_seq != CEPH_NOSNAP)
-    return -EROFS;
-
-  if (!o->size())
-    return 0;
-
-  Mutex mylock("IoCtxImpl::operate::mylock");
-  Cond cond;
-  bool done;
-  int r;
-  version_t ver;
-
-  Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
-
-  int op = o->ops[0].op.op;
-  ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
-                        << " nspace=" << oloc.nspace << dendl;
-  Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
-                                                         *o, snapc, ut, flags,
-                                                         oncommit, &ver);
-  objecter->op_submit(objecter_op);
-
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-  ldout(client->cct, 10) << "Objecter returned from "
-       << ceph_osd_op_name(op) << " r=" << r << dendl;
-
-  set_sync_op_version(ver);
-
-  return r;
-}
-
-int librados::IoCtxImpl::operate_read(const object_t& oid,
-                                     ::ObjectOperation *o,
-                                     bufferlist *pbl,
-                                     int flags)
-{
-  if (!o->size())
-    return 0;
-
-  Mutex mylock("IoCtxImpl::operate_read::mylock");
-  Cond cond;
-  bool done;
-  int r;
-  version_t ver;
-
-  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
-  int op = o->ops[0].op.op;
-  ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
-  Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
-                                             *o, snap_seq, pbl, flags,
-                                             onack, &ver);
-  objecter->op_submit(objecter_op);
-
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-  ldout(client->cct, 10) << "Objecter returned from "
-       << ceph_osd_op_name(op) << " r=" << r << dendl;
-
-  set_sync_op_version(ver);
-
-  return r;
-}
-
-int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
-                                         ::ObjectOperation *o,
-                                         AioCompletionImpl *c,
-                                         int flags,
-                                         bufferlist *pbl,
-                                          const blkin_trace_info *trace_info)
-{
-  FUNCTRACE();
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  c->is_read = true;
-  c->io = this;
-
-  ZTracer::Trace trace;
-  if (trace_info) {
-    ZTracer::Trace parent_trace("", nullptr, trace_info);
-    trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace);
-  }
-
-  trace.event("init root span");
-  Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
-                *o, snap_seq, pbl, flags,
-                oncomplete, &c->objver, nullptr, 0, &trace);
-  objecter->op_submit(objecter_op, &c->tid);
-  trace.event("rados operate read submitted");
-
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_operate(const object_t& oid,
-                                    ::ObjectOperation *o, AioCompletionImpl *c,
-                                    const SnapContext& snap_context, int flags,
-                                     const blkin_trace_info *trace_info)
-{
-  FUNCTRACE();
-  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
-  auto ut = ceph::real_clock::now();
-  /* can't write to a snapshot */
-  if (snap_seq != CEPH_NOSNAP)
-    return -EROFS;
-
-  Context *oncomplete = new C_aio_Complete(c);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-
-  c->io = this;
-  queue_aio_write(c);
-
-  ZTracer::Trace trace;
-  if (trace_info) {
-    ZTracer::Trace parent_trace("", nullptr, trace_info);
-    trace.init("rados operate", &objecter->trace_endpoint, &parent_trace);
-  }
-
-  trace.event("init root span");
-  Objecter::Op *op = objecter->prepare_mutate_op(
-    oid, oloc, *o, snap_context, ut, flags,
-    oncomplete, &c->objver, osd_reqid_t(), &trace);
-  objecter->op_submit(op, &c->tid);
-  trace.event("rados operate op submitted");
-
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
-                                 bufferlist *pbl, size_t len, uint64_t off,
-                                 uint64_t snapid, const blkin_trace_info *info)
-{
-  FUNCTRACE();
-  if (len > (size_t) INT_MAX)
-    return -EDOM;
-
-  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  c->is_read = true;
-  c->io = this;
-  c->blp = pbl;
-
-  ZTracer::Trace trace;
-  if (info)
-    trace.init("rados read", &objecter->trace_endpoint, info);
-
-  Objecter::Op *o = objecter->prepare_read_op(
-    oid, oloc,
-    off, len, snapid, pbl, 0,
-    oncomplete, &c->objver, nullptr, 0, &trace);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
-                                 char *buf, size_t len, uint64_t off,
-                                 uint64_t snapid, const blkin_trace_info *info)
-{
-  FUNCTRACE();
-  if (len > (size_t) INT_MAX)
-    return -EDOM;
-
-  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  c->is_read = true;
-  c->io = this;
-  c->bl.clear();
-  c->bl.push_back(buffer::create_static(len, buf));
-  c->blp = &c->bl;
-  c->out_buf = buf;
-
-  ZTracer::Trace trace;
-  if (info)
-    trace.init("rados read", &objecter->trace_endpoint, info);
-
-  Objecter::Op *o = objecter->prepare_read_op(
-    oid, oloc,
-    off, len, snapid, &c->bl, 0,
-    oncomplete, &c->objver, nullptr, 0, &trace);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-class C_ObjectOperation : public Context {
-public:
-  ::ObjectOperation m_ops;
-  explicit C_ObjectOperation(Context *c) : m_ctx(c) {}
-  void finish(int r) override {
-    m_ctx->complete(r);
-  }
-private:
-  Context *m_ctx;
-};
-
-int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
-                                        AioCompletionImpl *c,
-                                        std::map<uint64_t,uint64_t> *m,
-                                        bufferlist *data_bl, size_t len,
-                                        uint64_t off, uint64_t snapid)
-{
-  FUNCTRACE();
-  if (len > (size_t) INT_MAX)
-    return -EDOM;
-
-  Context *nested = new C_aio_Complete(c);
-  C_ObjectOperation *onack = new C_ObjectOperation(nested);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) nested)->oid = oid;
-#endif
-  c->is_read = true;
-  c->io = this;
-
-  onack->m_ops.sparse_read(off, len, m, data_bl, NULL);
-
-  Objecter::Op *o = objecter->prepare_read_op(
-    oid, oloc,
-    onack->m_ops, snapid, NULL, 0,
-    onack, &c->objver);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
-                                   AioCompletionImpl *c,
-                                   uint64_t off,
-                                   bufferlist& cmp_bl)
-{
-  if (cmp_bl.length() > UINT_MAX/2)
-    return -E2BIG;
-
-  Context *onack = new C_aio_Complete(c);
-
-  c->is_read = true;
-  c->io = this;
-
-  Objecter::Op *o = objecter->prepare_cmpext_op(
-    oid, oloc, off, cmp_bl, snap_seq, 0,
-    onack, &c->objver);
-  objecter->op_submit(o, &c->tid);
-
-  return 0;
-}
-
-/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
-int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
-                                   AioCompletionImpl *c,
-                                   const char *cmp_buf,
-                                   size_t cmp_len,
-                                   uint64_t off)
-{
-  if (cmp_len > UINT_MAX/2)
-    return -E2BIG;
-
-  bufferlist cmp_bl;
-  cmp_bl.append(cmp_buf, cmp_len);
-
-  Context *nested = new C_aio_Complete(c);
-  C_ObjectOperation *onack = new C_ObjectOperation(nested);
-
-  c->is_read = true;
-  c->io = this;
-
-  onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
-
-  Objecter::Op *o = objecter->prepare_read_op(
-    oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
-                                  const bufferlist& bl, size_t len,
-                                  uint64_t off, const blkin_trace_info *info)
-{
-  FUNCTRACE();
-  auto ut = ceph::real_clock::now();
-  ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
-  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
-
-  if (len > UINT_MAX/2)
-    return -E2BIG;
-  /* can't write to a snapshot */
-  if (snap_seq != CEPH_NOSNAP)
-    return -EROFS;
-
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  ZTracer::Trace trace;
-  if (info)
-    trace.init("rados write", &objecter->trace_endpoint, info);
-
-  c->io = this;
-  queue_aio_write(c);
-
-  Objecter::Op *o = objecter->prepare_write_op(
-    oid, oloc,
-    off, len, snapc, bl, ut, 0,
-    oncomplete, &c->objver, nullptr, 0, &trace);
-  objecter->op_submit(o, &c->tid);
-
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
-                                   const bufferlist& bl, size_t len)
-{
-  FUNCTRACE();
-  auto ut = ceph::real_clock::now();
-
-  if (len > UINT_MAX/2)
-    return -E2BIG;
-  /* can't write to a snapshot */
-  if (snap_seq != CEPH_NOSNAP)
-    return -EROFS;
-
-  Context *oncomplete = new C_aio_Complete(c);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-
-  c->io = this;
-  queue_aio_write(c);
-
-  Objecter::Op *o = objecter->prepare_append_op(
-    oid, oloc,
-    len, snapc, bl, ut, 0,
-    oncomplete, &c->objver);
-  objecter->op_submit(o, &c->tid);
-
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_write_full(const object_t &oid,
-                                       AioCompletionImpl *c,
-                                       const bufferlist& bl)
-{
-  FUNCTRACE();
-  auto ut = ceph::real_clock::now();
-
-  if (bl.length() > UINT_MAX/2)
-    return -E2BIG;
-  /* can't write to a snapshot */
-  if (snap_seq != CEPH_NOSNAP)
-    return -EROFS;
-
-  Context *oncomplete = new C_aio_Complete(c);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-
-  c->io = this;
-  queue_aio_write(c);
-
-  Objecter::Op *o = objecter->prepare_write_full_op(
-    oid, oloc,
-    snapc, bl, ut, 0,
-    oncomplete, &c->objver);
-  objecter->op_submit(o, &c->tid);
-
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_writesame(const object_t &oid,
-                                      AioCompletionImpl *c,
-                                      const bufferlist& bl,
-                                      size_t write_len,
-                                      uint64_t off)
-{
-  FUNCTRACE();
-  auto ut = ceph::real_clock::now();
-
-  if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
-    return -E2BIG;
-  if ((bl.length() == 0) || (write_len % bl.length()))
-    return -EINVAL;
-  /* can't write to a snapshot */
-  if (snap_seq != CEPH_NOSNAP)
-    return -EROFS;
-
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  c->io = this;
-  queue_aio_write(c);
-
-  Objecter::Op *o = objecter->prepare_writesame_op(
-    oid, oloc,
-    write_len, off,
-    snapc, bl, ut, 0,
-    oncomplete, &c->objver);
-  objecter->op_submit(o, &c->tid);
-
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
-{
-  FUNCTRACE();
-  auto ut = ceph::real_clock::now();
-
-  /* can't write to a snapshot */
-  if (snap_seq != CEPH_NOSNAP)
-    return -EROFS;
-
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  c->io = this;
-  queue_aio_write(c);
-
-  Objecter::Op *o = objecter->prepare_remove_op(
-    oid, oloc,
-    snapc, ut, flags,
-    oncomplete, &c->objver);
-  objecter->op_submit(o, &c->tid);
-
-  return 0;
-}
-
-
-int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
-                                 uint64_t *psize, time_t *pmtime)
-{
-  C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime);
-  c->is_read = true;
-  c->io = this;
-  Objecter::Op *o = objecter->prepare_stat_op(
-    oid, oloc,
-    snap_seq, psize, &onack->mtime, 0,
-    onack, &c->objver);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c,
-                                 uint64_t *psize, struct timespec *pts)
-{
-  C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts);
-  c->is_read = true;
-  c->io = this;
-  Objecter::Op *o = objecter->prepare_stat_op(
-    oid, oloc,
-    snap_seq, psize, &onack->mtime, 0,
-    onack, &c->objver);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c,
-                                     const char *name, bufferlist& bl)
-{
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.getxattr(name, &bl, NULL);
-  int r = aio_operate_read(oid, &rd, c, 0, &bl);
-  return r;
-}
-
-int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c,
-                                    const char *name)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.rmxattr(name);
-  return aio_operate(oid, &op, c, snapc, 0);
-}
-
-int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c,
-                                     const char *name, bufferlist& bl)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.setxattr(name, bl);
-  return aio_operate(oid, &op, c, snapc, 0);
-}
-
-namespace {
-struct AioGetxattrsData {
-  AioGetxattrsData(librados::AioCompletionImpl *c, map<string, bufferlist>* attrset,
-                  librados::RadosClient *_client) :
-    user_completion(c), user_attrset(attrset), client(_client) {}
-  struct librados::C_AioCompleteAndSafe user_completion;
-  map<string, bufferlist> result_attrset;
-  map<std::string, bufferlist>* user_attrset;
-  librados::RadosClient *client;
-};
-}
-
-static void aio_getxattrs_complete(rados_completion_t c, void *arg) {
-  AioGetxattrsData *cdata = reinterpret_cast<AioGetxattrsData*>(arg);
-  int rc = rados_aio_get_return_value(c);
-  cdata->user_attrset->clear();
-  if (rc >= 0) {
-    for (map<string,bufferlist>::iterator p = cdata->result_attrset.begin();
-        p != cdata->result_attrset.end();
-        ++p) {
-      ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
-      (*cdata->user_attrset)[p->first] = p->second;
-    }
-  }
-  cdata->user_completion.finish(rc);
-  ((librados::AioCompletionImpl*)c)->put();
-  delete cdata;
-}
-
-int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c,
-                                      map<std::string, bufferlist>& attrset)
-{
-  AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client);
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.getxattrs(&cdata->result_attrset, NULL);
-  librados::AioCompletionImpl *comp = new librados::AioCompletionImpl;
-  comp->set_complete_callback(cdata, aio_getxattrs_complete);
-  return aio_operate_read(oid, &rd, comp, 0, NULL);
-}
-
-int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c)
-{
-  return objecter->op_cancel(c->tid, -ECANCELED);
-}
-
-
-int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
-                             std::list< std::pair<time_t, time_t> > *pls)
-{
-  Context *oncomplete = new C_aio_Complete(c);
-  c->is_read = true;
-  c->io = this;
-
-  ::ObjectOperation rd;
-  rd.hit_set_ls(pls, NULL);
-  object_locator_t oloc(poolid);
-  Objecter::Op *o = objecter->prepare_pg_read_op(
-    hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
-                                    time_t stamp,
-                                    bufferlist *pbl)
-{
-  Context *oncomplete = new C_aio_Complete(c);
-  c->is_read = true;
-  c->io = this;
-
-  ::ObjectOperation rd;
-  rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
-  object_locator_t oloc(poolid);
-  Objecter::Op *o = objecter->prepare_pg_read_op(
-    hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::remove(const object_t& oid)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.remove();
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::remove(const object_t& oid, int flags)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.remove();
-  return operate(oid, &op, NULL, flags);
-}
-
-int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.truncate(size);
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg,
-                                                 const librados::object_id_t& start_after,
-                                                 uint64_t max_to_get,
-                                                 AioCompletionImpl *c,
-                                                 std::vector<inconsistent_obj_t>* objects,
-                                                 uint32_t* interval)
-{
-  Context *oncomplete = new C_aio_Complete(c);
-  c->is_read = true;
-  c->io = this;
-
-  ::ObjectOperation op;
-  op.scrub_ls(start_after, max_to_get, objects, interval, nullptr);
-  object_locator_t oloc{poolid, pg.ps()};
-  Objecter::Op *o = objecter->prepare_pg_read_op(
-    oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
-    nullptr, nullptr);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg,
-                                                  const librados::object_id_t& start_after,
-                                                  uint64_t max_to_get,
-                                                  AioCompletionImpl *c,
-                                                  std::vector<inconsistent_snapset_t>* snapsets,
-                                                  uint32_t* interval)
-{
-  Context *oncomplete = new C_aio_Complete(c);
-  c->is_read = true;
-  c->io = this;
-
-  ::ObjectOperation op;
-  op.scrub_ls(start_after, max_to_get, snapsets, interval, nullptr);
-  object_locator_t oloc{poolid, pg.ps()};
-  Objecter::Op *o = objecter->prepare_pg_read_op(
-    oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
-    nullptr, nullptr);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
-{
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.tmap_update(cmdbl);
-  return operate(oid, &wr, NULL);
-}
-
-int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl)
-{
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.tmap_put(bl);
-  return operate(oid, &wr, NULL);
-}
-
-int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl)
-{
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.tmap_get(&bl, NULL);
-  return operate_read(oid, &rd, NULL);
-}
-
-int librados::IoCtxImpl::tmap_to_omap(const object_t& oid, bool nullok)
-{
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.tmap_to_omap(nullok);
-  return operate(oid, &wr, NULL);
-}
-
-int librados::IoCtxImpl::exec(const object_t& oid,
-                             const char *cls, const char *method,
-                             bufferlist& inbl, bufferlist& outbl)
-{
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.call(cls, method, inbl);
-  return operate_read(oid, &rd, &outbl);
-}
-
-int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
-                                 const char *cls, const char *method,
-                                 bufferlist& inbl, bufferlist *outbl)
-{
-  FUNCTRACE();
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  c->is_read = true;
-  c->io = this;
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.call(cls, method, inbl);
-  Objecter::Op *o = objecter->prepare_read_op(
-    oid, oloc, rd, snap_seq, outbl, 0, oncomplete, &c->objver);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
-                                 const char *cls, const char *method,
-                                 bufferlist& inbl, char *buf, size_t out_len)
-{
-  FUNCTRACE();
-  Context *oncomplete = new C_aio_Complete(c);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ((C_aio_Complete *) oncomplete)->oid = oid;
-#endif
-  c->is_read = true;
-  c->io = this;
-  c->bl.clear();
-  c->bl.push_back(buffer::create_static(out_len, buf));
-  c->blp = &c->bl;
-  c->out_buf = buf;
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.call(cls, method, inbl);
-  Objecter::Op *o = objecter->prepare_read_op(
-    oid, oloc, rd, snap_seq, &c->bl, 0, oncomplete, &c->objver);
-  objecter->op_submit(o, &c->tid);
-  return 0;
-}
-
-int librados::IoCtxImpl::read(const object_t& oid,
-                             bufferlist& bl, size_t len, uint64_t off)
-{
-  if (len > (size_t) INT_MAX)
-    return -EDOM;
-  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.read(off, len, &bl, NULL, NULL);
-  int r = operate_read(oid, &rd, &bl);
-  if (r < 0)
-    return r;
-
-  if (bl.length() < len) {
-    ldout(client->cct, 10) << "Returned length " << bl.length()
-            << " less than original length "<< len << dendl;
-  }
-
-  return bl.length();
-}
-
-int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
-                                bufferlist& cmp_bl)
-{
-  if (cmp_bl.length() > UINT_MAX/2)
-    return -E2BIG;
-
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.cmpext(off, cmp_bl, NULL);
-  return operate_read(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::mapext(const object_t& oid,
-                               uint64_t off, size_t len,
-                               std::map<uint64_t,uint64_t>& m)
-{
-  bufferlist bl;
-
-  Mutex mylock("IoCtxImpl::read::mylock");
-  Cond cond;
-  bool done;
-  int r;
-  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
-
-  objecter->mapext(oid, oloc,
-                  off, len, snap_seq, &bl, 0,
-                  onack);
-
-  mylock.Lock();
-  while (!done)
-    cond.Wait(mylock);
-  mylock.Unlock();
-  ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
-
-  if (r < 0)
-    return r;
-
-  bufferlist::iterator iter = bl.begin();
-  ::decode(m, iter);
-
-  return m.size();
-}
-
-int librados::IoCtxImpl::sparse_read(const object_t& oid,
-                                    std::map<uint64_t,uint64_t>& m,
-                                    bufferlist& data_bl, size_t len,
-                                    uint64_t off)
-{
-  if (len > (size_t) INT_MAX)
-    return -EDOM;
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.sparse_read(off, len, &m, &data_bl, NULL);
-
-  int r = operate_read(oid, &rd, NULL);
-  if (r < 0)
-    return r;
-
-  return m.size();
-}
-
-int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
-                                 const bufferlist &init_value, size_t len,
-                                 uint64_t off, size_t chunk_size,
-                                 bufferlist *pbl)
-{
-  if (len > (size_t) INT_MAX) {
-    return -EDOM;
-  }
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
-
-  int r = operate_read(oid, &rd, nullptr);
-  if (r < 0) {
-    return r;
-  }
-
-  return 0;
-}
-
-int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
-{
-  uint64_t size;
-  real_time mtime;
-
-  if (!psize)
-    psize = &size;
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.stat(psize, &mtime, NULL);
-  int r = operate_read(oid, &rd, NULL);
-
-  if (r >= 0 && pmtime) {
-    *pmtime = real_clock::to_time_t(mtime);
-  }
-
-  return r;
-}
-
-int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts)
-{
-  uint64_t size;
-  ceph::real_time mtime;
-
-  if (!psize)
-    psize = &size;
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.stat(psize, &mtime, NULL);
-  int r = operate_read(oid, &rd, NULL);
-  if (r < 0) {
-    return r;
-  }
-
-  if (pts) {
-    *pts = ceph::real_clock::to_timespec(mtime);
-  }
-
-  return 0;
-}
-
-int librados::IoCtxImpl::getxattr(const object_t& oid,
-                                   const char *name, bufferlist& bl)
-{
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.getxattr(name, &bl, NULL);
-  int r = operate_read(oid, &rd, &bl);
-  if (r < 0)
-    return r;
-
-  return bl.length();
-}
-
-int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.rmxattr(name);
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::setxattr(const object_t& oid,
-                                   const char *name, bufferlist& bl)
-{
-  ::ObjectOperation op;
-  prepare_assert_ops(&op);
-  op.setxattr(name, bl);
-  return operate(oid, &op, NULL);
-}
-
-int librados::IoCtxImpl::getxattrs(const object_t& oid,
-                                    map<std::string, bufferlist>& attrset)
-{
-  map<string, bufferlist> aset;
-
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.getxattrs(&aset, NULL);
-  int r = operate_read(oid, &rd, NULL);
-
-  attrset.clear();
-  if (r >= 0) {
-    for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); ++p) {
-      ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
-      attrset[p->first.c_str()] = p->second;
-    }
-  }
-
-  return r;
-}
-
-void librados::IoCtxImpl::set_sync_op_version(version_t ver)
-{
-  ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver),
-                             "IoCtxImpl last_objver");
-  last_objver = ver;
-}
-
-struct WatchInfo : public Objecter::WatchContext {
-  librados::IoCtxImpl *ioctx;
-  object_t oid;
-  librados::WatchCtx *ctx;
-  librados::WatchCtx2 *ctx2;
-  bool internal = false;
-
-  WatchInfo(librados::IoCtxImpl *io, object_t o,
-           librados::WatchCtx *c, librados::WatchCtx2 *c2,
-            bool inter)
-    : ioctx(io), oid(o), ctx(c), ctx2(c2), internal(inter) {
-    ioctx->get();
-  }
-  ~WatchInfo() override {
-    ioctx->put();
-    if (internal) {
-      delete ctx;
-      delete ctx2;
-    }
-  }
-
-  void handle_notify(uint64_t notify_id,
-                    uint64_t cookie,
-                    uint64_t notifier_id,
-                    bufferlist& bl) override {
-    ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
-                                 << " cookie " << cookie
-                                 << " notifier_id " << notifier_id
-                                 << " len " << bl.length()
-                                 << dendl;
-
-    if (ctx2)
-      ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
-    if (ctx) {
-      ctx->notify(0, 0, bl);
-
-      // send ACK back to OSD if using legacy protocol
-      bufferlist empty;
-      ioctx->notify_ack(oid, notify_id, cookie, empty);
-    }
-  }
-  void handle_error(uint64_t cookie, int err) override {
-    ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
-                                 << " err " << err
-                                 << dendl;
-    if (ctx2)
-      ctx2->handle_error(cookie, err);
-  }
-};
-
-int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
-                               librados::WatchCtx *ctx,
-                               librados::WatchCtx2 *ctx2,
-                               bool internal)
-{
-  return watch(oid, handle, ctx, ctx2, 0, internal);
-}
-
-int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
-                               librados::WatchCtx *ctx,
-                               librados::WatchCtx2 *ctx2,
-                               uint32_t timeout,
-                               bool internal)
-{
-  ::ObjectOperation wr;
-  version_t objver;
-  C_SaferCond onfinish;
-
-  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
-  *handle = linger_op->get_cookie();
-  linger_op->watch_context = new WatchInfo(this,
-                                          oid, ctx, ctx2, internal);
-
-  prepare_assert_ops(&wr);
-  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
-  bufferlist bl;
-  objecter->linger_watch(linger_op, wr,
-                        snapc, ceph::real_clock::now(), bl,
-                        &onfinish,
-                        &objver);
-
-  int r = onfinish.wait();
-
-  set_sync_op_version(objver);
-
-  if (r < 0) {
-    objecter->linger_cancel(linger_op);
-    *handle = 0;
-  }
-
-  return r;
-}
-
-int librados::IoCtxImpl::aio_watch(const object_t& oid,
-                                   AioCompletionImpl *c,
-                                   uint64_t *handle,
-                                   librados::WatchCtx *ctx,
-                                   librados::WatchCtx2 *ctx2,
-                                   bool internal) {
-  return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
-}
-
-int librados::IoCtxImpl::aio_watch(const object_t& oid,
-                                   AioCompletionImpl *c,
-                                   uint64_t *handle,
-                                   librados::WatchCtx *ctx,
-                                   librados::WatchCtx2 *ctx2,
-                                   uint32_t timeout,
-                                   bool internal)
-{
-  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
-  c->io = this;
-  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
-
-  ::ObjectOperation wr;
-  *handle = linger_op->get_cookie();
-  linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);
-
-  prepare_assert_ops(&wr);
-  wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
-  bufferlist bl;
-  objecter->linger_watch(linger_op, wr,
-                         snapc, ceph::real_clock::now(), bl,
-                         oncomplete, &c->objver);
-
-  return 0;
-}
-
-
-int librados::IoCtxImpl::notify_ack(
-  const object_t& oid,
-  uint64_t notify_id,
-  uint64_t cookie,
-  bufferlist& bl)
-{
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  rd.notify_ack(notify_id, cookie, bl);
-  objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
-  return 0;
-}
-
-int librados::IoCtxImpl::watch_check(uint64_t cookie)
-{
-  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
-  return objecter->linger_check(linger_op);
-}
-
-int librados::IoCtxImpl::unwatch(uint64_t cookie)
-{
-  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
-  C_SaferCond onfinish;
-  version_t ver = 0;
-
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
-  objecter->mutate(linger_op->target.base_oid, oloc, wr,
-                  snapc, ceph::real_clock::now(), 0,
-                  &onfinish, &ver);
-  objecter->linger_cancel(linger_op);
-
-  int r = onfinish.wait();
-  set_sync_op_version(ver);
-  return r;
-}
-
-int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
-{
-  c->io = this;
-  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
-  Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
-
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
-  objecter->mutate(linger_op->target.base_oid, oloc, wr,
-                  snapc, ceph::real_clock::now(), 0,
-                  oncomplete, &c->objver);
-  return 0;
-}
-
-int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
-                               uint64_t timeout_ms,
-                               bufferlist *preply_bl,
-                               char **preply_buf, size_t *preply_buf_len)
-{
-  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
-
-  C_SaferCond notify_finish_cond;
-  Context *notify_finish = new C_notify_Finish(client->cct, &notify_finish_cond,
-                                               objecter, linger_op, preply_bl,
-                                               preply_buf, preply_buf_len);
-
-  uint32_t timeout = notify_timeout;
-  if (timeout_ms)
-    timeout = timeout_ms / 1000;
-
-  // Construct RADOS op
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  bufferlist inbl;
-  rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
-
-  // Issue RADOS op
-  C_SaferCond onack;
-  version_t objver;
-  objecter->linger_notify(linger_op,
-                         rd, snap_seq, inbl, NULL,
-                         &onack, &objver);
-
-  ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
-  int r = onack.wait();
-  ldout(client->cct, 10) << __func__ << " linger op " << linger_op
-                        << " acked (" << r << ")" << dendl;
-
-  if (r == 0) {
-    ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
-                          << linger_op << dendl;
-    r = notify_finish_cond.wait();
-
-  } else {
-    ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
-                          << r << dendl;
-    notify_finish->complete(r);
-  }
-
-  objecter->linger_cancel(linger_op);
-
-  set_sync_op_version(objver);
-  return r;
-}
-
-int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
-                                    bufferlist& bl, uint64_t timeout_ms,
-                                    bufferlist *preply_bl, char **preply_buf,
-                                    size_t *preply_buf_len)
-{
-  Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
-
-  c->io = this;
-
-  C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
-  C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
-                                                  objecter, linger_op,
-                                                  preply_bl, preply_buf,
-                                                  preply_buf_len);
-  Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete);
-
-  uint32_t timeout = notify_timeout;
-  if (timeout_ms)
-    timeout = timeout_ms / 1000;
-
-  // Construct RADOS op
-  ::ObjectOperation rd;
-  prepare_assert_ops(&rd);
-  bufferlist inbl;
-  rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
-
-  // Issue RADOS op
-  objecter->linger_notify(linger_op,
-                         rd, snap_seq, inbl, NULL,
-                         onack, &c->objver);
-  return 0;
-}
-
-int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
-                                        uint64_t expected_object_size,
-                                        uint64_t expected_write_size,
-                                       uint32_t flags)
-{
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.set_alloc_hint(expected_object_size, expected_write_size, flags);
-  return operate(oid, &wr, NULL);
-}
-
-version_t librados::IoCtxImpl::last_version()
-{
-  return last_objver;
-}
-
-void librados::IoCtxImpl::set_assert_version(uint64_t ver)
-{
-  assert_ver = ver;
-}
-
-void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
-{
-  notify_timeout = timeout;
-}
-
-int librados::IoCtxImpl::cache_pin(const object_t& oid)
-{
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.cache_pin();
-  return operate(oid, &wr, NULL);
-}
-
-int librados::IoCtxImpl::cache_unpin(const object_t& oid)
-{
-  ::ObjectOperation wr;
-  prepare_assert_ops(&wr);
-  wr.cache_unpin();
-  return operate(oid, &wr, NULL);
-}
-
-
-///////////////////////////// C_aio_stat_Ack ////////////////////////////
-
-librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c,
-                                                   time_t *pm)
-   : c(_c), pmtime(pm)
-{
-  assert(!c->io);
-  c->get();
-}
-
-void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
-{
-  c->lock.Lock();
-  c->rval = r;
-  c->complete = true;
-  c->cond.Signal();
-
-  if (r >= 0 && pmtime) {
-    *pmtime = real_clock::to_time_t(mtime);
-  }
-
-  if (c->callback_complete) {
-    c->io->client->finisher.queue(new C_AioComplete(c));
-  }
-
-  c->put_unlock();
-}
-
-///////////////////////////// C_aio_stat2_Ack ////////////////////////////
-
-librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c,
-                                                    struct timespec *pt)
-   : c(_c), pts(pt)
-{
-  assert(!c->io);
-  c->get();
-}
-
-void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
-{
-  c->lock.Lock();
-  c->rval = r;
-  c->complete = true;
-  c->cond.Signal();
-
-  if (r >= 0 && pts) {
-    *pts = real_clock::to_timespec(mtime);
-  }
-
-  if (c->callback_complete) {
-    c->io->client->finisher.queue(new C_AioComplete(c));
-  }
-
-  c->put_unlock();
-}
-
-//////////////////////////// C_aio_Complete ////////////////////////////////
-
-librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c)
-  : c(_c)
-{
-  c->get();
-}
-
-void librados::IoCtxImpl::C_aio_Complete::finish(int r)
-{
-  c->lock.Lock();
-  c->rval = r;
-  c->complete = true;
-  c->cond.Signal();
-
-  if (r == 0 && c->blp && c->blp->length() > 0) {
-    if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf))
-      c->blp->copy(0, c->blp->length(), c->out_buf);
-    c->rval = c->blp->length();
-  }
-
-  if (c->callback_complete ||
-      c->callback_safe) {
-    c->io->client->finisher.queue(new C_AioComplete(c));
-  }
-
-  if (c->aio_write_seq) {
-    c->io->complete_aio_write(c);
-  }
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
-#endif
-  c->put_unlock();
-}
-
-void librados::IoCtxImpl::object_list_slice(
-  const hobject_t start,
-  const hobject_t finish,
-  const size_t n,
-  const size_t m,
-  hobject_t *split_start,
-  hobject_t *split_finish)
-{
-  if (start.is_max()) {
-    *split_start = hobject_t::get_max();
-    *split_finish = hobject_t::get_max();
-    return;
-  }
-
-  uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash());
-  uint64_t finish_hash =
-    finish.is_max() ? 0x100000000 :
-    hobject_t::_reverse_bits(finish.get_hash());
-
-  uint64_t diff = finish_hash - start_hash;
-  uint64_t rev_start = start_hash + (diff * n / m);
-  uint64_t rev_finish = start_hash + (diff * (n + 1) / m);
-  if (n == 0) {
-    *split_start = start;
-  } else {
-    *split_start = hobject_t(
-      object_t(), string(), CEPH_NOSNAP,
-      hobject_t::_reverse_bits(rev_start), poolid, string());
-  }
-
-  if (n == m - 1)
-    *split_finish = finish;
-  else if (rev_finish >= 0x100000000)
-    *split_finish = hobject_t::get_max();
-  else
-    *split_finish = hobject_t(
-      object_t(), string(), CEPH_NOSNAP,
-      hobject_t::_reverse_bits(rev_finish), poolid, string());
-}
-
-int librados::IoCtxImpl::application_enable(const std::string& app_name,
-                                            bool force)
-{
-  auto c = new PoolAsyncCompletionImpl();
-  application_enable_async(app_name, force, c);
-
-  int r = c->wait();
-  assert(r == 0);
-
-  r = c->get_return_value();
-  c->release();
-  if (r < 0) {
-    return r;
-  }
-
-  return client->wait_for_latest_osdmap();
-}
-
-void librados::IoCtxImpl::application_enable_async(const std::string& app_name,
-                                                   bool force,
-                                                   PoolAsyncCompletionImpl *c)
-{
-  // pre-Luminous clusters will return -EINVAL and application won't be
-  // preserved until Luminous is configured as minimim version.
-  if (!client->get_required_monitor_features().contains_all(
-        ceph::features::mon::FEATURE_LUMINOUS)) {
-    client->finisher.queue(new C_PoolAsync_Safe(c), -EOPNOTSUPP);
-    return;
-  }
-
-  std::stringstream cmd;
-  cmd << "{"
-      << "\"prefix\": \"osd pool application enable\","
-      << "\"pool\": \"" << get_cached_pool_name() << "\","
-      << "\"app\": \"" << app_name << "\"";
-  if (force) {
-    cmd << ",\"force\":\"--yes-i-really-mean-it\"";
-  }
-  cmd << "}";
-
-  std::vector<std::string> cmds;
-  cmds.push_back(cmd.str());
-  bufferlist inbl;
-  client->mon_command_async(cmds, inbl, nullptr, nullptr,
-                            new C_PoolAsync_Safe(c));
-}
-
-int librados::IoCtxImpl::application_list(std::set<std::string> *app_names)
-{
-  int r = 0;
-  app_names->clear();
-  objecter->with_osdmap([&](const OSDMap& o) {
-      auto pg_pool = o.get_pg_pool(poolid);
-      if (pg_pool == nullptr) {
-       r = -ENOENT;
-        return;
-      }
-
-      for (auto &pair : pg_pool->application_metadata) {
-        app_names->insert(pair.first);
-      }
-    });
-  return r;
-}
-
-int librados::IoCtxImpl::application_metadata_get(const std::string& app_name,
-                                                  const std::string &key,
-                                                  std::string* value)
-{
-  int r = 0;
-  objecter->with_osdmap([&](const OSDMap& o) {
-      auto pg_pool = o.get_pg_pool(poolid);
-      if (pg_pool == nullptr) {
-       r = -ENOENT;
-        return;
-      }
-
-      auto app_it = pg_pool->application_metadata.find(app_name);
-      if (app_it == pg_pool->application_metadata.end()) {
-        r = -ENOENT;
-        return;
-      }
-
-      auto it = app_it->second.find(key);
-      if (it == app_it->second.end()) {
-        r = -ENOENT;
-        return;
-      }
-
-      *value = it->second;
-    });
-  return r;
-}
-
-int librados::IoCtxImpl::application_metadata_set(const std::string& app_name,
-                                                  const std::string &key,
-                                                  const std::string& value)
-{
-  std::stringstream cmd;
-  cmd << "{"
-      << "\"prefix\":\"osd pool application set\","
-      << "\"pool\":\"" << get_cached_pool_name() << "\","
-      << "\"app\":\"" << app_name << "\","
-      << "\"key\":\"" << key << "\","
-      << "\"value\":\"" << value << "\""
-      << "}";
-
-  std::vector<std::string> cmds;
-  cmds.push_back(cmd.str());
-  bufferlist inbl;
-  int r = client->mon_command(cmds, inbl, nullptr, nullptr);
-  if (r < 0) {
-    return r;
-  }
-
-  // ensure we have the latest osd map epoch before proceeding
-  return client->wait_for_latest_osdmap();
-}
-
-int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name,
-                                                     const std::string &key)
-{
-  std::stringstream cmd;
-  cmd << "{"
-      << "\"prefix\":\"osd pool application rm\","
-      << "\"pool\":\"" << get_cached_pool_name() << "\","
-      << "\"app\":\"" << app_name << "\","
-      << "\"key\":\"" << key << "\""
-      << "}";
-
-  std::vector<std::string> cmds;
-  cmds.push_back(cmd.str());
-  bufferlist inbl;
-  int r = client->mon_command(cmds, inbl, nullptr, nullptr);
-  if (r < 0) {
-    return r;
-  }
-
-  // ensure we have the latest osd map epoch before proceeding
-  return client->wait_for_latest_osdmap();
-}
-
-int librados::IoCtxImpl::application_metadata_list(const std::string& app_name,
-                                                   std::map<std::string, std::string> *values)
-{
-  int r = 0;
-  values->clear();
-  objecter->with_osdmap([&](const OSDMap& o) {
-      auto pg_pool = o.get_pg_pool(poolid);
-      if (pg_pool == nullptr) {
-        r = -ENOENT;
-        return;
-      }
-
-      auto it = pg_pool->application_metadata.find(app_name);
-      if (it == pg_pool->application_metadata.end()) {
-        r = -ENOENT;
-        return;
-      }
-
-      *values = it->second;
-    });
-  return r;
-}
-