1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "msg/Messenger.h"
7 #include "ObjectCacher.h"
8 #include "WritebackHandler.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
12 #include "include/assert.h"
14 #define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
15 #define BUFFER_MEMORY_WEIGHT 12 // memory usage of BufferHead, count in (1<<n)
17 using std::chrono::seconds;
18 /// while holding the lock
20 /*** ObjectCacher::BufferHead ***/
23 /*** ObjectCacher::Object ***/
25 #define dout_subsys ceph_subsys_objectcacher
27 #define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
31 class ObjectCacher::C_ReadFinish : public Context {
37 xlist<C_ReadFinish*>::item set_item;
44 C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
45 uint64_t l, const ZTracer::Trace &trace) :
46 oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47 set_item(this), trust_enoent(true),
48 tid(t), trace(trace) {
49 ob->reads.push_back(&set_item);
52 void finish(int r) override {
53 oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
54 trace.event("finish");
56 // object destructor clears the list
57 if (set_item.is_on_list())
58 set_item.remove_myself();
61 void distrust_enoent() {
66 class ObjectCacher::C_RetryRead : public Context {
73 C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74 const ZTracer::Trace &trace)
75 : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
77 void finish(int r) override {
79 r = oc->_readx(rd, oset, onfinish, false, &trace);
83 // read is still in-progress
87 trace.event("finish");
89 onfinish->complete(r);
94 ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
97 assert(oc->lock.is_locked());
98 ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
101 ObjectCacher::BufferHead *right = new BufferHead(this);
103 //inherit and if later access, this auto clean.
104 right->set_dontneed(left->get_dontneed());
105 right->set_nocache(left->get_nocache());
107 right->last_write_tid = left->last_write_tid;
108 right->last_read_tid = left->last_read_tid;
109 right->set_state(left->get_state());
110 right->snapc = left->snapc;
111 right->set_journal_tid(left->journal_tid);
113 loff_t newleftlen = off - left->start();
114 right->set_start(off);
115 right->set_length(left->length() - newleftlen);
118 oc->bh_stat_sub(left);
119 left->set_length(newleftlen);
120 oc->bh_stat_add(left);
123 oc->bh_add(this, right);
129 assert(bl.length() == (left->length() + right->length()));
130 right->bl.substr_of(bl, left->length(), right->length());
131 left->bl.substr_of(bl, 0, left->length());
135 if (!left->waitfor_read.empty()) {
136 map<loff_t, list<Context*> >::iterator start_remove
137 = left->waitfor_read.begin();
138 while (start_remove != left->waitfor_read.end() &&
139 start_remove->first < right->start())
141 for (map<loff_t, list<Context*> >::iterator p = start_remove;
142 p != left->waitfor_read.end(); ++p) {
143 ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
144 << " to right bh" << dendl;
145 right->waitfor_read[p->first].swap( p->second );
146 assert(p->second.empty());
148 left->waitfor_read.erase(start_remove, left->waitfor_read.end());
151 ldout(oc->cct, 20) << "split left is " << *left << dendl;
152 ldout(oc->cct, 20) << "split right is " << *right << dendl;
157 void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
159 assert(oc->lock.is_locked());
160 assert(left->end() == right->start());
161 assert(left->get_state() == right->get_state());
162 assert(left->can_merge_journal(right));
164 ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
165 if (left->get_journal_tid() == 0) {
166 left->set_journal_tid(right->get_journal_tid());
168 right->set_journal_tid(0);
170 oc->bh_remove(this, right);
171 oc->bh_stat_sub(left);
172 left->set_length(left->length() + right->length());
173 oc->bh_stat_add(left);
176 left->bl.claim_append(right->bl);
179 // note: this is sorta busted, but should only be used for dirty buffers
180 left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
181 left->last_write = MAX( left->last_write, right->last_write );
183 left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
184 left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
187 for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
188 p != right->waitfor_read.end();
190 left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
196 ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
199 void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
201 assert(oc->lock.is_locked());
202 ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
204 // do not merge rx buffers; last_read_tid may not match
209 map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
210 assert(p->second == bh);
211 if (p != data.begin()) {
213 if (p->second->end() == bh->start() &&
214 p->second->get_state() == bh->get_state() &&
215 p->second->can_merge_journal(bh)) {
216 merge_left(p->second, bh);
223 assert(p->second == bh);
225 if (p != data.end() &&
226 p->second->start() == bh->end() &&
227 p->second->get_state() == bh->get_state() &&
228 p->second->can_merge_journal(bh))
229 merge_left(bh, p->second);
233 * count bytes we have cached in given range
235 bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
237 assert(oc->lock.is_locked());
238 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
243 if (p->first <= cur) {
245 loff_t lenfromcur = MIN(p->second->end() - cur, left);
250 } else if (p->first > cur) {
261 * all cached data in this range[off, off+len]
263 bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
265 assert(oc->lock.is_locked());
268 map<loff_t, BufferHead*>::iterator first = data.begin();
269 map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
270 if (first->second->start() >= off && last->second->end() <= (off + len))
277 * map a range of bytes into buffer_heads.
278 * - create missing buffer_heads as necessary.
280 int ObjectCacher::Object::map_read(ObjectExtent &ex,
281 map<loff_t, BufferHead*>& hits,
282 map<loff_t, BufferHead*>& missing,
283 map<loff_t, BufferHead*>& rx,
284 map<loff_t, BufferHead*>& errors)
286 assert(oc->lock.is_locked());
287 ldout(oc->cct, 10) << "map_read " << ex.oid << " "
288 << ex.offset << "~" << ex.length << dendl;
290 loff_t cur = ex.offset;
291 loff_t left = ex.length;
293 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
296 if (p == data.end()) {
298 BufferHead *n = new BufferHead(this);
305 ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
308 ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
311 assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
315 if (p->first <= cur) {
316 // have it (or part of it)
317 BufferHead *e = p->second;
323 hits[cur] = e; // readable!
324 ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
325 } else if (e->is_rx()) {
326 rx[cur] = e; // missing, not readable.
327 ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
328 } else if (e->is_error()) {
330 ldout(oc->cct, 20) << "map_read error " << *e << dendl;
335 loff_t lenfromcur = MIN(e->end() - cur, left);
341 } else if (p->first > cur) {
343 loff_t next = p->first;
344 BufferHead *n = new BufferHead(this);
345 loff_t len = MIN(next - cur, left);
352 ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
355 ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
357 cur += MIN(left, n->length());
358 left -= MIN(left, n->length());
367 void ObjectCacher::Object::audit_buffers()
370 for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
371 it != data.end(); ++it) {
372 if (it->first != it->second->start()) {
373 lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
374 << " does not match bh start position: "
375 << *it->second << dendl;
376 assert(it->first == it->second->start());
378 if (it->first < offset) {
379 lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
380 << " overlaps with previous bh " << *((--it)->second)
382 assert(it->first >= offset);
384 BufferHead *bh = it->second;
385 map<loff_t, list<Context*> >::const_iterator w_it;
386 for (w_it = bh->waitfor_read.begin();
387 w_it != bh->waitfor_read.end(); ++w_it) {
388 if (w_it->first < bh->start() ||
389 w_it->first >= bh->start() + bh->length()) {
390 lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
391 << " is not within bh " << *bh << dendl;
392 assert(w_it->first >= bh->start());
393 assert(w_it->first < bh->start() + bh->length());
396 offset = it->first + it->second->length();
401 * map a range of extents on an object's buffer cache.
402 * - combine any bh's we're writing into one
403 * - break up bufferheads that don't fall completely within the range
404 * //no! - return a bh that includes the write. may also include
405 * other dirty data to left and/or right.
407 ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
410 assert(oc->lock.is_locked());
411 BufferHead *final = 0;
413 ldout(oc->cct, 10) << "map_write oex " << ex.oid
414 << " " << ex.offset << "~" << ex.length << dendl;
416 loff_t cur = ex.offset;
417 loff_t left = ex.length;
419 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
424 if (p == data.end()) {
426 final = new BufferHead(this);
427 replace_journal_tid(final, tid);
428 final->set_start( cur );
429 final->set_length( max );
430 oc->bh_add(this, final);
431 ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
433 oc->bh_stat_sub(final);
434 final->set_length(final->length() + max);
435 oc->bh_stat_add(final);
442 ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
443 //oc->verify_stats();
445 if (p->first <= cur) {
446 BufferHead *bh = p->second;
447 ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
449 if (p->first < cur) {
451 if (cur + max >= bh->end()) {
452 // we want right bit (one splice)
453 final = split(bh, cur); // just split it, take right half.
454 replace_journal_tid(final, tid);
456 assert(p->second == final);
458 // we want middle bit (two splices)
459 final = split(bh, cur);
461 assert(p->second == final);
462 split(final, cur+max);
463 replace_journal_tid(final, tid);
466 assert(p->first == cur);
467 if (bh->length() <= max) {
468 // whole bufferhead, piece of cake.
470 // we want left bit (one splice)
471 split(bh, cur + max); // just split
475 oc->mark_dirty(final);
476 --p; // move iterator back to final
477 assert(p->second == final);
478 replace_journal_tid(bh, tid);
479 merge_left(final, bh);
482 replace_journal_tid(final, tid);
487 loff_t lenfromcur = final->end() - cur;
494 loff_t next = p->first;
495 loff_t glen = MIN(next - cur, max);
496 ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
498 oc->bh_stat_sub(final);
499 final->set_length(final->length() + glen);
500 oc->bh_stat_add(final);
502 final = new BufferHead(this);
503 replace_journal_tid(final, tid);
504 final->set_start( cur );
505 final->set_length( glen );
506 oc->bh_add(this, final);
517 assert(final->get_journal_tid() == tid);
518 ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
523 void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
525 ceph_tid_t bh_tid = bh->get_journal_tid();
527 assert(tid == 0 || bh_tid <= tid);
528 if (bh_tid != 0 && bh_tid != tid) {
529 // inform journal that it should not expect a writeback from this extent
530 oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
531 bh->length(), bh_tid, tid);
533 bh->set_journal_tid(tid);
536 void ObjectCacher::Object::truncate(loff_t s)
538 assert(oc->lock.is_locked());
539 ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
541 while (!data.empty()) {
542 BufferHead *bh = data.rbegin()->second;
546 // split bh at truncation point?
547 if (bh->start() < s) {
552 // remove bh entirely
553 assert(bh->start() >= s);
554 assert(bh->waitfor_read.empty());
555 replace_journal_tid(bh, 0);
556 oc->bh_remove(this, bh);
561 void ObjectCacher::Object::discard(loff_t off, loff_t len)
563 assert(oc->lock.is_locked());
564 ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
568 ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
572 ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
576 map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
577 while (p != data.end()) {
578 BufferHead *bh = p->second;
579 if (bh->start() >= off + len)
582 // split bh at truncation point?
583 if (bh->start() < off) {
589 assert(bh->start() >= off);
590 if (bh->end() > off + len) {
591 split(bh, off + len);
595 ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
596 assert(bh->waitfor_read.empty());
597 replace_journal_tid(bh, 0);
598 oc->bh_remove(this, bh);
605 /*** ObjectCacher ***/
608 #define dout_prefix *_dout << "objectcacher "
611 ObjectCacher::ObjectCacher(CephContext *cct_, string name,
612 WritebackHandler& wb, Mutex& l,
613 flush_set_callback_t flush_callback,
614 void *flush_callback_arg, uint64_t max_bytes,
615 uint64_t max_objects, uint64_t max_dirty,
616 uint64_t target_dirty, double max_dirty_age,
617 bool block_writes_upfront)
619 cct(cct_), writeback_handler(wb), name(name), lock(l),
620 max_dirty(max_dirty), target_dirty(target_dirty),
621 max_size(max_bytes), max_objects(max_objects),
622 max_dirty_age(ceph::make_timespan(max_dirty_age)),
623 block_writes_upfront(block_writes_upfront),
624 trace_endpoint("ObjectCacher"),
625 flush_set_callback(flush_callback),
626 flush_set_callback_arg(flush_callback_arg),
627 last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
628 stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
629 stat_missing(0), stat_error(0), stat_dirty_waiting(0),
630 stat_nr_dirty_waiters(0), reads_outstanding(0)
634 scattered_write = writeback_handler.can_scattered_write();
637 ObjectCacher::~ObjectCacher()
641 // we should be empty.
642 for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
647 assert(bh_lru_rest.lru_get_size() == 0);
648 assert(bh_lru_dirty.lru_get_size() == 0);
649 assert(ob_lru.lru_get_size() == 0);
650 assert(dirty_or_tx_bh.empty());
653 void ObjectCacher::perf_start()
655 string n = "objectcacher-" + name;
656 PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
658 plb.add_u64_counter(l_objectcacher_cache_ops_hit,
659 "cache_ops_hit", "Hit operations");
660 plb.add_u64_counter(l_objectcacher_cache_ops_miss,
661 "cache_ops_miss", "Miss operations");
662 plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
663 "cache_bytes_hit", "Hit data");
664 plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
665 "cache_bytes_miss", "Miss data");
666 plb.add_u64_counter(l_objectcacher_data_read,
667 "data_read", "Read data");
668 plb.add_u64_counter(l_objectcacher_data_written,
669 "data_written", "Data written to cache");
670 plb.add_u64_counter(l_objectcacher_data_flushed,
671 "data_flushed", "Data flushed");
672 plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
673 "data_overwritten_while_flushing",
674 "Data overwritten while flushing");
675 plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
676 "Write operations, delayed due to dirty limits");
677 plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
678 "write_bytes_blocked",
679 "Write data blocked on dirty limit");
680 plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
681 "Time spent blocking a write due to dirty limits");
683 perfcounter = plb.create_perf_counters();
684 cct->get_perfcounters_collection()->add(perfcounter);
687 void ObjectCacher::perf_stop()
690 cct->get_perfcounters_collection()->remove(perfcounter);
695 ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
699 uint64_t truncate_size,
700 uint64_t truncate_seq)
702 // XXX: Add handling of nspace in object_locator_t in cache
703 assert(lock.is_locked());
705 if ((uint32_t)l.pool < objects.size()) {
706 if (objects[l.pool].count(oid)) {
707 Object *o = objects[l.pool][oid];
708 o->object_no = object_no;
709 o->truncate_size = truncate_size;
710 o->truncate_seq = truncate_seq;
714 objects.resize(l.pool+1);
718 Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
720 objects[l.pool][oid] = o;
721 ob_lru.lru_insert_top(o);
725 void ObjectCacher::close_object(Object *ob)
727 assert(lock.is_locked());
728 ldout(cct, 10) << "close_object " << *ob << dendl;
729 assert(ob->can_close());
732 ob_lru.lru_remove(ob);
733 objects[ob->oloc.pool].erase(ob->get_soid());
734 ob->set_item.remove_myself();
738 void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
739 const ZTracer::Trace &parent_trace)
741 assert(lock.is_locked());
742 ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
743 << reads_outstanding << dendl;
745 ZTracer::Trace trace;
746 if (parent_trace.valid()) {
747 trace.init("", &trace_endpoint, &parent_trace);
748 trace.copy_name("bh_read " + bh->ob->get_oid().name);
749 trace.event("start");
753 bh->last_read_tid = ++last_read_tid;
756 C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
757 bh->start(), bh->length(), trace);
759 writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
760 bh->ob->get_oloc(), bh->start(), bh->length(),
761 bh->ob->get_snap(), &onfinish->bl,
762 bh->ob->truncate_size, bh->ob->truncate_seq,
763 op_flags, trace, onfinish);
768 void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
769 ceph_tid_t tid, loff_t start,
770 uint64_t length, bufferlist &bl, int r,
773 assert(lock.is_locked());
774 ldout(cct, 7) << "bh_read_finish "
777 << " " << start << "~" << length
778 << " (bl is " << bl.length() << ")"
780 << " outstanding reads " << reads_outstanding
783 if (r >= 0 && bl.length() < length) {
784 ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
785 << length << " with " << length - bl.length() << " bytes of zeroes"
787 bl.append_zero(length - bl.length());
793 if (objects[poolid].count(oid) == 0) {
794 ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
796 Object *ob = objects[poolid][oid];
798 if (r == -ENOENT && !ob->complete) {
799 // wake up *all* rx waiters, or else we risk reordering
800 // identical reads. e.g.
802 // reply to unrelated 3~1 -> !exists
803 // read 1~1 -> immediate ENOENT
804 // reply to first 1~1 -> ooo ENOENT
806 for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
807 p != ob->data.end(); ++p) {
808 BufferHead *bh = p->second;
809 for (map<loff_t, list<Context*> >::iterator p
810 = bh->waitfor_read.begin();
811 p != bh->waitfor_read.end();
813 ls.splice(ls.end(), p->second);
814 bh->waitfor_read.clear();
815 if (!bh->is_zero() && !bh->is_rx())
819 // just pass through and retry all waiters if we don't trust
820 // -ENOENT for this read
823 << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
828 /* If all the bhs are effectively zero, get rid of them. All
829 * the waiters will be retried and get -ENOENT immediately, so
830 * it's safe to clean up the unneeded bh's now. Since we know
831 * it's safe to remove them now, do so, so they aren't hanging
832 *around waiting for more -ENOENTs from rados while the cache
833 * is being shut down.
835 * Only do this when all the bhs are rx or clean, to match the
836 * condition in _readx(). If there are any non-rx or non-clean
837 * bhs, _readx() will wait for the final result instead of
838 * returning -ENOENT immediately.
842 << "bh_read_finish ENOENT and allzero, getting rid of "
843 << "bhs for " << *ob << dendl;
844 map<loff_t, BufferHead*>::iterator p = ob->data.begin();
845 while (p != ob->data.end()) {
846 BufferHead *bh = p->second;
847 // current iterator will be invalidated by bh_remove()
859 map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
860 if (p == ob->data.end())
862 if (opos >= start+(loff_t)length) {
863 ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
864 << start << "+" << length << "=" << start+(loff_t)length
869 BufferHead *bh = p->second;
870 ldout(cct, 20) << "checking bh " << *bh << dendl;
873 for (map<loff_t, list<Context*> >::iterator it
874 = bh->waitfor_read.begin();
875 it != bh->waitfor_read.end();
877 ls.splice(ls.end(), it->second);
878 bh->waitfor_read.clear();
880 if (bh->start() > opos) {
881 ldout(cct, 1) << "bh_read_finish skipping gap "
882 << opos << "~" << bh->start() - opos
889 ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
894 if (bh->last_read_tid != tid) {
895 ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
896 << bh->last_read_tid << " != tid " << tid
897 << ", skipping" << dendl;
902 assert(opos >= bh->start());
903 assert(bh->start() == opos); // we don't merge rx bh's... yet!
904 assert(bh->length() <= start+(loff_t)length-opos);
913 ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
917 ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
933 ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
935 ob->try_merge_bh(bh);
939 // called with lock held.
940 ldout(cct, 20) << "finishing waiters " << ls << dendl;
942 finish_contexts(cct, ls, err);
943 retry_waiting_reads();
949 void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
950 int64_t *max_amount, int *max_count)
952 list<BufferHead*> blist;
955 int64_t total_len = 0;
956 set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
957 assert(it != dirty_or_tx_bh.end());
958 for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
959 p != dirty_or_tx_bh.end();
961 BufferHead *obh = *p;
962 if (obh->ob != bh->ob)
964 if (obh->is_dirty() && obh->last_write <= cutoff) {
965 blist.push_back(obh);
967 total_len += obh->length();
968 if ((max_count && count > *max_count) ||
969 (max_amount && total_len > *max_amount))
974 while (it != dirty_or_tx_bh.begin()) {
976 BufferHead *obh = *it;
977 if (obh->ob != bh->ob)
979 if (obh->is_dirty() && obh->last_write <= cutoff) {
980 blist.push_front(obh);
982 total_len += obh->length();
983 if ((max_count && count > *max_count) ||
984 (max_amount && total_len > *max_amount))
991 *max_amount -= total_len;
993 bh_write_scattered(blist);
996 class ObjectCacher::C_WriteCommit : public Context {
1000 vector<pair<loff_t, uint64_t> > ranges;
1001 ZTracer::Trace trace;
1004 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
1005 uint64_t l, const ZTracer::Trace &trace) :
1006 oc(c), poolid(_poolid), oid(o), trace(trace) {
1007 ranges.push_back(make_pair(s, l));
1009 C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1010 vector<pair<loff_t, uint64_t> >& _ranges) :
1011 oc(c), poolid(_poolid), oid(o), tid(0) {
1012 ranges.swap(_ranges);
1014 void finish(int r) override {
1015 oc->bh_write_commit(poolid, oid, ranges, tid, r);
1016 trace.event("finish");
1019 void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1021 assert(lock.is_locked());
1023 Object *ob = blist.front()->ob;
1026 ceph::real_time last_write;
1028 vector<pair<loff_t, uint64_t> > ranges;
1029 vector<pair<uint64_t, bufferlist> > io_vec;
1031 ranges.reserve(blist.size());
1032 io_vec.reserve(blist.size());
1034 uint64_t total_len = 0;
1035 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1036 BufferHead *bh = *p;
1037 ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1038 assert(bh->ob == ob);
1039 assert(bh->bl.length() == bh->length());
1040 ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1042 int n = io_vec.size();
1043 io_vec.resize(n + 1);
1044 io_vec[n].first = bh->start();
1045 io_vec[n].second = bh->bl;
1047 total_len += bh->length();
1048 if (bh->snapc.seq > snapc.seq)
1050 if (bh->last_write > last_write)
1051 last_write = bh->last_write;
1054 C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1056 ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1057 io_vec, snapc, last_write,
1058 ob->truncate_size, ob->truncate_seq,
1060 oncommit->tid = tid;
1061 ob->last_write_tid = tid;
1062 for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1063 BufferHead *bh = *p;
1064 bh->last_write_tid = tid;
1069 perfcounter->inc(l_objectcacher_data_flushed, total_len);
1072 void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
1074 assert(lock.is_locked());
1075 ldout(cct, 7) << "bh_write " << *bh << dendl;
1079 ZTracer::Trace trace;
1080 if (parent_trace.valid()) {
1081 trace.init("", &trace_endpoint, &parent_trace);
1082 trace.copy_name("bh_write " + bh->ob->get_oid().name);
1083 trace.event("start");
1087 C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1088 bh->ob->get_soid(), bh->start(),
1089 bh->length(), trace);
1091 ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1093 bh->start(), bh->length(),
1094 bh->snapc, bh->bl, bh->last_write,
1095 bh->ob->truncate_size,
1096 bh->ob->truncate_seq,
1097 bh->journal_tid, trace, oncommit);
1098 ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1100 // set bh last_write_tid
1101 oncommit->tid = tid;
1102 bh->ob->last_write_tid = tid;
1103 bh->last_write_tid = tid;
1106 perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1112 void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1113 vector<pair<loff_t, uint64_t> >& ranges,
1114 ceph_tid_t tid, int r)
1116 assert(lock.is_locked());
1117 ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1118 << " ranges " << ranges << " returned " << r << dendl;
1120 if (objects[poolid].count(oid) == 0) {
1121 ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1125 Object *ob = objects[poolid][oid];
1126 int was_dirty_or_tx = ob->oset->dirty_or_tx;
1128 for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1131 loff_t start = p->first;
1132 uint64_t length = p->second;
1134 ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1137 if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1139 ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1140 "complete on " << *ob << dendl;
1141 ob->complete = false;
1145 vector<pair<loff_t, BufferHead*>> hit;
1147 for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1148 p != ob->data.end();
1150 BufferHead *bh = p->second;
1152 if (bh->start() > start+(loff_t)length)
1155 if (bh->start() < start &&
1156 bh->end() > start+(loff_t)length) {
1157 ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
1161 // make sure bh is tx
1163 ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1167 // make sure bh tid matches
1168 if (bh->last_write_tid != tid) {
1169 assert(bh->last_write_tid > tid);
1170 ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1175 // ok! mark bh clean and error-free
1177 bh->set_journal_tid(0);
1178 if (bh->get_nocache())
1179 bh_lru_rest.lru_bottouch(bh);
1180 hit.push_back(make_pair(bh->start(), bh));
1181 ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1184 ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1185 << *bh << " r = " << r << " " << cpp_strerror(-r)
1190 for (auto& p : hit) {
1191 //p.second maybe merged and deleted in merge_left
1192 if (ob->data.count(p.first))
1193 ob->try_merge_bh(p.second);
1197 // update last_commit.
1198 assert(ob->last_commit_tid < tid);
1199 ob->last_commit_tid = tid;
1203 if (ob->waitfor_commit.count(tid)) {
1204 ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1205 ob->waitfor_commit.erase(tid);
1208 // is the entire object set now clean and fully committed?
1209 ObjectSet *oset = ob->oset;
1212 if (flush_set_callback &&
1213 was_dirty_or_tx > 0 &&
1214 oset->dirty_or_tx == 0) { // nothing dirty/tx
1215 flush_set_callback(flush_set_callback_arg, oset);
1219 finish_contexts(cct, ls, r);
1222 void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
1224 assert(trace != nullptr);
1225 assert(lock.is_locked());
1226 ceph::real_time cutoff = ceph::real_clock::now();
1228 ldout(cct, 10) << "flush " << amount << dendl;
1231 * NOTE: we aren't actually pulling things off the LRU here, just
1232 * looking at the tail item. Then we call bh_write, which moves it
1233 * to the other LRU, so that we can call
1234 * lru_dirty.lru_get_next_expire() again.
1236 int64_t left = amount;
1237 while (amount == 0 || left > 0) {
1238 BufferHead *bh = static_cast<BufferHead*>(
1239 bh_lru_dirty.lru_get_next_expire());
1241 if (bh->last_write > cutoff) break;
1243 if (scattered_write) {
1244 bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1246 left -= bh->length();
1247 bh_write(bh, *trace);
1253 void ObjectCacher::trim()
1255 assert(lock.is_locked());
1256 ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
1257 << get_stat_clean() << ", objects: max " << max_objects
1258 << " current " << ob_lru.lru_get_size() << dendl;
1260 uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1261 uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1262 while (get_stat_clean() > 0 &&
1263 ((uint64_t)get_stat_clean() > max_size ||
1264 nr_clean_bh > max_clean_bh)) {
1265 BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1269 ldout(cct, 10) << "trim trimming " << *bh << dendl;
1270 assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1272 Object *ob = bh->ob;
1279 ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1280 ob->complete = false;
1284 while (ob_lru.lru_get_size() > max_objects) {
1285 Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1289 ldout(cct, 10) << "trim trimming " << *ob << dendl;
1293 ldout(cct, 10) << "trim finish: max " << max_size << " clean "
1294 << get_stat_clean() << ", objects: max " << max_objects
1295 << " current " << ob_lru.lru_get_size() << dendl;
1302 bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1305 assert(lock.is_locked());
1306 for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1307 ex_it != extents.end();
1309 ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1312 sobject_t soid(ex_it->oid, snapid);
1313 Object *o = get_object_maybe(soid, ex_it->oloc);
1316 if (!o->is_cached(ex_it->offset, ex_it->length))
1324 * returns # bytes read (if in cache). onfinish is untouched (caller
1326 * returns 0 if doing async read
1328 int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1329 ZTracer::Trace *parent_trace)
1331 ZTracer::Trace trace;
1332 if (parent_trace != nullptr) {
1333 trace.init("read", &trace_endpoint, parent_trace);
1334 trace.event("start");
1337 int r =_readx(rd, oset, onfinish, true, &trace);
1339 trace.event("finish");
1344 int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1345 bool external_call, ZTracer::Trace *trace)
1347 assert(trace != nullptr);
1348 assert(lock.is_locked());
1349 bool success = true;
1351 uint64_t bytes_in_cache = 0;
1352 uint64_t bytes_not_in_cache = 0;
1353 uint64_t total_bytes_read = 0;
1354 map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
1355 bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1356 bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1359 * WARNING: we can only meaningfully return ENOENT if the read request
1360 * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
1361 * zeroed buffers needs to feed single extents into readx().
1363 assert(!oset->return_enoent || rd->extents.size() == 1);
1365 for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1366 ex_it != rd->extents.end();
1368 ldout(cct, 10) << "readx " << *ex_it << dendl;
1370 total_bytes_read += ex_it->length;
1373 sobject_t soid(ex_it->oid, rd->snap);
1374 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1375 ex_it->truncate_size, oset->truncate_seq);
1379 // does not exist and no hits?
1380 if (oset->return_enoent && !o->exists) {
1381 ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
1383 // should we worry about COW underneath us?
1384 if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1385 ex_it->length, soid.snap)) {
1386 ldout(cct, 20) << "readx may copy on write" << dendl;
1388 list<BufferHead*> blist;
1389 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1390 bh_it != o->data.end();
1392 BufferHead *bh = bh_it->second;
1393 if (bh->is_dirty() || bh->is_tx()) {
1394 ldout(cct, 10) << "readx flushing " << *bh << dendl;
1396 if (bh->is_dirty()) {
1397 if (scattered_write)
1398 blist.push_back(bh);
1400 bh_write(bh, *trace);
1404 if (scattered_write && !blist.empty())
1405 bh_write_scattered(blist);
1407 ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
1408 << " on " << *o << dendl;
1409 o->waitfor_commit[o->last_write_tid].push_back(
1410 new C_RetryRead(this,rd, oset, onfinish, *trace));
1411 // FIXME: perfcounter!
1416 // can we return ENOENT?
1417 bool allzero = true;
1418 for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1419 bh_it != o->data.end();
1421 ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
1422 if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1428 ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
1437 // map extent into bufferheads
1438 map<loff_t, BufferHead*> hits, missing, rx, errors;
1439 o->map_read(*ex_it, hits, missing, rx, errors);
1440 if (external_call) {
1441 // retry reading error buffers
1442 missing.insert(errors.begin(), errors.end());
1444 // some reads had errors, fail later so completions
1445 // are cleaned up properly
1446 // TODO: make read path not call _readx for every completion
1447 hits.insert(errors.begin(), errors.end());
1450 if (!missing.empty() || !rx.empty()) {
1452 map<loff_t, BufferHead*>::iterator last = missing.end();
1453 for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1454 bh_it != missing.end();
1456 uint64_t rx_bytes = static_cast<uint64_t>(
1457 stat_rx + bh_it->second->length());
1458 bytes_not_in_cache += bh_it->second->length();
1459 if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1460 // cache is full with concurrent reads -- wait for rx's to complete
1461 // to constrain memory growth (especially during copy-ups)
1463 ldout(cct, 10) << "readx missed, waiting on cache to complete "
1464 << waitfor_read.size() << " blocked reads, "
1465 << (MAX(rx_bytes, max_size) - max_size)
1466 << " read bytes" << dendl;
1467 waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1471 bh_remove(o, bh_it->second);
1472 delete bh_it->second;
1474 bh_it->second->set_nocache(nocache);
1475 bh_read(bh_it->second, rd->fadvise_flags, *trace);
1476 if ((success && onfinish) || last != missing.end())
1482 //add wait in last bh avoid wakeup early. Because read is order
1483 if (last != missing.end()) {
1484 ldout(cct, 10) << "readx missed, waiting on " << *last->second
1485 << " off " << last->first << dendl;
1486 last->second->waitfor_read[last->first].push_back(
1487 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1492 for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1495 touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1496 if (success && onfinish) {
1497 ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1498 << " off " << bh_it->first << dendl;
1499 bh_it->second->waitfor_read[bh_it->first].push_back(
1500 new C_RetryRead(this, rd, oset, onfinish, *trace) );
1502 bytes_not_in_cache += bh_it->second->length();
1506 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1507 bh_it != hits.end(); ++bh_it)
1508 //bump in lru, so we don't lose it when later read
1509 touch_bh(bh_it->second);
1512 assert(!hits.empty());
1514 // make a plain list
1515 for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1516 bh_it != hits.end();
1518 BufferHead *bh = bh_it->second;
1519 ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1520 if (bh->is_error() && bh->error)
1522 bytes_in_cache += bh->length();
1524 if (bh->get_nocache() && bh->is_clean())
1525 bh_lru_rest.lru_bottouch(bh);
1528 //must be after touch_bh because touch_bh set dontneed false
1530 ((loff_t)ex_it->offset <= bh->start() &&
1531 (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1532 bh->set_dontneed(true); //if dirty
1534 bh_lru_rest.lru_bottouch(bh);
1539 // create reverse map of buffer offset -> object for the
1540 // eventual result. this is over a single ObjectExtent, so we
1542 // - the bh's are contiguous
1543 // - the buffer frags need not be (and almost certainly aren't)
1544 loff_t opos = ex_it->offset;
1545 map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1546 assert(bh_it->second->start() <= opos);
1547 uint64_t bhoff = opos - bh_it->second->start();
1548 vector<pair<uint64_t,uint64_t> >::iterator f_it
1549 = ex_it->buffer_extents.begin();
1552 BufferHead *bh = bh_it->second;
1553 assert(opos == (loff_t)(bh->start() + bhoff));
1555 uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
1556 ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1557 << bhoff << " frag " << f_it->first << "~"
1558 << f_it->second << " +" << foff << "~" << len
1562 // put substr here first, since substr_of clobbers, and we
1563 // may get multiple bh's at this stripe_map position
1564 if (bh->is_zero()) {
1565 stripe_map[f_it->first].append_zero(len);
1567 bit.substr_of(bh->bl,
1570 stripe_map[f_it->first].claim_append(bit);
1576 if (opos == bh->end()) {
1580 if (foff == f_it->second) {
1584 if (bh_it == hits.end()) break;
1585 if (f_it == ex_it->buffer_extents.end())
1588 assert(f_it == ex_it->buffer_extents.end());
1589 assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1592 if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1598 if (perfcounter && external_call) {
1599 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1600 perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1601 perfcounter->inc(l_objectcacher_cache_ops_miss);
1604 ldout(cct, 20) << "readx defer " << rd << dendl;
1606 ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1612 if (perfcounter && external_call) {
1613 perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1614 perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1615 perfcounter->inc(l_objectcacher_cache_ops_hit);
1618 // no misses... success! do the read.
1619 ldout(cct, 10) << "readx has all buffers" << dendl;
1621 // ok, assemble into result buffer.
1623 if (rd->bl && !error) {
1625 for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1626 i != stripe_map.end();
1628 assert(pos == i->first);
1629 ldout(cct, 10) << "readx adding buffer len " << i->second.length()
1630 << " at " << pos << dendl;
1631 pos += i->second.length();
1632 rd->bl->claim_append(i->second);
1633 assert(rd->bl->length() == pos);
1635 ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
1636 } else if (!error) {
1637 ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
1638 map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1639 pos = i->first + i->second.length();
1643 int ret = error ? error : pos;
1644 ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1645 assert(pos <= (uint64_t) INT_MAX);
1654 void ObjectCacher::retry_waiting_reads()
1657 ls.swap(waitfor_read);
1659 while (!ls.empty() && waitfor_read.empty()) {
1660 Context *ctx = ls.front();
1664 waitfor_read.splice(waitfor_read.end(), ls);
1667 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1668 ZTracer::Trace *parent_trace)
1670 assert(lock.is_locked());
1671 ceph::real_time now = ceph::real_clock::now();
1672 uint64_t bytes_written = 0;
1673 uint64_t bytes_written_in_flush = 0;
1674 bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1675 bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1677 ZTracer::Trace trace;
1678 if (parent_trace != nullptr) {
1679 trace.init("write", &trace_endpoint, parent_trace);
1680 trace.event("start");
1683 for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1684 ex_it != wr->extents.end();
1687 sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1688 Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1689 ex_it->truncate_size, oset->truncate_seq);
1691 // map it all into a single bufferhead.
1692 BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1693 bool missing = bh->is_missing();
1694 bh->snapc = wr->snapc;
1696 bytes_written += ex_it->length;
1698 bytes_written_in_flush += ex_it->length;
1701 // adjust buffer pointers (ie "copy" data into my cache)
1702 // this is over a single ObjectExtent, so we know that
1703 // - there is one contiguous bh
1704 // - the buffer frags need not be (and almost certainly aren't)
1705 // note: i assume striping is monotonic... no jumps backwards, ever!
1706 loff_t opos = ex_it->offset;
1707 for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1708 = ex_it->buffer_extents.begin();
1709 f_it != ex_it->buffer_extents.end();
1711 ldout(cct, 10) << "writex writing " << f_it->first << "~"
1712 << f_it->second << " into " << *bh << " at " << opos
1714 uint64_t bhoff = bh->start() - opos;
1715 assert(f_it->second <= bh->length() - bhoff);
1717 // get the frag we're mapping in
1719 frag.substr_of(wr->bl,
1720 f_it->first, f_it->second);
1722 // keep anything left of bhoff
1725 newbl.substr_of(bh->bl, 0, bhoff);
1726 newbl.claim_append(frag);
1729 opos += f_it->second;
1732 // ok, now bh is dirty.
1735 bh->set_dontneed(true);
1736 else if (nocache && missing)
1737 bh->set_nocache(true);
1741 bh->last_write = now;
1743 o->try_merge_bh(bh);
1747 perfcounter->inc(l_objectcacher_data_written, bytes_written);
1748 if (bytes_written_in_flush) {
1749 perfcounter->inc(l_objectcacher_overwritten_in_flush,
1750 bytes_written_in_flush);
1754 int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
1762 class ObjectCacher::C_WaitForWrite : public Context {
1764 C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1765 const ZTracer::Trace &trace, Context *onfinish) :
1766 m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
1767 void finish(int r) override;
1771 ZTracer::Trace m_trace;
1772 Context *m_onfinish;
1775 void ObjectCacher::C_WaitForWrite::finish(int r)
1777 Mutex::Locker l(m_oc->lock);
1778 m_oc->maybe_wait_for_writeback(m_len, &m_trace);
1779 m_onfinish->complete(r);
1782 void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
1783 ZTracer::Trace *trace)
1785 assert(lock.is_locked());
1786 ceph::mono_time start = ceph::mono_clock::now();
1788 // wait for writeback?
1789 // - wait for dirty and tx bytes (relative to the max_dirty threshold)
1790 // - do not wait for bytes other waiters are waiting on. this means that
1791 // threads do not wait for each other. this effectively allows the cache
1792 // size to balloon proportional to the data that is in flight.
1794 uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
1795 while (get_stat_dirty() + get_stat_tx() > 0 &&
1796 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1797 max_dirty + get_stat_dirty_waiting()) ||
1798 (dirty_or_tx_bh.size() >=
1799 max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1802 trace->event("start wait for writeback");
1804 ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1805 << (get_stat_dirty() + get_stat_tx()) << " >= max "
1806 << max_dirty << " + dirty_waiting "
1807 << get_stat_dirty_waiting() << dendl;
1808 flusher_cond.Signal();
1809 stat_dirty_waiting += len;
1810 ++stat_nr_dirty_waiters;
1811 stat_cond.Wait(lock);
1812 stat_dirty_waiting -= len;
1813 --stat_nr_dirty_waiters;
1815 ldout(cct, 10) << __func__ << " woke up" << dendl;
1818 trace->event("finish wait for writeback");
1820 if (blocked && perfcounter) {
1821 perfcounter->inc(l_objectcacher_write_ops_blocked);
1822 perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1823 ceph::timespan blocked = ceph::mono_clock::now() - start;
1824 perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1828 // blocking wait for write.
1829 int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1830 ZTracer::Trace *trace, Context *onfreespace)
1832 assert(lock.is_locked());
1833 assert(trace != nullptr);
1836 if (max_dirty > 0) {
1837 if (block_writes_upfront) {
1838 maybe_wait_for_writeback(len, trace);
1840 onfreespace->complete(0);
1842 assert(onfreespace);
1843 finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
1846 // write-thru! flush what we just wrote.
1849 Context *fin = block_writes_upfront ?
1850 new C_Cond(&cond, &done, &ret) : onfreespace;
1852 bool flushed = flush_set(oset, wr->extents, trace, fin);
1853 assert(!flushed); // we just dirtied it, and didn't drop our lock!
1854 ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1855 << " bytes" << dendl;
1856 if (block_writes_upfront) {
1859 ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1861 onfreespace->complete(ret);
1865 // start writeback anyway?
1866 if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1867 ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1868 << target_dirty << ", nudging flusher" << dendl;
1869 flusher_cond.Signal();
1874 void ObjectCacher::flusher_entry()
1876 ldout(cct, 10) << "flusher start" << dendl;
1878 while (!flusher_stop) {
1879 loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1881 ldout(cct, 11) << "flusher "
1882 << all << " / " << max_size << ": "
1883 << get_stat_tx() << " tx, "
1884 << get_stat_rx() << " rx, "
1885 << get_stat_clean() << " clean, "
1886 << get_stat_dirty() << " dirty ("
1887 << target_dirty << " target, "
1888 << max_dirty << " max)"
1890 loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1892 ZTracer::Trace trace;
1893 if (cct->_conf->osdc_blkin_trace_all) {
1894 trace.init("flusher", &trace_endpoint);
1895 trace.event("start");
1898 if (actual > 0 && (uint64_t) actual > target_dirty) {
1899 // flush some dirty pages
1900 ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1901 << get_stat_dirty_waiting() << " dirty_waiting > target "
1902 << target_dirty << ", flushing some dirty bhs" << dendl;
1903 flush(&trace, actual - target_dirty);
1905 // check tail of lru for old dirty items
1906 ceph::real_time cutoff = ceph::real_clock::now();
1907 cutoff -= max_dirty_age;
1909 int max = MAX_FLUSH_UNDER_LOCK;
1910 while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1911 lru_get_next_expire())) != 0 &&
1912 bh->last_write <= cutoff &&
1914 ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1915 if (scattered_write) {
1916 bh_write_adjacencies(bh, cutoff, NULL, &max);
1918 bh_write(bh, trace);
1923 // back off the lock to avoid starving other threads
1924 trace.event("backoff");
1931 trace.event("finish");
1935 flusher_cond.WaitInterval(lock, seconds(1));
1938 /* Wait for reads to finish. This is only possible if handling
1939 * -ENOENT made some read completions finish before their rados read
1940 * came back. If we don't wait for them, and destroy the cache, when
1941 * the rados reads do come back their callback will try to access the
1942 * no-longer-valid ObjectCacher.
1944 while (reads_outstanding > 0) {
1945 ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1946 << reads_outstanding << dendl;
1947 read_cond.Wait(lock);
1951 ldout(cct, 10) << "flusher finish" << dendl;
1955 // -------------------------------------------------
1957 bool ObjectCacher::set_is_empty(ObjectSet *oset)
1959 assert(lock.is_locked());
1960 if (oset->objects.empty())
1963 for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
1964 if (!(*p)->is_empty())
1970 bool ObjectCacher::set_is_cached(ObjectSet *oset)
1972 assert(lock.is_locked());
1973 if (oset->objects.empty())
1976 for (xlist<Object*>::iterator p = oset->objects.begin();
1979 for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
1980 q != ob->data.end();
1982 BufferHead *bh = q->second;
1983 if (!bh->is_dirty() && !bh->is_tx())
1991 bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
1993 assert(lock.is_locked());
1994 if (oset->objects.empty())
1997 for (xlist<Object*>::iterator i = oset->objects.begin();
2001 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2002 p != ob->data.end();
2004 BufferHead *bh = p->second;
2005 if (bh->is_dirty() || bh->is_tx())
2014 // purge. non-blocking. violently removes dirty buffers from cache.
2015 void ObjectCacher::purge(Object *ob)
2017 assert(lock.is_locked());
2018 ldout(cct, 10) << "purge " << *ob << dendl;
2024 // flush. non-blocking. no callback.
2025 // true if clean, already flushed.
2026 // false if we wrote something.
2027 // be sloppy about the ranges and flush any buffer it touches
2028 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2029 ZTracer::Trace *trace)
2031 assert(trace != nullptr);
2032 assert(lock.is_locked());
2033 list<BufferHead*> blist;
2035 ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2036 for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2037 p != ob->data.end();
2039 BufferHead *bh = p->second;
2040 ldout(cct, 20) << "flush " << *bh << dendl;
2041 if (length && bh->start() > offset+length) {
2048 if (!bh->is_dirty()) {
2052 if (scattered_write)
2053 blist.push_back(bh);
2055 bh_write(bh, *trace);
2058 if (scattered_write && !blist.empty())
2059 bh_write_scattered(blist);
2064 bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2067 assert(lock.is_locked());
2068 if (gather->has_subs()) {
2069 gather->set_finisher(onfinish);
2074 ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2075 onfinish->complete(0);
2079 // flush. non-blocking, takes callback.
2080 // returns true if already flushed
2081 bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2083 assert(lock.is_locked());
2084 assert(onfinish != NULL);
2085 if (oset->objects.empty()) {
2086 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2087 onfinish->complete(0);
2091 ldout(cct, 10) << "flush_set " << oset << dendl;
2093 // we'll need to wait for all objects to flush!
2094 C_GatherBuilder gather(cct);
2095 set<Object*> waitfor_commit;
2097 list<BufferHead*> blist;
2098 Object *last_ob = NULL;
2099 set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2101 // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2102 // order. But items in oset->objects are not sorted. So the iterator can
2103 // point to any buffer head in the ObjectSet
2104 BufferHead key(*oset->objects.begin());
2105 it = dirty_or_tx_bh.lower_bound(&key);
2108 bool backwards = true;
2109 if (it != dirty_or_tx_bh.begin())
2114 for (; p != dirty_or_tx_bh.end(); p = q) {
2116 BufferHead *bh = *p;
2117 if (bh->ob->oset != oset)
2119 waitfor_commit.insert(bh->ob);
2120 if (bh->is_dirty()) {
2121 if (scattered_write) {
2122 if (last_ob != bh->ob) {
2123 if (!blist.empty()) {
2124 bh_write_scattered(blist);
2129 blist.push_back(bh);
2137 for(p = q = it; true; p = q) {
2138 if (q != dirty_or_tx_bh.begin())
2142 BufferHead *bh = *p;
2143 if (bh->ob->oset != oset)
2145 waitfor_commit.insert(bh->ob);
2146 if (bh->is_dirty()) {
2147 if (scattered_write) {
2148 if (last_ob != bh->ob) {
2149 if (!blist.empty()) {
2150 bh_write_scattered(blist);
2155 blist.push_front(bh);
2165 if (scattered_write && !blist.empty())
2166 bh_write_scattered(blist);
2168 for (set<Object*>::iterator i = waitfor_commit.begin();
2169 i != waitfor_commit.end(); ++i) {
2172 // we'll need to gather...
2173 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2174 << ob->last_write_tid << " on " << *ob << dendl;
2175 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2178 return _flush_set_finish(&gather, onfinish);
2181 // flush. non-blocking, takes callback.
2182 // returns true if already flushed
2183 bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2184 ZTracer::Trace *trace, Context *onfinish)
2186 assert(lock.is_locked());
2187 assert(trace != nullptr);
2188 assert(onfinish != NULL);
2189 if (oset->objects.empty()) {
2190 ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2191 onfinish->complete(0);
2195 ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2196 << " ObjectExtents" << dendl;
2198 // we'll need to wait for all objects to flush!
2199 C_GatherBuilder gather(cct);
2201 for (vector<ObjectExtent>::iterator p = exv.begin();
2204 ObjectExtent &ex = *p;
2205 sobject_t soid(ex.oid, CEPH_NOSNAP);
2206 if (objects[oset->poolid].count(soid) == 0)
2208 Object *ob = objects[oset->poolid][soid];
2210 ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2211 << " " << ob << dendl;
2213 if (!flush(ob, ex.offset, ex.length, trace)) {
2214 // we'll need to gather...
2215 ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2216 << ob->last_write_tid << " on " << *ob << dendl;
2217 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2221 return _flush_set_finish(&gather, onfinish);
2224 // flush all dirty data. non-blocking, takes callback.
2225 // returns true if already flushed
2226 bool ObjectCacher::flush_all(Context *onfinish)
2228 assert(lock.is_locked());
2229 assert(onfinish != NULL);
2231 ldout(cct, 10) << "flush_all " << dendl;
2233 // we'll need to wait for all objects to flush!
2234 C_GatherBuilder gather(cct);
2235 set<Object*> waitfor_commit;
2237 list<BufferHead*> blist;
2238 Object *last_ob = NULL;
2239 set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2240 next = it = dirty_or_tx_bh.begin();
2241 while (it != dirty_or_tx_bh.end()) {
2243 BufferHead *bh = *it;
2244 waitfor_commit.insert(bh->ob);
2246 if (bh->is_dirty()) {
2247 if (scattered_write) {
2248 if (last_ob != bh->ob) {
2249 if (!blist.empty()) {
2250 bh_write_scattered(blist);
2255 blist.push_back(bh);
2264 if (scattered_write && !blist.empty())
2265 bh_write_scattered(blist);
2267 for (set<Object*>::iterator i = waitfor_commit.begin();
2268 i != waitfor_commit.end();
2272 // we'll need to gather...
2273 ldout(cct, 10) << "flush_all will wait for ack tid "
2274 << ob->last_write_tid << " on " << *ob << dendl;
2275 ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2278 return _flush_set_finish(&gather, onfinish);
2281 void ObjectCacher::purge_set(ObjectSet *oset)
2283 assert(lock.is_locked());
2284 if (oset->objects.empty()) {
2285 ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2289 ldout(cct, 10) << "purge_set " << oset << dendl;
2290 const bool were_dirty = oset->dirty_or_tx > 0;
2292 for (xlist<Object*>::iterator i = oset->objects.begin();
2298 // Although we have purged rather than flushed, caller should still
2299 // drop any resources associate with dirty data.
2300 assert(oset->dirty_or_tx == 0);
2301 if (flush_set_callback && were_dirty) {
2302 flush_set_callback(flush_set_callback_arg, oset);
2307 loff_t ObjectCacher::release(Object *ob)
2309 assert(lock.is_locked());
2310 list<BufferHead*> clean;
2311 loff_t o_unclean = 0;
2313 for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2314 p != ob->data.end();
2316 BufferHead *bh = p->second;
2317 if (bh->is_clean() || bh->is_zero() || bh->is_error())
2318 clean.push_back(bh);
2320 o_unclean += bh->length();
2323 for (list<BufferHead*>::iterator p = clean.begin();
2330 if (ob->can_close()) {
2331 ldout(cct, 10) << "release trimming " << *ob << dendl;
2333 assert(o_unclean == 0);
2338 ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2339 ob->complete = false;
2342 ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2349 loff_t ObjectCacher::release_set(ObjectSet *oset)
2351 assert(lock.is_locked());
2352 // return # bytes not clean (and thus not released).
2355 if (oset->objects.empty()) {
2356 ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2360 ldout(cct, 10) << "release_set " << oset << dendl;
2362 xlist<Object*>::iterator q;
2363 for (xlist<Object*>::iterator p = oset->objects.begin();
2369 loff_t o_unclean = release(ob);
2370 unclean += o_unclean;
2373 ldout(cct, 10) << "release_set " << oset << " " << *ob
2374 << " has " << o_unclean << " bytes left"
2380 ldout(cct, 10) << "release_set " << oset
2381 << ", " << unclean << " bytes left" << dendl;
2388 uint64_t ObjectCacher::release_all()
2390 assert(lock.is_locked());
2391 ldout(cct, 10) << "release_all" << dendl;
2392 uint64_t unclean = 0;
2394 vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2396 while (i != objects.end()) {
2397 ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2398 while (p != i->end()) {
2399 ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2402 Object *ob = p->second;
2404 loff_t o_unclean = release(ob);
2405 unclean += o_unclean;
2408 ldout(cct, 10) << "release_all " << *ob
2409 << " has " << o_unclean << " bytes left"
2417 ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2424 void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2426 assert(lock.is_locked());
2427 ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2429 for (xlist<Object*>::iterator p = oset->objects.begin();
2433 ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2435 ob->complete = false;
2437 for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2439 C_ReadFinish *comp = *q;
2440 comp->distrust_enoent();
2446 * discard object extents from an ObjectSet by removing the objects in
2447 * exls from the in-memory oset.
2449 void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2451 assert(lock.is_locked());
2452 if (oset->objects.empty()) {
2453 ldout(cct, 10) << "discard_set on " << oset << " dne" << dendl;
2457 ldout(cct, 10) << "discard_set " << oset << dendl;
2459 bool were_dirty = oset->dirty_or_tx > 0;
2461 for (vector<ObjectExtent>::const_iterator p = exls.begin();
2464 ldout(cct, 10) << "discard_set " << oset << " ex " << *p << dendl;
2465 const ObjectExtent &ex = *p;
2466 sobject_t soid(ex.oid, CEPH_NOSNAP);
2467 if (objects[oset->poolid].count(soid) == 0)
2469 Object *ob = objects[oset->poolid][soid];
2471 ob->discard(ex.offset, ex.length);
2474 // did we truncate off dirty data?
2475 if (flush_set_callback &&
2476 were_dirty && oset->dirty_or_tx == 0)
2477 flush_set_callback(flush_set_callback_arg, oset);
2480 void ObjectCacher::verify_stats() const
2482 assert(lock.is_locked());
2483 ldout(cct, 10) << "verify_stats" << dendl;
2485 loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2487 for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2491 for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2495 Object *ob = p->second;
2496 for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2497 q != ob->data.end();
2499 BufferHead *bh = q->second;
2500 switch (bh->get_state()) {
2501 case BufferHead::STATE_MISSING:
2502 missing += bh->length();
2504 case BufferHead::STATE_CLEAN:
2505 clean += bh->length();
2507 case BufferHead::STATE_ZERO:
2508 zero += bh->length();
2510 case BufferHead::STATE_DIRTY:
2511 dirty += bh->length();
2513 case BufferHead::STATE_TX:
2516 case BufferHead::STATE_RX:
2519 case BufferHead::STATE_ERROR:
2520 error += bh->length();
2529 ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2530 << " dirty " << dirty << " missing " << missing
2531 << " error " << error << dendl;
2532 assert(clean == stat_clean);
2533 assert(rx == stat_rx);
2534 assert(tx == stat_tx);
2535 assert(dirty == stat_dirty);
2536 assert(missing == stat_missing);
2537 assert(zero == stat_zero);
2538 assert(error == stat_error);
2541 void ObjectCacher::bh_stat_add(BufferHead *bh)
2543 assert(lock.is_locked());
2544 switch (bh->get_state()) {
2545 case BufferHead::STATE_MISSING:
2546 stat_missing += bh->length();
2548 case BufferHead::STATE_CLEAN:
2549 stat_clean += bh->length();
2551 case BufferHead::STATE_ZERO:
2552 stat_zero += bh->length();
2554 case BufferHead::STATE_DIRTY:
2555 stat_dirty += bh->length();
2556 bh->ob->dirty_or_tx += bh->length();
2557 bh->ob->oset->dirty_or_tx += bh->length();
2559 case BufferHead::STATE_TX:
2560 stat_tx += bh->length();
2561 bh->ob->dirty_or_tx += bh->length();
2562 bh->ob->oset->dirty_or_tx += bh->length();
2564 case BufferHead::STATE_RX:
2565 stat_rx += bh->length();
2567 case BufferHead::STATE_ERROR:
2568 stat_error += bh->length();
2571 assert(0 == "bh_stat_add: invalid bufferhead state");
2573 if (get_stat_dirty_waiting() > 0)
2577 void ObjectCacher::bh_stat_sub(BufferHead *bh)
2579 assert(lock.is_locked());
2580 switch (bh->get_state()) {
2581 case BufferHead::STATE_MISSING:
2582 stat_missing -= bh->length();
2584 case BufferHead::STATE_CLEAN:
2585 stat_clean -= bh->length();
2587 case BufferHead::STATE_ZERO:
2588 stat_zero -= bh->length();
2590 case BufferHead::STATE_DIRTY:
2591 stat_dirty -= bh->length();
2592 bh->ob->dirty_or_tx -= bh->length();
2593 bh->ob->oset->dirty_or_tx -= bh->length();
2595 case BufferHead::STATE_TX:
2596 stat_tx -= bh->length();
2597 bh->ob->dirty_or_tx -= bh->length();
2598 bh->ob->oset->dirty_or_tx -= bh->length();
2600 case BufferHead::STATE_RX:
2601 stat_rx -= bh->length();
2603 case BufferHead::STATE_ERROR:
2604 stat_error -= bh->length();
2607 assert(0 == "bh_stat_sub: invalid bufferhead state");
2611 void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2613 assert(lock.is_locked());
2614 int state = bh->get_state();
2615 // move between lru lists?
2616 if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2617 bh_lru_rest.lru_remove(bh);
2618 bh_lru_dirty.lru_insert_top(bh);
2619 } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2620 bh_lru_dirty.lru_remove(bh);
2621 if (bh->get_dontneed())
2622 bh_lru_rest.lru_insert_bot(bh);
2624 bh_lru_rest.lru_insert_top(bh);
2627 if ((s == BufferHead::STATE_TX ||
2628 s == BufferHead::STATE_DIRTY) &&
2629 state != BufferHead::STATE_TX &&
2630 state != BufferHead::STATE_DIRTY) {
2631 dirty_or_tx_bh.insert(bh);
2632 } else if ((state == BufferHead::STATE_TX ||
2633 state == BufferHead::STATE_DIRTY) &&
2634 s != BufferHead::STATE_TX &&
2635 s != BufferHead::STATE_DIRTY) {
2636 dirty_or_tx_bh.erase(bh);
2639 if (s != BufferHead::STATE_ERROR &&
2640 state == BufferHead::STATE_ERROR) {
2650 void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2652 assert(lock.is_locked());
2653 ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2655 if (bh->is_dirty()) {
2656 bh_lru_dirty.lru_insert_top(bh);
2657 dirty_or_tx_bh.insert(bh);
2659 if (bh->get_dontneed())
2660 bh_lru_rest.lru_insert_bot(bh);
2662 bh_lru_rest.lru_insert_top(bh);
2666 dirty_or_tx_bh.insert(bh);
2671 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2673 assert(lock.is_locked());
2674 assert(bh->get_journal_tid() == 0);
2675 ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2677 if (bh->is_dirty()) {
2678 bh_lru_dirty.lru_remove(bh);
2679 dirty_or_tx_bh.erase(bh);
2681 bh_lru_rest.lru_remove(bh);
2685 dirty_or_tx_bh.erase(bh);
2688 if (get_stat_dirty_waiting() > 0)