Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osdc / ObjectCacher.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef CEPH_OBJECTCACHER_H
4 #define CEPH_OBJECTCACHER_H
5
6 #include "include/types.h"
7 #include "include/lru.h"
8 #include "include/Context.h"
9 #include "include/xlist.h"
10
11 #include "common/Cond.h"
12 #include "common/Finisher.h"
13 #include "common/Thread.h"
14 #include "common/zipkin_trace.h"
15
16 #include "Objecter.h"
17 #include "Striper.h"
18
19 class CephContext;
20 class WritebackHandler;
21 class PerfCounters;
22
23 enum {
24   l_objectcacher_first = 25000,
25
26   l_objectcacher_cache_ops_hit, // ops we satisfy completely from cache
27   l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache
28
29   l_objectcacher_cache_bytes_hit, // bytes read directly from cache
30
31   l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly
32
33                                    // from cache
34
35   l_objectcacher_data_read, // total bytes read out
36   l_objectcacher_data_written, // bytes written to cache
37   l_objectcacher_data_flushed, // bytes flushed to WritebackHandler
38   l_objectcacher_overwritten_in_flush, // bytes overwritten while
39                                        // flushing is in progress
40
41   l_objectcacher_write_ops_blocked, // total write ops we delayed due
42                                     // to dirty limits
43   l_objectcacher_write_bytes_blocked, // total number of write bytes
44                                       // we delayed due to dirty
45                                       // limits
46   l_objectcacher_write_time_blocked, // total time in seconds spent
47                                      // blocking a write due to dirty
48                                      // limits
49
50   l_objectcacher_last,
51 };
52
53 class ObjectCacher {
54   PerfCounters *perfcounter;
55  public:
56   CephContext *cct;
57   class Object;
58   struct ObjectSet;
59   class C_ReadFinish;
60
61   typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset);
62
63   // read scatter/gather
64   struct OSDRead {
65     vector<ObjectExtent> extents;
66     snapid_t snap;
67     bufferlist *bl;
68     int fadvise_flags;
69     OSDRead(snapid_t s, bufferlist *b, int f)
70       : snap(s), bl(b), fadvise_flags(f) {}
71   };
72
73   OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) const {
74     return new OSDRead(snap, b, f);
75   }
76
77   // write scatter/gather
78   struct OSDWrite {
79     vector<ObjectExtent> extents;
80     SnapContext snapc;
81     bufferlist bl;
82     ceph::real_time mtime;
83     int fadvise_flags;
84     ceph_tid_t journal_tid;
85     OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt,
86              int f, ceph_tid_t _journal_tid)
87       : snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
88         journal_tid(_journal_tid) {}
89   };
90
91   OSDWrite *prepare_write(const SnapContext& sc,
92                           const bufferlist &b,
93                           ceph::real_time mt,
94                           int f,
95                           ceph_tid_t journal_tid) const {
96     return new OSDWrite(sc, b, mt, f, journal_tid);
97   }
98
99
100
101   // ******* BufferHead *********
102   class BufferHead : public LRUObject {
103   public:
104     // states
105     static const int STATE_MISSING = 0;
106     static const int STATE_CLEAN = 1;
107     static const int STATE_ZERO = 2;   // NOTE: these are *clean* zeros
108     static const int STATE_DIRTY = 3;
109     static const int STATE_RX = 4;
110     static const int STATE_TX = 5;
111     static const int STATE_ERROR = 6; // a read error occurred
112
113   private:
114     // my fields
115     int state;
116     int ref;
117     struct {
118       loff_t start, length;   // bh extent in object
119     } ex;
120     bool dontneed; //indicate bh don't need by anyone
121     bool nocache; //indicate bh don't need by this caller
122
123   public:
124     Object *ob;
125     bufferlist  bl;
126     ceph_tid_t last_write_tid;  // version of bh (if non-zero)
127     ceph_tid_t last_read_tid;   // tid of last read op (if any)
128     ceph::real_time last_write;
129     SnapContext snapc;
130     ceph_tid_t journal_tid;
131     int error; // holds return value for failed reads
132
133     map<loff_t, list<Context*> > waitfor_read;
134
135     // cons
136     explicit BufferHead(Object *o) :
137       state(STATE_MISSING),
138       ref(0),
139       dontneed(false),
140       nocache(false),
141       ob(o),
142       last_write_tid(0),
143       last_read_tid(0),
144       journal_tid(0),
145       error(0) {
146       ex.start = ex.length = 0;
147     }
148
149     // extent
150     loff_t start() const { return ex.start; }
151     void set_start(loff_t s) { ex.start = s; }
152     loff_t length() const { return ex.length; }
153     void set_length(loff_t l) { ex.length = l; }
154     loff_t end() const { return ex.start + ex.length; }
155     loff_t last() const { return end() - 1; }
156
157     // states
158     void set_state(int s) {
159       if (s == STATE_RX || s == STATE_TX) get();
160       if (state == STATE_RX || state == STATE_TX) put();
161       state = s;
162     }
163     int get_state() const { return state; }
164
165     inline ceph_tid_t get_journal_tid() const {
166       return journal_tid;
167     }
168     inline void set_journal_tid(ceph_tid_t _journal_tid) {
169       journal_tid = _journal_tid;
170     }
171
172     bool is_missing() const { return state == STATE_MISSING; }
173     bool is_dirty() const { return state == STATE_DIRTY; }
174     bool is_clean() const { return state == STATE_CLEAN; }
175     bool is_zero() const { return state == STATE_ZERO; }
176     bool is_tx() const { return state == STATE_TX; }
177     bool is_rx() const { return state == STATE_RX; }
178     bool is_error() const { return state == STATE_ERROR; }
179
180     // reference counting
181     int get() {
182       assert(ref >= 0);
183       if (ref == 0) lru_pin();
184       return ++ref;
185     }
186     int put() {
187       assert(ref > 0);
188       if (ref == 1) lru_unpin();
189       --ref;
190       return ref;
191     }
192
193     void set_dontneed(bool v) {
194       dontneed = v;
195     }
196     bool get_dontneed() const {
197       return dontneed;
198     }
199
200     void set_nocache(bool v) {
201       nocache = v;
202     }
203     bool get_nocache() const {
204       return nocache;
205     }
206
207     inline bool can_merge_journal(BufferHead *bh) const {
208       return (get_journal_tid() == bh->get_journal_tid());
209     }
210
211     struct ptr_lt {
212       bool operator()(const BufferHead* l, const BufferHead* r) const {
213         const Object *lob = l->ob;
214         const Object *rob = r->ob;
215         const ObjectSet *loset = lob->oset;
216         const ObjectSet *roset = rob->oset;
217         if (loset != roset)
218           return loset < roset;
219         if (lob != rob)
220           return lob < rob;
221         if (l->start() != r->start())
222           return l->start() < r->start();
223         return l < r;
224       }
225     };
226   };
227
228   // ******* Object *********
229   class Object : public LRUObject {
230   private:
231     // ObjectCacher::Object fields
232     int ref;
233     ObjectCacher *oc;
234     sobject_t oid;
235     friend struct ObjectSet;
236
237   public:
238     uint64_t object_no;
239     ObjectSet *oset;
240     xlist<Object*>::item set_item;
241     object_locator_t oloc;
242     uint64_t truncate_size, truncate_seq;
243
244     bool complete;
245     bool exists;
246
247     map<loff_t, BufferHead*>     data;
248
249     ceph_tid_t last_write_tid;  // version of bh (if non-zero)
250     ceph_tid_t last_commit_tid; // last update commited.
251
252     int dirty_or_tx;
253
254     map< ceph_tid_t, list<Context*> > waitfor_commit;
255     xlist<C_ReadFinish*> reads;
256
257     Object(const Object&) = delete;
258     Object& operator=(const Object&) = delete;
259
260     Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os,
261            object_locator_t& l, uint64_t ts, uint64_t tq) :
262       ref(0),
263       oc(_oc),
264       oid(o), object_no(ono), oset(os), set_item(this), oloc(l),
265       truncate_size(ts), truncate_seq(tq),
266       complete(false), exists(true),
267       last_write_tid(0), last_commit_tid(0),
268       dirty_or_tx(0) {
269       // add to set
270       os->objects.push_back(&set_item);
271     }
272     ~Object() {
273       reads.clear();
274       assert(ref == 0);
275       assert(data.empty());
276       assert(dirty_or_tx == 0);
277       set_item.remove_myself();
278     }
279
280     sobject_t get_soid() const { return oid; }
281     object_t get_oid() { return oid.oid; }
282     snapid_t get_snap() { return oid.snap; }
283     ObjectSet *get_object_set() const { return oset; }
284     string get_namespace() { return oloc.nspace; }
285     uint64_t get_object_number() const { return object_no; }
286
287     const object_locator_t& get_oloc() const { return oloc; }
288     void set_object_locator(object_locator_t& l) { oloc = l; }
289
290     bool can_close() const {
291       if (lru_is_expireable()) {
292         assert(data.empty());
293         assert(waitfor_commit.empty());
294         return true;
295       }
296       return false;
297     }
298
299     /**
300      * Check buffers and waiters for consistency
301      * - no overlapping buffers
302      * - index in map matches BH
303      * - waiters fall within BH
304      */
305     void audit_buffers();
306
307     /**
308      * find first buffer that includes or follows an offset
309      *
310      * @param offset object byte offset
311      * @return iterator pointing to buffer, or data.end()
312      */
313     map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
314       map<loff_t,BufferHead*>::const_iterator p = data.lower_bound(offset);
315       if (p != data.begin() &&
316           (p == data.end() || p->first > offset)) {
317         --p;     // might overlap!
318         if (p->first + p->second->length() <= offset)
319           ++p;   // doesn't overlap.
320       }
321       return p;
322     }
323
324     // bh
325     // add to my map
326     void add_bh(BufferHead *bh) {
327       if (data.empty())
328         get();
329       assert(data.count(bh->start()) == 0);
330       data[bh->start()] = bh;
331     }
332     void remove_bh(BufferHead *bh) {
333       assert(data.count(bh->start()));
334       data.erase(bh->start());
335       if (data.empty())
336         put();
337     }
338
339     bool is_empty() const { return data.empty(); }
340
341     // mid-level
342     BufferHead *split(BufferHead *bh, loff_t off);
343     void merge_left(BufferHead *left, BufferHead *right);
344     void try_merge_bh(BufferHead *bh);
345
346     bool is_cached(loff_t off, loff_t len) const;
347     bool include_all_cached_data(loff_t off, loff_t len);
348     int map_read(ObjectExtent &ex,
349                  map<loff_t, BufferHead*>& hits,
350                  map<loff_t, BufferHead*>& missing,
351                  map<loff_t, BufferHead*>& rx,
352                  map<loff_t, BufferHead*>& errors);
353     BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
354
355     void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
356     void truncate(loff_t s);
357     void discard(loff_t off, loff_t len);
358
359     // reference counting
360     int get() {
361       assert(ref >= 0);
362       if (ref == 0) lru_pin();
363       return ++ref;
364     }
365     int put() {
366       assert(ref > 0);
367       if (ref == 1) lru_unpin();
368       --ref;
369       return ref;
370     }
371   };
372
373
374   struct ObjectSet {
375     void *parent;
376
377     inodeno_t ino;
378     uint64_t truncate_seq, truncate_size;
379
380     int64_t poolid;
381     xlist<Object*> objects;
382
383     int dirty_or_tx;
384     bool return_enoent;
385
386     ObjectSet(void *p, int64_t _poolid, inodeno_t i)
387       : parent(p), ino(i), truncate_seq(0),
388         truncate_size(0), poolid(_poolid), dirty_or_tx(0),
389         return_enoent(false) {}
390
391   };
392
393
394   // ******* ObjectCacher *********
395   // ObjectCacher fields
396  private:
397   WritebackHandler& writeback_handler;
398   bool scattered_write;
399
400   string name;
401   Mutex& lock;
402
403   uint64_t max_dirty, target_dirty, max_size, max_objects;
404   ceph::timespan max_dirty_age;
405   bool block_writes_upfront;
406
407   ZTracer::Endpoint trace_endpoint;
408
409   flush_set_callback_t flush_set_callback;
410   void *flush_set_callback_arg;
411
412   // indexed by pool_id
413   vector<ceph::unordered_map<sobject_t, Object*> > objects;
414
415   list<Context*> waitfor_read;
416
417   ceph_tid_t last_read_tid;
418
419   set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
420   LRU   bh_lru_dirty, bh_lru_rest;
421   LRU   ob_lru;
422
423   Cond flusher_cond;
424   bool flusher_stop;
425   void flusher_entry();
426   class FlusherThread : public Thread {
427     ObjectCacher *oc;
428   public:
429     explicit FlusherThread(ObjectCacher *o) : oc(o) {}
430     void *entry() override {
431       oc->flusher_entry();
432       return 0;
433     }
434   } flusher_thread;
435
436   Finisher finisher;
437
438   // objects
439   Object *get_object_maybe(sobject_t oid, object_locator_t &l) {
440     // have it?
441     if (((uint32_t)l.pool < objects.size()) &&
442         (objects[l.pool].count(oid)))
443       return objects[l.pool][oid];
444     return NULL;
445   }
446
447   Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset,
448                      object_locator_t &l, uint64_t truncate_size,
449                      uint64_t truncate_seq);
450   void close_object(Object *ob);
451
452   // bh stats
453   Cond  stat_cond;
454
455   loff_t stat_clean;
456   loff_t stat_zero;
457   loff_t stat_dirty;
458   loff_t stat_rx;
459   loff_t stat_tx;
460   loff_t stat_missing;
461   loff_t stat_error;
462   loff_t stat_dirty_waiting;   // bytes that writers are waiting on to write
463
464   size_t stat_nr_dirty_waiters;
465
466   void verify_stats() const;
467
468   void bh_stat_add(BufferHead *bh);
469   void bh_stat_sub(BufferHead *bh);
470   loff_t get_stat_tx() const { return stat_tx; }
471   loff_t get_stat_rx() const { return stat_rx; }
472   loff_t get_stat_dirty() const { return stat_dirty; }
473   loff_t get_stat_clean() const { return stat_clean; }
474   loff_t get_stat_zero() const { return stat_zero; }
475   loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
476   size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; }
477
478   void touch_bh(BufferHead *bh) {
479     if (bh->is_dirty())
480       bh_lru_dirty.lru_touch(bh);
481     else
482       bh_lru_rest.lru_touch(bh);
483
484     bh->set_dontneed(false);
485     bh->set_nocache(false);
486     touch_ob(bh->ob);
487   }
488   void touch_ob(Object *ob) {
489     ob_lru.lru_touch(ob);
490   }
491   void bottouch_ob(Object *ob) {
492     ob_lru.lru_bottouch(ob);
493   }
494
495   // bh states
496   void bh_set_state(BufferHead *bh, int s);
497   void copy_bh_state(BufferHead *bh1, BufferHead *bh2) {
498     bh_set_state(bh2, bh1->get_state());
499   }
500
501   void mark_missing(BufferHead *bh) {
502     bh_set_state(bh,BufferHead::STATE_MISSING);
503   }
504   void mark_clean(BufferHead *bh) {
505     bh_set_state(bh, BufferHead::STATE_CLEAN);
506   }
507   void mark_zero(BufferHead *bh) {
508     bh_set_state(bh, BufferHead::STATE_ZERO);
509   }
510   void mark_rx(BufferHead *bh) {
511     bh_set_state(bh, BufferHead::STATE_RX);
512   }
513   void mark_tx(BufferHead *bh) {
514     bh_set_state(bh, BufferHead::STATE_TX); }
515   void mark_error(BufferHead *bh) {
516     bh_set_state(bh, BufferHead::STATE_ERROR);
517   }
518   void mark_dirty(BufferHead *bh) {
519     bh_set_state(bh, BufferHead::STATE_DIRTY);
520     bh_lru_dirty.lru_touch(bh);
521     //bh->set_dirty_stamp(ceph_clock_now());
522   }
523
524   void bh_add(Object *ob, BufferHead *bh);
525   void bh_remove(Object *ob, BufferHead *bh);
526
527   // io
528   void bh_read(BufferHead *bh, int op_flags,
529                const ZTracer::Trace &parent_trace);
530   void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace);
531   void bh_write_scattered(list<BufferHead*>& blist);
532   void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
533                             int64_t *amount, int *max_count);
534
535   void trim();
536   void flush(ZTracer::Trace *trace, loff_t amount=0);
537
538   /**
539    * flush a range of buffers
540    *
541    * Flush any buffers that intersect the specified extent.  If len==0,
542    * flush *all* buffers for the object.
543    *
544    * @param o object
545    * @param off start offset
546    * @param len extent length, or 0 for entire object
547    * @return true if object was already clean/flushed.
548    */
549   bool flush(Object *o, loff_t off, loff_t len,
550              ZTracer::Trace *trace);
551   loff_t release(Object *o);
552   void purge(Object *o);
553
554   int64_t reads_outstanding;
555   Cond read_cond;
556
557   int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
558              bool external_call, ZTracer::Trace *trace);
559   void retry_waiting_reads();
560
561  public:
562   void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
563                       loff_t offset, uint64_t length,
564                       bufferlist &bl, int r,
565                       bool trust_enoent);
566   void bh_write_commit(int64_t poolid, sobject_t oid,
567                        vector<pair<loff_t, uint64_t> >& ranges,
568                        ceph_tid_t t, int r);
569
570   class C_WriteCommit;
571   class C_WaitForWrite;
572
573   void perf_start();
574   void perf_stop();
575
576
577
578   ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, Mutex& l,
579                flush_set_callback_t flush_callback,
580                void *flush_callback_arg,
581                uint64_t max_bytes, uint64_t max_objects,
582                uint64_t max_dirty, uint64_t target_dirty, double max_age,
583                bool block_writes_upfront);
584   ~ObjectCacher();
585
586   void start() {
587     flusher_thread.create("flusher");
588   }
589   void stop() {
590     assert(flusher_thread.is_started());
591     lock.Lock();  // hmm.. watch out for deadlock!
592     flusher_stop = true;
593     flusher_cond.Signal();
594     lock.Unlock();
595     flusher_thread.join();
596   }
597
598
599   class C_RetryRead;
600
601
602   // non-blocking.  async.
603
604   /**
605    * @note total read size must be <= INT_MAX, since
606    * the return value is total bytes read
607    */
608   int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
609             ZTracer::Trace *parent_trace = nullptr);
610   int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
611              ZTracer::Trace *parent_trace = nullptr);
612   bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
613                  snapid_t snapid);
614
615 private:
616   // write blocking
617   int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
618                       ZTracer::Trace *trace, Context *onfreespace);
619   void maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
620   bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);
621
622 public:
623   bool set_is_empty(ObjectSet *oset);
624   bool set_is_cached(ObjectSet *oset);
625   bool set_is_dirty_or_committing(ObjectSet *oset);
626
627   bool flush_set(ObjectSet *oset, Context *onfinish=0);
628   bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex,
629                  ZTracer::Trace *trace, Context *onfinish = 0);
630   bool flush_all(Context *onfinish = 0);
631
632   void purge_set(ObjectSet *oset);
633
634   // returns # of bytes not released (ie non-clean)
635   loff_t release_set(ObjectSet *oset);
636   uint64_t release_all();
637
638   void discard_set(ObjectSet *oset, const vector<ObjectExtent>& ex);
639
640   /**
641    * Retry any in-flight reads that get -ENOENT instead of marking
642    * them zero, and get rid of any cached -ENOENTs.
643    * After this is called and the cache's lock is unlocked,
644    * any new requests will treat -ENOENT normally.
645    */
646   void clear_nonexistence(ObjectSet *oset);
647
648
649   // cache sizes
650   void set_max_dirty(uint64_t v) {
651     max_dirty = v;
652   }
653   void set_target_dirty(int64_t v) {
654     target_dirty = v;
655   }
656   void set_max_size(int64_t v) {
657     max_size = v;
658   }
659   void set_max_dirty_age(double a) {
660     max_dirty_age = make_timespan(a);
661   }
662   void set_max_objects(int64_t v) {
663     max_objects = v;
664   }
665
666
667   // file functions
668
669   /*** async+caching (non-blocking) file interface ***/
670   int file_is_cached(ObjectSet *oset, file_layout_t *layout,
671                      snapid_t snapid, loff_t offset, uint64_t len) {
672     vector<ObjectExtent> extents;
673     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
674                              oset->truncate_size, extents);
675     return is_cached(oset, extents, snapid);
676   }
677
678   int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid,
679                 loff_t offset, uint64_t len, bufferlist *bl, int flags,
680                 Context *onfinish) {
681     OSDRead *rd = prepare_read(snapid, bl, flags);
682     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
683                              oset->truncate_size, rd->extents);
684     return readx(rd, oset, onfinish);
685   }
686
687   int file_write(ObjectSet *oset, file_layout_t *layout,
688                  const SnapContext& snapc, loff_t offset, uint64_t len,
689                  bufferlist& bl, ceph::real_time mtime, int flags) {
690     OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
691     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
692                              oset->truncate_size, wr->extents);
693     return writex(wr, oset, NULL);
694   }
695
696   bool file_flush(ObjectSet *oset, file_layout_t *layout,
697                   const SnapContext& snapc, loff_t offset, uint64_t len,
698                   Context *onfinish) {
699     vector<ObjectExtent> extents;
700     Striper::file_to_extents(cct, oset->ino, layout, offset, len,
701                              oset->truncate_size, extents);
702     ZTracer::Trace trace;
703     return flush_set(oset, extents, &trace, onfinish);
704   }
705 };
706
707
708 inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
709 {
710   out << "bh[ " << &bh << " "
711       << bh.start() << "~" << bh.length()
712       << " " << bh.ob
713       << " (" << bh.bl.length() << ")"
714       << " v " << bh.last_write_tid;
715   if (bh.get_journal_tid() != 0) {
716     out << " j " << bh.get_journal_tid();
717   }
718   if (bh.is_tx()) out << " tx";
719   if (bh.is_rx()) out << " rx";
720   if (bh.is_dirty()) out << " dirty";
721   if (bh.is_clean()) out << " clean";
722   if (bh.is_zero()) out << " zero";
723   if (bh.is_missing()) out << " missing";
724   if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0];
725   if (bh.error) out << " error=" << bh.error;
726   out << "]";
727   out << " waiters = {";
728   for (map<loff_t, list<Context*> >::const_iterator it
729          = bh.waitfor_read.begin();
730        it != bh.waitfor_read.end(); ++it) {
731     out << " " << it->first << "->[";
732     for (list<Context*>::const_iterator lit = it->second.begin();
733          lit != it->second.end(); ++lit) {
734          out << *lit << ", ";
735     }
736     out << "]";
737   }
738   out << "}";
739   return out;
740 }
741
742 inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os)
743 {
744   return out << "objectset[" << os.ino
745              << " ts " << os.truncate_seq << "/" << os.truncate_size
746              << " objects " << os.objects.size()
747              << " dirty_or_tx " << os.dirty_or_tx
748              << "]";
749 }
750
751 inline ostream& operator<<(ostream &out, const ObjectCacher::Object &ob)
752 {
753   out << "object["
754       << ob.get_soid() << " oset " << ob.oset << dec
755       << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
756
757   if (ob.complete)
758     out << " COMPLETE";
759   if (!ob.exists)
760     out << " !EXISTS";
761
762   out << "]";
763   return out;
764 }
765
766 #endif