Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osdc / ObjectCacher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits.h>
5
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11
12 #include "include/assert.h"
13
14 #define MAX_FLUSH_UNDER_LOCK 20  ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT 12   // memory usage of BufferHead, count in (1<<n)
16
17 using std::chrono::seconds;
18                                  /// while holding the lock
19
20 /*** ObjectCacher::BufferHead ***/
21
22
23 /*** ObjectCacher::Object ***/
24
25 #define dout_subsys ceph_subsys_objectcacher
26 #undef dout_prefix
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
28
29
30
31 class ObjectCacher::C_ReadFinish : public Context {
32   ObjectCacher *oc;
33   int64_t poolid;
34   sobject_t oid;
35   loff_t start;
36   uint64_t length;
37   xlist<C_ReadFinish*>::item set_item;
38   bool trust_enoent;
39   ceph_tid_t tid;
40   ZTracer::Trace trace;
41
42 public:
43   bufferlist bl;
44   C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
45                uint64_t l, const ZTracer::Trace &trace) :
46     oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47     set_item(this), trust_enoent(true),
48     tid(t), trace(trace) {
49     ob->reads.push_back(&set_item);
50   }
51
52   void finish(int r) override {
53     oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
54     trace.event("finish");
55
56     // object destructor clears the list
57     if (set_item.is_on_list())
58       set_item.remove_myself();
59   }
60
61   void distrust_enoent() {
62     trust_enoent = false;
63   }
64 };
65
66 class ObjectCacher::C_RetryRead : public Context {
67   ObjectCacher *oc;
68   OSDRead *rd;
69   ObjectSet *oset;
70   Context *onfinish;
71   ZTracer::Trace trace;
72 public:
73   C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74               const ZTracer::Trace &trace)
75     : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
76   }
77   void finish(int r) override {
78     if (r >= 0) {
79       r = oc->_readx(rd, oset, onfinish, false, &trace);
80     }
81
82     if (r == 0) {
83       // read is still in-progress
84       return;
85     }
86
87     trace.event("finish");
88     if (onfinish) {
89       onfinish->complete(r);
90     }
91   }
92 };
93
94 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
95                                                       loff_t off)
96 {
97   assert(oc->lock.is_locked());
98   ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
99
100   // split off right
101   ObjectCacher::BufferHead *right = new BufferHead(this);
102
103   //inherit and if later access, this auto clean.
104   right->set_dontneed(left->get_dontneed());
105   right->set_nocache(left->get_nocache());
106
107   right->last_write_tid = left->last_write_tid;
108   right->last_read_tid = left->last_read_tid;
109   right->set_state(left->get_state());
110   right->snapc = left->snapc;
111   right->set_journal_tid(left->journal_tid);
112
113   loff_t newleftlen = off - left->start();
114   right->set_start(off);
115   right->set_length(left->length() - newleftlen);
116
117   // shorten left
118   oc->bh_stat_sub(left);
119   left->set_length(newleftlen);
120   oc->bh_stat_add(left);
121
122   // add right
123   oc->bh_add(this, right);
124
125   // split buffers too
126   bufferlist bl;
127   bl.claim(left->bl);
128   if (bl.length()) {
129     assert(bl.length() == (left->length() + right->length()));
130     right->bl.substr_of(bl, left->length(), right->length());
131     left->bl.substr_of(bl, 0, left->length());
132   }
133
134   // move read waiters
135   if (!left->waitfor_read.empty()) {
136     map<loff_t, list<Context*> >::iterator start_remove
137       = left->waitfor_read.begin();
138     while (start_remove != left->waitfor_read.end() &&
139            start_remove->first < right->start())
140       ++start_remove;
141     for (map<loff_t, list<Context*> >::iterator p = start_remove;
142          p != left->waitfor_read.end(); ++p) {
143       ldout(oc->cct, 20) << "split  moving waiters at byte " << p->first
144                          << " to right bh" << dendl;
145       right->waitfor_read[p->first].swap( p->second );
146       assert(p->second.empty());
147     }
148     left->waitfor_read.erase(start_remove, left->waitfor_read.end());
149   }
150
151   ldout(oc->cct, 20) << "split    left is " << *left << dendl;
152   ldout(oc->cct, 20) << "split   right is " << *right << dendl;
153   return right;
154 }
155
156
157 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
158 {
159   assert(oc->lock.is_locked());
160   assert(left->end() == right->start());
161   assert(left->get_state() == right->get_state());
162   assert(left->can_merge_journal(right));
163
164   ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
165   if (left->get_journal_tid() == 0) {
166     left->set_journal_tid(right->get_journal_tid());
167   }
168   right->set_journal_tid(0);
169
170   oc->bh_remove(this, right);
171   oc->bh_stat_sub(left);
172   left->set_length(left->length() + right->length());
173   oc->bh_stat_add(left);
174
175   // data
176   left->bl.claim_append(right->bl);
177
178   // version
179   // note: this is sorta busted, but should only be used for dirty buffers
180   left->last_write_tid =  MAX( left->last_write_tid, right->last_write_tid );
181   left->last_write = MAX( left->last_write, right->last_write );
182
183   left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
184   left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
185
186   // waiters
187   for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
188        p != right->waitfor_read.end();
189        ++p)
190     left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
191                                         p->second );
192
193   // hose right
194   delete right;
195
196   ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
197 }
198
199 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
200 {
201   assert(oc->lock.is_locked());
202   ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
203
204   // do not merge rx buffers; last_read_tid may not match
205   if (bh->is_rx())
206     return;
207
208   // to the left?
209   map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
210   assert(p->second == bh);
211   if (p != data.begin()) {
212     --p;
213     if (p->second->end() == bh->start() &&
214         p->second->get_state() == bh->get_state() &&
215         p->second->can_merge_journal(bh)) {
216       merge_left(p->second, bh);
217       bh = p->second;
218     } else {
219       ++p;
220     }
221   }
222   // to the right?
223   assert(p->second == bh);
224   ++p;
225   if (p != data.end() &&
226       p->second->start() == bh->end() &&
227       p->second->get_state() == bh->get_state() &&
228       p->second->can_merge_journal(bh))
229     merge_left(bh, p->second);
230 }
231
232 /*
233  * count bytes we have cached in given range
234  */
235 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
236 {
237   assert(oc->lock.is_locked());
238   map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
239   while (left > 0) {
240     if (p == data.end())
241       return false;
242
243     if (p->first <= cur) {
244       // have part of it
245       loff_t lenfromcur = MIN(p->second->end() - cur, left);
246       cur += lenfromcur;
247       left -= lenfromcur;
248       ++p;
249       continue;
250     } else if (p->first > cur) {
251       // gap
252       return false;
253     } else
254       ceph_abort();
255   }
256
257   return true;
258 }
259
260 /*
261  * all cached data in this range[off, off+len]
262  */
263 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
264 {
265   assert(oc->lock.is_locked());
266   if (data.empty())
267       return true;
268   map<loff_t, BufferHead*>::iterator first = data.begin();
269   map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
270   if (first->second->start() >= off && last->second->end() <= (off + len))
271     return true;
272   else
273     return false;
274 }
275
276 /*
277  * map a range of bytes into buffer_heads.
278  * - create missing buffer_heads as necessary.
279  */
280 int ObjectCacher::Object::map_read(ObjectExtent &ex,
281                                    map<loff_t, BufferHead*>& hits,
282                                    map<loff_t, BufferHead*>& missing,
283                                    map<loff_t, BufferHead*>& rx,
284                                    map<loff_t, BufferHead*>& errors)
285 {
286   assert(oc->lock.is_locked());
287   ldout(oc->cct, 10) << "map_read " << ex.oid << " "
288                      << ex.offset << "~" << ex.length << dendl;
289
290   loff_t cur = ex.offset;
291   loff_t left = ex.length;
292
293   map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
294   while (left > 0) {
295     // at end?
296     if (p == data.end()) {
297       // rest is a miss.
298       BufferHead *n = new BufferHead(this);
299       n->set_start(cur);
300       n->set_length(left);
301       oc->bh_add(this, n);
302       if (complete) {
303         oc->mark_zero(n);
304         hits[cur] = n;
305         ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
306       } else {
307         missing[cur] = n;
308         ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
309       }
310       cur += left;
311       assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
312       break;  // no more.
313     }
314
315     if (p->first <= cur) {
316       // have it (or part of it)
317       BufferHead *e = p->second;
318
319       if (e->is_clean() ||
320           e->is_dirty() ||
321           e->is_tx() ||
322           e->is_zero()) {
323         hits[cur] = e;     // readable!
324         ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
325       } else if (e->is_rx()) {
326         rx[cur] = e;       // missing, not readable.
327         ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
328       } else if (e->is_error()) {
329         errors[cur] = e;
330         ldout(oc->cct, 20) << "map_read error " << *e << dendl;
331       } else {
332         ceph_abort();
333       }
334
335       loff_t lenfromcur = MIN(e->end() - cur, left);
336       cur += lenfromcur;
337       left -= lenfromcur;
338       ++p;
339       continue;  // more?
340
341     } else if (p->first > cur) {
342       // gap.. miss
343       loff_t next = p->first;
344       BufferHead *n = new BufferHead(this);
345       loff_t len = MIN(next - cur, left);
346       n->set_start(cur);
347       n->set_length(len);
348       oc->bh_add(this,n);
349       if (complete) {
350         oc->mark_zero(n);
351         hits[cur] = n;
352         ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
353       } else {
354         missing[cur] = n;
355         ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
356       }
357       cur += MIN(left, n->length());
358       left -= MIN(left, n->length());
359       continue;    // more?
360     } else {
361       ceph_abort();
362     }
363   }
364   return 0;
365 }
366
367 void ObjectCacher::Object::audit_buffers()
368 {
369   loff_t offset = 0;
370   for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
371        it != data.end(); ++it) {
372     if (it->first != it->second->start()) {
373       lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
374                      << " does not match bh start position: "
375                      << *it->second << dendl;
376       assert(it->first == it->second->start());
377     }
378     if (it->first < offset) {
379       lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
380                      << " overlaps with previous bh " << *((--it)->second)
381                      << dendl;
382       assert(it->first >= offset);
383     }
384     BufferHead *bh = it->second;
385     map<loff_t, list<Context*> >::const_iterator w_it;
386     for (w_it = bh->waitfor_read.begin();
387          w_it != bh->waitfor_read.end(); ++w_it) {
388       if (w_it->first < bh->start() ||
389             w_it->first >= bh->start() + bh->length()) {
390         lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
391                        << " is not within bh " << *bh << dendl;
392         assert(w_it->first >= bh->start());
393         assert(w_it->first < bh->start() + bh->length());
394       }
395     }
396     offset = it->first + it->second->length();
397   }
398 }
399
400 /*
401  * map a range of extents on an object's buffer cache.
402  * - combine any bh's we're writing into one
403  * - break up bufferheads that don't fall completely within the range
404  * //no! - return a bh that includes the write.  may also include
405  * other dirty data to left and/or right.
406  */
407 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
408                                                           ceph_tid_t tid)
409 {
410   assert(oc->lock.is_locked());
411   BufferHead *final = 0;
412
413   ldout(oc->cct, 10) << "map_write oex " << ex.oid
414                << " " << ex.offset << "~" << ex.length << dendl;
415
416   loff_t cur = ex.offset;
417   loff_t left = ex.length;
418
419   map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
420   while (left > 0) {
421     loff_t max = left;
422
423     // at end ?
424     if (p == data.end()) {
425       if (final == NULL) {
426         final = new BufferHead(this);
427         replace_journal_tid(final, tid);
428         final->set_start( cur );
429         final->set_length( max );
430         oc->bh_add(this, final);
431         ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
432       } else {
433         oc->bh_stat_sub(final);
434         final->set_length(final->length() + max);
435         oc->bh_stat_add(final);
436       }
437       left -= max;
438       cur += max;
439       continue;
440     }
441
442     ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
443     //oc->verify_stats();
444
445     if (p->first <= cur) {
446       BufferHead *bh = p->second;
447       ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
448
449       if (p->first < cur) {
450         assert(final == 0);
451         if (cur + max >= bh->end()) {
452           // we want right bit (one splice)
453           final = split(bh, cur);   // just split it, take right half.
454           replace_journal_tid(final, tid);
455           ++p;
456           assert(p->second == final);
457         } else {
458           // we want middle bit (two splices)
459           final = split(bh, cur);
460           ++p;
461           assert(p->second == final);
462           split(final, cur+max);
463           replace_journal_tid(final, tid);
464         }
465       } else {
466         assert(p->first == cur);
467         if (bh->length() <= max) {
468           // whole bufferhead, piece of cake.
469         } else {
470           // we want left bit (one splice)
471           split(bh, cur + max);        // just split
472         }
473         if (final) {
474           oc->mark_dirty(bh);
475           oc->mark_dirty(final);
476           --p;  // move iterator back to final
477           assert(p->second == final);
478           replace_journal_tid(bh, tid);
479           merge_left(final, bh);
480         } else {
481           final = bh;
482           replace_journal_tid(final, tid);
483         }
484       }
485
486       // keep going.
487       loff_t lenfromcur = final->end() - cur;
488       cur += lenfromcur;
489       left -= lenfromcur;
490       ++p;
491       continue;
492     } else {
493       // gap!
494       loff_t next = p->first;
495       loff_t glen = MIN(next - cur, max);
496       ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
497       if (final) {
498         oc->bh_stat_sub(final);
499         final->set_length(final->length() + glen);
500         oc->bh_stat_add(final);
501       } else {
502         final = new BufferHead(this);
503         replace_journal_tid(final, tid);
504         final->set_start( cur );
505         final->set_length( glen );
506         oc->bh_add(this, final);
507       }
508
509       cur += glen;
510       left -= glen;
511       continue;    // more?
512     }
513   }
514
515   // set version
516   assert(final);
517   assert(final->get_journal_tid() == tid);
518   ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
519
520   return final;
521 }
522
523 void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
524                                                ceph_tid_t tid) {
525   ceph_tid_t bh_tid = bh->get_journal_tid();
526
527   assert(tid == 0 || bh_tid <= tid);
528   if (bh_tid != 0 && bh_tid != tid) {
529     // inform journal that it should not expect a writeback from this extent
530     oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
531                                            bh->length(), bh_tid, tid);
532   }
533   bh->set_journal_tid(tid);
534 }
535
536 void ObjectCacher::Object::truncate(loff_t s)
537 {
538   assert(oc->lock.is_locked());
539   ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
540
541   while (!data.empty()) {
542     BufferHead *bh = data.rbegin()->second;
543     if (bh->end() <= s)
544       break;
545
546     // split bh at truncation point?
547     if (bh->start() < s) {
548       split(bh, s);
549       continue;
550     }
551
552     // remove bh entirely
553     assert(bh->start() >= s);
554     assert(bh->waitfor_read.empty());
555     replace_journal_tid(bh, 0);
556     oc->bh_remove(this, bh);
557     delete bh;
558   }
559 }
560
561 void ObjectCacher::Object::discard(loff_t off, loff_t len)
562 {
563   assert(oc->lock.is_locked());
564   ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
565                      << dendl;
566
567   if (!exists) {
568     ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
569     exists = true;
570   }
571   if (complete) {
572     ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
573     complete = false;
574   }
575
576   map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
577   while (p != data.end()) {
578     BufferHead *bh = p->second;
579     if (bh->start() >= off + len)
580       break;
581
582     // split bh at truncation point?
583     if (bh->start() < off) {
584       split(bh, off);
585       ++p;
586       continue;
587     }
588
589     assert(bh->start() >= off);
590     if (bh->end() > off + len) {
591       split(bh, off + len);
592     }
593
594     ++p;
595     ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
596     assert(bh->waitfor_read.empty());
597     replace_journal_tid(bh, 0);
598     oc->bh_remove(this, bh);
599     delete bh;
600   }
601 }
602
603
604
605 /*** ObjectCacher ***/
606
607 #undef dout_prefix
608 #define dout_prefix *_dout << "objectcacher "
609
610
611 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
612                            WritebackHandler& wb, Mutex& l,
613                            flush_set_callback_t flush_callback,
614                            void *flush_callback_arg, uint64_t max_bytes,
615                            uint64_t max_objects, uint64_t max_dirty,
616                            uint64_t target_dirty, double max_dirty_age,
617                            bool block_writes_upfront)
618   : perfcounter(NULL),
619     cct(cct_), writeback_handler(wb), name(name), lock(l),
620     max_dirty(max_dirty), target_dirty(target_dirty),
621     max_size(max_bytes), max_objects(max_objects),
622     max_dirty_age(ceph::make_timespan(max_dirty_age)),
623     block_writes_upfront(block_writes_upfront),
624     trace_endpoint("ObjectCacher"),
625     flush_set_callback(flush_callback),
626     flush_set_callback_arg(flush_callback_arg),
627     last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
628     stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
629     stat_missing(0), stat_error(0), stat_dirty_waiting(0),
630     stat_nr_dirty_waiters(0), reads_outstanding(0)
631 {
632   perf_start();
633   finisher.start();
634   scattered_write = writeback_handler.can_scattered_write();
635 }
636
637 ObjectCacher::~ObjectCacher()
638 {
639   finisher.stop();
640   perf_stop();
641   // we should be empty.
642   for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
643          = objects.begin();
644        i != objects.end();
645        ++i)
646     assert(i->empty());
647   assert(bh_lru_rest.lru_get_size() == 0);
648   assert(bh_lru_dirty.lru_get_size() == 0);
649   assert(ob_lru.lru_get_size() == 0);
650   assert(dirty_or_tx_bh.empty());
651 }
652
653 void ObjectCacher::perf_start()
654 {
655   string n = "objectcacher-" + name;
656   PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
657
658   plb.add_u64_counter(l_objectcacher_cache_ops_hit,
659                       "cache_ops_hit", "Hit operations");
660   plb.add_u64_counter(l_objectcacher_cache_ops_miss,
661                       "cache_ops_miss", "Miss operations");
662   plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
663                       "cache_bytes_hit", "Hit data");
664   plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
665                       "cache_bytes_miss", "Miss data");
666   plb.add_u64_counter(l_objectcacher_data_read,
667                       "data_read", "Read data");
668   plb.add_u64_counter(l_objectcacher_data_written,
669                       "data_written", "Data written to cache");
670   plb.add_u64_counter(l_objectcacher_data_flushed,
671                       "data_flushed", "Data flushed");
672   plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
673                       "data_overwritten_while_flushing",
674                       "Data overwritten while flushing");
675   plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
676                       "Write operations, delayed due to dirty limits");
677   plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
678                       "write_bytes_blocked",
679                       "Write data blocked on dirty limit");
680   plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
681                "Time spent blocking a write due to dirty limits");
682
683   perfcounter = plb.create_perf_counters();
684   cct->get_perfcounters_collection()->add(perfcounter);
685 }
686
687 void ObjectCacher::perf_stop()
688 {
689   assert(perfcounter);
690   cct->get_perfcounters_collection()->remove(perfcounter);
691   delete perfcounter;
692 }
693
694 /* private */
695 ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
696                                                uint64_t object_no,
697                                                ObjectSet *oset,
698                                                object_locator_t &l,
699                                                uint64_t truncate_size,
700                                                uint64_t truncate_seq)
701 {
702   // XXX: Add handling of nspace in object_locator_t in cache
703   assert(lock.is_locked());
704   // have it?
705   if ((uint32_t)l.pool < objects.size()) {
706     if (objects[l.pool].count(oid)) {
707       Object *o = objects[l.pool][oid];
708       o->object_no = object_no;
709       o->truncate_size = truncate_size;
710       o->truncate_seq = truncate_seq;
711       return o;
712     }
713   } else {
714     objects.resize(l.pool+1);
715   }
716
717   // create it.
718   Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
719                          truncate_seq);
720   objects[l.pool][oid] = o;
721   ob_lru.lru_insert_top(o);
722   return o;
723 }
724
725 void ObjectCacher::close_object(Object *ob)
726 {
727   assert(lock.is_locked());
728   ldout(cct, 10) << "close_object " << *ob << dendl;
729   assert(ob->can_close());
730
731   // ok!
732   ob_lru.lru_remove(ob);
733   objects[ob->oloc.pool].erase(ob->get_soid());
734   ob->set_item.remove_myself();
735   delete ob;
736 }
737
738 void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
739                            const ZTracer::Trace &parent_trace)
740 {
741   assert(lock.is_locked());
742   ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
743                 << reads_outstanding << dendl;
744
745   ZTracer::Trace trace;
746   if (parent_trace.valid()) {
747     trace.init("", &trace_endpoint, &parent_trace);
748     trace.copy_name("bh_read " + bh->ob->get_oid().name);
749     trace.event("start");
750   }
751
752   mark_rx(bh);
753   bh->last_read_tid = ++last_read_tid;
754
755   // finisher
756   C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
757                                             bh->start(), bh->length(), trace);
758   // go
759   writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
760                          bh->ob->get_oloc(), bh->start(), bh->length(),
761                          bh->ob->get_snap(), &onfinish->bl,
762                          bh->ob->truncate_size, bh->ob->truncate_seq,
763                          op_flags, trace, onfinish);
764
765   ++reads_outstanding;
766 }
767
768 void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
769                                   ceph_tid_t tid, loff_t start,
770                                   uint64_t length, bufferlist &bl, int r,
771                                   bool trust_enoent)
772 {
773   assert(lock.is_locked());
774   ldout(cct, 7) << "bh_read_finish "
775                 << oid
776                 << " tid " << tid
777                 << " " << start << "~" << length
778                 << " (bl is " << bl.length() << ")"
779                 << " returned " << r
780                 << " outstanding reads " << reads_outstanding
781                 << dendl;
782
783   if (r >= 0 && bl.length() < length) {
784     ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
785                   << length << " with " << length - bl.length() << " bytes of zeroes"
786                   << dendl;
787     bl.append_zero(length - bl.length());
788   }
789
790   list<Context*> ls;
791   int err = 0;
792
793   if (objects[poolid].count(oid) == 0) {
794     ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
795   } else {
796     Object *ob = objects[poolid][oid];
797
798     if (r == -ENOENT && !ob->complete) {
799       // wake up *all* rx waiters, or else we risk reordering
800       // identical reads. e.g.
801       //   read 1~1
802       //   reply to unrelated 3~1 -> !exists
803       //   read 1~1 -> immediate ENOENT
804       //   reply to first 1~1 -> ooo ENOENT
805       bool allzero = true;
806       for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
807            p != ob->data.end(); ++p) {
808         BufferHead *bh = p->second;
809         for (map<loff_t, list<Context*> >::iterator p
810                = bh->waitfor_read.begin();
811              p != bh->waitfor_read.end();
812              ++p)
813           ls.splice(ls.end(), p->second);
814         bh->waitfor_read.clear();
815         if (!bh->is_zero() && !bh->is_rx())
816           allzero = false;
817       }
818
819       // just pass through and retry all waiters if we don't trust
820       // -ENOENT for this read
821       if (trust_enoent) {
822         ldout(cct, 7)
823           << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
824           << dendl;
825         ob->complete = true;
826         ob->exists = false;
827
828         /* If all the bhs are effectively zero, get rid of them.  All
829          * the waiters will be retried and get -ENOENT immediately, so
830          * it's safe to clean up the unneeded bh's now. Since we know
831          * it's safe to remove them now, do so, so they aren't hanging
832          *around waiting for more -ENOENTs from rados while the cache
833          * is being shut down.
834          *
835          * Only do this when all the bhs are rx or clean, to match the
836          * condition in _readx(). If there are any non-rx or non-clean
837          * bhs, _readx() will wait for the final result instead of
838          * returning -ENOENT immediately.
839          */
840         if (allzero) {
841           ldout(cct, 10)
842             << "bh_read_finish ENOENT and allzero, getting rid of "
843             << "bhs for " << *ob << dendl;
844           map<loff_t, BufferHead*>::iterator p = ob->data.begin();
845           while (p != ob->data.end()) {
846             BufferHead *bh = p->second;
847             // current iterator will be invalidated by bh_remove()
848             ++p;
849             bh_remove(ob, bh);
850             delete bh;
851           }
852         }
853       }
854     }
855
856     // apply to bh's!
857     loff_t opos = start;
858     while (true) {
859       map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
860       if (p == ob->data.end())
861         break;
862       if (opos >= start+(loff_t)length) {
863         ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
864                        << start << "+" << length << "=" << start+(loff_t)length
865                        << dendl;
866         break;
867       }
868
869       BufferHead *bh = p->second;
870       ldout(cct, 20) << "checking bh " << *bh << dendl;
871
872       // finishers?
873       for (map<loff_t, list<Context*> >::iterator it
874              = bh->waitfor_read.begin();
875            it != bh->waitfor_read.end();
876            ++it)
877         ls.splice(ls.end(), it->second);
878       bh->waitfor_read.clear();
879
880       if (bh->start() > opos) {
881         ldout(cct, 1) << "bh_read_finish skipping gap "
882                       << opos << "~" << bh->start() - opos
883                       << dendl;
884         opos = bh->start();
885         continue;
886       }
887
888       if (!bh->is_rx()) {
889         ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
890         opos = bh->end();
891         continue;
892       }
893
894       if (bh->last_read_tid != tid) {
895         ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
896                        << bh->last_read_tid << " != tid " << tid
897                        << ", skipping" << dendl;
898         opos = bh->end();
899         continue;
900       }
901
902       assert(opos >= bh->start());
903       assert(bh->start() == opos);   // we don't merge rx bh's... yet!
904       assert(bh->length() <= start+(loff_t)length-opos);
905
906       if (bh->error < 0)
907         err = bh->error;
908
909       opos = bh->end();
910
911       if (r == -ENOENT) {
912         if (trust_enoent) {
913           ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
914           bh_remove(ob, bh);
915           delete bh;
916         } else {
917           ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
918                          << *bh << dendl;
919         }
920         continue;
921       }
922
923       if (r < 0) {
924         bh->error = r;
925         mark_error(bh);
926       } else {
927         bh->bl.substr_of(bl,
928                          bh->start() - start,
929                          bh->length());
930         mark_clean(bh);
931       }
932
933       ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
934
935       ob->try_merge_bh(bh);
936     }
937   }
938
939   // called with lock held.
940   ldout(cct, 20) << "finishing waiters " << ls << dendl;
941
942   finish_contexts(cct, ls, err);
943   retry_waiting_reads();
944
945   --reads_outstanding;
946   read_cond.Signal();
947 }
948
949 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
950                                         int64_t *max_amount, int *max_count)
951 {
952   list<BufferHead*> blist;
953
954   int count = 0;
955   int64_t total_len = 0;
956   set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
957   assert(it != dirty_or_tx_bh.end());
958   for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
959        p != dirty_or_tx_bh.end();
960        ++p) {
961     BufferHead *obh = *p;
962     if (obh->ob != bh->ob)
963       break;
964     if (obh->is_dirty() && obh->last_write <= cutoff) {
965       blist.push_back(obh);
966       ++count;
967       total_len += obh->length();
968       if ((max_count && count > *max_count) ||
969           (max_amount && total_len > *max_amount))
970         break;
971     }
972   }
973
974   while (it != dirty_or_tx_bh.begin()) {
975     --it;
976     BufferHead *obh = *it;
977     if (obh->ob != bh->ob)
978       break;
979     if (obh->is_dirty() && obh->last_write <= cutoff) {
980       blist.push_front(obh);
981       ++count;
982       total_len += obh->length();
983       if ((max_count && count > *max_count) ||
984           (max_amount && total_len > *max_amount))
985         break;
986     }
987   }
988   if (max_count)
989     *max_count -= count;
990   if (max_amount)
991     *max_amount -= total_len;
992
993   bh_write_scattered(blist);
994 }
995
996 class ObjectCacher::C_WriteCommit : public Context {
997   ObjectCacher *oc;
998   int64_t poolid;
999   sobject_t oid;
1000   vector<pair<loff_t, uint64_t> > ranges;
1001   ZTracer::Trace trace;
1002 public:
1003   ceph_tid_t tid = 0;
1004   C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
1005                 uint64_t l, const ZTracer::Trace &trace) :
1006     oc(c), poolid(_poolid), oid(o), trace(trace) {
1007       ranges.push_back(make_pair(s, l));
1008     }
1009   C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1010                 vector<pair<loff_t, uint64_t> >& _ranges) :
1011     oc(c), poolid(_poolid), oid(o), tid(0) {
1012       ranges.swap(_ranges);
1013     }
1014   void finish(int r) override {
1015     oc->bh_write_commit(poolid, oid, ranges, tid, r);
1016     trace.event("finish");
1017   }
1018 };
1019 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1020 {
1021   assert(lock.is_locked());
1022
1023   Object *ob = blist.front()->ob;
1024   ob->get();
1025
1026   ceph::real_time last_write;
1027   SnapContext snapc;
1028   vector<pair<loff_t, uint64_t> > ranges;
1029   vector<pair<uint64_t, bufferlist> > io_vec;
1030
1031   ranges.reserve(blist.size());
1032   io_vec.reserve(blist.size());
1033
1034   uint64_t total_len = 0;
1035   for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1036     BufferHead *bh = *p;
1037     ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1038     assert(bh->ob == ob);
1039     assert(bh->bl.length() == bh->length());
1040     ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1041
1042     int n = io_vec.size();
1043     io_vec.resize(n + 1);
1044     io_vec[n].first = bh->start();
1045     io_vec[n].second = bh->bl;
1046
1047     total_len += bh->length();
1048     if (bh->snapc.seq > snapc.seq)
1049       snapc = bh->snapc;
1050     if (bh->last_write > last_write)
1051       last_write = bh->last_write;
1052   }
1053
1054   C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1055
1056   ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1057                                            io_vec, snapc, last_write,
1058                                            ob->truncate_size, ob->truncate_seq,
1059                                            oncommit);
1060   oncommit->tid = tid;
1061   ob->last_write_tid = tid;
1062   for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1063     BufferHead *bh = *p;
1064     bh->last_write_tid = tid;
1065     mark_tx(bh);
1066   }
1067
1068   if (perfcounter)
1069     perfcounter->inc(l_objectcacher_data_flushed, total_len);
1070 }
1071
1072 void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
1073 {
1074   assert(lock.is_locked());
1075   ldout(cct, 7) << "bh_write " << *bh << dendl;
1076
1077   bh->ob->get();
1078
1079   ZTracer::Trace trace;
1080   if (parent_trace.valid()) {
1081     trace.init("", &trace_endpoint, &parent_trace);
1082     trace.copy_name("bh_write " + bh->ob->get_oid().name);
1083     trace.event("start");
1084   }
1085
1086   // finishers
1087   C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1088                                               bh->ob->get_soid(), bh->start(),
1089                                               bh->length(), trace);
1090   // go
1091   ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1092                                            bh->ob->get_oloc(),
1093                                            bh->start(), bh->length(),
1094                                            bh->snapc, bh->bl, bh->last_write,
1095                                            bh->ob->truncate_size,
1096                                            bh->ob->truncate_seq,
1097                                            bh->journal_tid, trace, oncommit);
1098   ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1099
1100   // set bh last_write_tid
1101   oncommit->tid = tid;
1102   bh->ob->last_write_tid = tid;
1103   bh->last_write_tid = tid;
1104
1105   if (perfcounter) {
1106     perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1107   }
1108
1109   mark_tx(bh);
1110 }
1111
1112 void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1113                                    vector<pair<loff_t, uint64_t> >& ranges,
1114                                    ceph_tid_t tid, int r)
1115 {
1116   assert(lock.is_locked());
1117   ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1118                 << " ranges " << ranges << " returned " << r << dendl;
1119
1120   if (objects[poolid].count(oid) == 0) {
1121     ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1122     return;
1123   }
1124
1125   Object *ob = objects[poolid][oid];
1126   int was_dirty_or_tx = ob->oset->dirty_or_tx;
1127
1128   for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1129        p != ranges.end();
1130        ++p) {
1131     loff_t start = p->first;
1132     uint64_t length = p->second;
1133     if (!ob->exists) {
1134       ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1135       ob->exists = true;
1136
1137       if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1138                                               ob->get_snap())) {
1139         ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1140           "complete on " << *ob << dendl;
1141         ob->complete = false;
1142       }
1143     }
1144
1145     vector<pair<loff_t, BufferHead*>> hit;
1146     // apply to bh's!
1147     for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1148          p != ob->data.end();
1149          ++p) {
1150       BufferHead *bh = p->second;
1151
1152       if (bh->start() > start+(loff_t)length)
1153         break;
1154
1155       if (bh->start() < start &&
1156           bh->end() > start+(loff_t)length) {
1157         ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
1158         continue;
1159       }
1160
1161       // make sure bh is tx
1162       if (!bh->is_tx()) {
1163         ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1164         continue;
1165       }
1166
1167       // make sure bh tid matches
1168       if (bh->last_write_tid != tid) {
1169         assert(bh->last_write_tid > tid);
1170         ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1171         continue;
1172       }
1173
1174       if (r >= 0) {
1175         // ok!  mark bh clean and error-free
1176         mark_clean(bh);
1177         bh->set_journal_tid(0);
1178         if (bh->get_nocache())
1179           bh_lru_rest.lru_bottouch(bh);
1180         hit.push_back(make_pair(bh->start(), bh));
1181         ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1182       } else {
1183         mark_dirty(bh);
1184         ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1185                        << *bh << " r = " << r << " " << cpp_strerror(-r)
1186                        << dendl;
1187       }
1188     }
1189
1190     for (auto& p : hit) {
1191       //p.second maybe merged and deleted in merge_left
1192       if (ob->data.count(p.first))
1193         ob->try_merge_bh(p.second);
1194     }
1195   }
1196
1197   // update last_commit.
1198   assert(ob->last_commit_tid < tid);
1199   ob->last_commit_tid = tid;
1200
1201   // waiters?
1202   list<Context*> ls;
1203   if (ob->waitfor_commit.count(tid)) {
1204     ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1205     ob->waitfor_commit.erase(tid);
1206   }
1207
1208   // is the entire object set now clean and fully committed?
1209   ObjectSet *oset = ob->oset;
1210   ob->put();
1211
1212   if (flush_set_callback &&
1213       was_dirty_or_tx > 0 &&
1214       oset->dirty_or_tx == 0) {        // nothing dirty/tx
1215     flush_set_callback(flush_set_callback_arg, oset);
1216   }
1217
1218   if (!ls.empty())
1219     finish_contexts(cct, ls, r);
1220 }
1221
1222 void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
1223 {
1224   assert(trace != nullptr);
1225   assert(lock.is_locked());
1226   ceph::real_time cutoff = ceph::real_clock::now();
1227
1228   ldout(cct, 10) << "flush " << amount << dendl;
1229
1230   /*
1231    * NOTE: we aren't actually pulling things off the LRU here, just
1232    * looking at the tail item.  Then we call bh_write, which moves it
1233    * to the other LRU, so that we can call
1234    * lru_dirty.lru_get_next_expire() again.
1235    */
1236   int64_t left = amount;
1237   while (amount == 0 || left > 0) {
1238     BufferHead *bh = static_cast<BufferHead*>(
1239       bh_lru_dirty.lru_get_next_expire());
1240     if (!bh) break;
1241     if (bh->last_write > cutoff) break;
1242
1243     if (scattered_write) {
1244       bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1245     } else {
1246       left -= bh->length();
1247       bh_write(bh, *trace);
1248     }
1249   }
1250 }
1251
1252
1253 void ObjectCacher::trim()
1254 {
1255   assert(lock.is_locked());
1256   ldout(cct, 10) << "trim  start: bytes: max " << max_size << "  clean "
1257                  << get_stat_clean() << ", objects: max " << max_objects
1258                  << " current " << ob_lru.lru_get_size() << dendl;
1259
1260   uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1261   uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1262   while (get_stat_clean() > 0 &&
1263          ((uint64_t)get_stat_clean() > max_size ||
1264           nr_clean_bh > max_clean_bh)) {
1265     BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1266     if (!bh)
1267       break;
1268
1269     ldout(cct, 10) << "trim trimming " << *bh << dendl;
1270     assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1271
1272     Object *ob = bh->ob;
1273     bh_remove(ob, bh);
1274     delete bh;
1275
1276     --nr_clean_bh;
1277
1278     if (ob->complete) {
1279       ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1280       ob->complete = false;
1281     }
1282   }
1283
1284   while (ob_lru.lru_get_size() > max_objects) {
1285     Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1286     if (!ob)
1287       break;
1288
1289     ldout(cct, 10) << "trim trimming " << *ob << dendl;
1290     close_object(ob);
1291   }
1292
1293   ldout(cct, 10) << "trim finish:  max " << max_size << "  clean "
1294                  << get_stat_clean() << ", objects: max " << max_objects
1295                  << " current " << ob_lru.lru_get_size() << dendl;
1296 }
1297
1298
1299
1300 /* public */
1301
1302 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1303                              snapid_t snapid)
1304 {
1305   assert(lock.is_locked());
1306   for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1307        ex_it != extents.end();
1308        ++ex_it) {
1309     ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1310
1311     // get Object cache
1312     sobject_t soid(ex_it->oid, snapid);
1313     Object *o = get_object_maybe(soid, ex_it->oloc);
1314     if (!o)
1315       return false;
1316     if (!o->is_cached(ex_it->offset, ex_it->length))
1317       return false;
1318   }
1319   return true;
1320 }
1321
1322
1323 /*
1324  * returns # bytes read (if in cache).  onfinish is untouched (caller
1325  *           must delete it)
1326  * returns 0 if doing async read
1327  */
1328 int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1329                         ZTracer::Trace *parent_trace)
1330 {
1331   ZTracer::Trace trace;
1332   if (parent_trace != nullptr) {
1333     trace.init("read", &trace_endpoint, parent_trace);
1334     trace.event("start");
1335   }
1336
1337   int r =_readx(rd, oset, onfinish, true, &trace);
1338   if (r < 0) {
1339     trace.event("finish");
1340   }
1341   return r;
1342 }
1343
1344 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1345                          bool external_call, ZTracer::Trace *trace)
1346 {
1347   assert(trace != nullptr);
1348   assert(lock.is_locked());
1349   bool success = true;
1350   int error = 0;
1351   uint64_t bytes_in_cache = 0;
1352   uint64_t bytes_not_in_cache = 0;
1353   uint64_t total_bytes_read = 0;
1354   map<uint64_t, bufferlist> stripe_map;  // final buffer offset -> substring
1355   bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1356   bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1357
1358   /*
1359    * WARNING: we can only meaningfully return ENOENT if the read request
1360    * passed in a single ObjectExtent.  Any caller who wants ENOENT instead of
1361    * zeroed buffers needs to feed single extents into readx().
1362    */
1363   assert(!oset->return_enoent || rd->extents.size() == 1);
1364
1365   for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1366        ex_it != rd->extents.end();
1367        ++ex_it) {
1368     ldout(cct, 10) << "readx " << *ex_it << dendl;
1369
1370     total_bytes_read += ex_it->length;
1371
1372     // get Object cache
1373     sobject_t soid(ex_it->oid, rd->snap);
1374     Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1375                            ex_it->truncate_size, oset->truncate_seq);
1376     if (external_call)
1377       touch_ob(o);
1378
1379     // does not exist and no hits?
1380     if (oset->return_enoent && !o->exists) {
1381       ldout(cct, 10) << "readx  object !exists, 1 extent..." << dendl;
1382
1383       // should we worry about COW underneath us?
1384       if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1385                                               ex_it->length, soid.snap)) {
1386         ldout(cct, 20) << "readx  may copy on write" << dendl;
1387         bool wait = false;
1388         list<BufferHead*> blist;
1389         for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1390              bh_it != o->data.end();
1391              ++bh_it) {
1392           BufferHead *bh = bh_it->second;
1393           if (bh->is_dirty() || bh->is_tx()) {
1394             ldout(cct, 10) << "readx  flushing " << *bh << dendl;
1395             wait = true;
1396             if (bh->is_dirty()) {
1397               if (scattered_write)
1398                 blist.push_back(bh);
1399               else
1400                 bh_write(bh, *trace);
1401             }
1402           }
1403         }
1404         if (scattered_write && !blist.empty())
1405           bh_write_scattered(blist);
1406         if (wait) {
1407           ldout(cct, 10) << "readx  waiting on tid " << o->last_write_tid
1408                          << " on " << *o << dendl;
1409           o->waitfor_commit[o->last_write_tid].push_back(
1410             new C_RetryRead(this,rd, oset, onfinish, *trace));
1411           // FIXME: perfcounter!
1412           return 0;
1413         }
1414       }
1415
1416       // can we return ENOENT?
1417       bool allzero = true;
1418       for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1419            bh_it != o->data.end();
1420            ++bh_it) {
1421         ldout(cct, 20) << "readx  ob has bh " << *bh_it->second << dendl;
1422         if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1423           allzero = false;
1424           break;
1425         }
1426       }
1427       if (allzero) {
1428         ldout(cct, 10) << "readx  ob has all zero|rx, returning ENOENT"
1429                        << dendl;
1430         delete rd;
1431         if (dontneed)
1432           bottouch_ob(o);
1433         return -ENOENT;
1434       }
1435     }
1436
1437     // map extent into bufferheads
1438     map<loff_t, BufferHead*> hits, missing, rx, errors;
1439     o->map_read(*ex_it, hits, missing, rx, errors);
1440     if (external_call) {
1441       // retry reading error buffers
1442       missing.insert(errors.begin(), errors.end());
1443     } else {
1444       // some reads had errors, fail later so completions
1445       // are cleaned up properly
1446       // TODO: make read path not call _readx for every completion
1447       hits.insert(errors.begin(), errors.end());
1448     }
1449
1450     if (!missing.empty() || !rx.empty()) {
1451       // read missing
1452       map<loff_t, BufferHead*>::iterator last = missing.end();
1453       for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1454            bh_it != missing.end();
1455            ++bh_it) {
1456         uint64_t rx_bytes = static_cast<uint64_t>(
1457           stat_rx + bh_it->second->length());
1458         bytes_not_in_cache += bh_it->second->length();
1459         if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1460           // cache is full with concurrent reads -- wait for rx's to complete
1461           // to constrain memory growth (especially during copy-ups)
1462           if (success) {
1463             ldout(cct, 10) << "readx missed, waiting on cache to complete "
1464                            << waitfor_read.size() << " blocked reads, "
1465                            << (MAX(rx_bytes, max_size) - max_size)
1466                            << " read bytes" << dendl;
1467             waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1468                                                    *trace));
1469           }
1470
1471           bh_remove(o, bh_it->second);
1472           delete bh_it->second;
1473         } else {
1474           bh_it->second->set_nocache(nocache);
1475           bh_read(bh_it->second, rd->fadvise_flags, *trace);
1476           if ((success && onfinish) || last != missing.end())
1477             last = bh_it;
1478         }
1479         success = false;
1480       }
1481
1482       //add wait in last bh avoid wakeup early. Because read is order
1483       if (last != missing.end()) {
1484         ldout(cct, 10) << "readx missed, waiting on " << *last->second
1485           << " off " << last->first << dendl;
1486         last->second->waitfor_read[last->first].push_back(
1487           new C_RetryRead(this, rd, oset, onfinish, *trace) );
1488
1489       }
1490
1491       // bump rx
1492       for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1493            bh_it != rx.end();
1494            ++bh_it) {
1495         touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1496         if (success && onfinish) {
1497           ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1498                          << " off " << bh_it->first << dendl;
1499           bh_it->second->waitfor_read[bh_it->first].push_back(
1500             new C_RetryRead(this, rd, oset, onfinish, *trace) );
1501         }
1502         bytes_not_in_cache += bh_it->second->length();
1503         success = false;
1504       }
1505
1506       for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1507            bh_it != hits.end();  ++bh_it)
1508         //bump in lru, so we don't lose it when later read
1509         touch_bh(bh_it->second);
1510
1511     } else {
1512       assert(!hits.empty());
1513
1514       // make a plain list
1515       for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1516            bh_it != hits.end();
1517            ++bh_it) {
1518         BufferHead *bh = bh_it->second;
1519         ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1520         if (bh->is_error() && bh->error)
1521           error = bh->error;
1522         bytes_in_cache += bh->length();
1523
1524         if (bh->get_nocache() && bh->is_clean())
1525           bh_lru_rest.lru_bottouch(bh);
1526         else
1527           touch_bh(bh);
1528         //must be after touch_bh because touch_bh set dontneed false
1529         if (dontneed &&
1530             ((loff_t)ex_it->offset <= bh->start() &&
1531              (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1532           bh->set_dontneed(true); //if dirty
1533           if (bh->is_clean())
1534             bh_lru_rest.lru_bottouch(bh);
1535         }
1536       }
1537
1538       if (!error) {
1539         // create reverse map of buffer offset -> object for the
1540         // eventual result.  this is over a single ObjectExtent, so we
1541         // know that
1542         //  - the bh's are contiguous
1543         //  - the buffer frags need not be (and almost certainly aren't)
1544         loff_t opos = ex_it->offset;
1545         map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1546         assert(bh_it->second->start() <= opos);
1547         uint64_t bhoff = opos - bh_it->second->start();
1548         vector<pair<uint64_t,uint64_t> >::iterator f_it
1549           = ex_it->buffer_extents.begin();
1550         uint64_t foff = 0;
1551         while (1) {
1552           BufferHead *bh = bh_it->second;
1553           assert(opos == (loff_t)(bh->start() + bhoff));
1554
1555           uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
1556           ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1557                          << bhoff << " frag " << f_it->first << "~"
1558                          << f_it->second << " +" << foff << "~" << len
1559                          << dendl;
1560
1561           bufferlist bit;
1562           // put substr here first, since substr_of clobbers, and we
1563           // may get multiple bh's at this stripe_map position
1564           if (bh->is_zero()) {
1565             stripe_map[f_it->first].append_zero(len);
1566           } else {
1567             bit.substr_of(bh->bl,
1568                 opos - bh->start(),
1569                 len);
1570             stripe_map[f_it->first].claim_append(bit);
1571           }
1572
1573           opos += len;
1574           bhoff += len;
1575           foff += len;
1576           if (opos == bh->end()) {
1577             ++bh_it;
1578             bhoff = 0;
1579           }
1580           if (foff == f_it->second) {
1581             ++f_it;
1582             foff = 0;
1583           }
1584           if (bh_it == hits.end()) break;
1585           if (f_it == ex_it->buffer_extents.end())
1586             break;
1587         }
1588         assert(f_it == ex_it->buffer_extents.end());
1589         assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1590       }
1591
1592       if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1593           bottouch_ob(o);
1594     }
1595   }
1596
1597   if (!success) {
1598     if (perfcounter && external_call) {
1599       perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1600       perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1601       perfcounter->inc(l_objectcacher_cache_ops_miss);
1602     }
1603     if (onfinish) {
1604       ldout(cct, 20) << "readx defer " << rd << dendl;
1605     } else {
1606       ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1607                      << dendl;
1608       delete rd;
1609     }
1610     return 0;  // wait!
1611   }
1612   if (perfcounter && external_call) {
1613     perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1614     perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1615     perfcounter->inc(l_objectcacher_cache_ops_hit);
1616   }
1617
1618   // no misses... success!  do the read.
1619   ldout(cct, 10) << "readx has all buffers" << dendl;
1620
1621   // ok, assemble into result buffer.
1622   uint64_t pos = 0;
1623   if (rd->bl && !error) {
1624     rd->bl->clear();
1625     for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1626          i != stripe_map.end();
1627          ++i) {
1628       assert(pos == i->first);
1629       ldout(cct, 10) << "readx  adding buffer len " << i->second.length()
1630                      << " at " << pos << dendl;
1631       pos += i->second.length();
1632       rd->bl->claim_append(i->second);
1633       assert(rd->bl->length() == pos);
1634     }
1635     ldout(cct, 10) << "readx  result is " << rd->bl->length() << dendl;
1636   } else if (!error) {
1637     ldout(cct, 10) << "readx  no bufferlist ptr (readahead?), done." << dendl;
1638     map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1639     pos = i->first + i->second.length();
1640   }
1641
1642   // done with read.
1643   int ret = error ? error : pos;
1644   ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1645   assert(pos <= (uint64_t) INT_MAX);
1646
1647   delete rd;
1648
1649   trim();
1650
1651   return ret;
1652 }
1653
1654 void ObjectCacher::retry_waiting_reads()
1655 {
1656   list<Context *> ls;
1657   ls.swap(waitfor_read);
1658
1659   while (!ls.empty() && waitfor_read.empty()) {
1660     Context *ctx = ls.front();
1661     ls.pop_front();
1662     ctx->complete(0);
1663   }
1664   waitfor_read.splice(waitfor_read.end(), ls);
1665 }
1666
1667 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1668                          ZTracer::Trace *parent_trace)
1669 {
1670   assert(lock.is_locked());
1671   ceph::real_time now = ceph::real_clock::now();
1672   uint64_t bytes_written = 0;
1673   uint64_t bytes_written_in_flush = 0;
1674   bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1675   bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1676
1677   ZTracer::Trace trace;
1678   if (parent_trace != nullptr) {
1679     trace.init("write", &trace_endpoint, parent_trace);
1680     trace.event("start");
1681   }
1682
1683   for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1684        ex_it != wr->extents.end();
1685        ++ex_it) {
1686     // get object cache
1687     sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1688     Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1689                            ex_it->truncate_size, oset->truncate_seq);
1690
1691     // map it all into a single bufferhead.
1692     BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1693     bool missing = bh->is_missing();
1694     bh->snapc = wr->snapc;
1695
1696     bytes_written += ex_it->length;
1697     if (bh->is_tx()) {
1698       bytes_written_in_flush += ex_it->length;
1699     }
1700
1701     // adjust buffer pointers (ie "copy" data into my cache)
1702     // this is over a single ObjectExtent, so we know that
1703     //  - there is one contiguous bh
1704     //  - the buffer frags need not be (and almost certainly aren't)
1705     // note: i assume striping is monotonic... no jumps backwards, ever!
1706     loff_t opos = ex_it->offset;
1707     for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1708            = ex_it->buffer_extents.begin();
1709          f_it != ex_it->buffer_extents.end();
1710          ++f_it) {
1711       ldout(cct, 10) << "writex writing " << f_it->first << "~"
1712                      << f_it->second << " into " << *bh << " at " << opos
1713                      << dendl;
1714       uint64_t bhoff = bh->start() - opos;
1715       assert(f_it->second <= bh->length() - bhoff);
1716
1717       // get the frag we're mapping in
1718       bufferlist frag;
1719       frag.substr_of(wr->bl,
1720                      f_it->first, f_it->second);
1721
1722       // keep anything left of bhoff
1723       bufferlist newbl;
1724       if (bhoff)
1725         newbl.substr_of(bh->bl, 0, bhoff);
1726       newbl.claim_append(frag);
1727       bh->bl.swap(newbl);
1728
1729       opos += f_it->second;
1730     }
1731
1732     // ok, now bh is dirty.
1733     mark_dirty(bh);
1734     if (dontneed)
1735       bh->set_dontneed(true);
1736     else if (nocache && missing)
1737       bh->set_nocache(true);
1738     else
1739       touch_bh(bh);
1740
1741     bh->last_write = now;
1742
1743     o->try_merge_bh(bh);
1744   }
1745
1746   if (perfcounter) {
1747     perfcounter->inc(l_objectcacher_data_written, bytes_written);
1748     if (bytes_written_in_flush) {
1749       perfcounter->inc(l_objectcacher_overwritten_in_flush,
1750                        bytes_written_in_flush);
1751     }
1752   }
1753
1754   int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
1755   delete wr;
1756
1757   //verify_stats();
1758   trim();
1759   return r;
1760 }
1761
1762 class ObjectCacher::C_WaitForWrite : public Context {
1763 public:
1764   C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1765                  const ZTracer::Trace &trace, Context *onfinish) :
1766     m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
1767   void finish(int r) override;
1768 private:
1769   ObjectCacher *m_oc;
1770   uint64_t m_len;
1771   ZTracer::Trace m_trace;
1772   Context *m_onfinish;
1773 };
1774
1775 void ObjectCacher::C_WaitForWrite::finish(int r)
1776 {
1777   Mutex::Locker l(m_oc->lock);
1778   m_oc->maybe_wait_for_writeback(m_len, &m_trace);
1779   m_onfinish->complete(r);
1780 }
1781
1782 void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
1783                                             ZTracer::Trace *trace)
1784 {
1785   assert(lock.is_locked());
1786   ceph::mono_time start = ceph::mono_clock::now();
1787   int blocked = 0;
1788   // wait for writeback?
1789   //  - wait for dirty and tx bytes (relative to the max_dirty threshold)
1790   //  - do not wait for bytes other waiters are waiting on.  this means that
1791   //    threads do not wait for each other.  this effectively allows the cache
1792   //    size to balloon proportional to the data that is in flight.
1793
1794   uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
1795   while (get_stat_dirty() + get_stat_tx() > 0 &&
1796          (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1797           max_dirty + get_stat_dirty_waiting()) ||
1798          (dirty_or_tx_bh.size() >=
1799           max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1800
1801     if (blocked == 0) {
1802       trace->event("start wait for writeback");
1803     }
1804     ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1805                    << (get_stat_dirty() + get_stat_tx()) << " >= max "
1806                    << max_dirty << " + dirty_waiting "
1807                    << get_stat_dirty_waiting() << dendl;
1808     flusher_cond.Signal();
1809     stat_dirty_waiting += len;
1810     ++stat_nr_dirty_waiters;
1811     stat_cond.Wait(lock);
1812     stat_dirty_waiting -= len;
1813     --stat_nr_dirty_waiters;
1814     ++blocked;
1815     ldout(cct, 10) << __func__ << " woke up" << dendl;
1816   }
1817   if (blocked > 0) {
1818     trace->event("finish wait for writeback");
1819   }
1820   if (blocked && perfcounter) {
1821     perfcounter->inc(l_objectcacher_write_ops_blocked);
1822     perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1823     ceph::timespan blocked = ceph::mono_clock::now() - start;
1824     perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1825   }
1826 }
1827
1828 // blocking wait for write.
1829 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1830                                   ZTracer::Trace *trace, Context *onfreespace)
1831 {
1832   assert(lock.is_locked());
1833   assert(trace != nullptr);
1834   int ret = 0;
1835
1836   if (max_dirty > 0) {
1837     if (block_writes_upfront) {
1838       maybe_wait_for_writeback(len, trace);
1839       if (onfreespace)
1840         onfreespace->complete(0);
1841     } else {
1842       assert(onfreespace);
1843       finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
1844     }
1845   } else {
1846     // write-thru!  flush what we just wrote.
1847     Cond cond;
1848     bool done = false;
1849     Context *fin = block_writes_upfront ?
1850       new C_Cond(&cond, &done, &ret) : onfreespace;
1851     assert(fin);
1852     bool flushed = flush_set(oset, wr->extents, trace, fin);
1853     assert(!flushed);   // we just dirtied it, and didn't drop our lock!
1854     ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1855                    << " bytes" << dendl;
1856     if (block_writes_upfront) {
1857       while (!done)
1858         cond.Wait(lock);
1859       ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1860       if (onfreespace)
1861         onfreespace->complete(ret);
1862     }
1863   }
1864
1865   // start writeback anyway?
1866   if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1867     ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1868                    << target_dirty << ", nudging flusher" << dendl;
1869     flusher_cond.Signal();
1870   }
1871   return ret;
1872 }
1873
1874 void ObjectCacher::flusher_entry()
1875 {
1876   ldout(cct, 10) << "flusher start" << dendl;
1877   lock.Lock();
1878   while (!flusher_stop) {
1879     loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1880       get_stat_dirty();
1881     ldout(cct, 11) << "flusher "
1882                    << all << " / " << max_size << ":  "
1883                    << get_stat_tx() << " tx, "
1884                    << get_stat_rx() << " rx, "
1885                    << get_stat_clean() << " clean, "
1886                    << get_stat_dirty() << " dirty ("
1887                    << target_dirty << " target, "
1888                    << max_dirty << " max)"
1889                    << dendl;
1890     loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1891
1892     ZTracer::Trace trace;
1893     if (cct->_conf->osdc_blkin_trace_all) {
1894       trace.init("flusher", &trace_endpoint);
1895       trace.event("start");
1896     }
1897
1898     if (actual > 0 && (uint64_t) actual > target_dirty) {
1899       // flush some dirty pages
1900       ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1901                      << get_stat_dirty_waiting() << " dirty_waiting > target "
1902                      << target_dirty << ", flushing some dirty bhs" << dendl;
1903       flush(&trace, actual - target_dirty);
1904     } else {
1905       // check tail of lru for old dirty items
1906       ceph::real_time cutoff = ceph::real_clock::now();
1907       cutoff -= max_dirty_age;
1908       BufferHead *bh = 0;
1909       int max = MAX_FLUSH_UNDER_LOCK;
1910       while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1911                                             lru_get_next_expire())) != 0 &&
1912              bh->last_write <= cutoff &&
1913              max > 0) {
1914         ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1915         if (scattered_write) {
1916           bh_write_adjacencies(bh, cutoff, NULL, &max);
1917         } else {
1918           bh_write(bh, trace);
1919           --max;
1920         }
1921       }
1922       if (!max) {
1923         // back off the lock to avoid starving other threads
1924         trace.event("backoff");
1925         lock.Unlock();
1926         lock.Lock();
1927         continue;
1928       }
1929     }
1930
1931     trace.event("finish");
1932     if (flusher_stop)
1933       break;
1934
1935     flusher_cond.WaitInterval(lock, seconds(1));
1936   }
1937
1938   /* Wait for reads to finish. This is only possible if handling
1939    * -ENOENT made some read completions finish before their rados read
1940    * came back. If we don't wait for them, and destroy the cache, when
1941    * the rados reads do come back their callback will try to access the
1942    * no-longer-valid ObjectCacher.
1943    */
1944   while (reads_outstanding > 0) {
1945     ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1946                    << reads_outstanding << dendl;
1947     read_cond.Wait(lock);
1948   }
1949
1950   lock.Unlock();
1951   ldout(cct, 10) << "flusher finish" << dendl;
1952 }
1953
1954
1955 // -------------------------------------------------
1956
1957 bool ObjectCacher::set_is_empty(ObjectSet *oset)
1958 {
1959   assert(lock.is_locked());
1960   if (oset->objects.empty())
1961     return true;
1962
1963   for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
1964     if (!(*p)->is_empty())
1965       return false;
1966
1967   return true;
1968 }
1969
1970 bool ObjectCacher::set_is_cached(ObjectSet *oset)
1971 {
1972   assert(lock.is_locked());
1973   if (oset->objects.empty())
1974     return false;
1975
1976   for (xlist<Object*>::iterator p = oset->objects.begin();
1977        !p.end(); ++p) {
1978     Object *ob = *p;
1979     for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
1980          q != ob->data.end();
1981          ++q) {
1982       BufferHead *bh = q->second;
1983       if (!bh->is_dirty() && !bh->is_tx())
1984         return true;
1985     }
1986   }
1987
1988   return false;
1989 }
1990
1991 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
1992 {
1993   assert(lock.is_locked());
1994   if (oset->objects.empty())
1995     return false;
1996
1997   for (xlist<Object*>::iterator i = oset->objects.begin();
1998        !i.end(); ++i) {
1999     Object *ob = *i;
2000
2001     for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2002          p != ob->data.end();
2003          ++p) {
2004       BufferHead *bh = p->second;
2005       if (bh->is_dirty() || bh->is_tx())
2006         return true;
2007     }
2008   }
2009
2010   return false;
2011 }
2012
2013
2014 // purge.  non-blocking.  violently removes dirty buffers from cache.
2015 void ObjectCacher::purge(Object *ob)
2016 {
2017   assert(lock.is_locked());
2018   ldout(cct, 10) << "purge " << *ob << dendl;
2019
2020   ob->truncate(0);
2021 }
2022
2023
2024 // flush.  non-blocking.  no callback.
2025 // true if clean, already flushed.
2026 // false if we wrote something.
2027 // be sloppy about the ranges and flush any buffer it touches
2028 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2029                          ZTracer::Trace *trace)
2030 {
2031   assert(trace != nullptr);
2032   assert(lock.is_locked());
2033   list<BufferHead*> blist;
2034   bool clean = true;
2035   ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2036   for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2037        p != ob->data.end();
2038        ++p) {
2039     BufferHead *bh = p->second;
2040     ldout(cct, 20) << "flush  " << *bh << dendl;
2041     if (length && bh->start() > offset+length) {
2042       break;
2043     }
2044     if (bh->is_tx()) {
2045       clean = false;
2046       continue;
2047     }
2048     if (!bh->is_dirty()) {
2049       continue;
2050     }
2051
2052     if (scattered_write)
2053       blist.push_back(bh);
2054     else
2055       bh_write(bh, *trace);
2056     clean = false;
2057   }
2058   if (scattered_write && !blist.empty())
2059     bh_write_scattered(blist);
2060
2061   return clean;
2062 }
2063
2064 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2065                                      Context *onfinish)
2066 {
2067   assert(lock.is_locked());
2068   if (gather->has_subs()) {
2069     gather->set_finisher(onfinish);
2070     gather->activate();
2071     return false;
2072   }
2073
2074   ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2075   onfinish->complete(0);
2076   return true;
2077 }
2078
2079 // flush.  non-blocking, takes callback.
2080 // returns true if already flushed
2081 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2082 {
2083   assert(lock.is_locked());
2084   assert(onfinish != NULL);
2085   if (oset->objects.empty()) {
2086     ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2087     onfinish->complete(0);
2088     return true;
2089   }
2090
2091   ldout(cct, 10) << "flush_set " << oset << dendl;
2092
2093   // we'll need to wait for all objects to flush!
2094   C_GatherBuilder gather(cct);
2095   set<Object*> waitfor_commit;
2096
2097   list<BufferHead*> blist;
2098   Object *last_ob = NULL;
2099   set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2100
2101   // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2102   // order. But items in oset->objects are not sorted. So the iterator can
2103   // point to any buffer head in the ObjectSet
2104   BufferHead key(*oset->objects.begin());
2105   it = dirty_or_tx_bh.lower_bound(&key);
2106   p = q = it;
2107
2108   bool backwards = true;
2109   if (it != dirty_or_tx_bh.begin())
2110     --it;
2111   else
2112     backwards = false;
2113
2114   for (; p != dirty_or_tx_bh.end(); p = q) {
2115     ++q;
2116     BufferHead *bh = *p;
2117     if (bh->ob->oset != oset)
2118       break;
2119     waitfor_commit.insert(bh->ob);
2120     if (bh->is_dirty()) {
2121       if (scattered_write) {
2122         if (last_ob != bh->ob) {
2123           if (!blist.empty()) {
2124             bh_write_scattered(blist);
2125             blist.clear();
2126           }
2127           last_ob = bh->ob;
2128         }
2129         blist.push_back(bh);
2130       } else {
2131         bh_write(bh, {});
2132       }
2133     }
2134   }
2135
2136   if (backwards) {
2137     for(p = q = it; true; p = q) {
2138       if (q != dirty_or_tx_bh.begin())
2139         --q;
2140       else
2141         backwards = false;
2142       BufferHead *bh = *p;
2143       if (bh->ob->oset != oset)
2144         break;
2145       waitfor_commit.insert(bh->ob);
2146       if (bh->is_dirty()) {
2147         if (scattered_write) {
2148           if (last_ob != bh->ob) {
2149             if (!blist.empty()) {
2150               bh_write_scattered(blist);
2151               blist.clear();
2152             }
2153             last_ob = bh->ob;
2154           }
2155           blist.push_front(bh);
2156         } else {
2157           bh_write(bh, {});
2158         }
2159       }
2160       if (!backwards)
2161         break;
2162     }
2163   }
2164
2165   if (scattered_write && !blist.empty())
2166     bh_write_scattered(blist);
2167
2168   for (set<Object*>::iterator i = waitfor_commit.begin();
2169        i != waitfor_commit.end(); ++i) {
2170     Object *ob = *i;
2171
2172     // we'll need to gather...
2173     ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2174                    << ob->last_write_tid << " on " << *ob << dendl;
2175     ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2176   }
2177
2178   return _flush_set_finish(&gather, onfinish);
2179 }
2180
2181 // flush.  non-blocking, takes callback.
2182 // returns true if already flushed
2183 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2184                              ZTracer::Trace *trace, Context *onfinish)
2185 {
2186   assert(lock.is_locked());
2187   assert(trace != nullptr);
2188   assert(onfinish != NULL);
2189   if (oset->objects.empty()) {
2190     ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2191     onfinish->complete(0);
2192     return true;
2193   }
2194
2195   ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2196                  << " ObjectExtents" << dendl;
2197
2198   // we'll need to wait for all objects to flush!
2199   C_GatherBuilder gather(cct);
2200
2201   for (vector<ObjectExtent>::iterator p = exv.begin();
2202        p != exv.end();
2203        ++p) {
2204     ObjectExtent &ex = *p;
2205     sobject_t soid(ex.oid, CEPH_NOSNAP);
2206     if (objects[oset->poolid].count(soid) == 0)
2207       continue;
2208     Object *ob = objects[oset->poolid][soid];
2209
2210     ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2211                    << " " << ob << dendl;
2212
2213     if (!flush(ob, ex.offset, ex.length, trace)) {
2214       // we'll need to gather...
2215       ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2216                      << ob->last_write_tid << " on " << *ob << dendl;
2217       ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2218     }
2219   }
2220
2221   return _flush_set_finish(&gather, onfinish);
2222 }
2223
2224 // flush all dirty data.  non-blocking, takes callback.
2225 // returns true if already flushed
2226 bool ObjectCacher::flush_all(Context *onfinish)
2227 {
2228   assert(lock.is_locked());
2229   assert(onfinish != NULL);
2230
2231   ldout(cct, 10) << "flush_all " << dendl;
2232
2233   // we'll need to wait for all objects to flush!
2234   C_GatherBuilder gather(cct);
2235   set<Object*> waitfor_commit;
2236
2237   list<BufferHead*> blist;
2238   Object *last_ob = NULL;
2239   set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2240   next = it = dirty_or_tx_bh.begin();
2241   while (it != dirty_or_tx_bh.end()) {
2242     ++next;
2243     BufferHead *bh = *it;
2244     waitfor_commit.insert(bh->ob);
2245
2246     if (bh->is_dirty()) {
2247       if (scattered_write) {
2248         if (last_ob != bh->ob) {
2249           if (!blist.empty()) {
2250             bh_write_scattered(blist);
2251             blist.clear();
2252           }
2253           last_ob = bh->ob;
2254         }
2255         blist.push_back(bh);
2256       } else {
2257         bh_write(bh, {});
2258       }
2259     }
2260
2261     it = next;
2262   }
2263
2264   if (scattered_write && !blist.empty())
2265     bh_write_scattered(blist);
2266
2267   for (set<Object*>::iterator i = waitfor_commit.begin();
2268        i != waitfor_commit.end();
2269        ++i) {
2270     Object *ob = *i;
2271
2272     // we'll need to gather...
2273     ldout(cct, 10) << "flush_all will wait for ack tid "
2274                    << ob->last_write_tid << " on " << *ob << dendl;
2275     ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2276   }
2277
2278   return _flush_set_finish(&gather, onfinish);
2279 }
2280
2281 void ObjectCacher::purge_set(ObjectSet *oset)
2282 {
2283   assert(lock.is_locked());
2284   if (oset->objects.empty()) {
2285     ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2286     return;
2287   }
2288
2289   ldout(cct, 10) << "purge_set " << oset << dendl;
2290   const bool were_dirty = oset->dirty_or_tx > 0;
2291
2292   for (xlist<Object*>::iterator i = oset->objects.begin();
2293        !i.end(); ++i) {
2294     Object *ob = *i;
2295         purge(ob);
2296   }
2297
2298   // Although we have purged rather than flushed, caller should still
2299   // drop any resources associate with dirty data.
2300   assert(oset->dirty_or_tx == 0);
2301   if (flush_set_callback && were_dirty) {
2302     flush_set_callback(flush_set_callback_arg, oset);
2303   }
2304 }
2305
2306
2307 loff_t ObjectCacher::release(Object *ob)
2308 {
2309   assert(lock.is_locked());
2310   list<BufferHead*> clean;
2311   loff_t o_unclean = 0;
2312
2313   for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2314        p != ob->data.end();
2315        ++p) {
2316     BufferHead *bh = p->second;
2317     if (bh->is_clean() || bh->is_zero() || bh->is_error())
2318       clean.push_back(bh);
2319     else
2320       o_unclean += bh->length();
2321   }
2322
2323   for (list<BufferHead*>::iterator p = clean.begin();
2324        p != clean.end();
2325        ++p) {
2326     bh_remove(ob, *p);
2327     delete *p;
2328   }
2329
2330   if (ob->can_close()) {
2331     ldout(cct, 10) << "release trimming " << *ob << dendl;
2332     close_object(ob);
2333     assert(o_unclean == 0);
2334     return 0;
2335   }
2336
2337   if (ob->complete) {
2338     ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2339     ob->complete = false;
2340   }
2341   if (!ob->exists) {
2342     ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2343     ob->exists = true;
2344   }
2345
2346   return o_unclean;
2347 }
2348
2349 loff_t ObjectCacher::release_set(ObjectSet *oset)
2350 {
2351   assert(lock.is_locked());
2352   // return # bytes not clean (and thus not released).
2353   loff_t unclean = 0;
2354
2355   if (oset->objects.empty()) {
2356     ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2357     return 0;
2358   }
2359
2360   ldout(cct, 10) << "release_set " << oset << dendl;
2361
2362   xlist<Object*>::iterator q;
2363   for (xlist<Object*>::iterator p = oset->objects.begin();
2364        !p.end(); ) {
2365     q = p;
2366     ++q;
2367     Object *ob = *p;
2368
2369     loff_t o_unclean = release(ob);
2370     unclean += o_unclean;
2371
2372     if (o_unclean)
2373       ldout(cct, 10) << "release_set " << oset << " " << *ob
2374                      << " has " << o_unclean << " bytes left"
2375                      << dendl;
2376     p = q;
2377   }
2378
2379   if (unclean) {
2380     ldout(cct, 10) << "release_set " << oset
2381                    << ", " << unclean << " bytes left" << dendl;
2382   }
2383
2384   return unclean;
2385 }
2386
2387
2388 uint64_t ObjectCacher::release_all()
2389 {
2390   assert(lock.is_locked());
2391   ldout(cct, 10) << "release_all" << dendl;
2392   uint64_t unclean = 0;
2393
2394   vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2395     = objects.begin();
2396   while (i != objects.end()) {
2397     ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2398     while (p != i->end()) {
2399       ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2400       ++n;
2401
2402       Object *ob = p->second;
2403
2404       loff_t o_unclean = release(ob);
2405       unclean += o_unclean;
2406
2407       if (o_unclean)
2408         ldout(cct, 10) << "release_all " << *ob
2409                        << " has " << o_unclean << " bytes left"
2410                        << dendl;
2411     p = n;
2412     }
2413     ++i;
2414   }
2415
2416   if (unclean) {
2417     ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2418                    << dendl;
2419   }
2420
2421   return unclean;
2422 }
2423
2424 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2425 {
2426   assert(lock.is_locked());
2427   ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2428
2429   for (xlist<Object*>::iterator p = oset->objects.begin();
2430        !p.end(); ++p) {
2431     Object *ob = *p;
2432     if (!ob->exists) {
2433       ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2434       ob->exists = true;
2435       ob->complete = false;
2436     }
2437     for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2438          !q.end(); ++q) {
2439       C_ReadFinish *comp = *q;
2440       comp->distrust_enoent();
2441     }
2442   }
2443 }
2444
2445 /**
2446  * discard object extents from an ObjectSet by removing the objects in
2447  * exls from the in-memory oset.
2448  */
2449 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2450 {
2451   assert(lock.is_locked());
2452   if (oset->objects.empty()) {
2453     ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl;
2454     return;
2455   }
2456
2457   ldout(cct, 10) << "discard_set " << oset << dendl;
2458
2459   bool were_dirty = oset->dirty_or_tx > 0;
2460
2461   for (vector<ObjectExtent>::const_iterator p = exls.begin();
2462        p != exls.end();
2463        ++p) {
2464     ldout(cct, 10) << "discard_set " << oset << " ex " << *p << dendl;
2465     const ObjectExtent &ex = *p;
2466     sobject_t soid(ex.oid, CEPH_NOSNAP);
2467     if (objects[oset->poolid].count(soid) == 0)
2468       continue;
2469     Object *ob = objects[oset->poolid][soid];
2470
2471     ob->discard(ex.offset, ex.length);
2472   }
2473
2474   // did we truncate off dirty data?
2475   if (flush_set_callback &&
2476       were_dirty && oset->dirty_or_tx == 0)
2477     flush_set_callback(flush_set_callback_arg, oset);
2478 }
2479
2480 void ObjectCacher::verify_stats() const
2481 {
2482   assert(lock.is_locked());
2483   ldout(cct, 10) << "verify_stats" << dendl;
2484
2485   loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2486     error = 0;
2487   for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2488          = objects.begin();
2489        i != objects.end();
2490        ++i) {
2491     for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2492            = i->begin();
2493          p != i->end();
2494          ++p) {
2495       Object *ob = p->second;
2496       for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2497            q != ob->data.end();
2498           ++q) {
2499         BufferHead *bh = q->second;
2500         switch (bh->get_state()) {
2501         case BufferHead::STATE_MISSING:
2502           missing += bh->length();
2503           break;
2504         case BufferHead::STATE_CLEAN:
2505           clean += bh->length();
2506           break;
2507         case BufferHead::STATE_ZERO:
2508           zero += bh->length();
2509           break;
2510         case BufferHead::STATE_DIRTY:
2511           dirty += bh->length();
2512           break;
2513         case BufferHead::STATE_TX:
2514           tx += bh->length();
2515           break;
2516         case BufferHead::STATE_RX:
2517           rx += bh->length();
2518           break;
2519         case BufferHead::STATE_ERROR:
2520           error += bh->length();
2521           break;
2522         default:
2523           ceph_abort();
2524         }
2525       }
2526     }
2527   }
2528
2529   ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2530                  << " dirty " << dirty << " missing " << missing
2531                  << " error " << error << dendl;
2532   assert(clean == stat_clean);
2533   assert(rx == stat_rx);
2534   assert(tx == stat_tx);
2535   assert(dirty == stat_dirty);
2536   assert(missing == stat_missing);
2537   assert(zero == stat_zero);
2538   assert(error == stat_error);
2539 }
2540
2541 void ObjectCacher::bh_stat_add(BufferHead *bh)
2542 {
2543   assert(lock.is_locked());
2544   switch (bh->get_state()) {
2545   case BufferHead::STATE_MISSING:
2546     stat_missing += bh->length();
2547     break;
2548   case BufferHead::STATE_CLEAN:
2549     stat_clean += bh->length();
2550     break;
2551   case BufferHead::STATE_ZERO:
2552     stat_zero += bh->length();
2553     break;
2554   case BufferHead::STATE_DIRTY:
2555     stat_dirty += bh->length();
2556     bh->ob->dirty_or_tx += bh->length();
2557     bh->ob->oset->dirty_or_tx += bh->length();
2558     break;
2559   case BufferHead::STATE_TX:
2560     stat_tx += bh->length();
2561     bh->ob->dirty_or_tx += bh->length();
2562     bh->ob->oset->dirty_or_tx += bh->length();
2563     break;
2564   case BufferHead::STATE_RX:
2565     stat_rx += bh->length();
2566     break;
2567   case BufferHead::STATE_ERROR:
2568     stat_error += bh->length();
2569     break;
2570   default:
2571     assert(0 == "bh_stat_add: invalid bufferhead state");
2572   }
2573   if (get_stat_dirty_waiting() > 0)
2574     stat_cond.Signal();
2575 }
2576
2577 void ObjectCacher::bh_stat_sub(BufferHead *bh)
2578 {
2579   assert(lock.is_locked());
2580   switch (bh->get_state()) {
2581   case BufferHead::STATE_MISSING:
2582     stat_missing -= bh->length();
2583     break;
2584   case BufferHead::STATE_CLEAN:
2585     stat_clean -= bh->length();
2586     break;
2587   case BufferHead::STATE_ZERO:
2588     stat_zero -= bh->length();
2589     break;
2590   case BufferHead::STATE_DIRTY:
2591     stat_dirty -= bh->length();
2592     bh->ob->dirty_or_tx -= bh->length();
2593     bh->ob->oset->dirty_or_tx -= bh->length();
2594     break;
2595   case BufferHead::STATE_TX:
2596     stat_tx -= bh->length();
2597     bh->ob->dirty_or_tx -= bh->length();
2598     bh->ob->oset->dirty_or_tx -= bh->length();
2599     break;
2600   case BufferHead::STATE_RX:
2601     stat_rx -= bh->length();
2602     break;
2603   case BufferHead::STATE_ERROR:
2604     stat_error -= bh->length();
2605     break;
2606   default:
2607     assert(0 == "bh_stat_sub: invalid bufferhead state");
2608   }
2609 }
2610
2611 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2612 {
2613   assert(lock.is_locked());
2614   int state = bh->get_state();
2615   // move between lru lists?
2616   if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2617     bh_lru_rest.lru_remove(bh);
2618     bh_lru_dirty.lru_insert_top(bh);
2619   } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2620     bh_lru_dirty.lru_remove(bh);
2621     if (bh->get_dontneed())
2622       bh_lru_rest.lru_insert_bot(bh);
2623     else
2624       bh_lru_rest.lru_insert_top(bh);
2625   }
2626
2627   if ((s == BufferHead::STATE_TX ||
2628        s == BufferHead::STATE_DIRTY) &&
2629       state != BufferHead::STATE_TX &&
2630       state != BufferHead::STATE_DIRTY) {
2631     dirty_or_tx_bh.insert(bh);
2632   } else if ((state == BufferHead::STATE_TX ||
2633               state == BufferHead::STATE_DIRTY) &&
2634              s != BufferHead::STATE_TX &&
2635              s != BufferHead::STATE_DIRTY) {
2636     dirty_or_tx_bh.erase(bh);
2637   }
2638
2639   if (s != BufferHead::STATE_ERROR &&
2640       state == BufferHead::STATE_ERROR) {
2641     bh->error = 0;
2642   }
2643
2644   // set state
2645   bh_stat_sub(bh);
2646   bh->set_state(s);
2647   bh_stat_add(bh);
2648 }
2649
2650 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2651 {
2652   assert(lock.is_locked());
2653   ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2654   ob->add_bh(bh);
2655   if (bh->is_dirty()) {
2656     bh_lru_dirty.lru_insert_top(bh);
2657     dirty_or_tx_bh.insert(bh);
2658   } else {
2659     if (bh->get_dontneed())
2660       bh_lru_rest.lru_insert_bot(bh);
2661     else
2662       bh_lru_rest.lru_insert_top(bh);
2663   }
2664
2665   if (bh->is_tx()) {
2666     dirty_or_tx_bh.insert(bh);
2667   }
2668   bh_stat_add(bh);
2669 }
2670
2671 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2672 {
2673   assert(lock.is_locked());
2674   assert(bh->get_journal_tid() == 0);
2675   ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2676   ob->remove_bh(bh);
2677   if (bh->is_dirty()) {
2678     bh_lru_dirty.lru_remove(bh);
2679     dirty_or_tx_bh.erase(bh);
2680   } else {
2681     bh_lru_rest.lru_remove(bh);
2682   }
2683
2684   if (bh->is_tx()) {
2685     dirty_or_tx_bh.erase(bh);
2686   }
2687   bh_stat_sub(bh);
2688   if (get_stat_dirty_waiting() > 0)
2689     stat_cond.Signal();
2690 }
2691