2 * Uses a two-level B-tree to store a set of key-value pairs.
6 * eleanor.cawthon@inktank.com
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #ifndef KVFLATBTREEASYNC_H_
15 #define KVFLATBTREEASYNC_H_
21 #include "key_value_store/key_value_structure.h"
22 #include "include/utime.h"
23 #include "include/types.h"
24 #include "include/encoding.h"
25 #include "common/Mutex.h"
26 #include "common/Clock.h"
27 #include "common/Formatter.h"
28 #include "global/global_context.h"
29 #include "include/rados/librados.hpp"
36 using ceph::bufferlist;
48 struct rebalance_args;
52 * stores information about a key in the index.
54 * prefix is "0" unless key is "", in which case it is "1". This ensures that
55 * the object with key "" will always be the highest key in the index.
65 * @pre: key is a raw key (does not contain a prefix)
70 raw_key == "" ? prefix = "1" : prefix = "0";
73 bool operator==(key_data k) const {
74 return ((raw_key == k.raw_key) && (prefix == k.prefix));
77 bool operator!=(key_data k) const {
78 return ((raw_key != k.raw_key) || (prefix != k.prefix));
81 bool operator<(key_data k) const {
82 return this->encoded() < k.encoded();
85 bool operator>(key_data k) const {
86 return this->encoded() > k.encoded();
90 * parses the prefix from encoded and stores the data in this.
92 * @pre: encoded has a prefix
94 void parse(string encoded) {
96 raw_key = encoded.substr(1,encoded.length());
100 * returns a string containing the encoded (prefixed) key
102 string encoded() const {
103 return prefix + raw_key;
106 void encode(bufferlist &bl) const {
107 ENCODE_START(1,1,bl);
108 ::encode(raw_key, bl);
109 ::encode(prefix, bl);
112 void decode(bufferlist::iterator &p) {
114 ::decode(raw_key, p);
119 WRITE_CLASS_ENCODER(key_data)
123 * Stores information read from a librados object.
126 key_data min_kdata; //the max key from the previous index entry
127 key_data max_kdata; //the max key, from the index
128 string name; //the object's name
129 map<std::string, bufferlist> omap; // the omap of the object
130 bool unwritable; // an xattr that, if false, means an op is in
131 // progress and other clients should not write to it.
132 uint64_t version; //the version at time of read
133 uint64_t size; //the number of elements in the omap
141 object_data(string the_name)
148 object_data(key_data min, key_data kdat, string the_name)
157 object_data(key_data min, key_data kdat, string the_name,
158 map<std::string, bufferlist> the_omap)
168 object_data(key_data min, key_data kdat, string the_name, int the_version)
173 version(the_version),
177 void encode(bufferlist &bl) const {
178 ENCODE_START(1,1,bl);
179 ::encode(min_kdata, bl);
180 ::encode(max_kdata, bl);
183 ::encode(unwritable, bl);
184 ::encode(version, bl);
188 void decode(bufferlist::iterator &p) {
190 ::decode(min_kdata, p);
191 ::decode(max_kdata, p);
194 ::decode(unwritable, p);
195 ::decode(version, p);
200 WRITE_CLASS_ENCODER(object_data)
203 * information about objects to be created by a split or merge - stored in the
214 create_data(key_data n, key_data x, string o)
220 create_data(object_data o)
226 create_data & operator=(const create_data &c) {
233 void encode(bufferlist &bl) const {
234 ENCODE_START(1,1,bl);
240 void decode(bufferlist::iterator &p) {
248 WRITE_CLASS_ENCODER(create_data)
251 * information about objects to be deleted by a split or merge - stored in the
264 delete_data(key_data n, key_data x, string o, uint64_t v)
271 delete_data & operator=(const delete_data &d) {
280 void encode(bufferlist &bl) const {
281 ENCODE_START(1,1,bl);
285 ::encode(version, bl);
288 void decode(bufferlist::iterator &p) {
293 ::decode(version, p);
297 WRITE_CLASS_ENCODER(delete_data)
300 * The index object is a key value map that stores
301 * the highest key stored in an object as keys, and an index_data
302 * as the corresponding value. The index_data contains the encoded
303 * high and low keys (where keys in this object are > min_kdata and
304 * <= kdata), the name of the librados object where keys containing
305 * that range of keys are located, and information about split and
306 * merge operations that may need to be cleaned up if a client dies.
309 //the encoded key corresponding to the object
312 //"1" if there is a prefix (because a split or merge is
313 //in progress), otherwise ""
316 //the kdata of the previous index entry
319 utime_t ts; //time that a split/merge started
321 //objects to be created
322 vector<create_data > to_create;
324 //objects to be deleted
325 vector<delete_data > to_delete;
327 //the name of the object where the key range is located.
333 index_data(string raw_key)
337 index_data(key_data max, key_data min, string o)
343 index_data(create_data c)
349 bool operator<(const index_data &other) const {
350 return (kdata.encoded() < other.kdata.encoded());
353 //true if there is a prefix and now - ts > timeout.
354 bool is_timed_out(utime_t now, utime_t timeout) const;
356 void encode(bufferlist &bl) const {
357 ENCODE_START(1,1,bl);
358 ::encode(prefix, bl);
359 ::encode(min_kdata, bl);
362 ::encode(to_create, bl);
363 ::encode(to_delete, bl);
367 void decode(bufferlist::iterator &p) {
370 ::decode(min_kdata, p);
373 ::decode(to_create, p);
374 ::decode(to_delete, p);
380 * Prints a string representation of the information, in the following format:
385 * elements of to_create, organized into (high key| obj name)
387 * elements of to_delete, organized into (high key| obj name | version number)
393 strm << '(' << min_kdata.encoded() << "/" << kdata.encoded() << ','
396 strm << ts.sec() << '.' << ts.usec();
397 for(vector<create_data>::const_iterator it = to_create.begin();
398 it != to_create.end(); ++it) {
399 strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|'
403 for(vector<delete_data >::const_iterator it = to_delete.begin();
404 it != to_delete.end(); ++it) {
405 strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|'
407 << it->version << ')';
415 WRITE_CLASS_ENCODER(index_data)
418 * Structure to store information read from the index for reuse.
422 map<key_data, pair<index_data, utime_t> > k2itmap;
423 map<utime_t, key_data> t2kmap;
431 * Inserts idata into the cache and removes whatever key mapped to before.
432 * If the cache is full, pops the oldest entry.
434 void push(const string &key, const index_data &idata);
437 * Inserts idata into the cache. If idata.kdata is already in the cache,
438 * replaces the old one. Pops the oldest entry if the cache is full.
440 void push(const index_data &idata);
443 * Removes the oldest entry from the cache
448 * Removes the value associated with kdata from both maps
450 void erase(key_data kdata);
453 * gets the idata where key belongs. If none, returns -ENODATA.
455 int get(const string &key, index_data *idata) const;
458 * Gets the idata where key goes and the one after it. If there are not
459 * valid entries for both of them, returns -ENODATA.
461 int get(const string &key, index_data *idata, index_data * next_idata) const;
465 class KvFlatBtreeAsync;
469 * These are used internally to translate aio operations into useful thread
472 struct aio_set_args {
473 KvFlatBtreeAsync * kvba;
483 KvFlatBtreeAsync * kvba;
490 struct aio_get_args {
491 KvFlatBtreeAsync * kvba;
500 class KvFlatBtreeAsync : public KeyValueStructure {
503 //don't change these once operations start being called - they are not
504 //protected with mutexes!
507 librados::IoCtx io_ctx;
510 librados::Rados rados;
512 injection_t interrupt;
514 utime_t timeout; //declare a client dead if it goes this long without
515 //finishing a split/merge
517 double cache_refresh; //read cache_size / cache_refresh entries each time the
519 bool verbose;//if true, display lots of debug output
521 //shared variables protected with mutexes
522 Mutex client_index_lock;
523 int client_index; //names of new objects are client_name.client_index
526 friend struct index_data;
529 * finds the object in the index with the lowest key value that is greater
530 * than idata.kdata. If idata.kdata is the max key, returns -EOVERFLOW. If
531 * idata has a prefix and has timed out, cleans up.
533 * @param idata: idata for the object to search for.
534 * @param out_data: the idata for the next object.
536 * @pre: idata must contain a key_data.
537 * @post: out_data contains complete information
539 int next(const index_data &idata, index_data * out_data);
542 * finds the object in the index with the lowest key value that is greater
543 * than idata.kdata. If idata.kdata is the lowest key, returns -ERANGE If
544 * idata has a prefix and has timed out, cleans up.
546 * @param idata: idata for the object to search for.
547 * @param out_data: the idata for the next object.
549 * @pre: idata must contain a key_data.
550 * @post: out_data contains complete information
552 int prev(const index_data &idata, index_data * out_data);
555 * finds the index_data where a key belongs, from cache if possible. If it
556 * reads the index object, it will read the first cache_size entries after
557 * key and put them in the cache.
559 * @param key: the key to search for
560 * @param idata: the index_data for the first index value such that idata.key
561 * is greater than key.
562 * @param next_idata: if not NULL, this will be set to the idata after idata
563 * @param force_update: if false, will try to read from cache first.
565 * @pre: key is not encoded
566 * @post: idata contains complete information
569 int read_index(const string &key, index_data * idata,
570 index_data * next_idata, bool force_update);
573 * Reads obj and generates information about it. Iff the object has >= 2k
574 * entries, reads the whole omap and then splits it.
576 * @param idata: index data for the object being split
577 * @pre: idata contains a key and an obj
578 * @post: idata.obj has been split and icache has been updated
579 * @return -EBALANCE if obj does not need to be split, 0 if split successful,
580 * error from read_object or perform_ops if there is one.
582 int split(const index_data &idata);
585 * reads o1 and the next object after o1 and, if necessary, rebalances them.
586 * if hk1 is the highest key in the index, calls rebalance on the next highest
589 * @param idata: index data for the object being rebalanced
590 * @param next_idata: index data for the next object. If blank, will read.
591 * @pre: idata contains a key and an obj
592 * @post: idata.obj has been rebalanced and icache has been updated
593 * @return -EBALANCE if no change needed, -ENOENT if o1 does not exist,
594 * -ECANCELED if second object does not exist, otherwise, error from
597 int rebalance(const index_data &idata1, const index_data &next_idata);
600 * performs an ObjectReadOperation to populate odata
602 * @post: odata has all information about obj except for key (which is "")
604 int read_object(const string &obj, object_data * odata);
607 * performs a maybe_read_for_balance ObjectOperation so the omap is only
608 * read if the object is out of bounds.
610 int read_object(const string &obj, rebalance_args * args);
613 * sets up owo to change the index in preparation for a split/merge.
615 * @param to_create: vector of object_data to be created.
616 * @param to_delete: vector of object_data to be deleted.
617 * @param owo: the ObjectWriteOperation to set up
618 * @param idata: will be populated by index data for this op.
619 * @param err: error code reference to pass to omap_cmp
620 * @pre: entries in to_create and to_delete must have keys and names.
622 void set_up_prefix_index(
623 const vector<object_data> &to_create,
624 const vector<object_data> &to_delete,
625 librados::ObjectWriteOperation * owo,
630 * sets up all make, mark, restore, and delete ops, as well as the remove
631 * prefix op, based on idata.
633 * @param create_vector: vector of data about the objects to be created.
634 * @pre: entries in create_data must have names and omaps and be in idata
636 * @param delete_vector: vector of data about the objects to be deleted
637 * @pre: entries in to_delete must have versions and be in idata order
638 * @param ops: the owos to set up. the pair is a pair of op identifiers
639 * and names of objects - set_up_ops fills these in.
640 * @pre: ops must be the correct size and the ObjectWriteOperation pointers
642 * @param idata: the idata with information about how to set up the ops
643 * @pre: idata has valid to_create and to_delete
644 * @param err: the int to get the error value for omap_cmp
647 const vector<object_data> &create_vector,
648 const vector<object_data> &delete_vector,
649 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > * ops,
650 const index_data &idata,
654 * sets up owo to exclusive create, set omap to to_set, and set
657 void set_up_make_object(
658 const map<std::string, bufferlist> &to_set,
659 librados::ObjectWriteOperation *owo);
662 * sets up owo to assert object version and that object version is
664 * then mark it unwritable.
666 * @param ver: if this is 0, no version is asserted.
668 void set_up_unwrite_object(
669 const int &ver, librados::ObjectWriteOperation *owo);
672 * sets up owo to assert that an object is unwritable and then mark it
675 void set_up_restore_object(
676 librados::ObjectWriteOperation *owo);
679 * sets up owo to assert that the object is unwritable and then remove it
681 void set_up_delete_object(
682 librados::ObjectWriteOperation *owo);
685 * perform the operations in ops and handles errors.
687 * @param debug_prefix: what to print at the beginning of debug output
688 * @param idata: the idata for the object being operated on, to be
689 * passed to cleanup if necessary
690 * @param ops: this contains an int identifying the type of op,
691 * a string that is the name of the object to operate on, and a pointer
692 * to the ObjectWriteOperation to use. All of this must be complete.
693 * @post: all operations are performed and most errors are handled
694 * (e.g., cleans up if an assertion fails). If an unknown error is found,
697 int perform_ops( const string &debug_prefix,
698 const index_data &idata,
699 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > * ops);
702 * Called when a client discovers that another client has died during a
703 * split or a merge. cleans up after that client.
705 * @param idata: the index data parsed from the index entry left by the dead
707 * @param error: the error that caused the client to realize the other client
708 * died (should be -ENOENT or -ETIMEDOUT)
709 * @post: rolls forward if -ENOENT, otherwise rolls back.
711 int cleanup(const index_data &idata, const int &error);
714 * does the ObjectWriteOperation and splits, reads the index, and/or retries
717 int set_op(const string &key, const bufferlist &val,
718 bool update_on_existing, index_data &idata);
721 * does the ObjectWriteOperation and merges, reads the index, and/or retries
724 int remove_op(const string &key, index_data &idata, index_data &next_idata);
727 * does the ObjectWriteOperation and reads the index and/or retries
730 int get_op(const string &key, bufferlist * val, index_data &idata);
733 * does the ObjectWriteOperation and splits, reads the index, and/or retries
736 int handle_set_rm_errors(int &err, string key, string obj,
737 index_data * idata, index_data * next_idata);
740 * called by aio_set, aio_remove, and aio_get, respectively.
742 static void* pset(void *ptr);
743 static void* prm(void *ptr);
744 static void* pget(void *ptr);
748 //interruption methods, for correctness testing
752 int nothing() override;
754 * 10% chance of waiting wait_ms seconds
758 * 10% chance of killing the client.
760 int suicide() override;
762 KvFlatBtreeAsync(int k_val, string name, int cache, double cache_r,
765 index_name("index_object"),
767 client_name(string(name).append(".")),
769 interrupt(&KeyValueStructure::nothing),
773 cache_refresh(cache_r),
775 client_index_lock("client_index_lock"),
777 icache_lock("icache_lock"),
782 * creates a string with an int at the end.
784 * @param s: the string on the left
785 * @param i: the int to be appended to the string
788 static string to_string(string s, int i);
793 static bufferlist to_bl(const string &in) {
800 * returns idata encoded;
802 static bufferlist to_bl(const index_data &idata) {
809 * returns the rados_id of this KvFlatBtreeAsync
814 * sets this kvba to call inject before every ObjectWriteOperation.
815 * If inject is wait and wait_time is set, wait will have a 10% chance of
816 * sleeping for waite_time miliseconds.
818 void set_inject(injection_t inject, int wait_time) override;
821 * sets up the rados and io_ctx of this KvFlatBtreeAsync. If the don't already
822 * exist, creates the index and max object.
824 int setup(int argc, const char** argv) override;
826 int set(const string &key, const bufferlist &val,
827 bool update_on_existing) override;
829 int remove(const string &key) override;
832 * returns true if all of the following are true:
834 * all objects are accounted for in the index or a prefix
835 * (i.e., no floating objects)
836 * all objects have k <= size <= 2k
837 * all keys in an object are within the specified predicted by the index
839 * if any of those fails, states that the problem(s) are, and prints str().
841 * @pre: no operations are in progress
843 bool is_consistent() override;
846 * returns an ASCII representation of the index and sub objects, showing
847 * stats about each object and all omaps. Don't use if you have more than
850 string str() override;
852 int get(const string &key, bufferlist *val) override;
854 //async versions of these methods
855 void aio_get(const string &key, bufferlist *val, callback cb,
856 void *cb_args, int * err) override;
857 void aio_set(const string &key, const bufferlist &val, bool exclusive,
858 callback cb, void * cb_args, int * err) override;
859 void aio_remove(const string &key, callback cb, void *cb_args, int * err) override;
861 //these methods that deal with multiple keys at once are efficient, but make
862 //no guarantees about atomicity!
865 * Removes all objects and resets the store as if setup had just run. Makes no
866 * attempt to do this safely - make sure this is the only operation running
869 int remove_all() override;
872 * This does not add prefixes to the index and therefore DOES NOT guarantee
873 * consistency! It is ONLY safe if there is only one instance at a time.
874 * It follows the same general logic as a rebalance, but
875 * with all objects that contain any of the keys in in_map. It is O(n), where
876 * n is the number of librados objects it has to change. Higher object sizes
877 * (i.e., k values) also decrease the efficiency of this method because it
878 * copies all of the entries in each object it modifies. Writing new objects
879 * is done in parallel.
881 * This is efficient if:
882 * * other clients are very unlikely to be modifying any of the objects while
883 * this operation is in progress
884 * * The entries in in_map are close together
885 * * It is especially efficient for initially entering lots of entries into
886 * an empty structure.
888 * It is very inefficient compared to setting one key and/or will starve if:
889 * * other clients are modifying the objects it tries to modify
890 * * The keys are distributed across the range of keys in the store
891 * * there is a small number of keys compared to k
893 int set_many(const map<string, bufferlist> &in_map) override;
895 int get_all_keys(std::set<string> *keys) override;
896 int get_all_keys_and_values(map<string,bufferlist> *kv_map) override;
900 #endif /* KVFLATBTREEASYNC_H_ */