+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#ifndef CEPH_OBJECTCACHER_H
-#define CEPH_OBJECTCACHER_H
-
-#include "include/types.h"
-#include "include/lru.h"
-#include "include/Context.h"
-#include "include/xlist.h"
-
-#include "common/Cond.h"
-#include "common/Finisher.h"
-#include "common/Thread.h"
-#include "common/zipkin_trace.h"
-
-#include "Objecter.h"
-#include "Striper.h"
-
-class CephContext;
-class WritebackHandler;
-class PerfCounters;
-
-enum {
- l_objectcacher_first = 25000,
-
- l_objectcacher_cache_ops_hit, // ops we satisfy completely from cache
- l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache
-
- l_objectcacher_cache_bytes_hit, // bytes read directly from cache
-
- l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly
-
- // from cache
-
- l_objectcacher_data_read, // total bytes read out
- l_objectcacher_data_written, // bytes written to cache
- l_objectcacher_data_flushed, // bytes flushed to WritebackHandler
- l_objectcacher_overwritten_in_flush, // bytes overwritten while
- // flushing is in progress
-
- l_objectcacher_write_ops_blocked, // total write ops we delayed due
- // to dirty limits
- l_objectcacher_write_bytes_blocked, // total number of write bytes
- // we delayed due to dirty
- // limits
- l_objectcacher_write_time_blocked, // total time in seconds spent
- // blocking a write due to dirty
- // limits
-
- l_objectcacher_last,
-};
-
-class ObjectCacher {
- PerfCounters *perfcounter;
- public:
- CephContext *cct;
- class Object;
- struct ObjectSet;
- class C_ReadFinish;
-
- typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset);
-
- // read scatter/gather
- struct OSDRead {
- vector<ObjectExtent> extents;
- snapid_t snap;
- bufferlist *bl;
- int fadvise_flags;
- OSDRead(snapid_t s, bufferlist *b, int f)
- : snap(s), bl(b), fadvise_flags(f) {}
- };
-
- OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) const {
- return new OSDRead(snap, b, f);
- }
-
- // write scatter/gather
- struct OSDWrite {
- vector<ObjectExtent> extents;
- SnapContext snapc;
- bufferlist bl;
- ceph::real_time mtime;
- int fadvise_flags;
- ceph_tid_t journal_tid;
- OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt,
- int f, ceph_tid_t _journal_tid)
- : snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
- journal_tid(_journal_tid) {}
- };
-
- OSDWrite *prepare_write(const SnapContext& sc,
- const bufferlist &b,
- ceph::real_time mt,
- int f,
- ceph_tid_t journal_tid) const {
- return new OSDWrite(sc, b, mt, f, journal_tid);
- }
-
-
-
- // ******* BufferHead *********
- class BufferHead : public LRUObject {
- public:
- // states
- static const int STATE_MISSING = 0;
- static const int STATE_CLEAN = 1;
- static const int STATE_ZERO = 2; // NOTE: these are *clean* zeros
- static const int STATE_DIRTY = 3;
- static const int STATE_RX = 4;
- static const int STATE_TX = 5;
- static const int STATE_ERROR = 6; // a read error occurred
-
- private:
- // my fields
- int state;
- int ref;
- struct {
- loff_t start, length; // bh extent in object
- } ex;
- bool dontneed; //indicate bh don't need by anyone
- bool nocache; //indicate bh don't need by this caller
-
- public:
- Object *ob;
- bufferlist bl;
- ceph_tid_t last_write_tid; // version of bh (if non-zero)
- ceph_tid_t last_read_tid; // tid of last read op (if any)
- ceph::real_time last_write;
- SnapContext snapc;
- ceph_tid_t journal_tid;
- int error; // holds return value for failed reads
-
- map<loff_t, list<Context*> > waitfor_read;
-
- // cons
- explicit BufferHead(Object *o) :
- state(STATE_MISSING),
- ref(0),
- dontneed(false),
- nocache(false),
- ob(o),
- last_write_tid(0),
- last_read_tid(0),
- journal_tid(0),
- error(0) {
- ex.start = ex.length = 0;
- }
-
- // extent
- loff_t start() const { return ex.start; }
- void set_start(loff_t s) { ex.start = s; }
- loff_t length() const { return ex.length; }
- void set_length(loff_t l) { ex.length = l; }
- loff_t end() const { return ex.start + ex.length; }
- loff_t last() const { return end() - 1; }
-
- // states
- void set_state(int s) {
- if (s == STATE_RX || s == STATE_TX) get();
- if (state == STATE_RX || state == STATE_TX) put();
- state = s;
- }
- int get_state() const { return state; }
-
- inline ceph_tid_t get_journal_tid() const {
- return journal_tid;
- }
- inline void set_journal_tid(ceph_tid_t _journal_tid) {
- journal_tid = _journal_tid;
- }
-
- bool is_missing() const { return state == STATE_MISSING; }
- bool is_dirty() const { return state == STATE_DIRTY; }
- bool is_clean() const { return state == STATE_CLEAN; }
- bool is_zero() const { return state == STATE_ZERO; }
- bool is_tx() const { return state == STATE_TX; }
- bool is_rx() const { return state == STATE_RX; }
- bool is_error() const { return state == STATE_ERROR; }
-
- // reference counting
- int get() {
- assert(ref >= 0);
- if (ref == 0) lru_pin();
- return ++ref;
- }
- int put() {
- assert(ref > 0);
- if (ref == 1) lru_unpin();
- --ref;
- return ref;
- }
-
- void set_dontneed(bool v) {
- dontneed = v;
- }
- bool get_dontneed() const {
- return dontneed;
- }
-
- void set_nocache(bool v) {
- nocache = v;
- }
- bool get_nocache() const {
- return nocache;
- }
-
- inline bool can_merge_journal(BufferHead *bh) const {
- return (get_journal_tid() == bh->get_journal_tid());
- }
-
- struct ptr_lt {
- bool operator()(const BufferHead* l, const BufferHead* r) const {
- const Object *lob = l->ob;
- const Object *rob = r->ob;
- const ObjectSet *loset = lob->oset;
- const ObjectSet *roset = rob->oset;
- if (loset != roset)
- return loset < roset;
- if (lob != rob)
- return lob < rob;
- if (l->start() != r->start())
- return l->start() < r->start();
- return l < r;
- }
- };
- };
-
- // ******* Object *********
- class Object : public LRUObject {
- private:
- // ObjectCacher::Object fields
- int ref;
- ObjectCacher *oc;
- sobject_t oid;
- friend struct ObjectSet;
-
- public:
- uint64_t object_no;
- ObjectSet *oset;
- xlist<Object*>::item set_item;
- object_locator_t oloc;
- uint64_t truncate_size, truncate_seq;
-
- bool complete;
- bool exists;
-
- map<loff_t, BufferHead*> data;
-
- ceph_tid_t last_write_tid; // version of bh (if non-zero)
- ceph_tid_t last_commit_tid; // last update commited.
-
- int dirty_or_tx;
-
- map< ceph_tid_t, list<Context*> > waitfor_commit;
- xlist<C_ReadFinish*> reads;
-
- Object(const Object&) = delete;
- Object& operator=(const Object&) = delete;
-
- Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os,
- object_locator_t& l, uint64_t ts, uint64_t tq) :
- ref(0),
- oc(_oc),
- oid(o), object_no(ono), oset(os), set_item(this), oloc(l),
- truncate_size(ts), truncate_seq(tq),
- complete(false), exists(true),
- last_write_tid(0), last_commit_tid(0),
- dirty_or_tx(0) {
- // add to set
- os->objects.push_back(&set_item);
- }
- ~Object() {
- reads.clear();
- assert(ref == 0);
- assert(data.empty());
- assert(dirty_or_tx == 0);
- set_item.remove_myself();
- }
-
- sobject_t get_soid() const { return oid; }
- object_t get_oid() { return oid.oid; }
- snapid_t get_snap() { return oid.snap; }
- ObjectSet *get_object_set() const { return oset; }
- string get_namespace() { return oloc.nspace; }
- uint64_t get_object_number() const { return object_no; }
-
- const object_locator_t& get_oloc() const { return oloc; }
- void set_object_locator(object_locator_t& l) { oloc = l; }
-
- bool can_close() const {
- if (lru_is_expireable()) {
- assert(data.empty());
- assert(waitfor_commit.empty());
- return true;
- }
- return false;
- }
-
- /**
- * Check buffers and waiters for consistency
- * - no overlapping buffers
- * - index in map matches BH
- * - waiters fall within BH
- */
- void audit_buffers();
-
- /**
- * find first buffer that includes or follows an offset
- *
- * @param offset object byte offset
- * @return iterator pointing to buffer, or data.end()
- */
- map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
- map<loff_t,BufferHead*>::const_iterator p = data.lower_bound(offset);
- if (p != data.begin() &&
- (p == data.end() || p->first > offset)) {
- --p; // might overlap!
- if (p->first + p->second->length() <= offset)
- ++p; // doesn't overlap.
- }
- return p;
- }
-
- // bh
- // add to my map
- void add_bh(BufferHead *bh) {
- if (data.empty())
- get();
- assert(data.count(bh->start()) == 0);
- data[bh->start()] = bh;
- }
- void remove_bh(BufferHead *bh) {
- assert(data.count(bh->start()));
- data.erase(bh->start());
- if (data.empty())
- put();
- }
-
- bool is_empty() const { return data.empty(); }
-
- // mid-level
- BufferHead *split(BufferHead *bh, loff_t off);
- void merge_left(BufferHead *left, BufferHead *right);
- void try_merge_bh(BufferHead *bh);
-
- bool is_cached(loff_t off, loff_t len) const;
- bool include_all_cached_data(loff_t off, loff_t len);
- int map_read(ObjectExtent &ex,
- map<loff_t, BufferHead*>& hits,
- map<loff_t, BufferHead*>& missing,
- map<loff_t, BufferHead*>& rx,
- map<loff_t, BufferHead*>& errors);
- BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
-
- void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
- void truncate(loff_t s);
- void discard(loff_t off, loff_t len);
-
- // reference counting
- int get() {
- assert(ref >= 0);
- if (ref == 0) lru_pin();
- return ++ref;
- }
- int put() {
- assert(ref > 0);
- if (ref == 1) lru_unpin();
- --ref;
- return ref;
- }
- };
-
-
- struct ObjectSet {
- void *parent;
-
- inodeno_t ino;
- uint64_t truncate_seq, truncate_size;
-
- int64_t poolid;
- xlist<Object*> objects;
-
- int dirty_or_tx;
- bool return_enoent;
-
- ObjectSet(void *p, int64_t _poolid, inodeno_t i)
- : parent(p), ino(i), truncate_seq(0),
- truncate_size(0), poolid(_poolid), dirty_or_tx(0),
- return_enoent(false) {}
-
- };
-
-
- // ******* ObjectCacher *********
- // ObjectCacher fields
- private:
- WritebackHandler& writeback_handler;
- bool scattered_write;
-
- string name;
- Mutex& lock;
-
- uint64_t max_dirty, target_dirty, max_size, max_objects;
- ceph::timespan max_dirty_age;
- bool block_writes_upfront;
-
- ZTracer::Endpoint trace_endpoint;
-
- flush_set_callback_t flush_set_callback;
- void *flush_set_callback_arg;
-
- // indexed by pool_id
- vector<ceph::unordered_map<sobject_t, Object*> > objects;
-
- list<Context*> waitfor_read;
-
- ceph_tid_t last_read_tid;
-
- set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
- LRU bh_lru_dirty, bh_lru_rest;
- LRU ob_lru;
-
- Cond flusher_cond;
- bool flusher_stop;
- void flusher_entry();
- class FlusherThread : public Thread {
- ObjectCacher *oc;
- public:
- explicit FlusherThread(ObjectCacher *o) : oc(o) {}
- void *entry() override {
- oc->flusher_entry();
- return 0;
- }
- } flusher_thread;
-
- Finisher finisher;
-
- // objects
- Object *get_object_maybe(sobject_t oid, object_locator_t &l) {
- // have it?
- if (((uint32_t)l.pool < objects.size()) &&
- (objects[l.pool].count(oid)))
- return objects[l.pool][oid];
- return NULL;
- }
-
- Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset,
- object_locator_t &l, uint64_t truncate_size,
- uint64_t truncate_seq);
- void close_object(Object *ob);
-
- // bh stats
- Cond stat_cond;
-
- loff_t stat_clean;
- loff_t stat_zero;
- loff_t stat_dirty;
- loff_t stat_rx;
- loff_t stat_tx;
- loff_t stat_missing;
- loff_t stat_error;
- loff_t stat_dirty_waiting; // bytes that writers are waiting on to write
-
- size_t stat_nr_dirty_waiters;
-
- void verify_stats() const;
-
- void bh_stat_add(BufferHead *bh);
- void bh_stat_sub(BufferHead *bh);
- loff_t get_stat_tx() const { return stat_tx; }
- loff_t get_stat_rx() const { return stat_rx; }
- loff_t get_stat_dirty() const { return stat_dirty; }
- loff_t get_stat_clean() const { return stat_clean; }
- loff_t get_stat_zero() const { return stat_zero; }
- loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
- size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; }
-
- void touch_bh(BufferHead *bh) {
- if (bh->is_dirty())
- bh_lru_dirty.lru_touch(bh);
- else
- bh_lru_rest.lru_touch(bh);
-
- bh->set_dontneed(false);
- bh->set_nocache(false);
- touch_ob(bh->ob);
- }
- void touch_ob(Object *ob) {
- ob_lru.lru_touch(ob);
- }
- void bottouch_ob(Object *ob) {
- ob_lru.lru_bottouch(ob);
- }
-
- // bh states
- void bh_set_state(BufferHead *bh, int s);
- void copy_bh_state(BufferHead *bh1, BufferHead *bh2) {
- bh_set_state(bh2, bh1->get_state());
- }
-
- void mark_missing(BufferHead *bh) {
- bh_set_state(bh,BufferHead::STATE_MISSING);
- }
- void mark_clean(BufferHead *bh) {
- bh_set_state(bh, BufferHead::STATE_CLEAN);
- }
- void mark_zero(BufferHead *bh) {
- bh_set_state(bh, BufferHead::STATE_ZERO);
- }
- void mark_rx(BufferHead *bh) {
- bh_set_state(bh, BufferHead::STATE_RX);
- }
- void mark_tx(BufferHead *bh) {
- bh_set_state(bh, BufferHead::STATE_TX); }
- void mark_error(BufferHead *bh) {
- bh_set_state(bh, BufferHead::STATE_ERROR);
- }
- void mark_dirty(BufferHead *bh) {
- bh_set_state(bh, BufferHead::STATE_DIRTY);
- bh_lru_dirty.lru_touch(bh);
- //bh->set_dirty_stamp(ceph_clock_now());
- }
-
- void bh_add(Object *ob, BufferHead *bh);
- void bh_remove(Object *ob, BufferHead *bh);
-
- // io
- void bh_read(BufferHead *bh, int op_flags,
- const ZTracer::Trace &parent_trace);
- void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace);
- void bh_write_scattered(list<BufferHead*>& blist);
- void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
- int64_t *amount, int *max_count);
-
- void trim();
- void flush(ZTracer::Trace *trace, loff_t amount=0);
-
- /**
- * flush a range of buffers
- *
- * Flush any buffers that intersect the specified extent. If len==0,
- * flush *all* buffers for the object.
- *
- * @param o object
- * @param off start offset
- * @param len extent length, or 0 for entire object
- * @return true if object was already clean/flushed.
- */
- bool flush(Object *o, loff_t off, loff_t len,
- ZTracer::Trace *trace);
- loff_t release(Object *o);
- void purge(Object *o);
-
- int64_t reads_outstanding;
- Cond read_cond;
-
- int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
- bool external_call, ZTracer::Trace *trace);
- void retry_waiting_reads();
-
- public:
- void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
- loff_t offset, uint64_t length,
- bufferlist &bl, int r,
- bool trust_enoent);
- void bh_write_commit(int64_t poolid, sobject_t oid,
- vector<pair<loff_t, uint64_t> >& ranges,
- ceph_tid_t t, int r);
-
- class C_WriteCommit;
- class C_WaitForWrite;
-
- void perf_start();
- void perf_stop();
-
-
-
- ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, Mutex& l,
- flush_set_callback_t flush_callback,
- void *flush_callback_arg,
- uint64_t max_bytes, uint64_t max_objects,
- uint64_t max_dirty, uint64_t target_dirty, double max_age,
- bool block_writes_upfront);
- ~ObjectCacher();
-
- void start() {
- flusher_thread.create("flusher");
- }
- void stop() {
- assert(flusher_thread.is_started());
- lock.Lock(); // hmm.. watch out for deadlock!
- flusher_stop = true;
- flusher_cond.Signal();
- lock.Unlock();
- flusher_thread.join();
- }
-
-
- class C_RetryRead;
-
-
- // non-blocking. async.
-
- /**
- * @note total read size must be <= INT_MAX, since
- * the return value is total bytes read
- */
- int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
- ZTracer::Trace *parent_trace = nullptr);
- int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
- ZTracer::Trace *parent_trace = nullptr);
- bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
- snapid_t snapid);
-
-private:
- // write blocking
- int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
- ZTracer::Trace *trace, Context *onfreespace);
- void maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
- bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);
-
-public:
- bool set_is_empty(ObjectSet *oset);
- bool set_is_cached(ObjectSet *oset);
- bool set_is_dirty_or_committing(ObjectSet *oset);
-
- bool flush_set(ObjectSet *oset, Context *onfinish=0);
- bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex,
- ZTracer::Trace *trace, Context *onfinish = 0);
- bool flush_all(Context *onfinish = 0);
-
- void purge_set(ObjectSet *oset);
-
- // returns # of bytes not released (ie non-clean)
- loff_t release_set(ObjectSet *oset);
- uint64_t release_all();
-
- void discard_set(ObjectSet *oset, const vector<ObjectExtent>& ex);
-
- /**
- * Retry any in-flight reads that get -ENOENT instead of marking
- * them zero, and get rid of any cached -ENOENTs.
- * After this is called and the cache's lock is unlocked,
- * any new requests will treat -ENOENT normally.
- */
- void clear_nonexistence(ObjectSet *oset);
-
-
- // cache sizes
- void set_max_dirty(uint64_t v) {
- max_dirty = v;
- }
- void set_target_dirty(int64_t v) {
- target_dirty = v;
- }
- void set_max_size(int64_t v) {
- max_size = v;
- }
- void set_max_dirty_age(double a) {
- max_dirty_age = make_timespan(a);
- }
- void set_max_objects(int64_t v) {
- max_objects = v;
- }
-
-
- // file functions
-
- /*** async+caching (non-blocking) file interface ***/
- int file_is_cached(ObjectSet *oset, file_layout_t *layout,
- snapid_t snapid, loff_t offset, uint64_t len) {
- vector<ObjectExtent> extents;
- Striper::file_to_extents(cct, oset->ino, layout, offset, len,
- oset->truncate_size, extents);
- return is_cached(oset, extents, snapid);
- }
-
- int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid,
- loff_t offset, uint64_t len, bufferlist *bl, int flags,
- Context *onfinish) {
- OSDRead *rd = prepare_read(snapid, bl, flags);
- Striper::file_to_extents(cct, oset->ino, layout, offset, len,
- oset->truncate_size, rd->extents);
- return readx(rd, oset, onfinish);
- }
-
- int file_write(ObjectSet *oset, file_layout_t *layout,
- const SnapContext& snapc, loff_t offset, uint64_t len,
- bufferlist& bl, ceph::real_time mtime, int flags) {
- OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
- Striper::file_to_extents(cct, oset->ino, layout, offset, len,
- oset->truncate_size, wr->extents);
- return writex(wr, oset, NULL);
- }
-
- bool file_flush(ObjectSet *oset, file_layout_t *layout,
- const SnapContext& snapc, loff_t offset, uint64_t len,
- Context *onfinish) {
- vector<ObjectExtent> extents;
- Striper::file_to_extents(cct, oset->ino, layout, offset, len,
- oset->truncate_size, extents);
- ZTracer::Trace trace;
- return flush_set(oset, extents, &trace, onfinish);
- }
-};
-
-
-inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
-{
- out << "bh[ " << &bh << " "
- << bh.start() << "~" << bh.length()
- << " " << bh.ob
- << " (" << bh.bl.length() << ")"
- << " v " << bh.last_write_tid;
- if (bh.get_journal_tid() != 0) {
- out << " j " << bh.get_journal_tid();
- }
- if (bh.is_tx()) out << " tx";
- if (bh.is_rx()) out << " rx";
- if (bh.is_dirty()) out << " dirty";
- if (bh.is_clean()) out << " clean";
- if (bh.is_zero()) out << " zero";
- if (bh.is_missing()) out << " missing";
- if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0];
- if (bh.error) out << " error=" << bh.error;
- out << "]";
- out << " waiters = {";
- for (map<loff_t, list<Context*> >::const_iterator it
- = bh.waitfor_read.begin();
- it != bh.waitfor_read.end(); ++it) {
- out << " " << it->first << "->[";
- for (list<Context*>::const_iterator lit = it->second.begin();
- lit != it->second.end(); ++lit) {
- out << *lit << ", ";
- }
- out << "]";
- }
- out << "}";
- return out;
-}
-
-inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os)
-{
- return out << "objectset[" << os.ino
- << " ts " << os.truncate_seq << "/" << os.truncate_size
- << " objects " << os.objects.size()
- << " dirty_or_tx " << os.dirty_or_tx
- << "]";
-}
-
-inline ostream& operator<<(ostream &out, const ObjectCacher::Object &ob)
-{
- out << "object["
- << ob.get_soid() << " oset " << ob.oset << dec
- << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
-
- if (ob.complete)
- out << " COMPLETE";
- if (!ob.exists)
- out << " !EXISTS";
-
- out << "]";
- return out;
-}
-
-#endif