initial code repo
[stor4nfv.git] / src / ceph / src / osdc / ObjectCacher.h
diff --git a/src/ceph/src/osdc/ObjectCacher.h b/src/ceph/src/osdc/ObjectCacher.h
new file mode 100644 (file)
index 0000000..58b3e7a
--- /dev/null
@@ -0,0 +1,766 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef CEPH_OBJECTCACHER_H
+#define CEPH_OBJECTCACHER_H
+
+#include "include/types.h"
+#include "include/lru.h"
+#include "include/Context.h"
+#include "include/xlist.h"
+
+#include "common/Cond.h"
+#include "common/Finisher.h"
+#include "common/Thread.h"
+#include "common/zipkin_trace.h"
+
+#include "Objecter.h"
+#include "Striper.h"
+
+class CephContext;
+class WritebackHandler;
+class PerfCounters;
+
+enum {
+  l_objectcacher_first = 25000,
+
+  l_objectcacher_cache_ops_hit, // ops we satisfy completely from cache
+  l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache
+
+  l_objectcacher_cache_bytes_hit, // bytes read directly from cache
+
+  l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly
+
+                                  // from cache
+
+  l_objectcacher_data_read, // total bytes read out
+  l_objectcacher_data_written, // bytes written to cache
+  l_objectcacher_data_flushed, // bytes flushed to WritebackHandler
+  l_objectcacher_overwritten_in_flush, // bytes overwritten while
+                                      // flushing is in progress
+
+  l_objectcacher_write_ops_blocked, // total write ops we delayed due
+                                   // to dirty limits
+  l_objectcacher_write_bytes_blocked, // total number of write bytes
+                                     // we delayed due to dirty
+                                     // limits
+  l_objectcacher_write_time_blocked, // total time in seconds spent
+                                    // blocking a write due to dirty
+                                    // limits
+
+  l_objectcacher_last,
+};
+
+class ObjectCacher {
+  PerfCounters *perfcounter;
+ public:
+  CephContext *cct;
+  class Object;
+  struct ObjectSet;
+  class C_ReadFinish;
+
+  typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset);
+
+  // read scatter/gather
+  struct OSDRead {
+    vector<ObjectExtent> extents;
+    snapid_t snap;
+    bufferlist *bl;
+    int fadvise_flags;
+    OSDRead(snapid_t s, bufferlist *b, int f)
+      : snap(s), bl(b), fadvise_flags(f) {}
+  };
+
+  OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) const {
+    return new OSDRead(snap, b, f);
+  }
+
+  // write scatter/gather
+  struct OSDWrite {
+    vector<ObjectExtent> extents;
+    SnapContext snapc;
+    bufferlist bl;
+    ceph::real_time mtime;
+    int fadvise_flags;
+    ceph_tid_t journal_tid;
+    OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt,
+            int f, ceph_tid_t _journal_tid)
+      : snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
+       journal_tid(_journal_tid) {}
+  };
+
+  OSDWrite *prepare_write(const SnapContext& sc,
+                         const bufferlist &b,
+                         ceph::real_time mt,
+                         int f,
+                         ceph_tid_t journal_tid) const {
+    return new OSDWrite(sc, b, mt, f, journal_tid);
+  }
+
+
+
+  // ******* BufferHead *********
+  class BufferHead : public LRUObject {
+  public:
+    // states
+    static const int STATE_MISSING = 0;
+    static const int STATE_CLEAN = 1;
+    static const int STATE_ZERO = 2;   // NOTE: these are *clean* zeros
+    static const int STATE_DIRTY = 3;
+    static const int STATE_RX = 4;
+    static const int STATE_TX = 5;
+    static const int STATE_ERROR = 6; // a read error occurred
+
+  private:
+    // my fields
+    int state;
+    int ref;
+    struct {
+      loff_t start, length;   // bh extent in object
+    } ex;
+    bool dontneed; //indicate bh don't need by anyone
+    bool nocache; //indicate bh don't need by this caller
+
+  public:
+    Object *ob;
+    bufferlist  bl;
+    ceph_tid_t last_write_tid;  // version of bh (if non-zero)
+    ceph_tid_t last_read_tid;   // tid of last read op (if any)
+    ceph::real_time last_write;
+    SnapContext snapc;
+    ceph_tid_t journal_tid;
+    int error; // holds return value for failed reads
+
+    map<loff_t, list<Context*> > waitfor_read;
+
+    // cons
+    explicit BufferHead(Object *o) :
+      state(STATE_MISSING),
+      ref(0),
+      dontneed(false),
+      nocache(false),
+      ob(o),
+      last_write_tid(0),
+      last_read_tid(0),
+      journal_tid(0),
+      error(0) {
+      ex.start = ex.length = 0;
+    }
+
+    // extent
+    loff_t start() const { return ex.start; }
+    void set_start(loff_t s) { ex.start = s; }
+    loff_t length() const { return ex.length; }
+    void set_length(loff_t l) { ex.length = l; }
+    loff_t end() const { return ex.start + ex.length; }
+    loff_t last() const { return end() - 1; }
+
+    // states
+    void set_state(int s) {
+      if (s == STATE_RX || s == STATE_TX) get();
+      if (state == STATE_RX || state == STATE_TX) put();
+      state = s;
+    }
+    int get_state() const { return state; }
+
+    inline ceph_tid_t get_journal_tid() const {
+      return journal_tid;
+    }
+    inline void set_journal_tid(ceph_tid_t _journal_tid) {
+      journal_tid = _journal_tid;
+    }
+
+    bool is_missing() const { return state == STATE_MISSING; }
+    bool is_dirty() const { return state == STATE_DIRTY; }
+    bool is_clean() const { return state == STATE_CLEAN; }
+    bool is_zero() const { return state == STATE_ZERO; }
+    bool is_tx() const { return state == STATE_TX; }
+    bool is_rx() const { return state == STATE_RX; }
+    bool is_error() const { return state == STATE_ERROR; }
+
+    // reference counting
+    int get() {
+      assert(ref >= 0);
+      if (ref == 0) lru_pin();
+      return ++ref;
+    }
+    int put() {
+      assert(ref > 0);
+      if (ref == 1) lru_unpin();
+      --ref;
+      return ref;
+    }
+
+    void set_dontneed(bool v) {
+      dontneed = v;
+    }
+    bool get_dontneed() const {
+      return dontneed;
+    }
+
+    void set_nocache(bool v) {
+      nocache = v;
+    }
+    bool get_nocache() const {
+      return nocache;
+    }
+
+    inline bool can_merge_journal(BufferHead *bh) const {
+      return (get_journal_tid() == bh->get_journal_tid());
+    }
+
+    struct ptr_lt {
+      bool operator()(const BufferHead* l, const BufferHead* r) const {
+       const Object *lob = l->ob;
+       const Object *rob = r->ob;
+       const ObjectSet *loset = lob->oset;
+       const ObjectSet *roset = rob->oset;
+       if (loset != roset)
+         return loset < roset;
+       if (lob != rob)
+         return lob < rob;
+       if (l->start() != r->start())
+         return l->start() < r->start();
+       return l < r;
+      }
+    };
+  };
+
+  // ******* Object *********
+  class Object : public LRUObject {
+  private:
+    // ObjectCacher::Object fields
+    int ref;
+    ObjectCacher *oc;
+    sobject_t oid;
+    friend struct ObjectSet;
+
+  public:
+    uint64_t object_no;
+    ObjectSet *oset;
+    xlist<Object*>::item set_item;
+    object_locator_t oloc;
+    uint64_t truncate_size, truncate_seq;
+
+    bool complete;
+    bool exists;
+
+    map<loff_t, BufferHead*>     data;
+
+    ceph_tid_t last_write_tid;  // version of bh (if non-zero)
+    ceph_tid_t last_commit_tid; // last update commited.
+
+    int dirty_or_tx;
+
+    map< ceph_tid_t, list<Context*> > waitfor_commit;
+    xlist<C_ReadFinish*> reads;
+
+    Object(const Object&) = delete;
+    Object& operator=(const Object&) = delete;
+
+    Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os,
+          object_locator_t& l, uint64_t ts, uint64_t tq) :
+      ref(0),
+      oc(_oc),
+      oid(o), object_no(ono), oset(os), set_item(this), oloc(l),
+      truncate_size(ts), truncate_seq(tq),
+      complete(false), exists(true),
+      last_write_tid(0), last_commit_tid(0),
+      dirty_or_tx(0) {
+      // add to set
+      os->objects.push_back(&set_item);
+    }
+    ~Object() {
+      reads.clear();
+      assert(ref == 0);
+      assert(data.empty());
+      assert(dirty_or_tx == 0);
+      set_item.remove_myself();
+    }
+
+    sobject_t get_soid() const { return oid; }
+    object_t get_oid() { return oid.oid; }
+    snapid_t get_snap() { return oid.snap; }
+    ObjectSet *get_object_set() const { return oset; }
+    string get_namespace() { return oloc.nspace; }
+    uint64_t get_object_number() const { return object_no; }
+
+    const object_locator_t& get_oloc() const { return oloc; }
+    void set_object_locator(object_locator_t& l) { oloc = l; }
+
+    bool can_close() const {
+      if (lru_is_expireable()) {
+       assert(data.empty());
+       assert(waitfor_commit.empty());
+       return true;
+      }
+      return false;
+    }
+
+    /**
+     * Check buffers and waiters for consistency
+     * - no overlapping buffers
+     * - index in map matches BH
+     * - waiters fall within BH
+     */
+    void audit_buffers();
+
+    /**
+     * find first buffer that includes or follows an offset
+     *
+     * @param offset object byte offset
+     * @return iterator pointing to buffer, or data.end()
+     */
+    map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
+      map<loff_t,BufferHead*>::const_iterator p = data.lower_bound(offset);
+      if (p != data.begin() &&
+         (p == data.end() || p->first > offset)) {
+       --p;     // might overlap!
+       if (p->first + p->second->length() <= offset)
+         ++p;   // doesn't overlap.
+      }
+      return p;
+    }
+
+    // bh
+    // add to my map
+    void add_bh(BufferHead *bh) {
+      if (data.empty())
+       get();
+      assert(data.count(bh->start()) == 0);
+      data[bh->start()] = bh;
+    }
+    void remove_bh(BufferHead *bh) {
+      assert(data.count(bh->start()));
+      data.erase(bh->start());
+      if (data.empty())
+       put();
+    }
+
+    bool is_empty() const { return data.empty(); }
+
+    // mid-level
+    BufferHead *split(BufferHead *bh, loff_t off);
+    void merge_left(BufferHead *left, BufferHead *right);
+    void try_merge_bh(BufferHead *bh);
+
+    bool is_cached(loff_t off, loff_t len) const;
+    bool include_all_cached_data(loff_t off, loff_t len);
+    int map_read(ObjectExtent &ex,
+                 map<loff_t, BufferHead*>& hits,
+                 map<loff_t, BufferHead*>& missing,
+                 map<loff_t, BufferHead*>& rx,
+                map<loff_t, BufferHead*>& errors);
+    BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
+
+    void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
+    void truncate(loff_t s);
+    void discard(loff_t off, loff_t len);
+
+    // reference counting
+    int get() {
+      assert(ref >= 0);
+      if (ref == 0) lru_pin();
+      return ++ref;
+    }
+    int put() {
+      assert(ref > 0);
+      if (ref == 1) lru_unpin();
+      --ref;
+      return ref;
+    }
+  };
+
+
+  struct ObjectSet {
+    void *parent;
+
+    inodeno_t ino;
+    uint64_t truncate_seq, truncate_size;
+
+    int64_t poolid;
+    xlist<Object*> objects;
+
+    int dirty_or_tx;
+    bool return_enoent;
+
+    ObjectSet(void *p, int64_t _poolid, inodeno_t i)
+      : parent(p), ino(i), truncate_seq(0),
+       truncate_size(0), poolid(_poolid), dirty_or_tx(0),
+       return_enoent(false) {}
+
+  };
+
+
+  // ******* ObjectCacher *********
+  // ObjectCacher fields
+ private:
+  WritebackHandler& writeback_handler;
+  bool scattered_write;
+
+  string name;
+  Mutex& lock;
+
+  uint64_t max_dirty, target_dirty, max_size, max_objects;
+  ceph::timespan max_dirty_age;
+  bool block_writes_upfront;
+
+  ZTracer::Endpoint trace_endpoint;
+
+  flush_set_callback_t flush_set_callback;
+  void *flush_set_callback_arg;
+
+  // indexed by pool_id
+  vector<ceph::unordered_map<sobject_t, Object*> > objects;
+
+  list<Context*> waitfor_read;
+
+  ceph_tid_t last_read_tid;
+
+  set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
+  LRU   bh_lru_dirty, bh_lru_rest;
+  LRU   ob_lru;
+
+  Cond flusher_cond;
+  bool flusher_stop;
+  void flusher_entry();
+  class FlusherThread : public Thread {
+    ObjectCacher *oc;
+  public:
+    explicit FlusherThread(ObjectCacher *o) : oc(o) {}
+    void *entry() override {
+      oc->flusher_entry();
+      return 0;
+    }
+  } flusher_thread;
+
+  Finisher finisher;
+
+  // objects
+  Object *get_object_maybe(sobject_t oid, object_locator_t &l) {
+    // have it?
+    if (((uint32_t)l.pool < objects.size()) &&
+       (objects[l.pool].count(oid)))
+      return objects[l.pool][oid];
+    return NULL;
+  }
+
+  Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset,
+                    object_locator_t &l, uint64_t truncate_size,
+                    uint64_t truncate_seq);
+  void close_object(Object *ob);
+
+  // bh stats
+  Cond  stat_cond;
+
+  loff_t stat_clean;
+  loff_t stat_zero;
+  loff_t stat_dirty;
+  loff_t stat_rx;
+  loff_t stat_tx;
+  loff_t stat_missing;
+  loff_t stat_error;
+  loff_t stat_dirty_waiting;   // bytes that writers are waiting on to write
+
+  size_t stat_nr_dirty_waiters;
+
+  void verify_stats() const;
+
+  void bh_stat_add(BufferHead *bh);
+  void bh_stat_sub(BufferHead *bh);
+  loff_t get_stat_tx() const { return stat_tx; }
+  loff_t get_stat_rx() const { return stat_rx; }
+  loff_t get_stat_dirty() const { return stat_dirty; }
+  loff_t get_stat_clean() const { return stat_clean; }
+  loff_t get_stat_zero() const { return stat_zero; }
+  loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
+  size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; }
+
+  void touch_bh(BufferHead *bh) {
+    if (bh->is_dirty())
+      bh_lru_dirty.lru_touch(bh);
+    else
+      bh_lru_rest.lru_touch(bh);
+
+    bh->set_dontneed(false);
+    bh->set_nocache(false);
+    touch_ob(bh->ob);
+  }
+  void touch_ob(Object *ob) {
+    ob_lru.lru_touch(ob);
+  }
+  void bottouch_ob(Object *ob) {
+    ob_lru.lru_bottouch(ob);
+  }
+
+  // bh states
+  void bh_set_state(BufferHead *bh, int s);
+  void copy_bh_state(BufferHead *bh1, BufferHead *bh2) {
+    bh_set_state(bh2, bh1->get_state());
+  }
+
+  void mark_missing(BufferHead *bh) {
+    bh_set_state(bh,BufferHead::STATE_MISSING);
+  }
+  void mark_clean(BufferHead *bh) {
+    bh_set_state(bh, BufferHead::STATE_CLEAN);
+  }
+  void mark_zero(BufferHead *bh) {
+    bh_set_state(bh, BufferHead::STATE_ZERO);
+  }
+  void mark_rx(BufferHead *bh) {
+    bh_set_state(bh, BufferHead::STATE_RX);
+  }
+  void mark_tx(BufferHead *bh) {
+    bh_set_state(bh, BufferHead::STATE_TX); }
+  void mark_error(BufferHead *bh) {
+    bh_set_state(bh, BufferHead::STATE_ERROR);
+  }
+  void mark_dirty(BufferHead *bh) {
+    bh_set_state(bh, BufferHead::STATE_DIRTY);
+    bh_lru_dirty.lru_touch(bh);
+    //bh->set_dirty_stamp(ceph_clock_now());
+  }
+
+  void bh_add(Object *ob, BufferHead *bh);
+  void bh_remove(Object *ob, BufferHead *bh);
+
+  // io
+  void bh_read(BufferHead *bh, int op_flags,
+               const ZTracer::Trace &parent_trace);
+  void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace);
+  void bh_write_scattered(list<BufferHead*>& blist);
+  void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
+                           int64_t *amount, int *max_count);
+
+  void trim();
+  void flush(ZTracer::Trace *trace, loff_t amount=0);
+
+  /**
+   * flush a range of buffers
+   *
+   * Flush any buffers that intersect the specified extent.  If len==0,
+   * flush *all* buffers for the object.
+   *
+   * @param o object
+   * @param off start offset
+   * @param len extent length, or 0 for entire object
+   * @return true if object was already clean/flushed.
+   */
+  bool flush(Object *o, loff_t off, loff_t len,
+             ZTracer::Trace *trace);
+  loff_t release(Object *o);
+  void purge(Object *o);
+
+  int64_t reads_outstanding;
+  Cond read_cond;
+
+  int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+            bool external_call, ZTracer::Trace *trace);
+  void retry_waiting_reads();
+
+ public:
+  void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
+                     loff_t offset, uint64_t length,
+                     bufferlist &bl, int r,
+                     bool trust_enoent);
+  void bh_write_commit(int64_t poolid, sobject_t oid,
+                      vector<pair<loff_t, uint64_t> >& ranges,
+                      ceph_tid_t t, int r);
+
+  class C_WriteCommit;
+  class C_WaitForWrite;
+
+  void perf_start();
+  void perf_stop();
+
+
+
+  ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, Mutex& l,
+              flush_set_callback_t flush_callback,
+              void *flush_callback_arg,
+              uint64_t max_bytes, uint64_t max_objects,
+              uint64_t max_dirty, uint64_t target_dirty, double max_age,
+              bool block_writes_upfront);
+  ~ObjectCacher();
+
+  void start() {
+    flusher_thread.create("flusher");
+  }
+  void stop() {
+    assert(flusher_thread.is_started());
+    lock.Lock();  // hmm.. watch out for deadlock!
+    flusher_stop = true;
+    flusher_cond.Signal();
+    lock.Unlock();
+    flusher_thread.join();
+  }
+
+
+  class C_RetryRead;
+
+
+  // non-blocking.  async.
+
+  /**
+   * @note total read size must be <= INT_MAX, since
+   * the return value is total bytes read
+   */
+  int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+           ZTracer::Trace *parent_trace = nullptr);
+  int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
+            ZTracer::Trace *parent_trace = nullptr);
+  bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
+                snapid_t snapid);
+
+private:
+  // write blocking
+  int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
+                      ZTracer::Trace *trace, Context *onfreespace);
+  void maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
+  bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);
+
+public:
+  bool set_is_empty(ObjectSet *oset);
+  bool set_is_cached(ObjectSet *oset);
+  bool set_is_dirty_or_committing(ObjectSet *oset);
+
+  bool flush_set(ObjectSet *oset, Context *onfinish=0);
+  bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex,
+                 ZTracer::Trace *trace, Context *onfinish = 0);
+  bool flush_all(Context *onfinish = 0);
+
+  void purge_set(ObjectSet *oset);
+
+  // returns # of bytes not released (ie non-clean)
+  loff_t release_set(ObjectSet *oset);
+  uint64_t release_all();
+
+  void discard_set(ObjectSet *oset, const vector<ObjectExtent>& ex);
+
+  /**
+   * Retry any in-flight reads that get -ENOENT instead of marking
+   * them zero, and get rid of any cached -ENOENTs.
+   * After this is called and the cache's lock is unlocked,
+   * any new requests will treat -ENOENT normally.
+   */
+  void clear_nonexistence(ObjectSet *oset);
+
+
+  // cache sizes
+  void set_max_dirty(uint64_t v) {
+    max_dirty = v;
+  }
+  void set_target_dirty(int64_t v) {
+    target_dirty = v;
+  }
+  void set_max_size(int64_t v) {
+    max_size = v;
+  }
+  void set_max_dirty_age(double a) {
+    max_dirty_age = make_timespan(a);
+  }
+  void set_max_objects(int64_t v) {
+    max_objects = v;
+  }
+
+
+  // file functions
+
+  /*** async+caching (non-blocking) file interface ***/
+  int file_is_cached(ObjectSet *oset, file_layout_t *layout,
+                    snapid_t snapid, loff_t offset, uint64_t len) {
+    vector<ObjectExtent> extents;
+    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
+                            oset->truncate_size, extents);
+    return is_cached(oset, extents, snapid);
+  }
+
+  int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid,
+               loff_t offset, uint64_t len, bufferlist *bl, int flags,
+               Context *onfinish) {
+    OSDRead *rd = prepare_read(snapid, bl, flags);
+    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
+                            oset->truncate_size, rd->extents);
+    return readx(rd, oset, onfinish);
+  }
+
+  int file_write(ObjectSet *oset, file_layout_t *layout,
+                const SnapContext& snapc, loff_t offset, uint64_t len,
+                bufferlist& bl, ceph::real_time mtime, int flags) {
+    OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
+    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
+                            oset->truncate_size, wr->extents);
+    return writex(wr, oset, NULL);
+  }
+
+  bool file_flush(ObjectSet *oset, file_layout_t *layout,
+                 const SnapContext& snapc, loff_t offset, uint64_t len,
+                 Context *onfinish) {
+    vector<ObjectExtent> extents;
+    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
+                            oset->truncate_size, extents);
+    ZTracer::Trace trace;
+    return flush_set(oset, extents, &trace, onfinish);
+  }
+};
+
+
+inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
+{
+  out << "bh[ " << &bh << " "
+      << bh.start() << "~" << bh.length()
+      << " " << bh.ob
+      << " (" << bh.bl.length() << ")"
+      << " v " << bh.last_write_tid;
+  if (bh.get_journal_tid() != 0) {
+    out << " j " << bh.get_journal_tid();
+  }
+  if (bh.is_tx()) out << " tx";
+  if (bh.is_rx()) out << " rx";
+  if (bh.is_dirty()) out << " dirty";
+  if (bh.is_clean()) out << " clean";
+  if (bh.is_zero()) out << " zero";
+  if (bh.is_missing()) out << " missing";
+  if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0];
+  if (bh.error) out << " error=" << bh.error;
+  out << "]";
+  out << " waiters = {";
+  for (map<loff_t, list<Context*> >::const_iterator it
+        = bh.waitfor_read.begin();
+       it != bh.waitfor_read.end(); ++it) {
+    out << " " << it->first << "->[";
+    for (list<Context*>::const_iterator lit = it->second.begin();
+        lit != it->second.end(); ++lit) {
+        out << *lit << ", ";
+    }
+    out << "]";
+  }
+  out << "}";
+  return out;
+}
+
+inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os)
+{
+  return out << "objectset[" << os.ino
+            << " ts " << os.truncate_seq << "/" << os.truncate_size
+            << " objects " << os.objects.size()
+            << " dirty_or_tx " << os.dirty_or_tx
+            << "]";
+}
+
+inline ostream& operator<<(ostream &out, const ObjectCacher::Object &ob)
+{
+  out << "object["
+      << ob.get_soid() << " oset " << ob.oset << dec
+      << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
+
+  if (ob.complete)
+    out << " COMPLETE";
+  if (!ob.exists)
+    out << " !EXISTS";
+
+  out << "]";
+  return out;
+}
+
+#endif