initial code repo
[stor4nfv.git] / src / ceph / src / osdc / ObjectCacher.cc
diff --git a/src/ceph/src/osdc/ObjectCacher.cc b/src/ceph/src/osdc/ObjectCacher.cc
new file mode 100644 (file)
index 0000000..4afd1de
--- /dev/null
@@ -0,0 +1,2691 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <limits.h>
+
+#include "msg/Messenger.h"
+#include "ObjectCacher.h"
+#include "WritebackHandler.h"
+#include "common/errno.h"
+#include "common/perf_counters.h"
+
+#include "include/assert.h"
+
+#define MAX_FLUSH_UNDER_LOCK 20  ///< max bh's we start writeback on
+#define BUFFER_MEMORY_WEIGHT 12   // memory usage of BufferHead, count in (1<<n)
+
+using std::chrono::seconds;
+                                /// while holding the lock
+
+/*** ObjectCacher::BufferHead ***/
+
+
+/*** ObjectCacher::Object ***/
+
+#define dout_subsys ceph_subsys_objectcacher
+#undef dout_prefix
+#define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
+
+
+
+class ObjectCacher::C_ReadFinish : public Context {
+  ObjectCacher *oc;
+  int64_t poolid;
+  sobject_t oid;
+  loff_t start;
+  uint64_t length;
+  xlist<C_ReadFinish*>::item set_item;
+  bool trust_enoent;
+  ceph_tid_t tid;
+  ZTracer::Trace trace;
+
+public:
+  bufferlist bl;
+  C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
+              uint64_t l, const ZTracer::Trace &trace) :
+    oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
+    set_item(this), trust_enoent(true),
+    tid(t), trace(trace) {
+    ob->reads.push_back(&set_item);
+  }
+
+  void finish(int r) override {
+    oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
+    trace.event("finish");
+
+    // object destructor clears the list
+    if (set_item.is_on_list())
+      set_item.remove_myself();
+  }
+
+  void distrust_enoent() {
+    trust_enoent = false;
+  }
+};
+
+class ObjectCacher::C_RetryRead : public Context {
+  ObjectCacher *oc;
+  OSDRead *rd;
+  ObjectSet *oset;
+  Context *onfinish;
+  ZTracer::Trace trace;
+public:
+  C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
+             const ZTracer::Trace &trace)
+    : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
+  }
+  void finish(int r) override {
+    if (r >= 0) {
+      r = oc->_readx(rd, oset, onfinish, false, &trace);
+    }
+
+    if (r == 0) {
+      // read is still in-progress
+      return;
+    }
+
+    trace.event("finish");
+    if (onfinish) {
+      onfinish->complete(r);
+    }
+  }
+};
+
+ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
+                                                     loff_t off)
+{
+  assert(oc->lock.is_locked());
+  ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
+
+  // split off right
+  ObjectCacher::BufferHead *right = new BufferHead(this);
+
+  //inherit and if later access, this auto clean.
+  right->set_dontneed(left->get_dontneed());
+  right->set_nocache(left->get_nocache());
+
+  right->last_write_tid = left->last_write_tid;
+  right->last_read_tid = left->last_read_tid;
+  right->set_state(left->get_state());
+  right->snapc = left->snapc;
+  right->set_journal_tid(left->journal_tid);
+
+  loff_t newleftlen = off - left->start();
+  right->set_start(off);
+  right->set_length(left->length() - newleftlen);
+
+  // shorten left
+  oc->bh_stat_sub(left);
+  left->set_length(newleftlen);
+  oc->bh_stat_add(left);
+
+  // add right
+  oc->bh_add(this, right);
+
+  // split buffers too
+  bufferlist bl;
+  bl.claim(left->bl);
+  if (bl.length()) {
+    assert(bl.length() == (left->length() + right->length()));
+    right->bl.substr_of(bl, left->length(), right->length());
+    left->bl.substr_of(bl, 0, left->length());
+  }
+
+  // move read waiters
+  if (!left->waitfor_read.empty()) {
+    map<loff_t, list<Context*> >::iterator start_remove
+      = left->waitfor_read.begin();
+    while (start_remove != left->waitfor_read.end() &&
+          start_remove->first < right->start())
+      ++start_remove;
+    for (map<loff_t, list<Context*> >::iterator p = start_remove;
+        p != left->waitfor_read.end(); ++p) {
+      ldout(oc->cct, 20) << "split  moving waiters at byte " << p->first
+                        << " to right bh" << dendl;
+      right->waitfor_read[p->first].swap( p->second );
+      assert(p->second.empty());
+    }
+    left->waitfor_read.erase(start_remove, left->waitfor_read.end());
+  }
+
+  ldout(oc->cct, 20) << "split    left is " << *left << dendl;
+  ldout(oc->cct, 20) << "split   right is " << *right << dendl;
+  return right;
+}
+
+
+void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
+{
+  assert(oc->lock.is_locked());
+  assert(left->end() == right->start());
+  assert(left->get_state() == right->get_state());
+  assert(left->can_merge_journal(right));
+
+  ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
+  if (left->get_journal_tid() == 0) {
+    left->set_journal_tid(right->get_journal_tid());
+  }
+  right->set_journal_tid(0);
+
+  oc->bh_remove(this, right);
+  oc->bh_stat_sub(left);
+  left->set_length(left->length() + right->length());
+  oc->bh_stat_add(left);
+
+  // data
+  left->bl.claim_append(right->bl);
+
+  // version
+  // note: this is sorta busted, but should only be used for dirty buffers
+  left->last_write_tid =  MAX( left->last_write_tid, right->last_write_tid );
+  left->last_write = MAX( left->last_write, right->last_write );
+
+  left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
+  left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
+
+  // waiters
+  for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
+       p != right->waitfor_read.end();
+       ++p)
+    left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
+                                       p->second );
+
+  // hose right
+  delete right;
+
+  ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
+}
+
+void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
+{
+  assert(oc->lock.is_locked());
+  ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
+
+  // do not merge rx buffers; last_read_tid may not match
+  if (bh->is_rx())
+    return;
+
+  // to the left?
+  map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
+  assert(p->second == bh);
+  if (p != data.begin()) {
+    --p;
+    if (p->second->end() == bh->start() &&
+       p->second->get_state() == bh->get_state() &&
+       p->second->can_merge_journal(bh)) {
+      merge_left(p->second, bh);
+      bh = p->second;
+    } else {
+      ++p;
+    }
+  }
+  // to the right?
+  assert(p->second == bh);
+  ++p;
+  if (p != data.end() &&
+      p->second->start() == bh->end() &&
+      p->second->get_state() == bh->get_state() &&
+      p->second->can_merge_journal(bh))
+    merge_left(bh, p->second);
+}
+
+/*
+ * count bytes we have cached in given range
+ */
+bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
+{
+  assert(oc->lock.is_locked());
+  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
+  while (left > 0) {
+    if (p == data.end())
+      return false;
+
+    if (p->first <= cur) {
+      // have part of it
+      loff_t lenfromcur = MIN(p->second->end() - cur, left);
+      cur += lenfromcur;
+      left -= lenfromcur;
+      ++p;
+      continue;
+    } else if (p->first > cur) {
+      // gap
+      return false;
+    } else
+      ceph_abort();
+  }
+
+  return true;
+}
+
+/*
+ * all cached data in this range[off, off+len]
+ */
+bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
+{
+  assert(oc->lock.is_locked());
+  if (data.empty())
+      return true;
+  map<loff_t, BufferHead*>::iterator first = data.begin();
+  map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
+  if (first->second->start() >= off && last->second->end() <= (off + len))
+    return true;
+  else
+    return false;
+}
+
+/*
+ * map a range of bytes into buffer_heads.
+ * - create missing buffer_heads as necessary.
+ */
+int ObjectCacher::Object::map_read(ObjectExtent &ex,
+                                   map<loff_t, BufferHead*>& hits,
+                                   map<loff_t, BufferHead*>& missing,
+                                   map<loff_t, BufferHead*>& rx,
+                                  map<loff_t, BufferHead*>& errors)
+{
+  assert(oc->lock.is_locked());
+  ldout(oc->cct, 10) << "map_read " << ex.oid << " "
+                     << ex.offset << "~" << ex.length << dendl;
+
+  loff_t cur = ex.offset;
+  loff_t left = ex.length;
+
+  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
+  while (left > 0) {
+    // at end?
+    if (p == data.end()) {
+      // rest is a miss.
+      BufferHead *n = new BufferHead(this);
+      n->set_start(cur);
+      n->set_length(left);
+      oc->bh_add(this, n);
+      if (complete) {
+        oc->mark_zero(n);
+        hits[cur] = n;
+        ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
+      } else {
+        missing[cur] = n;
+        ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
+      }
+      cur += left;
+      assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
+      break;  // no more.
+    }
+
+    if (p->first <= cur) {
+      // have it (or part of it)
+      BufferHead *e = p->second;
+
+      if (e->is_clean() ||
+          e->is_dirty() ||
+          e->is_tx() ||
+          e->is_zero()) {
+        hits[cur] = e;     // readable!
+        ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
+      } else if (e->is_rx()) {
+        rx[cur] = e;       // missing, not readable.
+        ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
+      } else if (e->is_error()) {
+        errors[cur] = e;
+        ldout(oc->cct, 20) << "map_read error " << *e << dendl;
+      } else {
+        ceph_abort();
+      }
+
+      loff_t lenfromcur = MIN(e->end() - cur, left);
+      cur += lenfromcur;
+      left -= lenfromcur;
+      ++p;
+      continue;  // more?
+
+    } else if (p->first > cur) {
+      // gap.. miss
+      loff_t next = p->first;
+      BufferHead *n = new BufferHead(this);
+      loff_t len = MIN(next - cur, left);
+      n->set_start(cur);
+      n->set_length(len);
+      oc->bh_add(this,n);
+      if (complete) {
+        oc->mark_zero(n);
+        hits[cur] = n;
+        ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
+      } else {
+        missing[cur] = n;
+        ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
+      }
+      cur += MIN(left, n->length());
+      left -= MIN(left, n->length());
+      continue;    // more?
+    } else {
+      ceph_abort();
+    }
+  }
+  return 0;
+}
+
+void ObjectCacher::Object::audit_buffers()
+{
+  loff_t offset = 0;
+  for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
+       it != data.end(); ++it) {
+    if (it->first != it->second->start()) {
+      lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
+                    << " does not match bh start position: "
+                    << *it->second << dendl;
+      assert(it->first == it->second->start());
+    }
+    if (it->first < offset) {
+      lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
+                    << " overlaps with previous bh " << *((--it)->second)
+                    << dendl;
+      assert(it->first >= offset);
+    }
+    BufferHead *bh = it->second;
+    map<loff_t, list<Context*> >::const_iterator w_it;
+    for (w_it = bh->waitfor_read.begin();
+        w_it != bh->waitfor_read.end(); ++w_it) {
+      if (w_it->first < bh->start() ||
+           w_it->first >= bh->start() + bh->length()) {
+       lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
+                      << " is not within bh " << *bh << dendl;
+       assert(w_it->first >= bh->start());
+       assert(w_it->first < bh->start() + bh->length());
+      }
+    }
+    offset = it->first + it->second->length();
+  }
+}
+
+/*
+ * map a range of extents on an object's buffer cache.
+ * - combine any bh's we're writing into one
+ * - break up bufferheads that don't fall completely within the range
+ * //no! - return a bh that includes the write.  may also include
+ * other dirty data to left and/or right.
+ */
+ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
+                                                         ceph_tid_t tid)
+{
+  assert(oc->lock.is_locked());
+  BufferHead *final = 0;
+
+  ldout(oc->cct, 10) << "map_write oex " << ex.oid
+              << " " << ex.offset << "~" << ex.length << dendl;
+
+  loff_t cur = ex.offset;
+  loff_t left = ex.length;
+
+  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
+  while (left > 0) {
+    loff_t max = left;
+
+    // at end ?
+    if (p == data.end()) {
+      if (final == NULL) {
+        final = new BufferHead(this);
+        replace_journal_tid(final, tid);
+        final->set_start( cur );
+        final->set_length( max );
+        oc->bh_add(this, final);
+        ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
+      } else {
+        oc->bh_stat_sub(final);
+        final->set_length(final->length() + max);
+        oc->bh_stat_add(final);
+      }
+      left -= max;
+      cur += max;
+      continue;
+    }
+
+    ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
+    //oc->verify_stats();
+
+    if (p->first <= cur) {
+      BufferHead *bh = p->second;
+      ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
+
+      if (p->first < cur) {
+        assert(final == 0);
+        if (cur + max >= bh->end()) {
+          // we want right bit (one splice)
+          final = split(bh, cur);   // just split it, take right half.
+          replace_journal_tid(final, tid);
+          ++p;
+          assert(p->second == final);
+        } else {
+          // we want middle bit (two splices)
+          final = split(bh, cur);
+          ++p;
+          assert(p->second == final);
+          split(final, cur+max);
+          replace_journal_tid(final, tid);
+        }
+      } else {
+        assert(p->first == cur);
+        if (bh->length() <= max) {
+          // whole bufferhead, piece of cake.
+        } else {
+          // we want left bit (one splice)
+          split(bh, cur + max);        // just split
+        }
+        if (final) {
+          oc->mark_dirty(bh);
+          oc->mark_dirty(final);
+          --p;  // move iterator back to final
+          assert(p->second == final);
+          replace_journal_tid(bh, tid);
+          merge_left(final, bh);
+        } else {
+          final = bh;
+          replace_journal_tid(final, tid);
+        }
+      }
+
+      // keep going.
+      loff_t lenfromcur = final->end() - cur;
+      cur += lenfromcur;
+      left -= lenfromcur;
+      ++p;
+      continue;
+    } else {
+      // gap!
+      loff_t next = p->first;
+      loff_t glen = MIN(next - cur, max);
+      ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
+      if (final) {
+        oc->bh_stat_sub(final);
+        final->set_length(final->length() + glen);
+        oc->bh_stat_add(final);
+      } else {
+        final = new BufferHead(this);
+       replace_journal_tid(final, tid);
+        final->set_start( cur );
+        final->set_length( glen );
+        oc->bh_add(this, final);
+      }
+
+      cur += glen;
+      left -= glen;
+      continue;    // more?
+    }
+  }
+
+  // set version
+  assert(final);
+  assert(final->get_journal_tid() == tid);
+  ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
+
+  return final;
+}
+
+void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
+                                              ceph_tid_t tid) {
+  ceph_tid_t bh_tid = bh->get_journal_tid();
+
+  assert(tid == 0 || bh_tid <= tid);
+  if (bh_tid != 0 && bh_tid != tid) {
+    // inform journal that it should not expect a writeback from this extent
+    oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
+                                          bh->length(), bh_tid, tid);
+  }
+  bh->set_journal_tid(tid);
+}
+
+void ObjectCacher::Object::truncate(loff_t s)
+{
+  assert(oc->lock.is_locked());
+  ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
+
+  while (!data.empty()) {
+    BufferHead *bh = data.rbegin()->second;
+    if (bh->end() <= s)
+      break;
+
+    // split bh at truncation point?
+    if (bh->start() < s) {
+      split(bh, s);
+      continue;
+    }
+
+    // remove bh entirely
+    assert(bh->start() >= s);
+    assert(bh->waitfor_read.empty());
+    replace_journal_tid(bh, 0);
+    oc->bh_remove(this, bh);
+    delete bh;
+  }
+}
+
+void ObjectCacher::Object::discard(loff_t off, loff_t len)
+{
+  assert(oc->lock.is_locked());
+  ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
+                    << dendl;
+
+  if (!exists) {
+    ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
+    exists = true;
+  }
+  if (complete) {
+    ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
+    complete = false;
+  }
+
+  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
+  while (p != data.end()) {
+    BufferHead *bh = p->second;
+    if (bh->start() >= off + len)
+      break;
+
+    // split bh at truncation point?
+    if (bh->start() < off) {
+      split(bh, off);
+      ++p;
+      continue;
+    }
+
+    assert(bh->start() >= off);
+    if (bh->end() > off + len) {
+      split(bh, off + len);
+    }
+
+    ++p;
+    ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
+    assert(bh->waitfor_read.empty());
+    replace_journal_tid(bh, 0);
+    oc->bh_remove(this, bh);
+    delete bh;
+  }
+}
+
+
+
+/*** ObjectCacher ***/
+
+#undef dout_prefix
+#define dout_prefix *_dout << "objectcacher "
+
+
+ObjectCacher::ObjectCacher(CephContext *cct_, string name,
+                          WritebackHandler& wb, Mutex& l,
+                          flush_set_callback_t flush_callback,
+                          void *flush_callback_arg, uint64_t max_bytes,
+                          uint64_t max_objects, uint64_t max_dirty,
+                          uint64_t target_dirty, double max_dirty_age,
+                          bool block_writes_upfront)
+  : perfcounter(NULL),
+    cct(cct_), writeback_handler(wb), name(name), lock(l),
+    max_dirty(max_dirty), target_dirty(target_dirty),
+    max_size(max_bytes), max_objects(max_objects),
+    max_dirty_age(ceph::make_timespan(max_dirty_age)),
+    block_writes_upfront(block_writes_upfront),
+    trace_endpoint("ObjectCacher"),
+    flush_set_callback(flush_callback),
+    flush_set_callback_arg(flush_callback_arg),
+    last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
+    stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
+    stat_missing(0), stat_error(0), stat_dirty_waiting(0),
+    stat_nr_dirty_waiters(0), reads_outstanding(0)
+{
+  perf_start();
+  finisher.start();
+  scattered_write = writeback_handler.can_scattered_write();
+}
+
+ObjectCacher::~ObjectCacher()
+{
+  finisher.stop();
+  perf_stop();
+  // we should be empty.
+  for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
+        = objects.begin();
+       i != objects.end();
+       ++i)
+    assert(i->empty());
+  assert(bh_lru_rest.lru_get_size() == 0);
+  assert(bh_lru_dirty.lru_get_size() == 0);
+  assert(ob_lru.lru_get_size() == 0);
+  assert(dirty_or_tx_bh.empty());
+}
+
+void ObjectCacher::perf_start()
+{
+  string n = "objectcacher-" + name;
+  PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
+
+  plb.add_u64_counter(l_objectcacher_cache_ops_hit,
+                     "cache_ops_hit", "Hit operations");
+  plb.add_u64_counter(l_objectcacher_cache_ops_miss,
+                     "cache_ops_miss", "Miss operations");
+  plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
+                     "cache_bytes_hit", "Hit data");
+  plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
+                     "cache_bytes_miss", "Miss data");
+  plb.add_u64_counter(l_objectcacher_data_read,
+                     "data_read", "Read data");
+  plb.add_u64_counter(l_objectcacher_data_written,
+                     "data_written", "Data written to cache");
+  plb.add_u64_counter(l_objectcacher_data_flushed,
+                     "data_flushed", "Data flushed");
+  plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
+                     "data_overwritten_while_flushing",
+                     "Data overwritten while flushing");
+  plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
+                     "Write operations, delayed due to dirty limits");
+  plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
+                     "write_bytes_blocked",
+                     "Write data blocked on dirty limit");
+  plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
+              "Time spent blocking a write due to dirty limits");
+
+  perfcounter = plb.create_perf_counters();
+  cct->get_perfcounters_collection()->add(perfcounter);
+}
+
+void ObjectCacher::perf_stop()
+{
+  assert(perfcounter);
+  cct->get_perfcounters_collection()->remove(perfcounter);
+  delete perfcounter;
+}
+
+/* private */
+ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
+                                              uint64_t object_no,
+                                              ObjectSet *oset,
+                                              object_locator_t &l,
+                                              uint64_t truncate_size,
+                                              uint64_t truncate_seq)
+{
+  // XXX: Add handling of nspace in object_locator_t in cache
+  assert(lock.is_locked());
+  // have it?
+  if ((uint32_t)l.pool < objects.size()) {
+    if (objects[l.pool].count(oid)) {
+      Object *o = objects[l.pool][oid];
+      o->object_no = object_no;
+      o->truncate_size = truncate_size;
+      o->truncate_seq = truncate_seq;
+      return o;
+    }
+  } else {
+    objects.resize(l.pool+1);
+  }
+
+  // create it.
+  Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
+                        truncate_seq);
+  objects[l.pool][oid] = o;
+  ob_lru.lru_insert_top(o);
+  return o;
+}
+
+void ObjectCacher::close_object(Object *ob)
+{
+  assert(lock.is_locked());
+  ldout(cct, 10) << "close_object " << *ob << dendl;
+  assert(ob->can_close());
+
+  // ok!
+  ob_lru.lru_remove(ob);
+  objects[ob->oloc.pool].erase(ob->get_soid());
+  ob->set_item.remove_myself();
+  delete ob;
+}
+
+void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
+                           const ZTracer::Trace &parent_trace)
+{
+  assert(lock.is_locked());
+  ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
+               << reads_outstanding << dendl;
+
+  ZTracer::Trace trace;
+  if (parent_trace.valid()) {
+    trace.init("", &trace_endpoint, &parent_trace);
+    trace.copy_name("bh_read " + bh->ob->get_oid().name);
+    trace.event("start");
+  }
+
+  mark_rx(bh);
+  bh->last_read_tid = ++last_read_tid;
+
+  // finisher
+  C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
+                                           bh->start(), bh->length(), trace);
+  // go
+  writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
+                        bh->ob->get_oloc(), bh->start(), bh->length(),
+                        bh->ob->get_snap(), &onfinish->bl,
+                        bh->ob->truncate_size, bh->ob->truncate_seq,
+                        op_flags, trace, onfinish);
+
+  ++reads_outstanding;
+}
+
+void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
+                                 ceph_tid_t tid, loff_t start,
+                                 uint64_t length, bufferlist &bl, int r,
+                                 bool trust_enoent)
+{
+  assert(lock.is_locked());
+  ldout(cct, 7) << "bh_read_finish "
+               << oid
+               << " tid " << tid
+               << " " << start << "~" << length
+               << " (bl is " << bl.length() << ")"
+               << " returned " << r
+               << " outstanding reads " << reads_outstanding
+               << dendl;
+
+  if (r >= 0 && bl.length() < length) {
+    ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
+                 << length << " with " << length - bl.length() << " bytes of zeroes"
+                 << dendl;
+    bl.append_zero(length - bl.length());
+  }
+
+  list<Context*> ls;
+  int err = 0;
+
+  if (objects[poolid].count(oid) == 0) {
+    ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
+  } else {
+    Object *ob = objects[poolid][oid];
+
+    if (r == -ENOENT && !ob->complete) {
+      // wake up *all* rx waiters, or else we risk reordering
+      // identical reads. e.g.
+      //   read 1~1
+      //   reply to unrelated 3~1 -> !exists
+      //   read 1~1 -> immediate ENOENT
+      //   reply to first 1~1 -> ooo ENOENT
+      bool allzero = true;
+      for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
+          p != ob->data.end(); ++p) {
+       BufferHead *bh = p->second;
+       for (map<loff_t, list<Context*> >::iterator p
+              = bh->waitfor_read.begin();
+            p != bh->waitfor_read.end();
+            ++p)
+         ls.splice(ls.end(), p->second);
+       bh->waitfor_read.clear();
+       if (!bh->is_zero() && !bh->is_rx())
+         allzero = false;
+      }
+
+      // just pass through and retry all waiters if we don't trust
+      // -ENOENT for this read
+      if (trust_enoent) {
+       ldout(cct, 7)
+         << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
+         << dendl;
+       ob->complete = true;
+       ob->exists = false;
+
+       /* If all the bhs are effectively zero, get rid of them.  All
+        * the waiters will be retried and get -ENOENT immediately, so
+        * it's safe to clean up the unneeded bh's now. Since we know
+        * it's safe to remove them now, do so, so they aren't hanging
+        *around waiting for more -ENOENTs from rados while the cache
+        * is being shut down.
+        *
+        * Only do this when all the bhs are rx or clean, to match the
+        * condition in _readx(). If there are any non-rx or non-clean
+        * bhs, _readx() will wait for the final result instead of
+        * returning -ENOENT immediately.
+        */
+       if (allzero) {
+         ldout(cct, 10)
+           << "bh_read_finish ENOENT and allzero, getting rid of "
+           << "bhs for " << *ob << dendl;
+         map<loff_t, BufferHead*>::iterator p = ob->data.begin();
+         while (p != ob->data.end()) {
+           BufferHead *bh = p->second;
+           // current iterator will be invalidated by bh_remove()
+           ++p;
+           bh_remove(ob, bh);
+           delete bh;
+         }
+       }
+      }
+    }
+
+    // apply to bh's!
+    loff_t opos = start;
+    while (true) {
+      map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
+      if (p == ob->data.end())
+       break;
+      if (opos >= start+(loff_t)length) {
+       ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
+                      << start << "+" << length << "=" << start+(loff_t)length
+                      << dendl;
+       break;
+      }
+
+      BufferHead *bh = p->second;
+      ldout(cct, 20) << "checking bh " << *bh << dendl;
+
+      // finishers?
+      for (map<loff_t, list<Context*> >::iterator it
+            = bh->waitfor_read.begin();
+          it != bh->waitfor_read.end();
+          ++it)
+       ls.splice(ls.end(), it->second);
+      bh->waitfor_read.clear();
+
+      if (bh->start() > opos) {
+       ldout(cct, 1) << "bh_read_finish skipping gap "
+                     << opos << "~" << bh->start() - opos
+                     << dendl;
+       opos = bh->start();
+       continue;
+      }
+
+      if (!bh->is_rx()) {
+       ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
+       opos = bh->end();
+       continue;
+      }
+
+      if (bh->last_read_tid != tid) {
+       ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
+                      << bh->last_read_tid << " != tid " << tid
+                      << ", skipping" << dendl;
+       opos = bh->end();
+       continue;
+      }
+
+      assert(opos >= bh->start());
+      assert(bh->start() == opos);   // we don't merge rx bh's... yet!
+      assert(bh->length() <= start+(loff_t)length-opos);
+
+      if (bh->error < 0)
+       err = bh->error;
+
+      opos = bh->end();
+
+      if (r == -ENOENT) {
+       if (trust_enoent) {
+         ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
+         bh_remove(ob, bh);
+         delete bh;
+       } else {
+         ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
+                        << *bh << dendl;
+       }
+       continue;
+      }
+
+      if (r < 0) {
+       bh->error = r;
+       mark_error(bh);
+      } else {
+       bh->bl.substr_of(bl,
+                        bh->start() - start,
+                        bh->length());
+       mark_clean(bh);
+      }
+
+      ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
+
+      ob->try_merge_bh(bh);
+    }
+  }
+
+  // called with lock held.
+  ldout(cct, 20) << "finishing waiters " << ls << dendl;
+
+  finish_contexts(cct, ls, err);
+  retry_waiting_reads();
+
+  --reads_outstanding;
+  read_cond.Signal();
+}
+
+void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
+                                       int64_t *max_amount, int *max_count)
+{
+  list<BufferHead*> blist;
+
+  int count = 0;
+  int64_t total_len = 0;
+  set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
+  assert(it != dirty_or_tx_bh.end());
+  for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
+       p != dirty_or_tx_bh.end();
+       ++p) {
+    BufferHead *obh = *p;
+    if (obh->ob != bh->ob)
+      break;
+    if (obh->is_dirty() && obh->last_write <= cutoff) {
+      blist.push_back(obh);
+      ++count;
+      total_len += obh->length();
+      if ((max_count && count > *max_count) ||
+         (max_amount && total_len > *max_amount))
+       break;
+    }
+  }
+
+  while (it != dirty_or_tx_bh.begin()) {
+    --it;
+    BufferHead *obh = *it;
+    if (obh->ob != bh->ob)
+      break;
+    if (obh->is_dirty() && obh->last_write <= cutoff) {
+      blist.push_front(obh);
+      ++count;
+      total_len += obh->length();
+      if ((max_count && count > *max_count) ||
+         (max_amount && total_len > *max_amount))
+       break;
+    }
+  }
+  if (max_count)
+    *max_count -= count;
+  if (max_amount)
+    *max_amount -= total_len;
+
+  bh_write_scattered(blist);
+}
+
+class ObjectCacher::C_WriteCommit : public Context {
+  ObjectCacher *oc;
+  int64_t poolid;
+  sobject_t oid;
+  vector<pair<loff_t, uint64_t> > ranges;
+  ZTracer::Trace trace;
+public:
+  ceph_tid_t tid = 0;
+  C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
+               uint64_t l, const ZTracer::Trace &trace) :
+    oc(c), poolid(_poolid), oid(o), trace(trace) {
+      ranges.push_back(make_pair(s, l));
+    }
+  C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
+               vector<pair<loff_t, uint64_t> >& _ranges) :
+    oc(c), poolid(_poolid), oid(o), tid(0) {
+      ranges.swap(_ranges);
+    }
+  void finish(int r) override {
+    oc->bh_write_commit(poolid, oid, ranges, tid, r);
+    trace.event("finish");
+  }
+};
+void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
+{
+  assert(lock.is_locked());
+
+  Object *ob = blist.front()->ob;
+  ob->get();
+
+  ceph::real_time last_write;
+  SnapContext snapc;
+  vector<pair<loff_t, uint64_t> > ranges;
+  vector<pair<uint64_t, bufferlist> > io_vec;
+
+  ranges.reserve(blist.size());
+  io_vec.reserve(blist.size());
+
+  uint64_t total_len = 0;
+  for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+    BufferHead *bh = *p;
+    ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
+    assert(bh->ob == ob);
+    assert(bh->bl.length() == bh->length());
+    ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
+
+    int n = io_vec.size();
+    io_vec.resize(n + 1);
+    io_vec[n].first = bh->start();
+    io_vec[n].second = bh->bl;
+
+    total_len += bh->length();
+    if (bh->snapc.seq > snapc.seq)
+      snapc = bh->snapc;
+    if (bh->last_write > last_write)
+      last_write = bh->last_write;
+  }
+
+  C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
+
+  ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
+                                          io_vec, snapc, last_write,
+                                          ob->truncate_size, ob->truncate_seq,
+                                          oncommit);
+  oncommit->tid = tid;
+  ob->last_write_tid = tid;
+  for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+    BufferHead *bh = *p;
+    bh->last_write_tid = tid;
+    mark_tx(bh);
+  }
+
+  if (perfcounter)
+    perfcounter->inc(l_objectcacher_data_flushed, total_len);
+}
+
+void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
+{
+  assert(lock.is_locked());
+  ldout(cct, 7) << "bh_write " << *bh << dendl;
+
+  bh->ob->get();
+
+  ZTracer::Trace trace;
+  if (parent_trace.valid()) {
+    trace.init("", &trace_endpoint, &parent_trace);
+    trace.copy_name("bh_write " + bh->ob->get_oid().name);
+    trace.event("start");
+  }
+
+  // finishers
+  C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
+                                             bh->ob->get_soid(), bh->start(),
+                                             bh->length(), trace);
+  // go
+  ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
+                                          bh->ob->get_oloc(),
+                                          bh->start(), bh->length(),
+                                          bh->snapc, bh->bl, bh->last_write,
+                                          bh->ob->truncate_size,
+                                          bh->ob->truncate_seq,
+                                          bh->journal_tid, trace, oncommit);
+  ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
+
+  // set bh last_write_tid
+  oncommit->tid = tid;
+  bh->ob->last_write_tid = tid;
+  bh->last_write_tid = tid;
+
+  if (perfcounter) {
+    perfcounter->inc(l_objectcacher_data_flushed, bh->length());
+  }
+
+  mark_tx(bh);
+}
+
+void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
+                                  vector<pair<loff_t, uint64_t> >& ranges,
+                                  ceph_tid_t tid, int r)
+{
+  assert(lock.is_locked());
+  ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
+               << " ranges " << ranges << " returned " << r << dendl;
+
+  if (objects[poolid].count(oid) == 0) {
+    ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
+    return;
+  }
+
+  Object *ob = objects[poolid][oid];
+  int was_dirty_or_tx = ob->oset->dirty_or_tx;
+
+  for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
+       p != ranges.end();
+       ++p) {
+    loff_t start = p->first;
+    uint64_t length = p->second;
+    if (!ob->exists) {
+      ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
+      ob->exists = true;
+
+      if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
+                                             ob->get_snap())) {
+       ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
+         "complete on " << *ob << dendl;
+       ob->complete = false;
+      }
+    }
+
+    vector<pair<loff_t, BufferHead*>> hit;
+    // apply to bh's!
+    for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
+        p != ob->data.end();
+        ++p) {
+      BufferHead *bh = p->second;
+
+      if (bh->start() > start+(loff_t)length)
+       break;
+
+      if (bh->start() < start &&
+         bh->end() > start+(loff_t)length) {
+       ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
+       continue;
+      }
+
+      // make sure bh is tx
+      if (!bh->is_tx()) {
+       ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
+       continue;
+      }
+
+      // make sure bh tid matches
+      if (bh->last_write_tid != tid) {
+       assert(bh->last_write_tid > tid);
+       ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
+       continue;
+      }
+
+      if (r >= 0) {
+       // ok!  mark bh clean and error-free
+       mark_clean(bh);
+       bh->set_journal_tid(0);
+       if (bh->get_nocache())
+         bh_lru_rest.lru_bottouch(bh);
+       hit.push_back(make_pair(bh->start(), bh));
+       ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
+      } else {
+       mark_dirty(bh);
+       ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
+                      << *bh << " r = " << r << " " << cpp_strerror(-r)
+                      << dendl;
+      }
+    }
+
+    for (auto& p : hit) {
+      //p.second maybe merged and deleted in merge_left
+      if (ob->data.count(p.first))
+       ob->try_merge_bh(p.second);
+    }
+  }
+
+  // update last_commit.
+  assert(ob->last_commit_tid < tid);
+  ob->last_commit_tid = tid;
+
+  // waiters?
+  list<Context*> ls;
+  if (ob->waitfor_commit.count(tid)) {
+    ls.splice(ls.begin(), ob->waitfor_commit[tid]);
+    ob->waitfor_commit.erase(tid);
+  }
+
+  // is the entire object set now clean and fully committed?
+  ObjectSet *oset = ob->oset;
+  ob->put();
+
+  if (flush_set_callback &&
+      was_dirty_or_tx > 0 &&
+      oset->dirty_or_tx == 0) {        // nothing dirty/tx
+    flush_set_callback(flush_set_callback_arg, oset);
+  }
+
+  if (!ls.empty())
+    finish_contexts(cct, ls, r);
+}
+
+void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
+{
+  assert(trace != nullptr);
+  assert(lock.is_locked());
+  ceph::real_time cutoff = ceph::real_clock::now();
+
+  ldout(cct, 10) << "flush " << amount << dendl;
+
+  /*
+   * NOTE: we aren't actually pulling things off the LRU here, just
+   * looking at the tail item.  Then we call bh_write, which moves it
+   * to the other LRU, so that we can call
+   * lru_dirty.lru_get_next_expire() again.
+   */
+  int64_t left = amount;
+  while (amount == 0 || left > 0) {
+    BufferHead *bh = static_cast<BufferHead*>(
+      bh_lru_dirty.lru_get_next_expire());
+    if (!bh) break;
+    if (bh->last_write > cutoff) break;
+
+    if (scattered_write) {
+      bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
+    } else {
+      left -= bh->length();
+      bh_write(bh, *trace);
+    }
+  }
+}
+
+
+void ObjectCacher::trim()
+{
+  assert(lock.is_locked());
+  ldout(cct, 10) << "trim  start: bytes: max " << max_size << "  clean "
+                << get_stat_clean() << ", objects: max " << max_objects
+                << " current " << ob_lru.lru_get_size() << dendl;
+
+  uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
+  uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
+  while (get_stat_clean() > 0 &&
+        ((uint64_t)get_stat_clean() > max_size ||
+         nr_clean_bh > max_clean_bh)) {
+    BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
+    if (!bh)
+      break;
+
+    ldout(cct, 10) << "trim trimming " << *bh << dendl;
+    assert(bh->is_clean() || bh->is_zero() || bh->is_error());
+
+    Object *ob = bh->ob;
+    bh_remove(ob, bh);
+    delete bh;
+
+    --nr_clean_bh;
+
+    if (ob->complete) {
+      ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
+      ob->complete = false;
+    }
+  }
+
+  while (ob_lru.lru_get_size() > max_objects) {
+    Object *ob = static_cast<Object*>(ob_lru.lru_expire());
+    if (!ob)
+      break;
+
+    ldout(cct, 10) << "trim trimming " << *ob << dendl;
+    close_object(ob);
+  }
+
+  ldout(cct, 10) << "trim finish:  max " << max_size << "  clean "
+                << get_stat_clean() << ", objects: max " << max_objects
+                << " current " << ob_lru.lru_get_size() << dendl;
+}
+
+
+
+/* public */
+
+bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
+                            snapid_t snapid)
+{
+  assert(lock.is_locked());
+  for (vector<ObjectExtent>::iterator ex_it = extents.begin();
+       ex_it != extents.end();
+       ++ex_it) {
+    ldout(cct, 10) << "is_cached " << *ex_it << dendl;
+
+    // get Object cache
+    sobject_t soid(ex_it->oid, snapid);
+    Object *o = get_object_maybe(soid, ex_it->oloc);
+    if (!o)
+      return false;
+    if (!o->is_cached(ex_it->offset, ex_it->length))
+      return false;
+  }
+  return true;
+}
+
+
+/*
+ * returns # bytes read (if in cache).  onfinish is untouched (caller
+ *           must delete it)
+ * returns 0 if doing async read
+ */
+int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+                       ZTracer::Trace *parent_trace)
+{
+  ZTracer::Trace trace;
+  if (parent_trace != nullptr) {
+    trace.init("read", &trace_endpoint, parent_trace);
+    trace.event("start");
+  }
+
+  int r =_readx(rd, oset, onfinish, true, &trace);
+  if (r < 0) {
+    trace.event("finish");
+  }
+  return r;
+}
+
+int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+                        bool external_call, ZTracer::Trace *trace)
+{
+  assert(trace != nullptr);
+  assert(lock.is_locked());
+  bool success = true;
+  int error = 0;
+  uint64_t bytes_in_cache = 0;
+  uint64_t bytes_not_in_cache = 0;
+  uint64_t total_bytes_read = 0;
+  map<uint64_t, bufferlist> stripe_map;  // final buffer offset -> substring
+  bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
+  bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
+
+  /*
+   * WARNING: we can only meaningfully return ENOENT if the read request
+   * passed in a single ObjectExtent.  Any caller who wants ENOENT instead of
+   * zeroed buffers needs to feed single extents into readx().
+   */
+  assert(!oset->return_enoent || rd->extents.size() == 1);
+
+  for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
+       ex_it != rd->extents.end();
+       ++ex_it) {
+    ldout(cct, 10) << "readx " << *ex_it << dendl;
+
+    total_bytes_read += ex_it->length;
+
+    // get Object cache
+    sobject_t soid(ex_it->oid, rd->snap);
+    Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
+                          ex_it->truncate_size, oset->truncate_seq);
+    if (external_call)
+      touch_ob(o);
+
+    // does not exist and no hits?
+    if (oset->return_enoent && !o->exists) {
+      ldout(cct, 10) << "readx  object !exists, 1 extent..." << dendl;
+
+      // should we worry about COW underneath us?
+      if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
+                                             ex_it->length, soid.snap)) {
+       ldout(cct, 20) << "readx  may copy on write" << dendl;
+       bool wait = false;
+       list<BufferHead*> blist;
+       for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
+            bh_it != o->data.end();
+            ++bh_it) {
+         BufferHead *bh = bh_it->second;
+         if (bh->is_dirty() || bh->is_tx()) {
+           ldout(cct, 10) << "readx  flushing " << *bh << dendl;
+           wait = true;
+           if (bh->is_dirty()) {
+             if (scattered_write)
+               blist.push_back(bh);
+             else
+               bh_write(bh, *trace);
+           }
+         }
+       }
+       if (scattered_write && !blist.empty())
+         bh_write_scattered(blist);
+       if (wait) {
+         ldout(cct, 10) << "readx  waiting on tid " << o->last_write_tid
+                        << " on " << *o << dendl;
+         o->waitfor_commit[o->last_write_tid].push_back(
+           new C_RetryRead(this,rd, oset, onfinish, *trace));
+         // FIXME: perfcounter!
+         return 0;
+       }
+      }
+
+      // can we return ENOENT?
+      bool allzero = true;
+      for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
+          bh_it != o->data.end();
+          ++bh_it) {
+       ldout(cct, 20) << "readx  ob has bh " << *bh_it->second << dendl;
+       if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
+         allzero = false;
+         break;
+       }
+      }
+      if (allzero) {
+       ldout(cct, 10) << "readx  ob has all zero|rx, returning ENOENT"
+                      << dendl;
+       delete rd;
+       if (dontneed)
+         bottouch_ob(o);
+       return -ENOENT;
+      }
+    }
+
+    // map extent into bufferheads
+    map<loff_t, BufferHead*> hits, missing, rx, errors;
+    o->map_read(*ex_it, hits, missing, rx, errors);
+    if (external_call) {
+      // retry reading error buffers
+      missing.insert(errors.begin(), errors.end());
+    } else {
+      // some reads had errors, fail later so completions
+      // are cleaned up properly
+      // TODO: make read path not call _readx for every completion
+      hits.insert(errors.begin(), errors.end());
+    }
+
+    if (!missing.empty() || !rx.empty()) {
+      // read missing
+      map<loff_t, BufferHead*>::iterator last = missing.end();
+      for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
+          bh_it != missing.end();
+          ++bh_it) {
+       uint64_t rx_bytes = static_cast<uint64_t>(
+         stat_rx + bh_it->second->length());
+       bytes_not_in_cache += bh_it->second->length();
+       if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
+         // cache is full with concurrent reads -- wait for rx's to complete
+         // to constrain memory growth (especially during copy-ups)
+         if (success) {
+           ldout(cct, 10) << "readx missed, waiting on cache to complete "
+                          << waitfor_read.size() << " blocked reads, "
+                          << (MAX(rx_bytes, max_size) - max_size)
+                          << " read bytes" << dendl;
+           waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
+                                                  *trace));
+         }
+
+         bh_remove(o, bh_it->second);
+         delete bh_it->second;
+       } else {
+         bh_it->second->set_nocache(nocache);
+         bh_read(bh_it->second, rd->fadvise_flags, *trace);
+         if ((success && onfinish) || last != missing.end())
+           last = bh_it;
+       }
+       success = false;
+      }
+
+      //add wait in last bh avoid wakeup early. Because read is order
+      if (last != missing.end()) {
+       ldout(cct, 10) << "readx missed, waiting on " << *last->second
+         << " off " << last->first << dendl;
+       last->second->waitfor_read[last->first].push_back(
+         new C_RetryRead(this, rd, oset, onfinish, *trace) );
+
+      }
+
+      // bump rx
+      for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
+          bh_it != rx.end();
+          ++bh_it) {
+       touch_bh(bh_it->second); // bump in lru, so we don't lose it.
+       if (success && onfinish) {
+         ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
+                        << " off " << bh_it->first << dendl;
+         bh_it->second->waitfor_read[bh_it->first].push_back(
+           new C_RetryRead(this, rd, oset, onfinish, *trace) );
+       }
+       bytes_not_in_cache += bh_it->second->length();
+       success = false;
+      }
+
+      for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
+          bh_it != hits.end();  ++bh_it)
+       //bump in lru, so we don't lose it when later read
+       touch_bh(bh_it->second);
+
+    } else {
+      assert(!hits.empty());
+
+      // make a plain list
+      for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
+          bh_it != hits.end();
+          ++bh_it) {
+       BufferHead *bh = bh_it->second;
+       ldout(cct, 10) << "readx hit bh " << *bh << dendl;
+       if (bh->is_error() && bh->error)
+         error = bh->error;
+       bytes_in_cache += bh->length();
+
+       if (bh->get_nocache() && bh->is_clean())
+         bh_lru_rest.lru_bottouch(bh);
+       else
+         touch_bh(bh);
+       //must be after touch_bh because touch_bh set dontneed false
+       if (dontneed &&
+           ((loff_t)ex_it->offset <= bh->start() &&
+            (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
+         bh->set_dontneed(true); //if dirty
+         if (bh->is_clean())
+           bh_lru_rest.lru_bottouch(bh);
+       }
+      }
+
+      if (!error) {
+       // create reverse map of buffer offset -> object for the
+       // eventual result.  this is over a single ObjectExtent, so we
+       // know that
+       //  - the bh's are contiguous
+       //  - the buffer frags need not be (and almost certainly aren't)
+       loff_t opos = ex_it->offset;
+       map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
+       assert(bh_it->second->start() <= opos);
+       uint64_t bhoff = opos - bh_it->second->start();
+       vector<pair<uint64_t,uint64_t> >::iterator f_it
+         = ex_it->buffer_extents.begin();
+       uint64_t foff = 0;
+       while (1) {
+         BufferHead *bh = bh_it->second;
+         assert(opos == (loff_t)(bh->start() + bhoff));
+
+         uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
+         ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
+                        << bhoff << " frag " << f_it->first << "~"
+                        << f_it->second << " +" << foff << "~" << len
+                        << dendl;
+
+         bufferlist bit;
+         // put substr here first, since substr_of clobbers, and we
+         // may get multiple bh's at this stripe_map position
+         if (bh->is_zero()) {
+           stripe_map[f_it->first].append_zero(len);
+         } else {
+           bit.substr_of(bh->bl,
+               opos - bh->start(),
+               len);
+           stripe_map[f_it->first].claim_append(bit);
+         }
+
+         opos += len;
+         bhoff += len;
+         foff += len;
+         if (opos == bh->end()) {
+           ++bh_it;
+           bhoff = 0;
+         }
+         if (foff == f_it->second) {
+           ++f_it;
+           foff = 0;
+         }
+         if (bh_it == hits.end()) break;
+         if (f_it == ex_it->buffer_extents.end())
+           break;
+       }
+       assert(f_it == ex_it->buffer_extents.end());
+       assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
+      }
+
+      if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
+         bottouch_ob(o);
+    }
+  }
+
+  if (!success) {
+    if (perfcounter && external_call) {
+      perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
+      perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
+      perfcounter->inc(l_objectcacher_cache_ops_miss);
+    }
+    if (onfinish) {
+      ldout(cct, 20) << "readx defer " << rd << dendl;
+    } else {
+      ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
+                    << dendl;
+      delete rd;
+    }
+    return 0;  // wait!
+  }
+  if (perfcounter && external_call) {
+    perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
+    perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
+    perfcounter->inc(l_objectcacher_cache_ops_hit);
+  }
+
+  // no misses... success!  do the read.
+  ldout(cct, 10) << "readx has all buffers" << dendl;
+
+  // ok, assemble into result buffer.
+  uint64_t pos = 0;
+  if (rd->bl && !error) {
+    rd->bl->clear();
+    for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
+        i != stripe_map.end();
+        ++i) {
+      assert(pos == i->first);
+      ldout(cct, 10) << "readx  adding buffer len " << i->second.length()
+                    << " at " << pos << dendl;
+      pos += i->second.length();
+      rd->bl->claim_append(i->second);
+      assert(rd->bl->length() == pos);
+    }
+    ldout(cct, 10) << "readx  result is " << rd->bl->length() << dendl;
+  } else if (!error) {
+    ldout(cct, 10) << "readx  no bufferlist ptr (readahead?), done." << dendl;
+    map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
+    pos = i->first + i->second.length();
+  }
+
+  // done with read.
+  int ret = error ? error : pos;
+  ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
+  assert(pos <= (uint64_t) INT_MAX);
+
+  delete rd;
+
+  trim();
+
+  return ret;
+}
+
+void ObjectCacher::retry_waiting_reads()
+{
+  list<Context *> ls;
+  ls.swap(waitfor_read);
+
+  while (!ls.empty() && waitfor_read.empty()) {
+    Context *ctx = ls.front();
+    ls.pop_front();
+    ctx->complete(0);
+  }
+  waitfor_read.splice(waitfor_read.end(), ls);
+}
+
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
+                        ZTracer::Trace *parent_trace)
+{
+  assert(lock.is_locked());
+  ceph::real_time now = ceph::real_clock::now();
+  uint64_t bytes_written = 0;
+  uint64_t bytes_written_in_flush = 0;
+  bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
+  bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
+
+  ZTracer::Trace trace;
+  if (parent_trace != nullptr) {
+    trace.init("write", &trace_endpoint, parent_trace);
+    trace.event("start");
+  }
+
+  for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
+       ex_it != wr->extents.end();
+       ++ex_it) {
+    // get object cache
+    sobject_t soid(ex_it->oid, CEPH_NOSNAP);
+    Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
+                          ex_it->truncate_size, oset->truncate_seq);
+
+    // map it all into a single bufferhead.
+    BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
+    bool missing = bh->is_missing();
+    bh->snapc = wr->snapc;
+
+    bytes_written += ex_it->length;
+    if (bh->is_tx()) {
+      bytes_written_in_flush += ex_it->length;
+    }
+
+    // adjust buffer pointers (ie "copy" data into my cache)
+    // this is over a single ObjectExtent, so we know that
+    //  - there is one contiguous bh
+    //  - the buffer frags need not be (and almost certainly aren't)
+    // note: i assume striping is monotonic... no jumps backwards, ever!
+    loff_t opos = ex_it->offset;
+    for (vector<pair<uint64_t, uint64_t> >::iterator f_it
+          = ex_it->buffer_extents.begin();
+        f_it != ex_it->buffer_extents.end();
+        ++f_it) {
+      ldout(cct, 10) << "writex writing " << f_it->first << "~"
+                    << f_it->second << " into " << *bh << " at " << opos
+                    << dendl;
+      uint64_t bhoff = bh->start() - opos;
+      assert(f_it->second <= bh->length() - bhoff);
+
+      // get the frag we're mapping in
+      bufferlist frag;
+      frag.substr_of(wr->bl,
+                    f_it->first, f_it->second);
+
+      // keep anything left of bhoff
+      bufferlist newbl;
+      if (bhoff)
+       newbl.substr_of(bh->bl, 0, bhoff);
+      newbl.claim_append(frag);
+      bh->bl.swap(newbl);
+
+      opos += f_it->second;
+    }
+
+    // ok, now bh is dirty.
+    mark_dirty(bh);
+    if (dontneed)
+      bh->set_dontneed(true);
+    else if (nocache && missing)
+      bh->set_nocache(true);
+    else
+      touch_bh(bh);
+
+    bh->last_write = now;
+
+    o->try_merge_bh(bh);
+  }
+
+  if (perfcounter) {
+    perfcounter->inc(l_objectcacher_data_written, bytes_written);
+    if (bytes_written_in_flush) {
+      perfcounter->inc(l_objectcacher_overwritten_in_flush,
+                      bytes_written_in_flush);
+    }
+  }
+
+  int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
+  delete wr;
+
+  //verify_stats();
+  trim();
+  return r;
+}
+
+class ObjectCacher::C_WaitForWrite : public Context {
+public:
+  C_WaitForWrite(ObjectCacher *oc, uint64_t len,
+                 const ZTracer::Trace &trace, Context *onfinish) :
+    m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
+  void finish(int r) override;
+private:
+  ObjectCacher *m_oc;
+  uint64_t m_len;
+  ZTracer::Trace m_trace;
+  Context *m_onfinish;
+};
+
+void ObjectCacher::C_WaitForWrite::finish(int r)
+{
+  Mutex::Locker l(m_oc->lock);
+  m_oc->maybe_wait_for_writeback(m_len, &m_trace);
+  m_onfinish->complete(r);
+}
+
+void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
+                                            ZTracer::Trace *trace)
+{
+  assert(lock.is_locked());
+  ceph::mono_time start = ceph::mono_clock::now();
+  int blocked = 0;
+  // wait for writeback?
+  //  - wait for dirty and tx bytes (relative to the max_dirty threshold)
+  //  - do not wait for bytes other waiters are waiting on.  this means that
+  //    threads do not wait for each other.  this effectively allows the cache
+  //    size to balloon proportional to the data that is in flight.
+
+  uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
+  while (get_stat_dirty() + get_stat_tx() > 0 &&
+        (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
+         max_dirty + get_stat_dirty_waiting()) ||
+        (dirty_or_tx_bh.size() >=
+         max_dirty_bh + get_stat_nr_dirty_waiters()))) {
+
+    if (blocked == 0) {
+      trace->event("start wait for writeback");
+    }
+    ldout(cct, 10) << __func__ << " waiting for dirty|tx "
+                  << (get_stat_dirty() + get_stat_tx()) << " >= max "
+                  << max_dirty << " + dirty_waiting "
+                  << get_stat_dirty_waiting() << dendl;
+    flusher_cond.Signal();
+    stat_dirty_waiting += len;
+    ++stat_nr_dirty_waiters;
+    stat_cond.Wait(lock);
+    stat_dirty_waiting -= len;
+    --stat_nr_dirty_waiters;
+    ++blocked;
+    ldout(cct, 10) << __func__ << " woke up" << dendl;
+  }
+  if (blocked > 0) {
+    trace->event("finish wait for writeback");
+  }
+  if (blocked && perfcounter) {
+    perfcounter->inc(l_objectcacher_write_ops_blocked);
+    perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
+    ceph::timespan blocked = ceph::mono_clock::now() - start;
+    perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
+  }
+}
+
+// blocking wait for write.
+int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
+                                 ZTracer::Trace *trace, Context *onfreespace)
+{
+  assert(lock.is_locked());
+  assert(trace != nullptr);
+  int ret = 0;
+
+  if (max_dirty > 0) {
+    if (block_writes_upfront) {
+      maybe_wait_for_writeback(len, trace);
+      if (onfreespace)
+       onfreespace->complete(0);
+    } else {
+      assert(onfreespace);
+      finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
+    }
+  } else {
+    // write-thru!  flush what we just wrote.
+    Cond cond;
+    bool done = false;
+    Context *fin = block_writes_upfront ?
+      new C_Cond(&cond, &done, &ret) : onfreespace;
+    assert(fin);
+    bool flushed = flush_set(oset, wr->extents, trace, fin);
+    assert(!flushed);   // we just dirtied it, and didn't drop our lock!
+    ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
+                  << " bytes" << dendl;
+    if (block_writes_upfront) {
+      while (!done)
+       cond.Wait(lock);
+      ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
+      if (onfreespace)
+       onfreespace->complete(ret);
+    }
+  }
+
+  // start writeback anyway?
+  if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
+    ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
+                  << target_dirty << ", nudging flusher" << dendl;
+    flusher_cond.Signal();
+  }
+  return ret;
+}
+
+void ObjectCacher::flusher_entry()
+{
+  ldout(cct, 10) << "flusher start" << dendl;
+  lock.Lock();
+  while (!flusher_stop) {
+    loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
+      get_stat_dirty();
+    ldout(cct, 11) << "flusher "
+                  << all << " / " << max_size << ":  "
+                  << get_stat_tx() << " tx, "
+                  << get_stat_rx() << " rx, "
+                  << get_stat_clean() << " clean, "
+                  << get_stat_dirty() << " dirty ("
+                  << target_dirty << " target, "
+                  << max_dirty << " max)"
+                  << dendl;
+    loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
+
+    ZTracer::Trace trace;
+    if (cct->_conf->osdc_blkin_trace_all) {
+      trace.init("flusher", &trace_endpoint);
+      trace.event("start");
+    }
+
+    if (actual > 0 && (uint64_t) actual > target_dirty) {
+      // flush some dirty pages
+      ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
+                    << get_stat_dirty_waiting() << " dirty_waiting > target "
+                    << target_dirty << ", flushing some dirty bhs" << dendl;
+      flush(&trace, actual - target_dirty);
+    } else {
+      // check tail of lru for old dirty items
+      ceph::real_time cutoff = ceph::real_clock::now();
+      cutoff -= max_dirty_age;
+      BufferHead *bh = 0;
+      int max = MAX_FLUSH_UNDER_LOCK;
+      while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
+                                           lru_get_next_expire())) != 0 &&
+            bh->last_write <= cutoff &&
+            max > 0) {
+       ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
+       if (scattered_write) {
+         bh_write_adjacencies(bh, cutoff, NULL, &max);
+        } else {
+         bh_write(bh, trace);
+         --max;
+       }
+      }
+      if (!max) {
+       // back off the lock to avoid starving other threads
+        trace.event("backoff");
+       lock.Unlock();
+       lock.Lock();
+       continue;
+      }
+    }
+
+    trace.event("finish");
+    if (flusher_stop)
+      break;
+
+    flusher_cond.WaitInterval(lock, seconds(1));
+  }
+
+  /* Wait for reads to finish. This is only possible if handling
+   * -ENOENT made some read completions finish before their rados read
+   * came back. If we don't wait for them, and destroy the cache, when
+   * the rados reads do come back their callback will try to access the
+   * no-longer-valid ObjectCacher.
+   */
+  while (reads_outstanding > 0) {
+    ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
+                  << reads_outstanding << dendl;
+    read_cond.Wait(lock);
+  }
+
+  lock.Unlock();
+  ldout(cct, 10) << "flusher finish" << dendl;
+}
+
+
+// -------------------------------------------------
+
+bool ObjectCacher::set_is_empty(ObjectSet *oset)
+{
+  assert(lock.is_locked());
+  if (oset->objects.empty())
+    return true;
+
+  for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
+    if (!(*p)->is_empty())
+      return false;
+
+  return true;
+}
+
+bool ObjectCacher::set_is_cached(ObjectSet *oset)
+{
+  assert(lock.is_locked());
+  if (oset->objects.empty())
+    return false;
+
+  for (xlist<Object*>::iterator p = oset->objects.begin();
+       !p.end(); ++p) {
+    Object *ob = *p;
+    for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
+        q != ob->data.end();
+        ++q) {
+      BufferHead *bh = q->second;
+      if (!bh->is_dirty() && !bh->is_tx())
+       return true;
+    }
+  }
+
+  return false;
+}
+
+bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
+{
+  assert(lock.is_locked());
+  if (oset->objects.empty())
+    return false;
+
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
+    Object *ob = *i;
+
+    for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
+        p != ob->data.end();
+        ++p) {
+      BufferHead *bh = p->second;
+      if (bh->is_dirty() || bh->is_tx())
+       return true;
+    }
+  }
+
+  return false;
+}
+
+
+// purge.  non-blocking.  violently removes dirty buffers from cache.
+void ObjectCacher::purge(Object *ob)
+{
+  assert(lock.is_locked());
+  ldout(cct, 10) << "purge " << *ob << dendl;
+
+  ob->truncate(0);
+}
+
+
+// flush.  non-blocking.  no callback.
+// true if clean, already flushed.
+// false if we wrote something.
+// be sloppy about the ranges and flush any buffer it touches
+bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
+                         ZTracer::Trace *trace)
+{
+  assert(trace != nullptr);
+  assert(lock.is_locked());
+  list<BufferHead*> blist;
+  bool clean = true;
+  ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
+  for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
+       p != ob->data.end();
+       ++p) {
+    BufferHead *bh = p->second;
+    ldout(cct, 20) << "flush  " << *bh << dendl;
+    if (length && bh->start() > offset+length) {
+      break;
+    }
+    if (bh->is_tx()) {
+      clean = false;
+      continue;
+    }
+    if (!bh->is_dirty()) {
+      continue;
+    }
+
+    if (scattered_write)
+      blist.push_back(bh);
+    else
+      bh_write(bh, *trace);
+    clean = false;
+  }
+  if (scattered_write && !blist.empty())
+    bh_write_scattered(blist);
+
+  return clean;
+}
+
+bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
+                                    Context *onfinish)
+{
+  assert(lock.is_locked());
+  if (gather->has_subs()) {
+    gather->set_finisher(onfinish);
+    gather->activate();
+    return false;
+  }
+
+  ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
+  onfinish->complete(0);
+  return true;
+}
+
+// flush.  non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
+{
+  assert(lock.is_locked());
+  assert(onfinish != NULL);
+  if (oset->objects.empty()) {
+    ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
+    onfinish->complete(0);
+    return true;
+  }
+
+  ldout(cct, 10) << "flush_set " << oset << dendl;
+
+  // we'll need to wait for all objects to flush!
+  C_GatherBuilder gather(cct);
+  set<Object*> waitfor_commit;
+
+  list<BufferHead*> blist;
+  Object *last_ob = NULL;
+  set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
+
+  // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
+  // order. But items in oset->objects are not sorted. So the iterator can
+  // point to any buffer head in the ObjectSet
+  BufferHead key(*oset->objects.begin());
+  it = dirty_or_tx_bh.lower_bound(&key);
+  p = q = it;
+
+  bool backwards = true;
+  if (it != dirty_or_tx_bh.begin())
+    --it;
+  else
+    backwards = false;
+
+  for (; p != dirty_or_tx_bh.end(); p = q) {
+    ++q;
+    BufferHead *bh = *p;
+    if (bh->ob->oset != oset)
+      break;
+    waitfor_commit.insert(bh->ob);
+    if (bh->is_dirty()) {
+      if (scattered_write) {
+       if (last_ob != bh->ob) {
+         if (!blist.empty()) {
+           bh_write_scattered(blist);
+           blist.clear();
+         }
+         last_ob = bh->ob;
+       }
+       blist.push_back(bh);
+      } else {
+       bh_write(bh, {});
+      }
+    }
+  }
+
+  if (backwards) {
+    for(p = q = it; true; p = q) {
+      if (q != dirty_or_tx_bh.begin())
+       --q;
+      else
+       backwards = false;
+      BufferHead *bh = *p;
+      if (bh->ob->oset != oset)
+       break;
+      waitfor_commit.insert(bh->ob);
+      if (bh->is_dirty()) {
+       if (scattered_write) {
+         if (last_ob != bh->ob) {
+           if (!blist.empty()) {
+             bh_write_scattered(blist);
+             blist.clear();
+           }
+           last_ob = bh->ob;
+         }
+         blist.push_front(bh);
+       } else {
+         bh_write(bh, {});
+       }
+      }
+      if (!backwards)
+       break;
+    }
+  }
+
+  if (scattered_write && !blist.empty())
+    bh_write_scattered(blist);
+
+  for (set<Object*>::iterator i = waitfor_commit.begin();
+       i != waitfor_commit.end(); ++i) {
+    Object *ob = *i;
+
+    // we'll need to gather...
+    ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
+                  << ob->last_write_tid << " on " << *ob << dendl;
+    ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+  }
+
+  return _flush_set_finish(&gather, onfinish);
+}
+
+// flush.  non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
+                            ZTracer::Trace *trace, Context *onfinish)
+{
+  assert(lock.is_locked());
+  assert(trace != nullptr);
+  assert(onfinish != NULL);
+  if (oset->objects.empty()) {
+    ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
+    onfinish->complete(0);
+    return true;
+  }
+
+  ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
+                << " ObjectExtents" << dendl;
+
+  // we'll need to wait for all objects to flush!
+  C_GatherBuilder gather(cct);
+
+  for (vector<ObjectExtent>::iterator p = exv.begin();
+       p != exv.end();
+       ++p) {
+    ObjectExtent &ex = *p;
+    sobject_t soid(ex.oid, CEPH_NOSNAP);
+    if (objects[oset->poolid].count(soid) == 0)
+      continue;
+    Object *ob = objects[oset->poolid][soid];
+
+    ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
+                  << " " << ob << dendl;
+
+    if (!flush(ob, ex.offset, ex.length, trace)) {
+      // we'll need to gather...
+      ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
+                    << ob->last_write_tid << " on " << *ob << dendl;
+      ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+    }
+  }
+
+  return _flush_set_finish(&gather, onfinish);
+}
+
+// flush all dirty data.  non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_all(Context *onfinish)
+{
+  assert(lock.is_locked());
+  assert(onfinish != NULL);
+
+  ldout(cct, 10) << "flush_all " << dendl;
+
+  // we'll need to wait for all objects to flush!
+  C_GatherBuilder gather(cct);
+  set<Object*> waitfor_commit;
+
+  list<BufferHead*> blist;
+  Object *last_ob = NULL;
+  set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
+  next = it = dirty_or_tx_bh.begin();
+  while (it != dirty_or_tx_bh.end()) {
+    ++next;
+    BufferHead *bh = *it;
+    waitfor_commit.insert(bh->ob);
+
+    if (bh->is_dirty()) {
+      if (scattered_write) {
+       if (last_ob != bh->ob) {
+         if (!blist.empty()) {
+           bh_write_scattered(blist);
+           blist.clear();
+         }
+         last_ob = bh->ob;
+       }
+       blist.push_back(bh);
+      } else {
+       bh_write(bh, {});
+      }
+    }
+
+    it = next;
+  }
+
+  if (scattered_write && !blist.empty())
+    bh_write_scattered(blist);
+
+  for (set<Object*>::iterator i = waitfor_commit.begin();
+       i != waitfor_commit.end();
+       ++i) {
+    Object *ob = *i;
+
+    // we'll need to gather...
+    ldout(cct, 10) << "flush_all will wait for ack tid "
+                  << ob->last_write_tid << " on " << *ob << dendl;
+    ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+  }
+
+  return _flush_set_finish(&gather, onfinish);
+}
+
+void ObjectCacher::purge_set(ObjectSet *oset)
+{
+  assert(lock.is_locked());
+  if (oset->objects.empty()) {
+    ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
+    return;
+  }
+
+  ldout(cct, 10) << "purge_set " << oset << dendl;
+  const bool were_dirty = oset->dirty_or_tx > 0;
+
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
+    Object *ob = *i;
+       purge(ob);
+  }
+
+  // Although we have purged rather than flushed, caller should still
+  // drop any resources associate with dirty data.
+  assert(oset->dirty_or_tx == 0);
+  if (flush_set_callback && were_dirty) {
+    flush_set_callback(flush_set_callback_arg, oset);
+  }
+}
+
+
+loff_t ObjectCacher::release(Object *ob)
+{
+  assert(lock.is_locked());
+  list<BufferHead*> clean;
+  loff_t o_unclean = 0;
+
+  for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
+       p != ob->data.end();
+       ++p) {
+    BufferHead *bh = p->second;
+    if (bh->is_clean() || bh->is_zero() || bh->is_error())
+      clean.push_back(bh);
+    else
+      o_unclean += bh->length();
+  }
+
+  for (list<BufferHead*>::iterator p = clean.begin();
+       p != clean.end();
+       ++p) {
+    bh_remove(ob, *p);
+    delete *p;
+  }
+
+  if (ob->can_close()) {
+    ldout(cct, 10) << "release trimming " << *ob << dendl;
+    close_object(ob);
+    assert(o_unclean == 0);
+    return 0;
+  }
+
+  if (ob->complete) {
+    ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
+    ob->complete = false;
+  }
+  if (!ob->exists) {
+    ldout(cct, 10) << "release setting exists on " << *ob << dendl;
+    ob->exists = true;
+  }
+
+  return o_unclean;
+}
+
+loff_t ObjectCacher::release_set(ObjectSet *oset)
+{
+  assert(lock.is_locked());
+  // return # bytes not clean (and thus not released).
+  loff_t unclean = 0;
+
+  if (oset->objects.empty()) {
+    ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
+    return 0;
+  }
+
+  ldout(cct, 10) << "release_set " << oset << dendl;
+
+  xlist<Object*>::iterator q;
+  for (xlist<Object*>::iterator p = oset->objects.begin();
+       !p.end(); ) {
+    q = p;
+    ++q;
+    Object *ob = *p;
+
+    loff_t o_unclean = release(ob);
+    unclean += o_unclean;
+
+    if (o_unclean)
+      ldout(cct, 10) << "release_set " << oset << " " << *ob
+                    << " has " << o_unclean << " bytes left"
+                    << dendl;
+    p = q;
+  }
+
+  if (unclean) {
+    ldout(cct, 10) << "release_set " << oset
+                  << ", " << unclean << " bytes left" << dendl;
+  }
+
+  return unclean;
+}
+
+
+uint64_t ObjectCacher::release_all()
+{
+  assert(lock.is_locked());
+  ldout(cct, 10) << "release_all" << dendl;
+  uint64_t unclean = 0;
+
+  vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
+    = objects.begin();
+  while (i != objects.end()) {
+    ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
+    while (p != i->end()) {
+      ceph::unordered_map<sobject_t, Object*>::iterator n = p;
+      ++n;
+
+      Object *ob = p->second;
+
+      loff_t o_unclean = release(ob);
+      unclean += o_unclean;
+
+      if (o_unclean)
+       ldout(cct, 10) << "release_all " << *ob
+                      << " has " << o_unclean << " bytes left"
+                      << dendl;
+    p = n;
+    }
+    ++i;
+  }
+
+  if (unclean) {
+    ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
+                  << dendl;
+  }
+
+  return unclean;
+}
+
+void ObjectCacher::clear_nonexistence(ObjectSet *oset)
+{
+  assert(lock.is_locked());
+  ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
+
+  for (xlist<Object*>::iterator p = oset->objects.begin();
+       !p.end(); ++p) {
+    Object *ob = *p;
+    if (!ob->exists) {
+      ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
+      ob->exists = true;
+      ob->complete = false;
+    }
+    for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
+        !q.end(); ++q) {
+      C_ReadFinish *comp = *q;
+      comp->distrust_enoent();
+    }
+  }
+}
+
+/**
+ * discard object extents from an ObjectSet by removing the objects in
+ * exls from the in-memory oset.
+ */
+void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
+{
+  assert(lock.is_locked());
+  if (oset->objects.empty()) {
+    ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl;
+    return;
+  }
+
+  ldout(cct, 10) << "discard_set " << oset << dendl;
+
+  bool were_dirty = oset->dirty_or_tx > 0;
+
+  for (vector<ObjectExtent>::const_iterator p = exls.begin();
+       p != exls.end();
+       ++p) {
+    ldout(cct, 10) << "discard_set " << oset << " ex " << *p << dendl;
+    const ObjectExtent &ex = *p;
+    sobject_t soid(ex.oid, CEPH_NOSNAP);
+    if (objects[oset->poolid].count(soid) == 0)
+      continue;
+    Object *ob = objects[oset->poolid][soid];
+
+    ob->discard(ex.offset, ex.length);
+  }
+
+  // did we truncate off dirty data?
+  if (flush_set_callback &&
+      were_dirty && oset->dirty_or_tx == 0)
+    flush_set_callback(flush_set_callback_arg, oset);
+}
+
+void ObjectCacher::verify_stats() const
+{
+  assert(lock.is_locked());
+  ldout(cct, 10) << "verify_stats" << dendl;
+
+  loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
+    error = 0;
+  for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
+        = objects.begin();
+       i != objects.end();
+       ++i) {
+    for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
+          = i->begin();
+        p != i->end();
+        ++p) {
+      Object *ob = p->second;
+      for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
+          q != ob->data.end();
+         ++q) {
+       BufferHead *bh = q->second;
+       switch (bh->get_state()) {
+       case BufferHead::STATE_MISSING:
+         missing += bh->length();
+         break;
+       case BufferHead::STATE_CLEAN:
+         clean += bh->length();
+         break;
+       case BufferHead::STATE_ZERO:
+         zero += bh->length();
+         break;
+       case BufferHead::STATE_DIRTY:
+         dirty += bh->length();
+         break;
+       case BufferHead::STATE_TX:
+         tx += bh->length();
+         break;
+       case BufferHead::STATE_RX:
+         rx += bh->length();
+         break;
+       case BufferHead::STATE_ERROR:
+         error += bh->length();
+         break;
+       default:
+         ceph_abort();
+       }
+      }
+    }
+  }
+
+  ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
+                << " dirty " << dirty << " missing " << missing
+                << " error " << error << dendl;
+  assert(clean == stat_clean);
+  assert(rx == stat_rx);
+  assert(tx == stat_tx);
+  assert(dirty == stat_dirty);
+  assert(missing == stat_missing);
+  assert(zero == stat_zero);
+  assert(error == stat_error);
+}
+
+void ObjectCacher::bh_stat_add(BufferHead *bh)
+{
+  assert(lock.is_locked());
+  switch (bh->get_state()) {
+  case BufferHead::STATE_MISSING:
+    stat_missing += bh->length();
+    break;
+  case BufferHead::STATE_CLEAN:
+    stat_clean += bh->length();
+    break;
+  case BufferHead::STATE_ZERO:
+    stat_zero += bh->length();
+    break;
+  case BufferHead::STATE_DIRTY:
+    stat_dirty += bh->length();
+    bh->ob->dirty_or_tx += bh->length();
+    bh->ob->oset->dirty_or_tx += bh->length();
+    break;
+  case BufferHead::STATE_TX:
+    stat_tx += bh->length();
+    bh->ob->dirty_or_tx += bh->length();
+    bh->ob->oset->dirty_or_tx += bh->length();
+    break;
+  case BufferHead::STATE_RX:
+    stat_rx += bh->length();
+    break;
+  case BufferHead::STATE_ERROR:
+    stat_error += bh->length();
+    break;
+  default:
+    assert(0 == "bh_stat_add: invalid bufferhead state");
+  }
+  if (get_stat_dirty_waiting() > 0)
+    stat_cond.Signal();
+}
+
+void ObjectCacher::bh_stat_sub(BufferHead *bh)
+{
+  assert(lock.is_locked());
+  switch (bh->get_state()) {
+  case BufferHead::STATE_MISSING:
+    stat_missing -= bh->length();
+    break;
+  case BufferHead::STATE_CLEAN:
+    stat_clean -= bh->length();
+    break;
+  case BufferHead::STATE_ZERO:
+    stat_zero -= bh->length();
+    break;
+  case BufferHead::STATE_DIRTY:
+    stat_dirty -= bh->length();
+    bh->ob->dirty_or_tx -= bh->length();
+    bh->ob->oset->dirty_or_tx -= bh->length();
+    break;
+  case BufferHead::STATE_TX:
+    stat_tx -= bh->length();
+    bh->ob->dirty_or_tx -= bh->length();
+    bh->ob->oset->dirty_or_tx -= bh->length();
+    break;
+  case BufferHead::STATE_RX:
+    stat_rx -= bh->length();
+    break;
+  case BufferHead::STATE_ERROR:
+    stat_error -= bh->length();
+    break;
+  default:
+    assert(0 == "bh_stat_sub: invalid bufferhead state");
+  }
+}
+
+void ObjectCacher::bh_set_state(BufferHead *bh, int s)
+{
+  assert(lock.is_locked());
+  int state = bh->get_state();
+  // move between lru lists?
+  if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
+    bh_lru_rest.lru_remove(bh);
+    bh_lru_dirty.lru_insert_top(bh);
+  } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
+    bh_lru_dirty.lru_remove(bh);
+    if (bh->get_dontneed())
+      bh_lru_rest.lru_insert_bot(bh);
+    else
+      bh_lru_rest.lru_insert_top(bh);
+  }
+
+  if ((s == BufferHead::STATE_TX ||
+       s == BufferHead::STATE_DIRTY) &&
+      state != BufferHead::STATE_TX &&
+      state != BufferHead::STATE_DIRTY) {
+    dirty_or_tx_bh.insert(bh);
+  } else if ((state == BufferHead::STATE_TX ||
+             state == BufferHead::STATE_DIRTY) &&
+            s != BufferHead::STATE_TX &&
+            s != BufferHead::STATE_DIRTY) {
+    dirty_or_tx_bh.erase(bh);
+  }
+
+  if (s != BufferHead::STATE_ERROR &&
+      state == BufferHead::STATE_ERROR) {
+    bh->error = 0;
+  }
+
+  // set state
+  bh_stat_sub(bh);
+  bh->set_state(s);
+  bh_stat_add(bh);
+}
+
+void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
+{
+  assert(lock.is_locked());
+  ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
+  ob->add_bh(bh);
+  if (bh->is_dirty()) {
+    bh_lru_dirty.lru_insert_top(bh);
+    dirty_or_tx_bh.insert(bh);
+  } else {
+    if (bh->get_dontneed())
+      bh_lru_rest.lru_insert_bot(bh);
+    else
+      bh_lru_rest.lru_insert_top(bh);
+  }
+
+  if (bh->is_tx()) {
+    dirty_or_tx_bh.insert(bh);
+  }
+  bh_stat_add(bh);
+}
+
+void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
+{
+  assert(lock.is_locked());
+  assert(bh->get_journal_tid() == 0);
+  ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
+  ob->remove_bh(bh);
+  if (bh->is_dirty()) {
+    bh_lru_dirty.lru_remove(bh);
+    dirty_or_tx_bh.erase(bh);
+  } else {
+    bh_lru_rest.lru_remove(bh);
+  }
+
+  if (bh->is_tx()) {
+    dirty_or_tx_bh.erase(bh);
+  }
+  bh_stat_sub(bh);
+  if (get_stat_dirty_waiting() > 0)
+    stat_cond.Signal();
+}
+