Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / key_value_store / kv_flat_btree_async.cc
1 /*
2  * Key-value store using librados
3  *
4  * September 2, 2012
5  * Eleanor Cawthon
6  * eleanor.cawthon@inktank.com
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14 #include "key_value_store/key_value_structure.h"
15 #include "key_value_store/kv_flat_btree_async.h"
16 #include "key_value_store/kvs_arg_types.h"
17 #include "include/rados/librados.hpp"
18 #include "/usr/include/asm-generic/errno.h"
19 #include "/usr/include/asm-generic/errno-base.h"
20 #include "common/ceph_context.h"
21 #include "common/Clock.h"
22 #include "include/types.h"
23
24
25 #include <string>
26 #include <iostream>
27 #include <cassert>
28 #include <climits>
29 #include <cmath>
30 #include <sstream>
31 #include <stdlib.h>
32 #include <iterator>
33
34 using namespace std;
35 using ceph::bufferlist;
36
37 bool index_data::is_timed_out(utime_t now, utime_t timeout) const {
38   return prefix != "" && now - ts > timeout;
39 }
40
41 void IndexCache::clear() {
42   k2itmap.clear();
43   t2kmap.clear();
44 }
45
46 void IndexCache::push(const string &key, const index_data &idata) {
47   if (cache_size == 0) {
48     return;
49   }
50   index_data old_idata;
51   map<key_data, pair<index_data, utime_t> >::iterator old_it =
52       k2itmap.lower_bound(key_data(key));
53   if (old_it != k2itmap.end()) {
54     t2kmap.erase(old_it->second.second);
55     k2itmap.erase(old_it);
56   }
57   map<key_data, pair<index_data, utime_t> >::iterator new_it =
58       k2itmap.find(idata.kdata);
59   if (new_it != k2itmap.end()) {
60     utime_t old_time = new_it->second.second;
61     t2kmap.erase(old_time);
62   }
63   utime_t time = ceph_clock_now();
64   k2itmap[idata.kdata] = make_pair(idata, time);
65   t2kmap[time] = idata.kdata;
66   if ((int)k2itmap.size() > cache_size) {
67     pop();
68   }
69
70 }
71
72 void IndexCache::push(const index_data &idata) {
73   if (cache_size == 0) {
74     return;
75   }
76   if (k2itmap.count(idata.kdata) > 0) {
77     utime_t old_time = k2itmap[idata.kdata].second;
78     t2kmap.erase(old_time);
79     k2itmap.erase(idata.kdata);
80   }
81   utime_t time = ceph_clock_now();
82   k2itmap[idata.kdata] = make_pair(idata, time);
83   t2kmap[time] = idata.kdata;
84   if ((int)k2itmap.size() > cache_size) {
85     pop();
86   }
87 }
88
89 void IndexCache::pop() {
90   if (cache_size == 0) {
91     return;
92   }
93   map<utime_t, key_data>::iterator it = t2kmap.begin();
94   utime_t time = it->first;
95   key_data kdata = it->second;
96   k2itmap.erase(kdata);
97   t2kmap.erase(time);
98 }
99
100 void IndexCache::erase(key_data kdata) {
101   if (cache_size == 0) {
102     return;
103   }
104   if (k2itmap.count(kdata) > 0) {
105     utime_t c = k2itmap[kdata].second;
106     k2itmap.erase(kdata);
107     t2kmap.erase(c);
108   }
109 }
110
111 int IndexCache::get(const string &key, index_data *idata) const {
112   if (cache_size == 0) {
113     return -ENODATA;
114   }
115   if ((int)k2itmap.size() == 0) {
116     return -ENODATA;
117   }
118   map<key_data, pair<index_data, utime_t> >::const_iterator it =
119       k2itmap.lower_bound(key_data(key));
120   if (it == k2itmap.end() || !(it->second.first.min_kdata < key_data(key))) {
121     return -ENODATA;
122   } else {
123     *idata = it->second.first;
124   }
125   return 0;
126 }
127
128 int IndexCache::get(const string &key, index_data *idata,
129     index_data *next_idata) const {
130   if (cache_size == 0) {
131     return -ENODATA;
132   }
133   map<key_data, pair<index_data, utime_t> >::const_iterator it =
134       k2itmap.lower_bound(key_data(key));
135   if (it == k2itmap.end() || ++it == k2itmap.end()) {
136     return -ENODATA;
137   } else {
138     --it;
139     if (!(it->second.first.min_kdata < key_data(key))){
140       //stale, should be reread.
141       return -ENODATA;
142     } else {
143       *idata = it->second.first;
144       ++it;
145       if (it != k2itmap.end()) {
146         *next_idata = it->second.first;
147       }
148     }
149   }
150   return 0;
151 }
152
153 int KvFlatBtreeAsync::nothing() {
154   return 0;
155 }
156
157 int KvFlatBtreeAsync::wait() {
158   if (rand() % 10 == 0) {
159     usleep(wait_ms);
160   }
161   return 0;
162 }
163
164 int KvFlatBtreeAsync::suicide() {
165   if (rand() % 10 == 0) {
166     if (verbose) cout << client_name << " is suiciding" << std::endl;
167     return 1;
168   }
169   return 0;
170 }
171
172 int KvFlatBtreeAsync::next(const index_data &idata, index_data * out_data)
173 {
174   if (verbose) cout << "\t\t" << client_name << "-next: finding next of "
175       << idata.str()
176       << std::endl;
177   int err = 0;
178   librados::ObjectReadOperation oro;
179   std::map<std::string, bufferlist> kvs;
180   oro.omap_get_vals2(idata.kdata.encoded(),1,&kvs, nullptr, &err);
181   err = io_ctx.operate(index_name, &oro, NULL);
182   if (err < 0){
183     if (verbose) cout << "\t\t\t" << client_name
184         << "-next: getting index failed with error "
185         << err << std::endl;
186     return err;
187   }
188   if (!kvs.empty()) {
189     out_data->kdata.parse(kvs.begin()->first);
190     bufferlist::iterator b = kvs.begin()->second.begin();
191     out_data->decode(b);
192     if (idata.is_timed_out(ceph_clock_now(), timeout)) {
193       if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
194           << std::endl;
195       //the client died after deleting the object. clean up.
196       cleanup(idata, err);
197     }
198   } else {
199     err = -EOVERFLOW;
200   }
201   return err;
202 }
203
204 int KvFlatBtreeAsync::prev(const index_data &idata, index_data * out_data)
205 {
206   if (verbose) cout << "\t\t" << client_name << "-prev: finding prev of "
207       << idata.str() << std::endl;
208   int err = 0;
209   bufferlist inbl;
210   idata_from_idata_args in_args;
211   in_args.idata = idata;
212   in_args.encode(inbl);
213   bufferlist outbl;
214   err = io_ctx.exec(index_name,"kvs", "get_prev_idata", inbl, outbl);
215   if (err < 0){
216     if (verbose) cout << "\t\t\t" << client_name
217         << "-prev: getting index failed with error "
218         << err << std::endl;
219     if (idata.is_timed_out(ceph_clock_now(), timeout)) {
220       if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
221           << std::endl;
222       //the client died after deleting the object. clean up.
223       err = cleanup(idata, err);
224       if (err == -ESUICIDE) {
225         return err;
226       } else {
227         err = 0;
228       }
229     }
230     return err;
231   }
232   bufferlist::iterator it = outbl.begin();
233   in_args.decode(it);
234   *out_data = in_args.next_idata;
235   if (verbose) cout << "\t\t" << client_name << "-prev: prev is "
236       << out_data->str()
237       << std::endl;
238   return err;
239 }
240
241 int KvFlatBtreeAsync::read_index(const string &key, index_data * idata,
242     index_data * next_idata, bool force_update) {
243   int err = 0;
244   if (!force_update) {
245     if (verbose) cout << "\t" << client_name
246         << "-read_index: getting index_data for " << key
247         << " from cache" << std::endl;
248     icache_lock.Lock();
249     if (next_idata != NULL) {
250       err = icache.get(key, idata, next_idata);
251     } else {
252       err = icache.get(key, idata);
253     }
254     icache_lock.Unlock();
255
256     if (err == 0) {
257       //if (verbose) cout << "CACHE SUCCESS" << std::endl;
258       return err;
259     } else {
260       if (verbose) cout << "NOT IN CACHE" << std::endl;
261     }
262   }
263
264   if (verbose) cout << "\t" << client_name
265       << "-read_index: getting index_data for " << key
266       << " from object" << std::endl;
267   librados::ObjectReadOperation oro;
268   bufferlist raw_val;
269   std::set<std::string> key_set;
270   key_set.insert(key_data(key).encoded());
271   std::map<std::string, bufferlist> kvmap;
272   std::map<std::string, bufferlist> dupmap;
273   oro.omap_get_vals_by_keys(key_set, &dupmap, &err);
274   oro.omap_get_vals2(key_data(key).encoded(),
275       (cache_size / cache_refresh >= 2? cache_size / cache_refresh: 2),
276       &kvmap, nullptr, &err);
277   err = io_ctx.operate(index_name, &oro, NULL);
278   utime_t mytime = ceph_clock_now();
279   if (err < 0){
280     cerr << "\t" << client_name
281         << "-read_index: getting keys failed with "
282         << err << std::endl;
283     assert(0 == client_name + "-read_index: reading index failed");
284     return err;
285   }
286   kvmap.insert(dupmap.begin(), dupmap.end());
287   for (map<string, bufferlist>::iterator it = ++kvmap.begin();
288       it != kvmap.end();
289       ++it) {
290     bufferlist bl = it->second;
291     bufferlist::iterator blit = bl.begin();
292     index_data this_idata;
293     this_idata.decode(blit);
294     if (this_idata.is_timed_out(mytime, timeout)) {
295       if (verbose) cout << client_name
296           << " THINKS THE OTHER CLIENT DIED. (mytime is "
297         << mytime.sec() << "." << mytime.usec() << ", idata.ts is "
298         << this_idata.ts.sec() << "." << this_idata.ts.usec()
299         << ", it has been " << (mytime - this_idata.ts).sec()
300         << '.' << (mytime - this_idata.ts).usec()
301         << ", timeout is " << timeout << ")" << std::endl;
302       //the client died after deleting the object. clean up.
303       if (cleanup(this_idata, -EPREFIX) == -ESUICIDE) {
304         return -ESUICIDE;
305       }
306       return read_index(key, idata, next_idata, force_update);
307     }
308     icache_lock.Lock();
309     icache.push(this_idata);
310     icache_lock.Unlock();
311   }
312   bufferlist::iterator b = kvmap.begin()->second.begin();
313   idata->decode(b);
314   idata->kdata.parse(kvmap.begin()->first);
315   if (verbose) cout << "\t" << client_name << "-read_index: kvmap_size is "
316       << kvmap.size()
317       << ", idata is " << idata->str() << std::endl;
318
319   assert(idata->obj != "");
320   icache_lock.Lock();
321   icache.push(key, *idata);
322   icache_lock.Unlock();
323
324   if (next_idata != NULL && idata->kdata.prefix != "1") {
325     next_idata->kdata.parse((++kvmap.begin())->first);
326     bufferlist::iterator nb = (++kvmap.begin())->second.begin();
327     next_idata->decode(nb);
328     icache_lock.Lock();
329     icache.push(*next_idata);
330     icache_lock.Unlock();
331   }
332   return err;
333 }
334
335 int KvFlatBtreeAsync::split(const index_data &idata) {
336   int err = 0;
337   opmap['l']++;
338
339   if (idata.prefix != "") {
340     return -EPREFIX;
341   }
342
343   rebalance_args args;
344   args.bound = 2 * k - 1;
345   args.comparator = CEPH_OSD_CMPXATTR_OP_GT;
346   err = read_object(idata.obj, &args);
347   args.odata.max_kdata = idata.kdata;
348   if (err < 0) {
349     if (verbose) cout << "\t\t" << client_name << "-split: read object "
350         << args.odata.name
351         << " got " << err << std::endl;
352     return err;
353   }
354
355   if (verbose) cout << "\t\t" << client_name << "-split: splitting "
356       << idata.obj
357       << ", which has size " << args.odata.size
358       << " and actual size " << args.odata.omap.size() << std::endl;
359
360   ///////preparations that happen outside the critical section
361   //for prefix index
362   vector<object_data> to_create;
363   vector<object_data> to_delete;
364   to_delete.push_back(object_data(idata.min_kdata,
365       args.odata.max_kdata, args.odata.name, args.odata.version));
366
367   //for lower half object
368   map<std::string, bufferlist>::const_iterator it = args.odata.omap.begin();
369   client_index_lock.Lock();
370   to_create.push_back(object_data(to_string(client_name, client_index++)));
371   client_index_lock.Unlock();
372   for (int i = 0; i < k; i++) {
373     to_create[0].omap.insert(*it);
374     ++it;
375   }
376   to_create[0].min_kdata = idata.min_kdata;
377   to_create[0].max_kdata = key_data(to_create[0].omap.rbegin()->first);
378
379   //for upper half object
380   client_index_lock.Lock();
381   to_create.push_back(object_data(to_create[0].max_kdata,
382         args.odata.max_kdata,
383         to_string(client_name, client_index++)));
384   client_index_lock.Unlock();
385   to_create[1].omap.insert(
386       ++args.odata.omap.find(to_create[0].omap.rbegin()->first),
387       args.odata.omap.end());
388
389   //setting up operations
390   librados::ObjectWriteOperation owos[6];
391   vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
392   index_data out_data;
393   set_up_prefix_index(to_create, to_delete, &owos[0], &out_data, &err);
394   ops.push_back(make_pair(
395       pair<int, string>(ADD_PREFIX, index_name),
396       &owos[0]));
397   for (int i = 1; i < 6; i++) {
398     ops.push_back(make_pair(make_pair(0,""), &owos[i]));
399   }
400   set_up_ops(to_create, to_delete, &ops, out_data, &err);
401
402   /////BEGIN CRITICAL SECTION/////
403   //put prefix on index entry for idata.val
404   err = perform_ops("\t\t" + client_name + "-split:", out_data, &ops);
405   if (err < 0) {
406     return err;
407   }
408   if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
409       << std::endl;
410   /////END CRITICAL SECTION/////
411   icache_lock.Lock();
412   for (vector<delete_data>::iterator it = out_data.to_delete.begin();
413       it != out_data.to_delete.end(); ++it) {
414     icache.erase(it->max);
415   }
416   for (vector<create_data>::iterator it = out_data.to_create.begin();
417       it != out_data.to_create.end(); ++it) {
418     icache.push(index_data(*it));
419   }
420   icache_lock.Unlock();
421   return err;
422 }
423
424 int KvFlatBtreeAsync::rebalance(const index_data &idata1,
425     const index_data &next_idata){
426   opmap['m']++;
427   int err = 0;
428
429   if (idata1.prefix != "") {
430     return -EPREFIX;
431   }
432
433   rebalance_args args1;
434   args1.bound = k + 1;
435   args1.comparator = CEPH_OSD_CMPXATTR_OP_LT;
436   index_data idata2 = next_idata;
437
438   rebalance_args args2;
439   args2.bound = k + 1;
440   args2.comparator = CEPH_OSD_CMPXATTR_OP_LT;
441
442   if (idata1.kdata.prefix == "1") {
443     //this is the highest key in the index, so it doesn't have a next.
444
445     //read the index for the previous entry
446     err = prev(idata1, &idata2);
447     if (err == -ERANGE) {
448       if (verbose) cout << "\t\t" << client_name
449           << "-rebalance: this is the only node, "
450           << "so aborting" << std::endl;
451       return -EUCLEAN;
452     } else if (err < 0) {
453       return err;
454     }
455
456     //read the first object
457     err = read_object(idata1.obj, &args2);
458     if (err < 0) {
459       if (verbose) cout << "reading " << idata1.obj << " failed with " << err
460           << std::endl;
461       if (err == -ENOENT) {
462         return -ECANCELED;
463       }
464       return err;
465     }
466     args2.odata.min_kdata = idata1.min_kdata;
467     args2.odata.max_kdata = idata1.kdata;
468
469     //read the second object
470     args1.bound = 2 * k + 1;
471     err = read_object(idata2.obj, &args1);
472     if (err < 0) {
473       if (verbose) cout << "reading " << idata1.obj << " failed with " << err
474           << std::endl;
475       return err;
476     }
477     args1.odata.min_kdata = idata2.min_kdata;
478     args1.odata.max_kdata = idata2.kdata;
479
480     if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
481         << idata2.obj
482         << ". size: " << args1.odata.size << " version: "
483         << args1.odata.version
484         << std::endl;
485   } else {
486     assert (next_idata.obj != "");
487     //there is a next key, so get it.
488     err = read_object(idata1.obj, &args1);
489     if (err < 0) {
490       if (verbose) cout << "reading " << idata1.obj << " failed with " << err
491           << std::endl;
492       return err;
493     }
494     args1.odata.min_kdata = idata1.min_kdata;
495     args1.odata.max_kdata = idata1.kdata;
496
497     args2.bound = 2 * k + 1;
498     err = read_object(idata2.obj, &args2);
499     if (err < 0) {
500       if (verbose) cout << "reading " << idata1.obj << " failed with " << err
501           << std::endl;
502       if (err == -ENOENT) {
503         return -ECANCELED;
504       }
505       return err;
506     }
507     args2.odata.min_kdata = idata2.min_kdata;
508     args2.odata.max_kdata = idata2.kdata;
509
510     if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
511         << idata2.obj
512         << ". size: " << args2.odata.size << " version: "
513         << args2.odata.version
514         << std::endl;
515   }
516
517   if (verbose) cout << "\t\t" << client_name << "-rebalance: o1 is "
518       << args1.odata.max_kdata.encoded() << ","
519       << args1.odata.name  << " with size " << args1.odata.size
520       << " , o2 is " << args2.odata.max_kdata.encoded()
521       << "," << args2.odata.name  << " with size " << args2.odata.size
522       << std::endl;
523
524   //calculations
525   if ((int)args1.odata.size > k && (int)args1.odata.size <= 2*k
526       && (int)args2.odata.size > k
527       && (int)args2.odata.size <= 2*k) {
528     //nothing to do
529     if (verbose) cout << "\t\t" << client_name
530         << "-rebalance: both sizes in range, so"
531         << " aborting " << std::endl;
532     return -EBALANCE;
533   } else if (idata1.prefix != "" || idata2.prefix != "") {
534     return -EPREFIX;
535   }
536
537   //this is the high object. it gets created regardless of rebalance or merge.
538   client_index_lock.Lock();
539   string o2w = to_string(client_name, client_index++);
540   client_index_lock.Unlock();
541   index_data idata;
542   vector<object_data> to_create;
543   vector<object_data> to_delete;
544   librados::ObjectWriteOperation create[2];//possibly only 1 will be used
545   librados::ObjectWriteOperation other_ops[6];
546   vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
547   ops.push_back(make_pair(
548       pair<int, string>(ADD_PREFIX, index_name),
549       &other_ops[0]));
550
551   if ((int)args1.odata.size + (int)args2.odata.size <= 2*k) {
552     //merge
553     if (verbose) cout << "\t\t" << client_name << "-rebalance: merging "
554         << args1.odata.name
555         << " and " << args2.odata.name << " to get " << o2w
556         << std::endl;
557     map<string, bufferlist> write2_map;
558     write2_map.insert(args1.odata.omap.begin(), args1.odata.omap.end());
559     write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
560     to_create.push_back(object_data(args1.odata.min_kdata,
561         args2.odata.max_kdata, o2w, write2_map));
562     ops.push_back(make_pair(
563         pair<int, string>(MAKE_OBJECT, o2w),
564         &create[0]));
565     assert((int)write2_map.size() <= 2*k);
566   } else {
567     //rebalance
568     if (verbose) cout << "\t\t" << client_name << "-rebalance: rebalancing "
569         << args1.odata.name
570         << " and " << args2.odata.name << std::endl;
571     map<std::string, bufferlist> write1_map;
572     map<std::string, bufferlist> write2_map;
573     map<std::string, bufferlist>::iterator it;
574     client_index_lock.Lock();
575     string o1w = to_string(client_name, client_index++);
576     client_index_lock.Unlock();
577     int target_size_1 = ceil(((int)args1.odata.size + (int)args2.odata.size)
578         / 2.0);
579     if (args1.odata.max_kdata != idata1.kdata) {
580       //this should be true if idata1 is the high object
581       target_size_1 = floor(((int)args1.odata.size + (int)args2.odata.size)
582           / 2.0);
583     }
584     for (it = args1.odata.omap.begin();
585         it != args1.odata.omap.end() && (int)write1_map.size()
586             < target_size_1;
587         ++it) {
588       write1_map.insert(*it);
589     }
590     if (it != args1.odata.omap.end()){
591       //write1_map is full, so put the rest in write2_map
592       write2_map.insert(it, args1.odata.omap.end());
593       write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
594     } else {
595       //args1.odata.omap was small, and write2_map still needs more
596       map<std::string, bufferlist>::iterator it2;
597       for(it2 = args2.odata.omap.begin();
598           (it2 != args2.odata.omap.end()) && ((int)write1_map.size()
599               < target_size_1);
600           ++it2) {
601         write1_map.insert(*it2);
602       }
603       write2_map.insert(it2, args2.odata.omap.end());
604     }
605     if (verbose) cout << "\t\t" << client_name
606         << "-rebalance: write1_map has size "
607         << write1_map.size() << ", write2_map.size() is " << write2_map.size()
608         << std::endl;
609     //at this point, write1_map and write2_map should have the correct pairs
610     to_create.push_back(object_data(args1.odata.min_kdata,
611         key_data(write1_map.rbegin()->first),
612         o1w,write1_map));
613     to_create.push_back(object_data( key_data(write1_map.rbegin()->first),
614         args2.odata.max_kdata, o2w, write2_map));
615     ops.push_back(make_pair(
616         pair<int, string>(MAKE_OBJECT, o1w),
617         &create[0]));
618     ops.push_back(make_pair(
619         pair<int, string>(MAKE_OBJECT, o2w),
620         &create[1]));
621   }
622
623   to_delete.push_back(object_data(args1.odata.min_kdata,
624       args1.odata.max_kdata, args1.odata.name, args1.odata.version));
625   to_delete.push_back(object_data(args2.odata.min_kdata,
626       args2.odata.max_kdata, args2.odata.name, args2.odata.version));
627   for (int i = 1; i < 6; i++) {
628     ops.push_back(make_pair(make_pair(0,""), &other_ops[i]));
629   }
630
631   index_data out_data;
632   set_up_prefix_index(to_create, to_delete, &other_ops[0], &out_data, &err);
633   set_up_ops(to_create, to_delete, &ops, out_data, &err);
634
635   //at this point, all operations should be completely set up.
636   /////BEGIN CRITICAL SECTION/////
637   err = perform_ops("\t\t" + client_name + "-rebalance:", out_data, &ops);
638   if (err < 0) {
639     return err;
640   }
641   icache_lock.Lock();
642   for (vector<delete_data>::iterator it = out_data.to_delete.begin();
643       it != out_data.to_delete.end(); ++it) {
644     icache.erase(it->max);
645   }
646   for (vector<create_data>::iterator it = out_data.to_create.begin();
647       it != out_data.to_create.end(); ++it) {
648     icache.push(index_data(*it));
649   }
650   icache_lock.Unlock();
651   if (verbose) cout << "\t\t" << client_name << "-rebalance: done rebalancing."
652       << std::endl;
653   /////END CRITICAL SECTION/////
654   return err;
655 }
656
657 int KvFlatBtreeAsync::read_object(const string &obj, object_data * odata) {
658   librados::ObjectReadOperation get_obj;
659   librados::AioCompletion * obj_aioc = rados.aio_create_completion();
660   int err;
661   bufferlist unw_bl;
662   odata->name = obj;
663   get_obj.omap_get_vals2("", LONG_MAX, &odata->omap, nullptr, &err);
664   get_obj.getxattr("unwritable", &unw_bl, &err);
665   io_ctx.aio_operate(obj, obj_aioc, &get_obj, NULL);
666   obj_aioc->wait_for_safe();
667   err = obj_aioc->get_return_value();
668   if (err < 0){
669     //possibly -ENOENT, meaning someone else deleted it.
670     obj_aioc->release();
671     return err;
672   }
673   odata->unwritable = string(unw_bl.c_str(), unw_bl.length()) == "1";
674   odata->version = obj_aioc->get_version64();
675   odata->size = odata->omap.size();
676   obj_aioc->release();
677   return 0;
678 }
679
680 int KvFlatBtreeAsync::read_object(const string &obj, rebalance_args * args) {
681   bufferlist inbl;
682   args->encode(inbl);
683   bufferlist outbl;
684   int err;
685   librados::AioCompletion * a = rados.aio_create_completion();
686   io_ctx.aio_exec(obj, a, "kvs", "maybe_read_for_balance", inbl, &outbl);
687   a->wait_for_safe();
688   err = a->get_return_value();
689   if (err < 0) {
690     if (verbose) cout << "\t\t" << client_name
691         << "-read_object: reading failed with "
692         << err << std::endl;
693     a->release();
694     return err;
695   }
696   bufferlist::iterator it = outbl.begin();
697   args->decode(it);
698   args->odata.name = obj;
699   args->odata.version = a->get_version64();
700   a->release();
701   return err;
702 }
703
704 void KvFlatBtreeAsync::set_up_prefix_index(
705     const vector<object_data> &to_create,
706     const vector<object_data> &to_delete,
707     librados::ObjectWriteOperation * owo,
708     index_data * idata,
709     int * err) {
710   std::map<std::string, pair<bufferlist, int> > assertions;
711   map<string, bufferlist> to_insert;
712   idata->prefix = "1";
713   idata->ts = ceph_clock_now();
714   for(vector<object_data>::const_iterator it = to_create.begin();
715       it != to_create.end();
716       ++it) {
717     create_data c(it->min_kdata, it->max_kdata, it->name);
718     idata->to_create.push_back(c);
719   }
720   for(vector<object_data>::const_iterator it = to_delete.begin();
721       it != to_delete.end();
722       ++it) {
723     delete_data d(it->min_kdata, it->max_kdata, it->name, it->version);
724     idata->to_delete.push_back(d);
725   }
726   for(vector<object_data>::const_iterator it = to_delete.begin();
727       it != to_delete.end();
728       ++it) {
729     idata->obj = it->name;
730     idata->min_kdata = it->min_kdata;
731     idata->kdata = it->max_kdata;
732     bufferlist insert;
733     idata->encode(insert);
734     to_insert[it->max_kdata.encoded()] = insert;
735     index_data this_entry;
736     this_entry.min_kdata = idata->min_kdata;
737     this_entry.kdata = idata->kdata;
738     this_entry.obj = idata->obj;
739     assertions[it->max_kdata.encoded()] = pair<bufferlist, int>
740     (to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
741     if (verbose) cout << "\t\t\t" << client_name
742         << "-setup_prefix: will assert "
743         << this_entry.str() << std::endl;
744   }
745   assert(*err == 0);
746   owo->omap_cmp(assertions, err);
747   if (to_create.size() <= 2) {
748     owo->omap_set(to_insert);
749   }
750 }
751
752 //some args can be null if there are no corresponding entries in p
753 void KvFlatBtreeAsync::set_up_ops(
754     const vector<object_data> &create_vector,
755     const vector<object_data> &delete_vector,
756     vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > * ops,
757     const index_data &idata,
758     int * err) {
759   vector<pair<pair<int, string>,
760     librados::ObjectWriteOperation* > >::iterator it;
761
762   //skip the prefixing part
763   for(it = ops->begin(); it->first.first == ADD_PREFIX; ++it) {}
764   map<string, bufferlist> to_insert;
765   std::set<string> to_remove;
766   map<string, pair<bufferlist, int> > assertions;
767   if (create_vector.size() > 0) {
768     for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
769       it->first = pair<int, string>(UNWRITE_OBJECT, idata.to_delete[i].obj);
770       set_up_unwrite_object(delete_vector[i].version, it->second);
771       ++it;
772     }
773   }
774   for (int i = 0; i < (int)idata.to_create.size(); ++i) {
775     index_data this_entry(idata.to_create[i].max, idata.to_create[i].min,
776         idata.to_create[i].obj);
777     to_insert[idata.to_create[i].max.encoded()] = to_bl(this_entry);
778     if (idata.to_create.size() <= 2) {
779       it->first = pair<int, string>(MAKE_OBJECT, idata.to_create[i].obj);
780     } else {
781       it->first = pair<int, string>(AIO_MAKE_OBJECT, idata.to_create[i].obj);
782     }
783     set_up_make_object(create_vector[i].omap, it->second);
784     ++it;
785   }
786   for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
787     index_data this_entry = idata;
788     this_entry.obj = idata.to_delete[i].obj;
789     this_entry.min_kdata = idata.to_delete[i].min;
790     this_entry.kdata = idata.to_delete[i].max;
791     if (verbose) cout << "\t\t\t" << client_name << "-setup_ops: will assert "
792         << this_entry.str() << std::endl;
793     assertions[idata.to_delete[i].max.encoded()] = pair<bufferlist, int>(
794         to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
795     to_remove.insert(idata.to_delete[i].max.encoded());
796     it->first = pair<int, string>(REMOVE_OBJECT, idata.to_delete[i].obj);
797     set_up_delete_object(it->second);
798     ++it;
799   }
800   if ((int)idata.to_create.size() <= 2) {
801     it->second->omap_cmp(assertions, err);
802   }
803   it->second->omap_rm_keys(to_remove);
804   it->second->omap_set(to_insert);
805
806
807   it->first = pair<int, string>(REMOVE_PREFIX, index_name);
808 }
809
810 void KvFlatBtreeAsync::set_up_make_object(
811     const map<std::string, bufferlist> &to_set,
812     librados::ObjectWriteOperation *owo) {
813   bufferlist inbl;
814   ::encode(to_set, inbl);
815   owo->exec("kvs", "create_with_omap", inbl);
816 }
817
818 void KvFlatBtreeAsync::set_up_unwrite_object(
819     const int &ver, librados::ObjectWriteOperation *owo) {
820   if (ver > 0) {
821     owo->assert_version(ver);
822   }
823   owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("0"));
824   owo->setxattr("unwritable", to_bl("1"));
825 }
826
827 void KvFlatBtreeAsync::set_up_restore_object(
828     librados::ObjectWriteOperation *owo) {
829   owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
830   owo->setxattr("unwritable", to_bl("0"));
831 }
832
833 void KvFlatBtreeAsync::set_up_delete_object(
834     librados::ObjectWriteOperation *owo) {
835   owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
836   owo->remove();
837 }
838
839 int KvFlatBtreeAsync::perform_ops(const string &debug_prefix,
840     const index_data &idata,
841     vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > *ops) {
842   int err = 0;
843   vector<librados::AioCompletion*> aiocs(idata.to_create.size());
844   int count = 0;
845   for (vector<pair<pair<int, string>,
846       librados::ObjectWriteOperation*> >::iterator it = ops->begin();
847       it != ops->end(); ++it) {
848     if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
849       return -ESUICIDE;
850     }
851     switch (it->first.first) {
852     case ADD_PREFIX://prefixing
853       if (verbose) cout << debug_prefix << " adding prefix" << std::endl;
854       err = io_ctx.operate(index_name, it->second);
855       if (err < 0) {
856         if (verbose) cout << debug_prefix << " prefixing the index failed with "
857             << err << std::endl;
858         return -EPREFIX;
859       }
860       if (verbose) cout << debug_prefix << " prefix added." << std::endl;
861       break;
862     case UNWRITE_OBJECT://marking
863       if (verbose) cout << debug_prefix << " marking " << it->first.second
864       << std::endl;
865       err = io_ctx.operate(it->first.second, it->second);
866       if (err < 0) {
867         //most likely because it changed, in which case it will be -ERANGE
868         if (verbose) cout << debug_prefix << " marking " << it->first.second
869             << "failed with code" << err << std::endl;
870         if (it->first.second == (*idata.to_delete.begin()).max.encoded()) {
871           if (cleanup(idata, -EFIRSTOBJ) == -ESUICIDE) {
872             return -ESUICIDE;
873           }
874         } else {
875           if (cleanup(idata, -ERANGE) == -ESUICIDE) {
876             return -ESUICIDE;
877           }
878         }
879         return err;
880       }
881       if (verbose) cout << debug_prefix << " marked " << it->first.second
882           << std::endl;
883       break;
884     case MAKE_OBJECT://creating
885       if (verbose) cout << debug_prefix << " creating " << it->first.second
886          << std::endl;
887       err = io_ctx.operate(it->first.second, it->second);
888       if (err < 0) {
889         //this can happen if someone else was cleaning up after us.
890         if (verbose) cout << debug_prefix << " creating " << it->first.second
891             << " failed"
892             << " with code " << err << std::endl;
893         if (err == -EEXIST) {
894           //someone thinks we died, so die
895           if (verbose) cout << client_name << " is suiciding!" << std::endl;
896           return -ESUICIDE;
897         } else {
898           assert(false);
899         }
900         return err;
901       }
902       if (verbose || idata.to_create.size() > 2) {
903         cout << debug_prefix << " created object " << it->first.second
904           << std::endl;
905       }
906       break;
907     case AIO_MAKE_OBJECT:
908       cout << debug_prefix << " launching asynchronous create "
909           << it->first.second << std::endl;
910       aiocs[count] = rados.aio_create_completion();
911       io_ctx.aio_operate(it->first.second, aiocs[count], it->second);
912       count++;
913       if ((int)idata.to_create.size() == count) {
914         cout << "starting aiowrite waiting loop" << std::endl;
915           for (count -= 1; count >= 0; count--) {
916             aiocs[count]->wait_for_safe();
917             err = aiocs[count]->get_return_value();
918             if (err < 0) {
919               //this can happen if someone else was cleaning up after us.
920               cerr << debug_prefix << " a create failed"
921                   << " with code " << err << std::endl;
922               if (err == -EEXIST) {
923                 //someone thinks we died, so die
924                 cerr << client_name << " is suiciding!" << std::endl;
925                 return -ESUICIDE;
926               } else {
927                 assert(false);
928               }
929               return err;
930             }
931             if (verbose || idata.to_create.size() > 2) {
932               cout << debug_prefix << " completed aio " << aiocs.size() - count
933                   << "/" << aiocs.size() << std::endl;
934             }
935           }
936       }
937       break;
938     case REMOVE_OBJECT://deleting
939       if (verbose) cout << debug_prefix << " deleting " << it->first.second
940       << std::endl;
941       err = io_ctx.operate(it->first.second, it->second);
942       if (err < 0) {
943         //if someone else called cleanup on this prefix first
944         if (verbose) cout << debug_prefix << " deleting " << it->first.second
945             << "failed with code" << err << std::endl;
946       }
947       if (verbose) cout << debug_prefix << " deleted " << it->first.second
948           << std::endl;
949       break;
950     case REMOVE_PREFIX://rewriting index
951       if (verbose) cout << debug_prefix << " updating index " << std::endl;
952       err = io_ctx.operate(index_name, it->second);
953       if (err < 0) {
954         if (verbose) cout << debug_prefix
955         << " rewriting the index failed with code " << err
956         << ". someone else must have thought we died, so dying" << std::endl;
957         return -ETIMEDOUT;
958       }
959       if (verbose) cout << debug_prefix << " updated index." << std::endl;
960       break;
961     case RESTORE_OBJECT:
962       if (verbose) cout << debug_prefix << " restoring " << it->first.second
963       << std::endl;
964       err = io_ctx.operate(it->first.second, it->second);
965       if (err < 0) {
966         if (verbose) cout << debug_prefix << "restoring " << it->first.second
967             << " failed"
968             << " with " << err << std::endl;
969         return err;
970       }
971       if (verbose) cout << debug_prefix << " restored " << it->first.second
972           << std::endl;
973       break;
974     default:
975       if (verbose) cout << debug_prefix << " performing unknown op on "
976       << it->first.second
977         << std::endl;
978       err = io_ctx.operate(index_name, it->second);
979       if (err < 0) {
980         if (verbose) cout << debug_prefix << " unknown op on "
981             << it->first.second
982             << " failed with " << err << std::endl;
983         return err;
984       }
985       if (verbose) cout << debug_prefix << " unknown op on "
986           << it->first.second
987           << " succeeded." << std::endl;
988       break;
989     }
990   }
991
992   return err;
993 }
994
995 int KvFlatBtreeAsync::cleanup(const index_data &idata, const int &error) {
996   if (verbose) cout << "\t\t" << client_name << ": cleaning up after "
997       << idata.str()
998       << std::endl;
999   int err = 0;
1000   assert(idata.prefix != "");
1001   map<std::string,bufferlist> new_index;
1002   map<std::string, pair<bufferlist, int> > assertions;
1003   switch (error) {
1004   case -EFIRSTOBJ: {
1005     //this happens if the split or rebalance failed to mark the first object,
1006     //meaning only the index needs to be changed.
1007     //restore objects that had been marked unwritable.
1008     for(vector<delete_data >::const_iterator it =
1009         idata.to_delete.begin();
1010         it != idata.to_delete.end(); ++it) {
1011       index_data this_entry;
1012       this_entry.obj = (*it).obj;
1013       this_entry.min_kdata = it->min;
1014       this_entry.kdata = it->max;
1015       new_index[it->max.encoded()] = to_bl(this_entry);
1016       this_entry = idata;
1017       this_entry.obj = it->obj;
1018       this_entry.min_kdata = it->min;
1019       this_entry.kdata = it->max;
1020       if (verbose) cout << "\t\t\t" << client_name
1021           << "-cleanup: will assert index contains "
1022         << this_entry.str() << std::endl;
1023       assertions[it->max.encoded()] =
1024           pair<bufferlist, int>(to_bl(this_entry),
1025               CEPH_OSD_CMPXATTR_OP_EQ);
1026     }
1027
1028     //update the index
1029     librados::ObjectWriteOperation update_index;
1030     update_index.omap_cmp(assertions, &err);
1031     update_index.omap_set(new_index);
1032     if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1033         << std::endl;
1034     if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1035       return -ESUICIDE;
1036     }
1037     err = io_ctx.operate(index_name, &update_index);
1038     if (err < 0) {
1039       if (verbose) cout << "\t\t\t" << client_name
1040           << "-cleanup: rewriting failed with "
1041           << err << ". returning -ECANCELED" << std::endl;
1042       return -ECANCELED;
1043     }
1044     if (verbose) cout << "\t\t\t" << client_name
1045         << "-cleanup: updated index. cleanup done."
1046         << std::endl;
1047     break;
1048   }
1049   case -ERANGE: {
1050     //this happens if a split or rebalance fails to mark an object. It is a
1051     //special case of rolling back that does not have to deal with new objects.
1052
1053     //restore objects that had been marked unwritable.
1054     vector<delete_data >::const_iterator it;
1055     for(it = idata.to_delete.begin();
1056         it != idata.to_delete.end(); ++it) {
1057       index_data this_entry;
1058       this_entry.obj = (*it).obj;
1059       this_entry.min_kdata = it->min;
1060       this_entry.kdata = it->max;
1061       new_index[it->max.encoded()] = to_bl(this_entry);
1062       this_entry = idata;
1063       this_entry.obj = it->obj;
1064       this_entry.min_kdata = it->min;
1065       this_entry.kdata = it->max;
1066       if (verbose) cout << "\t\t\t" << client_name
1067           << "-cleanup: will assert index contains "
1068         << this_entry.str() << std::endl;
1069       assertions[it->max.encoded()] =
1070           pair<bufferlist, int>(to_bl(this_entry),
1071               CEPH_OSD_CMPXATTR_OP_EQ);
1072     }
1073     it = idata.to_delete.begin();
1074     librados::ObjectWriteOperation restore;
1075     set_up_restore_object(&restore);
1076     if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1077       return -ESUICIDE;
1078     }
1079     if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1080         << it->obj
1081         << std::endl;
1082     err = io_ctx.operate(it->obj, &restore);
1083     if (err < 0) {
1084       //i.e., -ECANCELED because the object was already restored by someone
1085       //else
1086         if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1087             << it->obj
1088           << " failed with " << err << std::endl;
1089     } else {
1090       if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1091           << it->obj
1092            << std::endl;
1093     }
1094
1095     //update the index
1096     librados::ObjectWriteOperation update_index;
1097     update_index.omap_cmp(assertions, &err);
1098     update_index.omap_set(new_index);
1099     if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1100         << std::endl;
1101     if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1102       return -ESUICIDE;
1103     }
1104     err = io_ctx.operate(index_name, &update_index);
1105     if (err < 0) {
1106       if (verbose) cout << "\t\t\t" << client_name
1107           << "-cleanup: rewriting failed with "
1108           << err << ". returning -ECANCELED" << std::endl;
1109       return -ECANCELED;
1110     }
1111     if (verbose) cout << "\t\t\t" << client_name
1112         << "-cleanup: updated index. cleanup done."
1113         << std::endl;
1114     break;
1115   }
1116   case -ENOENT: {
1117     if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling forward"
1118         << std::endl;
1119     //all changes were created except for updating the index and possibly
1120     //deleting the objects. roll forward.
1121     vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1122     vector<librados::ObjectWriteOperation> owos(idata.to_delete.size() + 1);
1123     for (int i = 0; i <= (int)idata.to_delete.size(); ++i) {
1124       ops.push_back(make_pair(pair<int, string>(0, ""), &owos[i]));
1125     }
1126     set_up_ops(vector<object_data>(),
1127         vector<object_data>(), &ops, idata, &err);
1128     err = perform_ops("\t\t" + client_name + "-cleanup:", idata, &ops);
1129     if (err < 0) {
1130       if (err == -ESUICIDE) {
1131         return -ESUICIDE;
1132       }
1133       if (verbose) cout << "\t\t\t" << client_name
1134           << "-cleanup: rewriting failed with "
1135           << err << ". returning -ECANCELED" << std::endl;
1136       return -ECANCELED;
1137     }
1138     if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updated index"
1139         << std::endl;
1140     break;
1141   }
1142   default: {
1143     //roll back all changes.
1144     if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling back"
1145         << std::endl;
1146     map<std::string,bufferlist> new_index;
1147     std::set<string> to_remove;
1148     map<std::string, pair<bufferlist, int> > assertions;
1149
1150     //mark the objects to be created. if someone else already has, die.
1151     for(vector<create_data >::const_reverse_iterator it =
1152         idata.to_create.rbegin();
1153         it != idata.to_create.rend(); ++it) {
1154       librados::ObjectWriteOperation rm;
1155       set_up_unwrite_object(0, &rm);
1156       if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1157       {
1158         return -ESUICIDE;
1159       }
1160       if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1161           << it->obj
1162         << std::endl;
1163       err = io_ctx.operate(it->obj, &rm);
1164       if (err < 0) {
1165         if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1166             << it->obj
1167             << " failed with " << err << std::endl;
1168       } else {
1169       if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marked "
1170           << it->obj
1171         << std::endl;
1172       }
1173     }
1174
1175     //restore objects that had been marked unwritable.
1176     for(vector<delete_data >::const_iterator it =
1177         idata.to_delete.begin();
1178         it != idata.to_delete.end(); ++it) {
1179       index_data this_entry;
1180       this_entry.obj = (*it).obj;
1181       this_entry.min_kdata = it->min;
1182       this_entry.kdata = it->max;
1183       new_index[it->max.encoded()] = to_bl(this_entry);
1184       this_entry = idata;
1185       this_entry.obj = it->obj;
1186       this_entry.min_kdata = it->min;
1187       this_entry.kdata = it->max;
1188       if (verbose) cout << "\t\t\t" << client_name
1189           << "-cleanup: will assert index contains "
1190         << this_entry.str() << std::endl;
1191       assertions[it->max.encoded()] =
1192           pair<bufferlist, int>(to_bl(this_entry),
1193               CEPH_OSD_CMPXATTR_OP_EQ);
1194       librados::ObjectWriteOperation restore;
1195       set_up_restore_object(&restore);
1196       if (verbose) cout << "\t\t\t" << client_name
1197           << "-cleanup: will assert index contains "
1198           << this_entry.str() << std::endl;
1199       if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1200       {
1201         return -ESUICIDE;
1202       }
1203       if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1204           << it->obj
1205           << std::endl;
1206       err = io_ctx.operate(it->obj, &restore);
1207       if (err == -ENOENT) {
1208         //it had gotten far enough to be rolled forward - unmark the objects
1209         //and roll forward.
1210         if (verbose) cout << "\t\t\t" << client_name
1211             << "-cleanup: roll forward instead"
1212             << std::endl;
1213         for(vector<create_data >::const_iterator cit =
1214             idata.to_create.begin();
1215             cit != idata.to_create.end(); ++cit) {
1216           librados::ObjectWriteOperation res;
1217           set_up_restore_object(&res);
1218           if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1219               == 1 ) {
1220             return -ECANCELED;
1221           }
1222           if (verbose) cout << "\t\t\t" << client_name
1223               << "-cleanup: restoring " << cit->obj
1224             << std::endl;
1225           err = io_ctx.operate(cit->obj, &res);
1226           if (err < 0) {
1227             if (verbose) cout << "\t\t\t" << client_name
1228                 << "-cleanup: restoring "
1229                 << cit->obj << " failed with " << err << std::endl;
1230           }
1231           if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1232               << cit->obj
1233             << std::endl;
1234         }
1235         return cleanup(idata, -ENOENT);
1236       } else if (err < 0) {
1237         //i.e., -ECANCELED because the object was already restored by someone
1238         //else
1239           if (verbose) cout << "\t\t\t" << client_name
1240               << "-cleanup: restoring " << it->obj
1241             << " failed with " << err << std::endl;
1242       } else {
1243         if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1244             << it->obj
1245              << std::endl;
1246       }
1247     }
1248
1249     //remove the new objects
1250     for(vector<create_data >::const_reverse_iterator it =
1251         idata.to_create.rbegin();
1252         it != idata.to_create.rend(); ++it) {
1253       to_remove.insert(it->max.encoded());
1254       librados::ObjectWriteOperation rm;
1255       rm.remove();
1256       if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1257       {
1258         return -ESUICIDE;
1259       }
1260       if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removing "
1261           << it->obj
1262           << std::endl;
1263       err = io_ctx.operate(it->obj, &rm);
1264       if (err < 0) {
1265         if (verbose) cout << "\t\t\t" << client_name
1266             << "-cleanup: failed to remove "
1267             << it->obj << std::endl;
1268       } else {
1269         if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removed "
1270             << it->obj
1271             << std::endl;
1272       }
1273     }
1274
1275     //update the index
1276     librados::ObjectWriteOperation update_index;
1277     update_index.omap_cmp(assertions, &err);
1278     update_index.omap_rm_keys(to_remove);
1279     update_index.omap_set(new_index);
1280     if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1281         << std::endl;
1282     if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1283       return -ESUICIDE;
1284     }
1285     err = io_ctx.operate(index_name, &update_index);
1286     if (err < 0) {
1287       if (verbose) cout << "\t\t\t" << client_name
1288           << "-cleanup: rewriting failed with "
1289           << err << ". returning -ECANCELED" << std::endl;
1290       return -ECANCELED;
1291     }
1292     if (verbose) cout << "\t\t\t" << client_name
1293         << "-cleanup: updated index. cleanup done."
1294         << std::endl;
1295     break;
1296   }
1297   }
1298   return err;
1299 }
1300
1301 string KvFlatBtreeAsync::to_string(string s, int i) {
1302   stringstream ret;
1303   ret << s << i;
1304   return ret.str();
1305 }
1306
1307 string KvFlatBtreeAsync::get_name() {
1308   return rados_id;
1309 }
1310
1311 void KvFlatBtreeAsync::set_inject(injection_t inject, int wait_time) {
1312   interrupt = inject;
1313   wait_ms = wait_time;
1314 }
1315
1316 int KvFlatBtreeAsync::setup(int argc, const char** argv) {
1317   int r = rados.init(rados_id.c_str());
1318   if (r < 0) {
1319     cerr << "error during init" << r << std::endl;
1320     return r;
1321   }
1322   r = rados.conf_parse_argv(argc, argv);
1323   if (r < 0) {
1324     cerr << "error during parsing args" << r << std::endl;
1325     return r;
1326   }
1327   r = rados.conf_parse_env(NULL);
1328   if (r < 0) {
1329     cerr << "error during parsing env" << r << std::endl;
1330     return r;
1331   }
1332   r = rados.conf_read_file(NULL);
1333   if (r < 0) {
1334     cerr << "error during read file: " << r << std::endl;
1335     return r;
1336   }
1337   r = rados.connect();
1338   if (r < 0) {
1339     cerr << "error during connect: " << r << std::endl;
1340     return r;
1341   }
1342   r = rados.ioctx_create(pool_name.c_str(), io_ctx);
1343   if (r < 0) {
1344     cerr << "error creating io ctx: " << r << std::endl;
1345     rados.shutdown();
1346     return r;
1347   }
1348
1349   librados::ObjectWriteOperation make_index;
1350   make_index.create(true);
1351   map<std::string,bufferlist> index_map;
1352   index_data idata;
1353   idata.obj = client_name;
1354   idata.min_kdata.raw_key = "";
1355   idata.kdata = key_data("");
1356   index_map["1"] = to_bl(idata);
1357   make_index.omap_set(index_map);
1358   r = io_ctx.operate(index_name, &make_index);
1359   if (r < 0) {
1360     if (verbose) cout << client_name << ": Making the index failed with code "
1361         << r
1362         << std::endl;
1363     return 0;
1364   }
1365   if (verbose) cout << client_name << ": created index object" << std::endl;
1366
1367   librados::ObjectWriteOperation make_max_obj;
1368   make_max_obj.create(true);
1369   make_max_obj.setxattr("unwritable", to_bl("0"));
1370   make_max_obj.setxattr("size", to_bl("0"));
1371   r = io_ctx.operate(client_name, &make_max_obj);
1372   if (r < 0) {
1373     if (verbose) cout << client_name << ": Setting xattr failed with code "
1374         << r
1375         << std::endl;
1376   }
1377
1378   return 0;
1379 }
1380
1381 int KvFlatBtreeAsync::set(const string &key, const bufferlist &val,
1382     bool update_on_existing) {
1383   if (verbose) cout << client_name << " is "
1384       << (update_on_existing? "updating " : "setting ")
1385       << key << std::endl;
1386   int err = 0;
1387   utime_t mytime;
1388   index_data idata(key);
1389
1390   if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1391   err = read_index(key, &idata, NULL, false);
1392   if (err < 0) {
1393     if (verbose) cout << "\t" << client_name
1394         << ": getting oid failed with code "
1395         << err << std::endl;
1396     return err;
1397   }
1398   if (verbose) cout << "\t" << client_name << ": index data is " << idata.str()
1399       << ", object is " << idata.obj << std::endl;
1400
1401   err = set_op(key, val, update_on_existing, idata);
1402
1403   if (verbose) cout << "\t" << client_name << ": finished set with " << err
1404       << std::endl;
1405   return err;
1406 }
1407
1408 int KvFlatBtreeAsync::set_op(const string &key, const bufferlist &val,
1409     bool update_on_existing, index_data &idata) {
1410   //write
1411
1412   bufferlist inbl;
1413   omap_set_args args;
1414   args.bound = 2 * k;
1415   args.exclusive = !update_on_existing;
1416   args.omap[key] = val;
1417   args.encode(inbl);
1418
1419   librados::ObjectWriteOperation owo;
1420   owo.exec("kvs", "omap_insert", inbl);
1421   if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1422     if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1423     return -ESUICIDE;
1424   }
1425   if (verbose) cout << "\t" << client_name << ": inserting " << key
1426       << " into object "
1427       << idata.obj << std::endl;
1428   int err = io_ctx.operate(idata.obj, &owo);
1429   if (err < 0) {
1430     switch (err) {
1431     case -EEXIST: {
1432       //the key already exists and this is an exclusive insert.
1433       cerr << "\t" << client_name << ": writing key failed with "
1434         << err << std::endl;
1435       return err;
1436     }
1437     case -EKEYREJECTED: {
1438       //the object needs to be split.
1439       do {
1440         if (verbose) cout << "\t" << client_name << ": running split on "
1441             << idata.obj
1442             << std::endl;
1443         err = read_index(key, &idata, NULL, true);
1444         if (err < 0) {
1445           if (verbose) cout << "\t" << client_name
1446               << ": getting oid failed with code "
1447               << err << std::endl;
1448           return err;
1449         }
1450         err = split(idata);
1451         if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1452           if (verbose) cerr << "\t" << client_name << ": split failed with "
1453               << err << std::endl;
1454           int ret = handle_set_rm_errors(err, idata.obj, key, &idata, NULL);
1455           switch (ret) {
1456           case -ESUICIDE:
1457             if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1458             return ret;
1459             break;
1460           case 1:
1461             return set_op(key, val, update_on_existing, idata);
1462             break;
1463           case 2:
1464             return err;
1465             break;
1466           }
1467         }
1468       } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1469       err = read_index(key, &idata, NULL, true);
1470       if (err < 0) {
1471         if (verbose) cout << "\t" << client_name
1472             << ": getting oid failed with code "
1473             << err << std::endl;
1474         return err;
1475       }
1476       return set_op(key, val, update_on_existing, idata);
1477     }
1478     default:
1479       if (verbose) cerr << "\t" << client_name << ": writing obj failed with "
1480         << err << std::endl;
1481       if (err == -ENOENT || err == -EACCES) {
1482         if (err == -ENOENT) {
1483           if (verbose) cout << "CACHE FAILURE" << std::endl;
1484         }
1485         err = read_index(key, &idata, NULL, true);
1486         if (err < 0) {
1487           if (verbose) cout << "\t" << client_name
1488               << ": getting oid failed with code "
1489               << err << std::endl;
1490           return err;
1491         }
1492         if (verbose) cout << "\t" << client_name << ": index data is "
1493             << idata.str()
1494             << ", object is " << idata.obj << std::endl;
1495         return set_op(key, val, update_on_existing, idata);
1496       } else {
1497         return err;
1498       }
1499     }
1500   }
1501   return 0;
1502 }
1503
1504 int KvFlatBtreeAsync::remove(const string &key) {
1505   if (verbose) cout << client_name << ": removing " << key << std::endl;
1506   int err = 0;
1507   string obj;
1508   utime_t mytime;
1509   index_data idata;
1510   index_data next_idata;
1511
1512   if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1513   err = read_index(key, &idata, &next_idata, false);
1514   if (err < 0) {
1515     if (verbose) cout << "getting oid failed with code " << err << std::endl;
1516     return err;
1517   }
1518   obj = idata.obj;
1519   if (verbose) cout << "\t" << client_name << ": idata is " << idata.str()
1520       << ", next_idata is " << next_idata.str()
1521       << ", obj is " << obj << std::endl;
1522
1523   err = remove_op(key, idata, next_idata);
1524
1525   if (verbose) cout << "\t" << client_name << ": finished remove with " << err
1526       << " and exiting" << std::endl;
1527   return err;
1528 }
1529
1530 int KvFlatBtreeAsync::remove_op(const string &key, index_data &idata,
1531     index_data &next_idata) {
1532   //write
1533   bufferlist inbl;
1534   omap_rm_args args;
1535   args.bound = k;
1536   args.omap.insert(key);
1537   args.encode(inbl);
1538
1539   librados::ObjectWriteOperation owo;
1540   owo.exec("kvs", "omap_remove", inbl);
1541   if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1542     if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1543     return -ESUICIDE;
1544   }
1545   if (verbose) cout << "\t" << client_name << ": removing " << key << " from "
1546       << idata.obj
1547       << std::endl;
1548   int err = io_ctx.operate(idata.obj, &owo);
1549   if (err < 0) {
1550     if (verbose) cout << "\t" << client_name << ": writing obj failed with "
1551         << err << std::endl;
1552     switch (err) {
1553     case -ENODATA: {
1554       //the key does not exist in the object
1555       return err;
1556     }
1557     case -EKEYREJECTED: {
1558       //the object needs to be split.
1559       do {
1560         if (verbose) cerr << "\t" << client_name << ": running rebalance on "
1561             << idata.obj << std::endl;
1562         err = read_index(key, &idata, &next_idata, true);
1563         if (err < 0) {
1564           if (verbose) cout << "\t" << client_name
1565               << ": getting oid failed with code "
1566               << err << std::endl;
1567           return err;
1568         }
1569         err = rebalance(idata, next_idata);
1570         if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1571           if (verbose) cerr << "\t" << client_name << ": rebalance returned "
1572               << err << std::endl;
1573           int ret = handle_set_rm_errors(err, idata.obj, key, &idata,
1574               &next_idata);
1575           switch (ret) {
1576           case -ESUICIDE:
1577             if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1578             return err;
1579             break;
1580           case 1:
1581             return remove_op(key, idata, next_idata);
1582             break;
1583           case 2:
1584             return err;
1585             break;
1586           case -EUCLEAN:
1587             //this is the only node, so it's ok to go below k.
1588             librados::ObjectWriteOperation owo;
1589             bufferlist inbl;
1590             omap_rm_args args;
1591             args.bound = 0;
1592             args.omap.insert(key);
1593             args.encode(inbl);
1594             owo.exec("kvs", "omap_remove", inbl);
1595             if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1596                 == 1 ) {
1597               if (verbose) cout << client_name << " IS SUICIDING!"
1598                   << std::endl;
1599               return -ESUICIDE;
1600             }
1601             if (verbose) cout << "\t" << client_name << ": removing " << key
1602                 << " from "
1603                 << idata.obj
1604                 << std::endl;
1605             int err = io_ctx.operate(idata.obj, &owo);
1606             if (err == 0) {
1607               return 0;
1608             }
1609           }
1610         }
1611       } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1612       err = read_index(key, &idata, &next_idata, true);
1613       if (err < 0) {
1614         if (verbose) cout << "\t" << client_name
1615             << ": getting oid failed with code "
1616             << err << std::endl;
1617         return err;
1618       }
1619       return remove(key);
1620     }
1621     default:
1622       if (err == -ENOENT || err == -EACCES) {
1623         err = read_index(key, &idata, &next_idata, true);
1624         if (err < 0) {
1625           if (verbose) cout << "\t" << client_name
1626               << ": getting oid failed with code "
1627               << err << std::endl;
1628           return err;
1629         }
1630         if (verbose) cout << "\t" << client_name << ": index data is "
1631             << idata.str()
1632             << ", object is " << idata.obj << std::endl;
1633         //idea: we read the time every time we read the index anyway - store it.
1634         return remove_op(key, idata, next_idata);
1635       } else {
1636         return err;
1637       }
1638     }
1639   }
1640   return 0;
1641 }
1642
1643 int KvFlatBtreeAsync::handle_set_rm_errors(int &err, string obj,
1644     string key,
1645     index_data * idata, index_data * next_idata) {
1646   if (err == -ESUICIDE) {
1647     return err;
1648   } else if (err == -ECANCELED //if an object was unwritable or index changed
1649       || err == -EPREFIX //if there is currently a prefix
1650       || err == -ETIMEDOUT// if the index changes during the op - i.e. cleanup
1651       || err == -EACCES) //possible if we were acting on old index data
1652   {
1653     err = read_index(key, idata, next_idata, true);
1654     if (err < 0) {
1655       return err;
1656     }
1657     if (verbose) cout << "\t" << client_name << ": prefix is " << idata->str()
1658         << std::endl;
1659     if (idata->obj != obj) {
1660       //someone else has split or cleaned up or something. start over.
1661       return 1;//meaning repeat
1662     }
1663   } else if (err != -ETIMEDOUT && err != -ERANGE && err != -EACCES
1664       && err != -EUCLEAN){
1665     if (verbose) cout << "\t" << client_name
1666         << ": split encountered an unexpected error: " << err
1667         << std::endl;
1668     return 2;
1669   }
1670   return err;
1671 }
1672
1673 int KvFlatBtreeAsync::get(const string &key, bufferlist *val) {
1674   opmap['g']++;
1675   if (verbose) cout << client_name << ": getting " << key << std::endl;
1676   int err = 0;
1677   index_data idata;
1678   utime_t mytime;
1679
1680   if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1681     return -ESUICIDE;
1682   }
1683   err = read_index(key, &idata, NULL, false);
1684   mytime = ceph_clock_now();
1685   if (err < 0) {
1686     if (verbose) cout << "getting oid failed with code " << err << std::endl;
1687     return err;
1688   }
1689
1690   err = get_op(key, val, idata);
1691
1692   if (verbose) cout << client_name << ": got " << key << " with " << err
1693       << std::endl;
1694
1695   return err;
1696 }
1697
1698 int KvFlatBtreeAsync::get_op(const string &key, bufferlist *val,
1699     index_data &idata) {
1700   int err = 0;
1701   std::set<std::string> key_set;
1702   key_set.insert(key);
1703   map<std::string,bufferlist> omap;
1704   librados::ObjectReadOperation read;
1705   read.omap_get_vals_by_keys(key_set, &omap, &err);
1706   err = io_ctx.operate(idata.obj, &read, NULL);
1707   if (err < 0) {
1708     if (err == -ENOENT) {
1709         err = read_index(key, &idata, NULL, true);
1710         if (err < 0) {
1711           if (verbose) cout << "\t" << client_name
1712               << ": getting oid failed with code "
1713               << err << std::endl;
1714           return err;
1715         }
1716         if (verbose) cout << "\t" << client_name << ": index data is "
1717             << idata.str()
1718             << ", object is " << idata.obj << std::endl;
1719         return get_op(key, val, idata);
1720     } else {
1721       if (verbose) cout << client_name
1722           << ": get encountered an unexpected error: " << err
1723           << std::endl;
1724       return err;
1725     }
1726   }
1727
1728   *val = omap[key];
1729   return err;
1730 }
1731
1732 void *KvFlatBtreeAsync::pset(void *ptr) {
1733   struct aio_set_args *args = (struct aio_set_args *)ptr;
1734   *args->err =
1735       args->kvba->KvFlatBtreeAsync::set((string)args->key,
1736           (bufferlist)args->val, (bool)args->exc);
1737   args->cb(args->err, args->cb_args);
1738   delete args;
1739   return NULL;
1740 }
1741
1742 void KvFlatBtreeAsync::aio_set(const string &key, const bufferlist &val,
1743     bool exclusive, callback cb, void * cb_args, int * err) {
1744   aio_set_args *args = new aio_set_args();
1745   args->kvba = this;
1746   args->key = key;
1747   args->val = val;
1748   args->exc = exclusive;
1749   args->cb = cb;
1750   args->cb_args = cb_args;
1751   args->err = err;
1752   pthread_t t;
1753   int r = pthread_create(&t, NULL, pset, (void*)args);
1754   if (r < 0) {
1755     *args->err = r;
1756     return;
1757   }
1758   pthread_detach(t);
1759 }
1760
1761 void *KvFlatBtreeAsync::prm(void *ptr) {
1762   struct aio_rm_args *args = (struct aio_rm_args *)ptr;
1763   *args->err =
1764       args->kvba->KvFlatBtreeAsync::remove((string)args->key);
1765   args->cb(args->err, args->cb_args);
1766   delete args;
1767   return NULL;
1768 }
1769
1770 void KvFlatBtreeAsync::aio_remove(const string &key,
1771     callback cb, void * cb_args, int * err) {
1772   aio_rm_args * args = new aio_rm_args();
1773   args->kvba = this;
1774   args->key = key;
1775   args->cb = cb;
1776   args->cb_args = cb_args;
1777   args->err = err;
1778   pthread_t t;
1779   int r = pthread_create(&t, NULL, prm, (void*)args);
1780   if (r < 0) {
1781     *args->err = r;
1782     return;
1783   }
1784   pthread_detach(t);
1785 }
1786
1787 void *KvFlatBtreeAsync::pget(void *ptr) {
1788   struct aio_get_args *args = (struct aio_get_args *)ptr;
1789   *args->err =
1790       args->kvba->KvFlatBtreeAsync::get((string)args->key,
1791           (bufferlist *)args->val);
1792   args->cb(args->err, args->cb_args);
1793   delete args;
1794   return NULL;
1795 }
1796
1797 void KvFlatBtreeAsync::aio_get(const string &key, bufferlist *val,
1798     callback cb, void * cb_args, int * err) {
1799   aio_get_args * args = new aio_get_args();
1800   args->kvba = this;
1801   args->key = key;
1802   args->val = val;
1803   args->cb = cb;
1804   args->cb_args = cb_args;
1805   args->err = err;
1806   pthread_t t;
1807   int r = pthread_create(&t, NULL, pget, (void*)args);
1808   if (r < 0) {
1809     *args->err = r;
1810     return;
1811   }
1812   pthread_detach(t);
1813 }
1814
1815 int KvFlatBtreeAsync::set_many(const map<string, bufferlist> &in_map) {
1816   int err = 0;
1817   bufferlist inbl;
1818   bufferlist outbl;
1819   std::set<string> keys;
1820
1821   map<string, bufferlist> big_map;
1822   for (map<string, bufferlist>::const_iterator it = in_map.begin();
1823       it != in_map.end(); ++it) {
1824     keys.insert(it->first);
1825     big_map.insert(*it);
1826   }
1827
1828   if (verbose) cout << "created key set and big_map" << std::endl;
1829
1830   ::encode(keys, inbl);
1831   librados::AioCompletion * aioc = rados.aio_create_completion();
1832   io_ctx.aio_exec(index_name, aioc,  "kvs", "read_many", inbl, &outbl);
1833   aioc->wait_for_safe();
1834   err = aioc->get_return_value();
1835   aioc->release();
1836   if (err < 0) {
1837     cerr << "getting index failed with " << err << std::endl;
1838     return err;
1839   }
1840
1841   map<string, bufferlist> imap;//read from the index
1842   bufferlist::iterator blit = outbl.begin();
1843   ::decode(imap, blit);
1844
1845   if (verbose) cout << "finished reading index for objects. there are "
1846       << imap.size() << " entries that need to be changed. " << std::endl;
1847
1848
1849   vector<object_data> to_delete;
1850
1851   vector<object_data> to_create;
1852
1853   if (verbose) cout << "setting up to_delete and to_create vectors from index "
1854       << "map" << std::endl;
1855   //set up to_delete from index map
1856   for (map<string, bufferlist>::iterator it = imap.begin(); it != imap.end();
1857       ++it){
1858     index_data idata;
1859     blit = it->second.begin();
1860     idata.decode(blit);
1861     to_delete.push_back(object_data(idata.min_kdata, idata.kdata, idata.obj));
1862     err = read_object(idata.obj, &to_delete[to_delete.size() - 1]);
1863     if (err < 0) {
1864       if (verbose) cout << "reading " << idata.obj << " failed with " << err
1865           << std::endl;
1866       return set_many(in_map);
1867     }
1868
1869     big_map.insert(to_delete[to_delete.size() - 1].omap.begin(),
1870         to_delete[to_delete.size() - 1].omap.end());
1871   }
1872
1873   to_create.push_back(object_data(
1874         to_string(client_name, client_index++)));
1875   to_create[0].min_kdata = to_delete[0].min_kdata;
1876
1877   for(map<string, bufferlist>::iterator it = big_map.begin();
1878       it != big_map.end(); ++it) {
1879     if (to_create[to_create.size() - 1].omap.size() == 1.5 * k) {
1880       to_create[to_create.size() - 1].max_kdata =
1881           key_data(to_create[to_create.size() - 1]
1882                                  .omap.rbegin()->first);
1883
1884       to_create.push_back(object_data(
1885         to_string(client_name, client_index++)));
1886       to_create[to_create.size() - 1].min_kdata =
1887           to_create[to_create.size() - 2].max_kdata;
1888     }
1889
1890     to_create[to_create.size() - 1].omap.insert(*it);
1891   }
1892   to_create[to_create.size() - 1].max_kdata =
1893       to_delete[to_delete.size() - 1].max_kdata;
1894
1895   vector<librados::ObjectWriteOperation> owos(2 + 2 * to_delete.size()
1896                                               + to_create.size());
1897   vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1898
1899
1900   index_data idata;
1901   set_up_prefix_index(to_create, to_delete, &owos[0], &idata, &err);
1902
1903   if (verbose) cout << "finished making to_create and to_delete. "
1904       << std::endl;
1905
1906   ops.push_back(make_pair(
1907       pair<int, string>(ADD_PREFIX, index_name),
1908       &owos[0]));
1909   for (int i = 1; i < 2 + 2 * (int)to_delete.size() + (int)to_create.size();
1910       i++) {
1911     ops.push_back(make_pair(make_pair(0,""), &owos[i]));
1912   }
1913
1914   set_up_ops(to_create, to_delete, &ops, idata, &err);
1915
1916   cout << "finished setting up ops. Starting critical section..." << std::endl;
1917
1918   /////BEGIN CRITICAL SECTION/////
1919   //put prefix on index entry for idata.val
1920   err = perform_ops("\t\t" + client_name + "-set_many:", idata, &ops);
1921   if (err < 0) {
1922     return set_many(in_map);
1923   }
1924   if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
1925       << std::endl;
1926   /////END CRITICAL SECTION/////
1927   icache_lock.Lock();
1928   for (vector<delete_data>::iterator it = idata.to_delete.begin();
1929       it != idata.to_delete.end(); ++it) {
1930     icache.erase(it->max);
1931   }
1932   for (vector<create_data>::iterator it = idata.to_create.begin();
1933       it != idata.to_create.end(); ++it) {
1934     icache.push(index_data(*it));
1935   }
1936   icache_lock.Unlock();
1937   return err;
1938 }
1939
1940 int KvFlatBtreeAsync::remove_all() {
1941   if (verbose) cout << client_name << ": removing all" << std::endl;
1942   int err = 0;
1943   librados::ObjectReadOperation oro;
1944   librados::AioCompletion * oro_aioc = rados.aio_create_completion();
1945   std::map<std::string, bufferlist> index_set;
1946   oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1947   err = io_ctx.aio_operate(index_name, oro_aioc, &oro, NULL);
1948   if (err < 0){
1949     if (err == -ENOENT) {
1950       return 0;
1951     }
1952     if (verbose) cout << "getting keys failed with error " << err << std::endl;
1953     return err;
1954   }
1955   oro_aioc->wait_for_safe();
1956   oro_aioc->release();
1957
1958   librados::ObjectWriteOperation rm_index;
1959   librados::AioCompletion * rm_index_aioc = rados.aio_create_completion();
1960   map<std::string,bufferlist> new_index;
1961   new_index["1"] = index_set["1"];
1962   rm_index.omap_clear();
1963   rm_index.omap_set(new_index);
1964   io_ctx.aio_operate(index_name, rm_index_aioc, &rm_index);
1965   err = rm_index_aioc->get_return_value();
1966   rm_index_aioc->release();
1967   if (err < 0) {
1968     if (verbose) cout << "rm index aioc failed with " << err
1969         << std::endl;
1970     return err;
1971   }
1972
1973   if (!index_set.empty()) {
1974     for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
1975         it != index_set.end(); ++it){
1976       librados::ObjectWriteOperation sub;
1977       if (it->first == "1") {
1978         sub.omap_clear();
1979       } else {
1980         sub.remove();
1981       }
1982       index_data idata;
1983       bufferlist::iterator b = it->second.begin();
1984       idata.decode(b);
1985       io_ctx.operate(idata.obj, &sub);
1986     }
1987   }
1988
1989   icache.clear();
1990
1991   return 0;
1992 }
1993
1994 int KvFlatBtreeAsync::get_all_keys(std::set<std::string> *keys) {
1995   if (verbose) cout << client_name << ": getting all keys" << std::endl;
1996   int err = 0;
1997   librados::ObjectReadOperation oro;
1998   std::map<std::string,bufferlist> index_set;
1999   oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
2000   io_ctx.operate(index_name, &oro, NULL);
2001   if (err < 0){
2002     if (verbose) cout << "getting keys failed with error " << err << std::endl;
2003     return err;
2004   }
2005   for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
2006       it != index_set.end(); ++it){
2007     librados::ObjectReadOperation sub;
2008     std::set<std::string> ret;
2009     sub.omap_get_keys2("",LONG_MAX,&ret, nullptr, &err);
2010     index_data idata;
2011     bufferlist::iterator b = it->second.begin();
2012     idata.decode(b);
2013     io_ctx.operate(idata.obj, &sub, NULL);
2014     keys->insert(ret.begin(), ret.end());
2015   }
2016   return err;
2017 }
2018
2019 int KvFlatBtreeAsync::get_all_keys_and_values(
2020     map<std::string,bufferlist> *kv_map) {
2021   if (verbose) cout << client_name << ": getting all keys and values"
2022       << std::endl;
2023   int err = 0;
2024   librados::ObjectReadOperation first_read;
2025   std::set<std::string> index_set;
2026   first_read.omap_get_keys2("",LONG_MAX,&index_set, nullptr, &err);
2027   io_ctx.operate(index_name, &first_read, NULL);
2028   if (err < 0){
2029     if (verbose) cout << "getting keys failed with error " << err << std::endl;
2030     return err;
2031   }
2032   for (std::set<std::string>::iterator it = index_set.begin();
2033       it != index_set.end(); ++it){
2034     librados::ObjectReadOperation sub;
2035     map<std::string, bufferlist> ret;
2036     sub.omap_get_vals2("",LONG_MAX,&ret, nullptr, &err);
2037     io_ctx.operate(*it, &sub, NULL);
2038     kv_map->insert(ret.begin(), ret.end());
2039   }
2040   return err;
2041 }
2042
2043 bool KvFlatBtreeAsync::is_consistent() {
2044   int err;
2045   bool ret = true;
2046   if (verbose) cout << client_name << ": checking consistency" << std::endl;
2047   std::map<std::string,bufferlist> index;
2048   map<std::string, std::set<std::string> > sub_objs;
2049   librados::ObjectReadOperation oro;
2050   oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2051   io_ctx.operate(index_name, &oro, NULL);
2052   if (err < 0){
2053     //probably because the index doesn't exist - this might be ok.
2054     for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2055         oit != io_ctx.nobjects_end(); ++oit) {
2056       //if this executes, there are floating objects.
2057       cerr << "Not consistent! found floating object " << oit->get_oid()
2058              << std::endl;
2059       ret = false;
2060     }
2061     return ret;
2062   }
2063
2064   std::map<std::string, string> parsed_index;
2065   std::set<std::string> onames;
2066   std::set<std::string> special_names;
2067   for (map<std::string,bufferlist>::iterator it = index.begin();
2068       it != index.end(); ++it) {
2069     if (it->first != "") {
2070       index_data idata;
2071       bufferlist::iterator b = it->second.begin();
2072       idata.decode(b);
2073       if (idata.prefix != "") {
2074         for(vector<delete_data>::iterator dit = idata.to_delete.begin();
2075             dit != idata.to_delete.end(); ++dit) {
2076           librados::ObjectReadOperation oro;
2077           librados::AioCompletion * aioc = rados.aio_create_completion();
2078           bufferlist un;
2079           oro.getxattr("unwritable", &un, &err);
2080           io_ctx.aio_operate(dit->obj, aioc, &oro, NULL);
2081           aioc->wait_for_safe();
2082           err = aioc->get_return_value();
2083           if (ceph_clock_now() - idata.ts > timeout) {
2084             if (err < 0) {
2085               aioc->release();
2086               if (err == -ENOENT) {
2087                 continue;
2088               } else {
2089                 cerr << "Not consistent! reading object " << dit->obj
2090                 << "returned " << err << std::endl;
2091                 ret = false;
2092                 break;
2093               }
2094             }
2095             if (atoi(string(un.c_str(), un.length()).c_str()) != 1 &&
2096                 aioc->get_version64() != dit->version) {
2097               cerr << "Not consistent! object " << dit->obj << " has been "
2098                   << " modified since the client died was not cleaned up."
2099                   << std::endl;
2100               ret = false;
2101             }
2102           }
2103           special_names.insert(dit->obj);
2104           aioc->release();
2105         }
2106         for(vector<create_data >::iterator cit = idata.to_create.begin();
2107             cit != idata.to_create.end(); ++cit) {
2108           special_names.insert(cit->obj);
2109         }
2110       }
2111       parsed_index.insert(make_pair(it->first, idata.obj));
2112       onames.insert(idata.obj);
2113     }
2114   }
2115
2116   //make sure that an object exists iff it either is the index
2117   //or is listed in the index
2118   for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2119       oit != io_ctx.nobjects_end(); ++oit) {
2120     string name = oit->get_oid();
2121     if (name != index_name && onames.count(name) == 0
2122         && special_names.count(name) == 0) {
2123       cerr << "Not consistent! found floating object " << name << std::endl;
2124       ret = false;
2125     }
2126   }
2127
2128   //check objects
2129   string prev = "";
2130   for (std::map<std::string, string>::iterator it = parsed_index.begin();
2131       it != parsed_index.end();
2132       ++it) {
2133     librados::ObjectReadOperation read;
2134     read.omap_get_keys2("", LONG_MAX, &sub_objs[it->second], nullptr, &err);
2135     err = io_ctx.operate(it->second, &read, NULL);
2136     int size_int = (int)sub_objs[it->second].size();
2137
2138     //check that size is in the right range
2139     if (it->first != "1" && special_names.count(it->second) == 0 &&
2140         err != -ENOENT && (size_int > 2*k|| size_int < k)
2141         && parsed_index.size() > 1) {
2142       cerr << "Not consistent! Object " << *it << " has size " << size_int
2143           << ", which is outside the acceptable range." << std::endl;
2144       ret = false;
2145     }
2146
2147     //check that all keys belong in that object
2148     for(std::set<std::string>::iterator subit = sub_objs[it->second].begin();
2149         subit != sub_objs[it->second].end(); ++subit) {
2150       if ((it->first != "1"
2151           && *subit > it->first.substr(1,it->first.length()))
2152           || *subit <= prev) {
2153         cerr << "Not consistent! key " << *subit << " does not belong in "
2154             << *it << std::endl;
2155         cerr << "not last element, i.e. " << it->first << " not equal to 1? "
2156             << (it->first != "1") << std::endl
2157             << "greater than " << it->first.substr(1,it->first.length())
2158             <<"? " << (*subit > it->first.substr(1,it->first.length()))
2159             << std::endl
2160             << "less than or equal to " << prev << "? "
2161             << (*subit <= prev) << std::endl;
2162         ret = false;
2163       }
2164     }
2165
2166     prev = it->first.substr(1,it->first.length());
2167   }
2168
2169   if (!ret) {
2170     if (verbose) cout << "failed consistency test - see error log"
2171         << std::endl;
2172     cerr << str();
2173   } else {
2174     if (verbose) cout << "passed consistency test" << std::endl;
2175   }
2176   return ret;
2177 }
2178
2179 string KvFlatBtreeAsync::str() {
2180   stringstream ret;
2181   ret << "Top-level map:" << std::endl;
2182   int err = 0;
2183   std::set<std::string> keys;
2184   std::map<std::string,bufferlist> index;
2185   librados::ObjectReadOperation oro;
2186   librados::AioCompletion * top_aioc = rados.aio_create_completion();
2187   oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2188   io_ctx.aio_operate(index_name, top_aioc, &oro, NULL);
2189   top_aioc->wait_for_safe();
2190   err = top_aioc->get_return_value();
2191   top_aioc->release();
2192   if (err < 0 && err != -5){
2193     if (verbose) cout << "getting keys failed with error " << err << std::endl;
2194     return ret.str();
2195   }
2196   if(index.empty()) {
2197     ret << "There are no objects!" << std::endl;
2198     return ret.str();
2199   }
2200
2201   for (map<std::string,bufferlist>::iterator it = index.begin();
2202       it != index.end(); ++it) {
2203     keys.insert(string(it->second.c_str(), it->second.length())
2204         .substr(1,it->second.length()));
2205   }
2206
2207   vector<std::string> all_names;
2208   vector<int> all_sizes(index.size());
2209   vector<int> all_versions(index.size());
2210   vector<bufferlist> all_unwrit(index.size());
2211   vector<map<std::string,bufferlist> > all_maps(keys.size());
2212   vector<map<std::string,bufferlist>::iterator> its(keys.size());
2213   unsigned done = 0;
2214   vector<bool> dones(keys.size());
2215   ret << std::endl << string(150,'-') << std::endl;
2216
2217   for (map<std::string,bufferlist>::iterator it = index.begin();
2218       it != index.end(); ++it){
2219     index_data idata;
2220     bufferlist::iterator b = it->second.begin();
2221     idata.decode(b);
2222     string s = idata.str();
2223     ret << "|" << string((148 -
2224         ((*it).first.length()+s.length()+3))/2,' ');
2225     ret << (*it).first;
2226     ret << " | ";
2227     ret << string(idata.str());
2228     ret << string((148 -
2229         ((*it).first.length()+s.length()+3))/2,' ');
2230     ret << "|\t";
2231     all_names.push_back(idata.obj);
2232     ret << std::endl << string(150,'-') << std::endl;
2233   }
2234
2235   int indexer = 0;
2236
2237   //get the object names and sizes
2238   for(vector<std::string>::iterator it = all_names.begin(); it
2239   != all_names.end();
2240       ++it) {
2241     librados::ObjectReadOperation oro;
2242     librados::AioCompletion *aioc = rados.aio_create_completion();
2243     oro.omap_get_vals2("", LONG_MAX, &all_maps[indexer], nullptr, &err);
2244     oro.getxattr("unwritable", &all_unwrit[indexer], &err);
2245     io_ctx.aio_operate(*it, aioc, &oro, NULL);
2246     aioc->wait_for_safe();
2247     if (aioc->get_return_value() < 0) {
2248       ret << "reading" << *it << "failed: " << err << std::endl;
2249       //return ret.str();
2250     }
2251     all_sizes[indexer] = all_maps[indexer].size();
2252     all_versions[indexer] = aioc->get_version64();
2253     indexer++;
2254     aioc->release();
2255   }
2256
2257   ret << "///////////////////OBJECT NAMES////////////////" << std::endl;
2258   //HEADERS
2259   ret << std::endl;
2260   for (int i = 0; i < indexer; i++) {
2261    ret << "---------------------------\t";
2262   }
2263   ret << std::endl;
2264   for (int i = 0; i < indexer; i++) {
2265     ret << "|" << string((25 -
2266         (string("Bucket: ").length() + all_names[i].length()))/2, ' ');
2267     ret << "Bucket: " << all_names[i];
2268     ret << string((25 -
2269         (string("Bucket: ").length() + all_names[i].length()))/2, ' ') << "|\t";
2270   }
2271   ret << std::endl;
2272   for (int i = 0; i < indexer; i++) {
2273     its[i] = all_maps[i].begin();
2274     ret << "|" << string((25 - (string("size: ").length()
2275         + to_string("",all_sizes[i]).length()))/2, ' ');
2276     ret << "size: " << all_sizes[i];
2277     ret << string((25 - (string("size: ").length()
2278           + to_string("",all_sizes[i]).length()))/2, ' ') << "|\t";
2279   }
2280   ret << std::endl;
2281   for (int i = 0; i < indexer; i++) {
2282     its[i] = all_maps[i].begin();
2283     ret << "|" << string((25 - (string("version: ").length()
2284         + to_string("",all_versions[i]).length()))/2, ' ');
2285     ret << "version: " << all_versions[i];
2286     ret << string((25 - (string("version: ").length()
2287           + to_string("",all_versions[i]).length()))/2, ' ') << "|\t";
2288   }
2289   ret << std::endl;
2290   for (int i = 0; i < indexer; i++) {
2291     its[i] = all_maps[i].begin();
2292     ret << "|" << string((25 - (string("unwritable? ").length()
2293         + 1))/2, ' ');
2294     ret << "unwritable? " << string(all_unwrit[i].c_str(),
2295         all_unwrit[i].length());
2296     ret << string((25 - (string("unwritable? ").length()
2297           + 1))/2, ' ') << "|\t";
2298   }
2299   ret << std::endl;
2300   for (int i = 0; i < indexer; i++) {
2301     ret << "---------------------------\t";
2302   }
2303   ret << std::endl;
2304   ret << "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl;
2305
2306
2307   ret << std::endl;
2308   for (int i = 0; i < indexer; i++) {
2309     ret << "---------------------------\t";
2310   }
2311   ret << std::endl;
2312   //each time through this part is two lines
2313   while(done < keys.size()) {
2314     for(int i = 0; i < indexer; i++) {
2315       if(dones[i]){
2316         ret << "                          \t";
2317       } else {
2318         if (its[i] == all_maps[i].end()){
2319           done++;
2320           dones[i] = true;
2321           ret << "                          \t";
2322         } else {
2323           ret << "|" << string((25 -
2324               ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2325           ret << (*its[i]).first;
2326           ret << " | ";
2327           ret << string(its[i]->second.c_str(), its[i]->second.length());
2328           ret << string((25 -
2329               ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2330           ret << "|\t";
2331           ++(its[i]);
2332         }
2333
2334       }
2335     }
2336     ret << std::endl;
2337     for (int i = 0; i < indexer; i++) {
2338       if(dones[i]){
2339         ret << "                          \t";
2340       } else {
2341         ret << "---------------------------\t";
2342       }
2343     }
2344     ret << std::endl;
2345
2346   }
2347   return ret.str();
2348 }