/* * Uses a two-level B-tree to store a set of key-value pairs. * * September 2, 2012 * Eleanor Cawthon * eleanor.cawthon@inktank.com * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. */ #ifndef KVFLATBTREEASYNC_H_ #define KVFLATBTREEASYNC_H_ #define ESUICIDE 134 #define EPREFIX 136 #define EFIRSTOBJ 138 #include "key_value_store/key_value_structure.h" #include "include/utime.h" #include "include/types.h" #include "include/encoding.h" #include "common/Mutex.h" #include "common/Clock.h" #include "common/Formatter.h" #include "global/global_context.h" #include "include/rados/librados.hpp" #include #include #include #include using namespace std; using ceph::bufferlist; enum { ADD_PREFIX = 1, MAKE_OBJECT = 2, UNWRITE_OBJECT = 3, RESTORE_OBJECT = 4, REMOVE_OBJECT = 5, REMOVE_PREFIX = 6, AIO_MAKE_OBJECT = 7 }; struct rebalance_args; /** * stores information about a key in the index. * * prefix is "0" unless key is "", in which case it is "1". This ensures that * the object with key "" will always be the highest key in the index. */ struct key_data { string raw_key; string prefix; key_data() {} /** * @pre: key is a raw key (does not contain a prefix) */ key_data(string key) : raw_key(key) { raw_key == "" ? prefix = "1" : prefix = "0"; } bool operator==(key_data k) const { return ((raw_key == k.raw_key) && (prefix == k.prefix)); } bool operator!=(key_data k) const { return ((raw_key != k.raw_key) || (prefix != k.prefix)); } bool operator<(key_data k) const { return this->encoded() < k.encoded(); } bool operator>(key_data k) const { return this->encoded() > k.encoded(); } /** * parses the prefix from encoded and stores the data in this. * * @pre: encoded has a prefix */ void parse(string encoded) { prefix = encoded[0]; raw_key = encoded.substr(1,encoded.length()); } /** * returns a string containing the encoded (prefixed) key */ string encoded() const { return prefix + raw_key; } void encode(bufferlist &bl) const { ENCODE_START(1,1,bl); ::encode(raw_key, bl); ::encode(prefix, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator &p) { DECODE_START(1, p); ::decode(raw_key, p); ::decode(prefix, p); DECODE_FINISH(p); } }; WRITE_CLASS_ENCODER(key_data) /** * Stores information read from a librados object. */ struct object_data { key_data min_kdata; //the max key from the previous index entry key_data max_kdata; //the max key, from the index string name; //the object's name map omap; // the omap of the object bool unwritable; // an xattr that, if false, means an op is in // progress and other clients should not write to it. uint64_t version; //the version at time of read uint64_t size; //the number of elements in the omap object_data() : unwritable(false), version(0), size(0) {} object_data(string the_name) : name(the_name), unwritable(false), version(0), size(0) {} object_data(key_data min, key_data kdat, string the_name) : min_kdata(min), max_kdata(kdat), name(the_name), unwritable(false), version(0), size(0) {} object_data(key_data min, key_data kdat, string the_name, map the_omap) : min_kdata(min), max_kdata(kdat), name(the_name), omap(the_omap), unwritable(false), version(0), size(0) {} object_data(key_data min, key_data kdat, string the_name, int the_version) : min_kdata(min), max_kdata(kdat), name(the_name), unwritable(false), version(the_version), size(0) {} void encode(bufferlist &bl) const { ENCODE_START(1,1,bl); ::encode(min_kdata, bl); ::encode(max_kdata, bl); ::encode(name, bl); ::encode(omap, bl); ::encode(unwritable, bl); ::encode(version, bl); ::encode(size, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator &p) { DECODE_START(1, p); ::decode(min_kdata, p); ::decode(max_kdata, p); ::decode(name, p); ::decode(omap, p); ::decode(unwritable, p); ::decode(version, p); ::decode(size, p); DECODE_FINISH(p); } }; WRITE_CLASS_ENCODER(object_data) /** * information about objects to be created by a split or merge - stored in the * index_data. */ struct create_data { key_data min; key_data max; string obj; create_data() {} create_data(key_data n, key_data x, string o) : min(n), max(x), obj(o) {} create_data(object_data o) : min(o.min_kdata), max(o.max_kdata), obj(o.name) {} create_data & operator=(const create_data &c) { min = c.min; max = c.max; obj = c.obj; return *this; } void encode(bufferlist &bl) const { ENCODE_START(1,1,bl); ::encode(min, bl); ::encode(max, bl); ::encode(obj, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator &p) { DECODE_START(1, p); ::decode(min, p); ::decode(max, p); ::decode(obj, p); DECODE_FINISH(p); } }; WRITE_CLASS_ENCODER(create_data) /** * information about objects to be deleted by a split or merge - stored in the * index_data. */ struct delete_data { key_data min; key_data max; string obj; uint64_t version; delete_data() : version(0) {} delete_data(key_data n, key_data x, string o, uint64_t v) : min(n), max(x), obj(o), version(v) {} delete_data & operator=(const delete_data &d) { min = d.min; max = d.max; obj = d.obj; version = d.version; return *this; } void encode(bufferlist &bl) const { ENCODE_START(1,1,bl); ::encode(min, bl); ::encode(max, bl); ::encode(obj, bl); ::encode(version, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator &p) { DECODE_START(1, p); ::decode(min, p); ::decode(max, p); ::decode(obj, p); ::decode(version, p); DECODE_FINISH(p); } }; WRITE_CLASS_ENCODER(delete_data) /** * The index object is a key value map that stores * the highest key stored in an object as keys, and an index_data * as the corresponding value. The index_data contains the encoded * high and low keys (where keys in this object are > min_kdata and * <= kdata), the name of the librados object where keys containing * that range of keys are located, and information about split and * merge operations that may need to be cleaned up if a client dies. */ struct index_data { //the encoded key corresponding to the object key_data kdata; //"1" if there is a prefix (because a split or merge is //in progress), otherwise "" string prefix; //the kdata of the previous index entry key_data min_kdata; utime_t ts; //time that a split/merge started //objects to be created vector to_create; //objects to be deleted vector to_delete; //the name of the object where the key range is located. string obj; index_data() {} index_data(string raw_key) : kdata(raw_key) {} index_data(key_data max, key_data min, string o) : kdata(max), min_kdata(min), obj(o) {} index_data(create_data c) : kdata(c.max), min_kdata(c.min), obj(c.obj) {} bool operator<(const index_data &other) const { return (kdata.encoded() < other.kdata.encoded()); } //true if there is a prefix and now - ts > timeout. bool is_timed_out(utime_t now, utime_t timeout) const; void encode(bufferlist &bl) const { ENCODE_START(1,1,bl); ::encode(prefix, bl); ::encode(min_kdata, bl); ::encode(kdata, bl); ::encode(ts, bl); ::encode(to_create, bl); ::encode(to_delete, bl); ::encode(obj, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator &p) { DECODE_START(1, p); ::decode(prefix, p); ::decode(min_kdata, p); ::decode(kdata, p); ::decode(ts, p); ::decode(to_create, p); ::decode(to_delete, p); ::decode(obj, p); DECODE_FINISH(p); } /* * Prints a string representation of the information, in the following format: * (min_kdata/ * kdata, * prefix * ts * elements of to_create, organized into (high key| obj name) * ; * elements of to_delete, organized into (high key| obj name | version number) * : * val) */ string str() const { stringstream strm; strm << '(' << min_kdata.encoded() << "/" << kdata.encoded() << ',' << prefix; if (prefix == "1") { strm << ts.sec() << '.' << ts.usec(); for(vector::const_iterator it = to_create.begin(); it != to_create.end(); ++it) { strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|' << it->obj << ')'; } strm << ';'; for(vector::const_iterator it = to_delete.begin(); it != to_delete.end(); ++it) { strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|' << it->obj << '|' << it->version << ')'; } strm << ':'; } strm << obj << ')'; return strm.str(); } }; WRITE_CLASS_ENCODER(index_data) /** * Structure to store information read from the index for reuse. */ class IndexCache { protected: map > k2itmap; map t2kmap; int cache_size; public: IndexCache(int n) : cache_size(n) {} /** * Inserts idata into the cache and removes whatever key mapped to before. * If the cache is full, pops the oldest entry. */ void push(const string &key, const index_data &idata); /** * Inserts idata into the cache. If idata.kdata is already in the cache, * replaces the old one. Pops the oldest entry if the cache is full. */ void push(const index_data &idata); /** * Removes the oldest entry from the cache */ void pop(); /** * Removes the value associated with kdata from both maps */ void erase(key_data kdata); /** * gets the idata where key belongs. If none, returns -ENODATA. */ int get(const string &key, index_data *idata) const; /** * Gets the idata where key goes and the one after it. If there are not * valid entries for both of them, returns -ENODATA. */ int get(const string &key, index_data *idata, index_data * next_idata) const; void clear(); }; class KvFlatBtreeAsync; /** * These are used internally to translate aio operations into useful thread * arguments. */ struct aio_set_args { KvFlatBtreeAsync * kvba; string key; bufferlist val; bool exc; callback cb; void * cb_args; int * err; }; struct aio_rm_args { KvFlatBtreeAsync * kvba; string key; callback cb; void * cb_args; int * err; }; struct aio_get_args { KvFlatBtreeAsync * kvba; string key; bufferlist * val; bool exc; callback cb; void * cb_args; int * err; }; class KvFlatBtreeAsync : public KeyValueStructure { protected: //don't change these once operations start being called - they are not //protected with mutexes! int k; string index_name; librados::IoCtx io_ctx; string rados_id; string client_name; librados::Rados rados; string pool_name; injection_t interrupt; int wait_ms; utime_t timeout; //declare a client dead if it goes this long without //finishing a split/merge int cache_size; double cache_refresh; //read cache_size / cache_refresh entries each time the //index is read bool verbose;//if true, display lots of debug output //shared variables protected with mutexes Mutex client_index_lock; int client_index; //names of new objects are client_name.client_index Mutex icache_lock; IndexCache icache; friend struct index_data; /** * finds the object in the index with the lowest key value that is greater * than idata.kdata. If idata.kdata is the max key, returns -EOVERFLOW. If * idata has a prefix and has timed out, cleans up. * * @param idata: idata for the object to search for. * @param out_data: the idata for the next object. * * @pre: idata must contain a key_data. * @post: out_data contains complete information */ int next(const index_data &idata, index_data * out_data); /** * finds the object in the index with the lowest key value that is greater * than idata.kdata. If idata.kdata is the lowest key, returns -ERANGE If * idata has a prefix and has timed out, cleans up. * * @param idata: idata for the object to search for. * @param out_data: the idata for the next object. * * @pre: idata must contain a key_data. * @post: out_data contains complete information */ int prev(const index_data &idata, index_data * out_data); /** * finds the index_data where a key belongs, from cache if possible. If it * reads the index object, it will read the first cache_size entries after * key and put them in the cache. * * @param key: the key to search for * @param idata: the index_data for the first index value such that idata.key * is greater than key. * @param next_idata: if not NULL, this will be set to the idata after idata * @param force_update: if false, will try to read from cache first. * * @pre: key is not encoded * @post: idata contains complete information * stored */ int read_index(const string &key, index_data * idata, index_data * next_idata, bool force_update); /** * Reads obj and generates information about it. Iff the object has >= 2k * entries, reads the whole omap and then splits it. * * @param idata: index data for the object being split * @pre: idata contains a key and an obj * @post: idata.obj has been split and icache has been updated * @return -EBALANCE if obj does not need to be split, 0 if split successful, * error from read_object or perform_ops if there is one. */ int split(const index_data &idata); /** * reads o1 and the next object after o1 and, if necessary, rebalances them. * if hk1 is the highest key in the index, calls rebalance on the next highest * key. * * @param idata: index data for the object being rebalanced * @param next_idata: index data for the next object. If blank, will read. * @pre: idata contains a key and an obj * @post: idata.obj has been rebalanced and icache has been updated * @return -EBALANCE if no change needed, -ENOENT if o1 does not exist, * -ECANCELED if second object does not exist, otherwise, error from * perform_ops */ int rebalance(const index_data &idata1, const index_data &next_idata); /** * performs an ObjectReadOperation to populate odata * * @post: odata has all information about obj except for key (which is "") */ int read_object(const string &obj, object_data * odata); /** * performs a maybe_read_for_balance ObjectOperation so the omap is only * read if the object is out of bounds. */ int read_object(const string &obj, rebalance_args * args); /** * sets up owo to change the index in preparation for a split/merge. * * @param to_create: vector of object_data to be created. * @param to_delete: vector of object_data to be deleted. * @param owo: the ObjectWriteOperation to set up * @param idata: will be populated by index data for this op. * @param err: error code reference to pass to omap_cmp * @pre: entries in to_create and to_delete must have keys and names. */ void set_up_prefix_index( const vector &to_create, const vector &to_delete, librados::ObjectWriteOperation * owo, index_data * idata, int * err); /** * sets up all make, mark, restore, and delete ops, as well as the remove * prefix op, based on idata. * * @param create_vector: vector of data about the objects to be created. * @pre: entries in create_data must have names and omaps and be in idata * order * @param delete_vector: vector of data about the objects to be deleted * @pre: entries in to_delete must have versions and be in idata order * @param ops: the owos to set up. the pair is a pair of op identifiers * and names of objects - set_up_ops fills these in. * @pre: ops must be the correct size and the ObjectWriteOperation pointers * must be valid. * @param idata: the idata with information about how to set up the ops * @pre: idata has valid to_create and to_delete * @param err: the int to get the error value for omap_cmp */ void set_up_ops( const vector &create_vector, const vector &delete_vector, vector, librados::ObjectWriteOperation*> > * ops, const index_data &idata, int * err); /** * sets up owo to exclusive create, set omap to to_set, and set * unwritable to "0" */ void set_up_make_object( const map &to_set, librados::ObjectWriteOperation *owo); /** * sets up owo to assert object version and that object version is * writable, * then mark it unwritable. * * @param ver: if this is 0, no version is asserted. */ void set_up_unwrite_object( const int &ver, librados::ObjectWriteOperation *owo); /** * sets up owo to assert that an object is unwritable and then mark it * writable */ void set_up_restore_object( librados::ObjectWriteOperation *owo); /** * sets up owo to assert that the object is unwritable and then remove it */ void set_up_delete_object( librados::ObjectWriteOperation *owo); /** * perform the operations in ops and handles errors. * * @param debug_prefix: what to print at the beginning of debug output * @param idata: the idata for the object being operated on, to be * passed to cleanup if necessary * @param ops: this contains an int identifying the type of op, * a string that is the name of the object to operate on, and a pointer * to the ObjectWriteOperation to use. All of this must be complete. * @post: all operations are performed and most errors are handled * (e.g., cleans up if an assertion fails). If an unknown error is found, * returns it. */ int perform_ops( const string &debug_prefix, const index_data &idata, vector, librados::ObjectWriteOperation*> > * ops); /** * Called when a client discovers that another client has died during a * split or a merge. cleans up after that client. * * @param idata: the index data parsed from the index entry left by the dead * client. * @param error: the error that caused the client to realize the other client * died (should be -ENOENT or -ETIMEDOUT) * @post: rolls forward if -ENOENT, otherwise rolls back. */ int cleanup(const index_data &idata, const int &error); /** * does the ObjectWriteOperation and splits, reads the index, and/or retries * until success. */ int set_op(const string &key, const bufferlist &val, bool update_on_existing, index_data &idata); /** * does the ObjectWriteOperation and merges, reads the index, and/or retries * until success. */ int remove_op(const string &key, index_data &idata, index_data &next_idata); /** * does the ObjectWriteOperation and reads the index and/or retries * until success. */ int get_op(const string &key, bufferlist * val, index_data &idata); /** * does the ObjectWriteOperation and splits, reads the index, and/or retries * until success. */ int handle_set_rm_errors(int &err, string key, string obj, index_data * idata, index_data * next_idata); /** * called by aio_set, aio_remove, and aio_get, respectively. */ static void* pset(void *ptr); static void* prm(void *ptr); static void* pget(void *ptr); public: //interruption methods, for correctness testing /** * returns 0 */ int nothing() override; /** * 10% chance of waiting wait_ms seconds */ int wait() override; /** * 10% chance of killing the client. */ int suicide() override; KvFlatBtreeAsync(int k_val, string name, int cache, double cache_r, bool verb) : k(k_val), index_name("index_object"), rados_id(name), client_name(string(name).append(".")), pool_name("rbd"), interrupt(&KeyValueStructure::nothing), wait_ms(0), timeout(100000,0), cache_size(cache), cache_refresh(cache_r), verbose(verb), client_index_lock("client_index_lock"), client_index(0), icache_lock("icache_lock"), icache(cache) {} /** * creates a string with an int at the end. * * @param s: the string on the left * @param i: the int to be appended to the string * @return the string */ static string to_string(string s, int i); /** * returns in encoded */ static bufferlist to_bl(const string &in) { bufferlist bl; bl.append(in); return bl; } /** * returns idata encoded; */ static bufferlist to_bl(const index_data &idata) { bufferlist bl; idata.encode(bl); return bl; } /** * returns the rados_id of this KvFlatBtreeAsync */ string get_name(); /** * sets this kvba to call inject before every ObjectWriteOperation. * If inject is wait and wait_time is set, wait will have a 10% chance of * sleeping for waite_time miliseconds. */ void set_inject(injection_t inject, int wait_time) override; /** * sets up the rados and io_ctx of this KvFlatBtreeAsync. If the don't already * exist, creates the index and max object. */ int setup(int argc, const char** argv) override; int set(const string &key, const bufferlist &val, bool update_on_existing) override; int remove(const string &key) override; /** * returns true if all of the following are true: * * all objects are accounted for in the index or a prefix * (i.e., no floating objects) * all objects have k <= size <= 2k * all keys in an object are within the specified predicted by the index * * if any of those fails, states that the problem(s) are, and prints str(). * * @pre: no operations are in progress */ bool is_consistent() override; /** * returns an ASCII representation of the index and sub objects, showing * stats about each object and all omaps. Don't use if you have more than * about 10 objects. */ string str() override; int get(const string &key, bufferlist *val) override; //async versions of these methods void aio_get(const string &key, bufferlist *val, callback cb, void *cb_args, int * err) override; void aio_set(const string &key, const bufferlist &val, bool exclusive, callback cb, void * cb_args, int * err) override; void aio_remove(const string &key, callback cb, void *cb_args, int * err) override; //these methods that deal with multiple keys at once are efficient, but make //no guarantees about atomicity! /** * Removes all objects and resets the store as if setup had just run. Makes no * attempt to do this safely - make sure this is the only operation running * when it is called! */ int remove_all() override; /** * This does not add prefixes to the index and therefore DOES NOT guarantee * consistency! It is ONLY safe if there is only one instance at a time. * It follows the same general logic as a rebalance, but * with all objects that contain any of the keys in in_map. It is O(n), where * n is the number of librados objects it has to change. Higher object sizes * (i.e., k values) also decrease the efficiency of this method because it * copies all of the entries in each object it modifies. Writing new objects * is done in parallel. * * This is efficient if: * * other clients are very unlikely to be modifying any of the objects while * this operation is in progress * * The entries in in_map are close together * * It is especially efficient for initially entering lots of entries into * an empty structure. * * It is very inefficient compared to setting one key and/or will starve if: * * other clients are modifying the objects it tries to modify * * The keys are distributed across the range of keys in the store * * there is a small number of keys compared to k */ int set_many(const map &in_map) override; int get_all_keys(std::set *keys) override; int get_all_keys_and_values(map *kv_map) override; }; #endif /* KVFLATBTREEASYNC_H_ */