X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fkey_value_store%2Fkv_flat_btree_async.h;fp=src%2Fceph%2Fsrc%2Fkey_value_store%2Fkv_flat_btree_async.h;h=f16ba6d9a99e479d3d09635e84774e751ac8107a;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/key_value_store/kv_flat_btree_async.h b/src/ceph/src/key_value_store/kv_flat_btree_async.h new file mode 100644 index 0000000..f16ba6d --- /dev/null +++ b/src/ceph/src/key_value_store/kv_flat_btree_async.h @@ -0,0 +1,900 @@ +/* + * Uses a two-level B-tree to store a set of key-value pairs. + * + * September 2, 2012 + * Eleanor Cawthon + * eleanor.cawthon@inktank.com + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef KVFLATBTREEASYNC_H_ +#define KVFLATBTREEASYNC_H_ + +#define ESUICIDE 134 +#define EPREFIX 136 +#define EFIRSTOBJ 138 + +#include "key_value_store/key_value_structure.h" +#include "include/utime.h" +#include "include/types.h" +#include "include/encoding.h" +#include "common/Mutex.h" +#include "common/Clock.h" +#include "common/Formatter.h" +#include "global/global_context.h" +#include "include/rados/librados.hpp" +#include +#include +#include +#include + +using namespace std; +using ceph::bufferlist; + +enum { + ADD_PREFIX = 1, + MAKE_OBJECT = 2, + UNWRITE_OBJECT = 3, + RESTORE_OBJECT = 4, + REMOVE_OBJECT = 5, + REMOVE_PREFIX = 6, + AIO_MAKE_OBJECT = 7 +}; + +struct rebalance_args; + + +/** + * stores information about a key in the index. + * + * prefix is "0" unless key is "", in which case it is "1". This ensures that + * the object with key "" will always be the highest key in the index. + */ +struct key_data { + string raw_key; + string prefix; + + key_data() + {} + + /** + * @pre: key is a raw key (does not contain a prefix) + */ + key_data(string key) + : raw_key(key) + { + raw_key == "" ? prefix = "1" : prefix = "0"; + } + + bool operator==(key_data k) const { + return ((raw_key == k.raw_key) && (prefix == k.prefix)); + } + + bool operator!=(key_data k) const { + return ((raw_key != k.raw_key) || (prefix != k.prefix)); + } + + bool operator<(key_data k) const { + return this->encoded() < k.encoded(); + } + + bool operator>(key_data k) const { + return this->encoded() > k.encoded(); + } + + /** + * parses the prefix from encoded and stores the data in this. + * + * @pre: encoded has a prefix + */ + void parse(string encoded) { + prefix = encoded[0]; + raw_key = encoded.substr(1,encoded.length()); + } + + /** + * returns a string containing the encoded (prefixed) key + */ + string encoded() const { + return prefix + raw_key; + } + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + ::encode(raw_key, bl); + ::encode(prefix, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &p) { + DECODE_START(1, p); + ::decode(raw_key, p); + ::decode(prefix, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(key_data) + + +/** + * Stores information read from a librados object. + */ +struct object_data { + key_data min_kdata; //the max key from the previous index entry + key_data max_kdata; //the max key, from the index + string name; //the object's name + map omap; // the omap of the object + bool unwritable; // an xattr that, if false, means an op is in + // progress and other clients should not write to it. + uint64_t version; //the version at time of read + uint64_t size; //the number of elements in the omap + + object_data() + : unwritable(false), + version(0), + size(0) + {} + + object_data(string the_name) + : name(the_name), + unwritable(false), + version(0), + size(0) + {} + + object_data(key_data min, key_data kdat, string the_name) + : min_kdata(min), + max_kdata(kdat), + name(the_name), + unwritable(false), + version(0), + size(0) + {} + + object_data(key_data min, key_data kdat, string the_name, + map the_omap) + : min_kdata(min), + max_kdata(kdat), + name(the_name), + omap(the_omap), + unwritable(false), + version(0), + size(0) + {} + + object_data(key_data min, key_data kdat, string the_name, int the_version) + : min_kdata(min), + max_kdata(kdat), + name(the_name), + unwritable(false), + version(the_version), + size(0) + {} + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + ::encode(min_kdata, bl); + ::encode(max_kdata, bl); + ::encode(name, bl); + ::encode(omap, bl); + ::encode(unwritable, bl); + ::encode(version, bl); + ::encode(size, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &p) { + DECODE_START(1, p); + ::decode(min_kdata, p); + ::decode(max_kdata, p); + ::decode(name, p); + ::decode(omap, p); + ::decode(unwritable, p); + ::decode(version, p); + ::decode(size, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(object_data) + +/** + * information about objects to be created by a split or merge - stored in the + * index_data. + */ +struct create_data { + key_data min; + key_data max; + string obj; + + create_data() + {} + + create_data(key_data n, key_data x, string o) + : min(n), + max(x), + obj(o) + {} + + create_data(object_data o) + : min(o.min_kdata), + max(o.max_kdata), + obj(o.name) + {} + + create_data & operator=(const create_data &c) { + min = c.min; + max = c.max; + obj = c.obj; + return *this; + } + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + ::encode(min, bl); + ::encode(max, bl); + ::encode(obj, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &p) { + DECODE_START(1, p); + ::decode(min, p); + ::decode(max, p); + ::decode(obj, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(create_data) + +/** + * information about objects to be deleted by a split or merge - stored in the + * index_data. + */ +struct delete_data { + key_data min; + key_data max; + string obj; + uint64_t version; + + delete_data() + : version(0) + {} + + delete_data(key_data n, key_data x, string o, uint64_t v) + : min(n), + max(x), + obj(o), + version(v) + {} + + delete_data & operator=(const delete_data &d) { + min = d.min; + max = d.max; + obj = d.obj; + version = d.version; + return *this; + } + + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + ::encode(min, bl); + ::encode(max, bl); + ::encode(obj, bl); + ::encode(version, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &p) { + DECODE_START(1, p); + ::decode(min, p); + ::decode(max, p); + ::decode(obj, p); + ::decode(version, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(delete_data) + +/** + * The index object is a key value map that stores + * the highest key stored in an object as keys, and an index_data + * as the corresponding value. The index_data contains the encoded + * high and low keys (where keys in this object are > min_kdata and + * <= kdata), the name of the librados object where keys containing + * that range of keys are located, and information about split and + * merge operations that may need to be cleaned up if a client dies. + */ +struct index_data { + //the encoded key corresponding to the object + key_data kdata; + + //"1" if there is a prefix (because a split or merge is + //in progress), otherwise "" + string prefix; + + //the kdata of the previous index entry + key_data min_kdata; + + utime_t ts; //time that a split/merge started + + //objects to be created + vector to_create; + + //objects to be deleted + vector to_delete; + + //the name of the object where the key range is located. + string obj; + + index_data() + {} + + index_data(string raw_key) + : kdata(raw_key) + {} + + index_data(key_data max, key_data min, string o) + : kdata(max), + min_kdata(min), + obj(o) + {} + + index_data(create_data c) + : kdata(c.max), + min_kdata(c.min), + obj(c.obj) + {} + + bool operator<(const index_data &other) const { + return (kdata.encoded() < other.kdata.encoded()); + } + + //true if there is a prefix and now - ts > timeout. + bool is_timed_out(utime_t now, utime_t timeout) const; + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + ::encode(prefix, bl); + ::encode(min_kdata, bl); + ::encode(kdata, bl); + ::encode(ts, bl); + ::encode(to_create, bl); + ::encode(to_delete, bl); + ::encode(obj, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator &p) { + DECODE_START(1, p); + ::decode(prefix, p); + ::decode(min_kdata, p); + ::decode(kdata, p); + ::decode(ts, p); + ::decode(to_create, p); + ::decode(to_delete, p); + ::decode(obj, p); + DECODE_FINISH(p); + } + + /* + * Prints a string representation of the information, in the following format: + * (min_kdata/ + * kdata, + * prefix + * ts + * elements of to_create, organized into (high key| obj name) + * ; + * elements of to_delete, organized into (high key| obj name | version number) + * : + * val) + */ + string str() const { + stringstream strm; + strm << '(' << min_kdata.encoded() << "/" << kdata.encoded() << ',' + << prefix; + if (prefix == "1") { + strm << ts.sec() << '.' << ts.usec(); + for(vector::const_iterator it = to_create.begin(); + it != to_create.end(); ++it) { + strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|' + << it->obj << ')'; + } + strm << ';'; + for(vector::const_iterator it = to_delete.begin(); + it != to_delete.end(); ++it) { + strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|' + << it->obj << '|' + << it->version << ')'; + } + strm << ':'; + } + strm << obj << ')'; + return strm.str(); + } +}; +WRITE_CLASS_ENCODER(index_data) + +/** + * Structure to store information read from the index for reuse. + */ +class IndexCache { +protected: + map > k2itmap; + map t2kmap; + int cache_size; + +public: + IndexCache(int n) + : cache_size(n) + {} + /** + * Inserts idata into the cache and removes whatever key mapped to before. + * If the cache is full, pops the oldest entry. + */ + void push(const string &key, const index_data &idata); + + /** + * Inserts idata into the cache. If idata.kdata is already in the cache, + * replaces the old one. Pops the oldest entry if the cache is full. + */ + void push(const index_data &idata); + + /** + * Removes the oldest entry from the cache + */ + void pop(); + + /** + * Removes the value associated with kdata from both maps + */ + void erase(key_data kdata); + + /** + * gets the idata where key belongs. If none, returns -ENODATA. + */ + int get(const string &key, index_data *idata) const; + + /** + * Gets the idata where key goes and the one after it. If there are not + * valid entries for both of them, returns -ENODATA. + */ + int get(const string &key, index_data *idata, index_data * next_idata) const; + void clear(); +}; + +class KvFlatBtreeAsync; + + +/** + * These are used internally to translate aio operations into useful thread + * arguments. + */ +struct aio_set_args { + KvFlatBtreeAsync * kvba; + string key; + bufferlist val; + bool exc; + callback cb; + void * cb_args; + int * err; +}; + +struct aio_rm_args { + KvFlatBtreeAsync * kvba; + string key; + callback cb; + void * cb_args; + int * err; +}; + +struct aio_get_args { + KvFlatBtreeAsync * kvba; + string key; + bufferlist * val; + bool exc; + callback cb; + void * cb_args; + int * err; +}; + +class KvFlatBtreeAsync : public KeyValueStructure { +protected: + + //don't change these once operations start being called - they are not + //protected with mutexes! + int k; + string index_name; + librados::IoCtx io_ctx; + string rados_id; + string client_name; + librados::Rados rados; + string pool_name; + injection_t interrupt; + int wait_ms; + utime_t timeout; //declare a client dead if it goes this long without + //finishing a split/merge + int cache_size; + double cache_refresh; //read cache_size / cache_refresh entries each time the + //index is read + bool verbose;//if true, display lots of debug output + + //shared variables protected with mutexes + Mutex client_index_lock; + int client_index; //names of new objects are client_name.client_index + Mutex icache_lock; + IndexCache icache; + friend struct index_data; + + /** + * finds the object in the index with the lowest key value that is greater + * than idata.kdata. If idata.kdata is the max key, returns -EOVERFLOW. If + * idata has a prefix and has timed out, cleans up. + * + * @param idata: idata for the object to search for. + * @param out_data: the idata for the next object. + * + * @pre: idata must contain a key_data. + * @post: out_data contains complete information + */ + int next(const index_data &idata, index_data * out_data); + + /** + * finds the object in the index with the lowest key value that is greater + * than idata.kdata. If idata.kdata is the lowest key, returns -ERANGE If + * idata has a prefix and has timed out, cleans up. + * + * @param idata: idata for the object to search for. + * @param out_data: the idata for the next object. + * + * @pre: idata must contain a key_data. + * @post: out_data contains complete information + */ + int prev(const index_data &idata, index_data * out_data); + + /** + * finds the index_data where a key belongs, from cache if possible. If it + * reads the index object, it will read the first cache_size entries after + * key and put them in the cache. + * + * @param key: the key to search for + * @param idata: the index_data for the first index value such that idata.key + * is greater than key. + * @param next_idata: if not NULL, this will be set to the idata after idata + * @param force_update: if false, will try to read from cache first. + * + * @pre: key is not encoded + * @post: idata contains complete information + * stored + */ + int read_index(const string &key, index_data * idata, + index_data * next_idata, bool force_update); + + /** + * Reads obj and generates information about it. Iff the object has >= 2k + * entries, reads the whole omap and then splits it. + * + * @param idata: index data for the object being split + * @pre: idata contains a key and an obj + * @post: idata.obj has been split and icache has been updated + * @return -EBALANCE if obj does not need to be split, 0 if split successful, + * error from read_object or perform_ops if there is one. + */ + int split(const index_data &idata); + + /** + * reads o1 and the next object after o1 and, if necessary, rebalances them. + * if hk1 is the highest key in the index, calls rebalance on the next highest + * key. + * + * @param idata: index data for the object being rebalanced + * @param next_idata: index data for the next object. If blank, will read. + * @pre: idata contains a key and an obj + * @post: idata.obj has been rebalanced and icache has been updated + * @return -EBALANCE if no change needed, -ENOENT if o1 does not exist, + * -ECANCELED if second object does not exist, otherwise, error from + * perform_ops + */ + int rebalance(const index_data &idata1, const index_data &next_idata); + + /** + * performs an ObjectReadOperation to populate odata + * + * @post: odata has all information about obj except for key (which is "") + */ + int read_object(const string &obj, object_data * odata); + + /** + * performs a maybe_read_for_balance ObjectOperation so the omap is only + * read if the object is out of bounds. + */ + int read_object(const string &obj, rebalance_args * args); + + /** + * sets up owo to change the index in preparation for a split/merge. + * + * @param to_create: vector of object_data to be created. + * @param to_delete: vector of object_data to be deleted. + * @param owo: the ObjectWriteOperation to set up + * @param idata: will be populated by index data for this op. + * @param err: error code reference to pass to omap_cmp + * @pre: entries in to_create and to_delete must have keys and names. + */ + void set_up_prefix_index( + const vector &to_create, + const vector &to_delete, + librados::ObjectWriteOperation * owo, + index_data * idata, + int * err); + + /** + * sets up all make, mark, restore, and delete ops, as well as the remove + * prefix op, based on idata. + * + * @param create_vector: vector of data about the objects to be created. + * @pre: entries in create_data must have names and omaps and be in idata + * order + * @param delete_vector: vector of data about the objects to be deleted + * @pre: entries in to_delete must have versions and be in idata order + * @param ops: the owos to set up. the pair is a pair of op identifiers + * and names of objects - set_up_ops fills these in. + * @pre: ops must be the correct size and the ObjectWriteOperation pointers + * must be valid. + * @param idata: the idata with information about how to set up the ops + * @pre: idata has valid to_create and to_delete + * @param err: the int to get the error value for omap_cmp + */ + void set_up_ops( + const vector &create_vector, + const vector &delete_vector, + vector, librados::ObjectWriteOperation*> > * ops, + const index_data &idata, + int * err); + + /** + * sets up owo to exclusive create, set omap to to_set, and set + * unwritable to "0" + */ + void set_up_make_object( + const map &to_set, + librados::ObjectWriteOperation *owo); + + /** + * sets up owo to assert object version and that object version is + * writable, + * then mark it unwritable. + * + * @param ver: if this is 0, no version is asserted. + */ + void set_up_unwrite_object( + const int &ver, librados::ObjectWriteOperation *owo); + + /** + * sets up owo to assert that an object is unwritable and then mark it + * writable + */ + void set_up_restore_object( + librados::ObjectWriteOperation *owo); + + /** + * sets up owo to assert that the object is unwritable and then remove it + */ + void set_up_delete_object( + librados::ObjectWriteOperation *owo); + + /** + * perform the operations in ops and handles errors. + * + * @param debug_prefix: what to print at the beginning of debug output + * @param idata: the idata for the object being operated on, to be + * passed to cleanup if necessary + * @param ops: this contains an int identifying the type of op, + * a string that is the name of the object to operate on, and a pointer + * to the ObjectWriteOperation to use. All of this must be complete. + * @post: all operations are performed and most errors are handled + * (e.g., cleans up if an assertion fails). If an unknown error is found, + * returns it. + */ + int perform_ops( const string &debug_prefix, + const index_data &idata, + vector, librados::ObjectWriteOperation*> > * ops); + + /** + * Called when a client discovers that another client has died during a + * split or a merge. cleans up after that client. + * + * @param idata: the index data parsed from the index entry left by the dead + * client. + * @param error: the error that caused the client to realize the other client + * died (should be -ENOENT or -ETIMEDOUT) + * @post: rolls forward if -ENOENT, otherwise rolls back. + */ + int cleanup(const index_data &idata, const int &error); + + /** + * does the ObjectWriteOperation and splits, reads the index, and/or retries + * until success. + */ + int set_op(const string &key, const bufferlist &val, + bool update_on_existing, index_data &idata); + + /** + * does the ObjectWriteOperation and merges, reads the index, and/or retries + * until success. + */ + int remove_op(const string &key, index_data &idata, index_data &next_idata); + + /** + * does the ObjectWriteOperation and reads the index and/or retries + * until success. + */ + int get_op(const string &key, bufferlist * val, index_data &idata); + + /** + * does the ObjectWriteOperation and splits, reads the index, and/or retries + * until success. + */ + int handle_set_rm_errors(int &err, string key, string obj, + index_data * idata, index_data * next_idata); + + /** + * called by aio_set, aio_remove, and aio_get, respectively. + */ + static void* pset(void *ptr); + static void* prm(void *ptr); + static void* pget(void *ptr); +public: + + + //interruption methods, for correctness testing + /** + * returns 0 + */ + int nothing() override; + /** + * 10% chance of waiting wait_ms seconds + */ + int wait() override; + /** + * 10% chance of killing the client. + */ + int suicide() override; + +KvFlatBtreeAsync(int k_val, string name, int cache, double cache_r, + bool verb) + : k(k_val), + index_name("index_object"), + rados_id(name), + client_name(string(name).append(".")), + pool_name("rbd"), + interrupt(&KeyValueStructure::nothing), + wait_ms(0), + timeout(100000,0), + cache_size(cache), + cache_refresh(cache_r), + verbose(verb), + client_index_lock("client_index_lock"), + client_index(0), + icache_lock("icache_lock"), + icache(cache) + {} + + /** + * creates a string with an int at the end. + * + * @param s: the string on the left + * @param i: the int to be appended to the string + * @return the string + */ + static string to_string(string s, int i); + + /** + * returns in encoded + */ + static bufferlist to_bl(const string &in) { + bufferlist bl; + bl.append(in); + return bl; + } + + /** + * returns idata encoded; + */ + static bufferlist to_bl(const index_data &idata) { + bufferlist bl; + idata.encode(bl); + return bl; + } + + /** + * returns the rados_id of this KvFlatBtreeAsync + */ + string get_name(); + + /** + * sets this kvba to call inject before every ObjectWriteOperation. + * If inject is wait and wait_time is set, wait will have a 10% chance of + * sleeping for waite_time miliseconds. + */ + void set_inject(injection_t inject, int wait_time) override; + + /** + * sets up the rados and io_ctx of this KvFlatBtreeAsync. If the don't already + * exist, creates the index and max object. + */ + int setup(int argc, const char** argv) override; + + int set(const string &key, const bufferlist &val, + bool update_on_existing) override; + + int remove(const string &key) override; + + /** + * returns true if all of the following are true: + * + * all objects are accounted for in the index or a prefix + * (i.e., no floating objects) + * all objects have k <= size <= 2k + * all keys in an object are within the specified predicted by the index + * + * if any of those fails, states that the problem(s) are, and prints str(). + * + * @pre: no operations are in progress + */ + bool is_consistent() override; + + /** + * returns an ASCII representation of the index and sub objects, showing + * stats about each object and all omaps. Don't use if you have more than + * about 10 objects. + */ + string str() override; + + int get(const string &key, bufferlist *val) override; + + //async versions of these methods + void aio_get(const string &key, bufferlist *val, callback cb, + void *cb_args, int * err) override; + void aio_set(const string &key, const bufferlist &val, bool exclusive, + callback cb, void * cb_args, int * err) override; + void aio_remove(const string &key, callback cb, void *cb_args, int * err) override; + + //these methods that deal with multiple keys at once are efficient, but make + //no guarantees about atomicity! + + /** + * Removes all objects and resets the store as if setup had just run. Makes no + * attempt to do this safely - make sure this is the only operation running + * when it is called! + */ + int remove_all() override; + + /** + * This does not add prefixes to the index and therefore DOES NOT guarantee + * consistency! It is ONLY safe if there is only one instance at a time. + * It follows the same general logic as a rebalance, but + * with all objects that contain any of the keys in in_map. It is O(n), where + * n is the number of librados objects it has to change. Higher object sizes + * (i.e., k values) also decrease the efficiency of this method because it + * copies all of the entries in each object it modifies. Writing new objects + * is done in parallel. + * + * This is efficient if: + * * other clients are very unlikely to be modifying any of the objects while + * this operation is in progress + * * The entries in in_map are close together + * * It is especially efficient for initially entering lots of entries into + * an empty structure. + * + * It is very inefficient compared to setting one key and/or will starve if: + * * other clients are modifying the objects it tries to modify + * * The keys are distributed across the range of keys in the store + * * there is a small number of keys compared to k + */ + int set_many(const map &in_map) override; + + int get_all_keys(std::set *keys) override; + int get_all_keys_and_values(map *kv_map) override; + +}; + +#endif /* KVFLATBTREEASYNC_H_ */