Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / HashIndex.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "include/compat.h"
16 #include "include/types.h"
17 #include "include/buffer.h"
18 #include "osd/osd_types.h"
19 #include <errno.h>
20
21 #include "HashIndex.h"
22
23 #include "common/errno.h"
24 #include "common/debug.h"
25 #define dout_context cct
26 #define dout_subsys ceph_subsys_filestore
27
28 const string HashIndex::SUBDIR_ATTR = "contents";
29 const string HashIndex::SETTINGS_ATTR = "settings";
30 const string HashIndex::IN_PROGRESS_OP_TAG = "in_progress_op";
31
32 /// hex digit to integer value
33 int hex_to_int(char c)
34 {
35   if (c >= '0' && c <= '9')
36     return c - '0';
37   if (c >= 'A' && c <= 'F')
38     return c - 'A' + 10;
39   ceph_abort();
40 }
41
42 /// int value to hex digit
43 char int_to_hex(int v)
44 {
45   assert(v < 16);
46   if (v < 10)
47     return '0' + v;
48   return 'A' + v - 10;
49 }
50
51 /// reverse bits in a nibble (0..15)
52 int reverse_nibble_bits(int in)
53 {
54   assert(in < 16);
55   return
56     ((in & 8) >> 3) |
57     ((in & 4) >> 1) |
58     ((in & 2) << 1) |
59     ((in & 1) << 3);
60 }
61
62 /// reverse nibble bits in a hex digit
63 char reverse_hexdigit_bits(char c)
64 {
65   return int_to_hex(reverse_nibble_bits(hex_to_int(c)));
66 }
67
68 /// reverse nibble bits in a hex string
69 string reverse_hexdigit_bits_string(string s)
70 {
71   for (unsigned i=0; i<s.size(); ++i)
72     s[i] = reverse_hexdigit_bits(s[i]);
73   return s;
74 }
75
76 /// compare hex digit (as length 1 string) bitwise
77 bool cmp_hexdigit_bitwise(const string& l, const string& r)
78 {
79   assert(l.length() == 1 && r.length() == 1);
80   int lv = hex_to_int(l[0]);
81   int rv = hex_to_int(r[0]);
82   assert(lv < 16);
83   assert(rv < 16);
84   return reverse_nibble_bits(lv) < reverse_nibble_bits(rv);
85 }
86
87 /// compare hex digit string bitwise
88 bool cmp_hexdigit_string_bitwise(const string& l, const string& r)
89 {
90   string ll = reverse_hexdigit_bits_string(l);
91   string rr = reverse_hexdigit_bits_string(r);
92   return ll < rr;
93 }
94
95 int HashIndex::cleanup() {
96   bufferlist bl;
97   int r = get_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl);
98   if (r < 0) {
99     // No in progress operations!
100     return 0;
101   }
102   bufferlist::iterator i = bl.begin();
103   InProgressOp in_progress(i);
104   subdir_info_s info;
105   r = get_info(in_progress.path, &info);
106   if (r == -ENOENT) {
107     return end_split_or_merge(in_progress.path);
108   } else if (r < 0) {
109     return r;
110   }
111
112   if (in_progress.is_split())
113     return complete_split(in_progress.path, info);
114   else if (in_progress.is_merge())
115     return complete_merge(in_progress.path, info);
116   else if (in_progress.is_col_split()) {
117     for (vector<string>::iterator i = in_progress.path.begin();
118          i != in_progress.path.end();
119          ++i) {
120       vector<string> path(in_progress.path.begin(), i);
121       int r = reset_attr(path);
122       if (r < 0)
123         return r;
124     }
125     return 0;
126   }
127   else
128     return -EINVAL;
129 }
130
131 int HashIndex::reset_attr(
132   const vector<string> &path)
133 {
134   int exists = 0;
135   int r = path_exists(path, &exists);
136   if (r < 0)
137     return r;
138   if (!exists)
139     return 0;
140   map<string, ghobject_t> objects;
141   vector<string> subdirs;
142   r = list_objects(path, 0, 0, &objects);
143   if (r < 0)
144     return r;
145   r = list_subdirs(path, &subdirs);
146   if (r < 0)
147     return r;
148
149   subdir_info_s info;
150   info.hash_level = path.size();
151   info.objs = objects.size();
152   info.subdirs = subdirs.size();
153   return set_info(path, info);
154 }
155
156 int HashIndex::col_split_level(
157   HashIndex &from,
158   HashIndex &to,
159   const vector<string> &path,
160   uint32_t inbits,
161   uint32_t match,
162   unsigned *mkdirred)
163 {
164   /* For each subdir, move, recurse, or ignore based on comparing the low order
165    * bits of the hash represented by the subdir path with inbits, match passed
166    * in.
167    */
168   vector<string> subdirs;
169   int r = from.list_subdirs(path, &subdirs);
170   if (r < 0)
171     return r;
172   map<string, ghobject_t> objects;
173   r = from.list_objects(path, 0, 0, &objects);
174   if (r < 0)
175     return r;
176
177   set<string> to_move;
178   for (vector<string>::iterator i = subdirs.begin();
179        i != subdirs.end();
180        ++i) {
181     uint32_t bits = 0;
182     uint32_t hash = 0;
183     vector<string> sub_path(path.begin(), path.end());
184     sub_path.push_back(*i);
185     path_to_hobject_hash_prefix(sub_path, &bits, &hash);
186     if (bits < inbits) {
187       if (hobject_t::match_hash(hash, bits, match)) {
188         r = col_split_level(
189           from,
190           to,
191           sub_path,
192           inbits,
193           match,
194           mkdirred);
195         if (r < 0)
196           return r;
197         if (*mkdirred > path.size())
198           *mkdirred = path.size();
199       } // else, skip, doesn't need to be moved or recursed into
200     } else {
201       if (hobject_t::match_hash(hash, inbits, match)) {
202         to_move.insert(*i);
203       }
204     } // else, skip, doesn't need to be moved or recursed into
205   }
206
207   /* Then, do the same for each object */
208   map<string, ghobject_t> objs_to_move;
209   for (map<string, ghobject_t>::iterator i = objects.begin();
210        i != objects.end();
211        ++i) {
212     if (i->second.match(inbits, match)) {
213       objs_to_move.insert(*i);
214     }
215   }
216
217   if (objs_to_move.empty() && to_move.empty())
218     return 0;
219
220   // Make parent directories as needed
221   while (*mkdirred < path.size()) {
222     ++*mkdirred;
223     int exists = 0;
224     vector<string> creating_path(path.begin(), path.begin()+*mkdirred);
225     r = to.path_exists(creating_path, &exists);
226     if (r < 0)
227       return r;
228     if (exists)
229       continue;
230     subdir_info_s info;
231     info.objs = 0;
232     info.subdirs = 0;
233     info.hash_level = creating_path.size();
234     if (*mkdirred < path.size() - 1)
235       info.subdirs = 1;
236     r = to.start_col_split(creating_path);
237     if (r < 0)
238       return r;
239     r = to.create_path(creating_path);
240     if (r < 0)
241       return r;
242     r = to.set_info(creating_path, info);
243     if (r < 0)
244       return r;
245     r = to.end_split_or_merge(creating_path);
246     if (r < 0)
247       return r;
248   }
249
250   subdir_info_s from_info;
251   subdir_info_s to_info;
252   r = from.get_info(path, &from_info);
253   if (r < 0)
254     return r;
255   r = to.get_info(path, &to_info);
256   if (r < 0)
257     return r;
258
259   from.start_col_split(path);
260   to.start_col_split(path);
261
262   // Do subdir moves
263   for (set<string>::iterator i = to_move.begin();
264        i != to_move.end();
265        ++i) {
266     from_info.subdirs--;
267     to_info.subdirs++;
268     r = move_subdir(from, to, path, *i);
269     if (r < 0)
270       return r;
271   }
272
273   for (map<string, ghobject_t>::iterator i = objs_to_move.begin();
274        i != objs_to_move.end();
275        ++i) {
276     from_info.objs--;
277     to_info.objs++;
278     r = move_object(from, to, path, *i);
279     if (r < 0)
280       return r;
281   }
282
283
284   r = to.set_info(path, to_info);
285   if (r < 0)
286     return r;
287   r = from.set_info(path, from_info);
288   if (r < 0)
289     return r;
290   from.end_split_or_merge(path);
291   to.end_split_or_merge(path);
292   return 0;
293 }
294
295 int HashIndex::_split(
296   uint32_t match,
297   uint32_t bits,
298   CollectionIndex* dest) {
299   assert(collection_version() == dest->collection_version());
300   unsigned mkdirred = 0;
301   return col_split_level(
302     *this,
303     *static_cast<HashIndex*>(dest),
304     vector<string>(),
305     bits,
306     match,
307     &mkdirred);
308 }
309
310 int HashIndex::split_dirs(const vector<string> &path) {
311   dout(20) << __func__ << " " << path << dendl;
312   subdir_info_s info;
313   int r = get_info(path, &info);
314   if (r < 0) {
315     dout(10) << "error looking up info for " << path << ": "
316              << cpp_strerror(r) << dendl;
317     return r;
318   }
319
320   if (must_split(info)) {
321     dout(1) << __func__ << " " << path << " has " << info.objs
322             << " objects, starting split." << dendl;
323     r = initiate_split(path, info);
324     if (r < 0) {
325       dout(10) << "error initiating split on " << path << ": "
326                << cpp_strerror(r) << dendl;
327       return r;
328     }
329
330     r = complete_split(path, info);
331     dout(1) << __func__ << " " << path << " split completed."
332             << dendl;
333     if (r < 0) {
334       dout(10) << "error completing split on " << path << ": "
335                << cpp_strerror(r) << dendl;
336       return r;
337     }
338   }
339
340   vector<string> subdirs;
341   r = list_subdirs(path, &subdirs);
342   if (r < 0) {
343     dout(10) << "error listing subdirs of " << path << ": "
344              << cpp_strerror(r) << dendl;
345     return r;
346   }
347   for (vector<string>::const_iterator it = subdirs.begin();
348        it != subdirs.end(); ++it) {
349     vector<string> subdir_path(path);
350     subdir_path.push_back(*it);
351     r = split_dirs(subdir_path);
352     if (r < 0) {
353       return r;
354     }
355   }
356
357   return r;
358 }
359
360 int HashIndex::apply_layout_settings() {
361   vector<string> path;
362   dout(10) << __func__ << " split multiple = " << split_multiplier
363            << " merge threshold = " << merge_threshold
364            << " split rand factor = " << cct->_conf->filestore_split_rand_factor
365            << dendl;
366   int r = write_settings();
367   if (r < 0)
368     return r;
369   return split_dirs(path);
370 }
371
372 int HashIndex::_init() {
373   subdir_info_s info;
374   vector<string> path;
375   int r = set_info(path, info);
376   if (r < 0)
377     return r;
378   return write_settings();
379 }
380
381 int HashIndex::write_settings() {
382   if (cct->_conf->filestore_split_rand_factor > 0) {
383     settings.split_rand_factor = rand() % cct->_conf->filestore_split_rand_factor;
384   } else {
385     settings.split_rand_factor = 0;
386   }
387   vector<string> path;
388   bufferlist bl;
389   settings.encode(bl);
390   return add_attr_path(path, SETTINGS_ATTR, bl);
391 }
392
393 int HashIndex::read_settings() {
394   vector<string> path;
395   bufferlist bl;
396   int r = get_attr_path(path, SETTINGS_ATTR, bl);
397   if (r == -ENODATA)
398     return 0;
399   if (r < 0) {
400     derr << __func__ << " error reading settings: " << cpp_strerror(r) << dendl;
401     return r;
402   }
403   bufferlist::iterator it = bl.begin();
404   settings.decode(it);
405   dout(20) << __func__ << " split_rand_factor = " << settings.split_rand_factor << dendl;
406   return 0;
407 }
408
409 /* LFNIndex virtual method implementations */
410 int HashIndex::_created(const vector<string> &path,
411                         const ghobject_t &oid,
412                         const string &mangled_name) {
413   subdir_info_s info;
414   int r;
415   r = get_info(path, &info);
416   if (r < 0)
417     return r;
418   info.objs++;
419   r = set_info(path, info);
420   if (r < 0)
421     return r;
422
423   if (must_split(info)) {
424     dout(1) << __func__ << " " << path << " has " << info.objs
425             << " objects, starting split." << dendl;
426     int r = initiate_split(path, info);
427     if (r < 0)
428       return r;
429     r = complete_split(path, info);
430     dout(1) << __func__ << " " << path << " split completed."
431             << dendl;
432     return r;
433   } else {
434     return 0;
435   }
436 }
437
438 int HashIndex::_remove(const vector<string> &path,
439                        const ghobject_t &oid,
440                        const string &mangled_name) {
441   int r;
442   r = remove_object(path, oid);
443   if (r < 0)
444     return r;
445   subdir_info_s info;
446   r = get_info(path, &info);
447   if (r < 0)
448     return r;
449   info.objs--;
450   r = set_info(path, info);
451   if (r < 0)
452     return r;
453   if (must_merge(info)) {
454     r = initiate_merge(path, info);
455     if (r < 0)
456       return r;
457     return complete_merge(path, info);
458   } else {
459     return 0;
460   }
461 }
462
463 int HashIndex::_lookup(const ghobject_t &oid,
464                        vector<string> *path,
465                        string *mangled_name,
466                        int *hardlink) {
467   vector<string> path_comp;
468   get_path_components(oid, &path_comp);
469   vector<string>::iterator next = path_comp.begin();
470   int exists;
471   while (1) {
472     int r = path_exists(*path, &exists);
473     if (r < 0)
474       return r;
475     if (!exists) {
476       if (path->empty())
477         return -ENOENT;
478       path->pop_back();
479       break;
480     }
481     if (next == path_comp.end())
482       break;
483     path->push_back(*(next++));
484   }
485   return get_mangled_name(*path, oid, mangled_name, hardlink);
486 }
487
488 int HashIndex::_collection_list_partial(const ghobject_t &start,
489                                         const ghobject_t &end,
490                                         int max_count,
491                                         vector<ghobject_t> *ls,
492                                         ghobject_t *next) {
493   vector<string> path;
494   ghobject_t _next;
495   if (!next)
496     next = &_next;
497   *next = start;
498   dout(20) << __func__ << " start:" << start << " end:" << end << "-" << max_count << " ls.size " << ls->size() << dendl;
499   return list_by_hash(path, end, max_count, next, ls);
500 }
501
502 int HashIndex::prep_delete() {
503   return recursive_remove(vector<string>());
504 }
505
506 int HashIndex::_pre_hash_collection(uint32_t pg_num, uint64_t expected_num_objs) {
507   int ret;
508   vector<string> path;
509   subdir_info_s root_info;
510   // Make sure there is neither objects nor sub-folders
511   // in this collection
512   ret = get_info(path, &root_info);
513   if (ret < 0)
514     return ret;
515
516   // Do the folder splitting first
517   ret = pre_split_folder(pg_num, expected_num_objs);
518   if (ret < 0)
519     return ret;
520   // Initialize the folder info starting from root
521   return init_split_folder(path, 0);
522 }
523
524 int HashIndex::pre_split_folder(uint32_t pg_num, uint64_t expected_num_objs)
525 {
526   // If folder merging is enabled (by setting the threshold positive),
527   // no need to split
528   if (merge_threshold > 0)
529     return 0;
530   const coll_t c = coll();
531   // Do not split if the expected number of objects in this collection is zero (by default)
532   if (expected_num_objs == 0)
533     return 0;
534
535   // Calculate the number of leaf folders (which actually store files)
536   // need to be created
537   const uint64_t objs_per_folder = ((uint64_t)(abs(merge_threshold)) * (uint64_t)split_multiplier + settings.split_rand_factor) * 16;
538   uint64_t leavies = expected_num_objs / objs_per_folder ;
539   // No need to split
540   if (leavies == 0 || expected_num_objs == objs_per_folder)
541     return 0;
542
543   spg_t spgid;
544   if (!c.is_pg_prefix(&spgid))
545     return -EINVAL;
546   const ps_t ps = spgid.pgid.ps();
547
548   // the most significant bits of pg_num
549   const int pg_num_bits = calc_num_bits(pg_num - 1);
550   ps_t tmp_id = ps;
551   // calculate the number of levels we only create one sub folder
552   int num = pg_num_bits / 4;
553   // pg num's hex value is like 1xxx,xxxx,xxxx but not 1111,1111,1111,
554   // so that splitting starts at level 3
555   if (pg_num_bits % 4 == 0 && pg_num < ((uint32_t)1 << pg_num_bits)) {
556     --num;
557   }
558
559   int ret;
560   // Start with creation that only has one subfolder
561   vector<string> paths;
562   int dump_num = num;
563   while (num-- > 0) {
564     ps_t v = tmp_id & 0x0000000f;
565     paths.push_back(to_hex(v));
566     ret = create_path(paths);
567     if (ret < 0 && ret != -EEXIST)
568       return ret;
569     tmp_id = tmp_id >> 4;
570   }
571
572   // Starting from here, we can split by creating multiple subfolders
573   const int left_bits = pg_num_bits - dump_num * 4;
574   // this variable denotes how many bits (for this level) that can be
575   // used for sub folder splitting
576   int split_bits = 4 - left_bits;
577   // the below logic is inspired by rados.h#ceph_stable_mod,
578   // it basically determines how many sub-folders should we
579   // create for splitting
580   assert(pg_num_bits > 0); // otherwise BAD_SHIFT
581   if (((1 << (pg_num_bits - 1)) | ps) >= pg_num) {
582     ++split_bits;
583   }
584   const uint32_t subs = (1 << split_bits);
585   // Calculate how many levels we create starting from here
586   int level  = 0;
587   leavies /= subs;
588   while (leavies > 1) {
589     ++level;
590     leavies = leavies >> 4;
591   }
592   for (uint32_t i = 0; i < subs; ++i) {
593     assert(split_bits <= 4); // otherwise BAD_SHIFT
594     int v = tmp_id | (i << ((4 - split_bits) % 4));
595     paths.push_back(to_hex(v));
596     ret = create_path(paths);
597     if (ret < 0 && ret != -EEXIST)
598       return ret;
599     ret = recursive_create_path(paths, level);
600     if (ret < 0)
601       return ret;
602     paths.pop_back();
603   }
604   return 0;
605 }
606
607 int HashIndex::init_split_folder(vector<string> &path, uint32_t hash_level)
608 {
609   // Get the number of sub directories for the current path
610   vector<string> subdirs;
611   int ret = list_subdirs(path, &subdirs);
612   if (ret < 0)
613     return ret;
614   subdir_info_s info;
615   info.subdirs = subdirs.size();
616   info.hash_level = hash_level;
617   ret = set_info(path, info);
618   if (ret < 0)
619     return ret;
620   ret = fsync_dir(path);
621   if (ret < 0)
622     return ret;
623
624   // Do the same for subdirs
625   vector<string>::const_iterator iter;
626   for (iter = subdirs.begin(); iter != subdirs.end(); ++iter) {
627     path.push_back(*iter);
628     ret = init_split_folder(path, hash_level + 1);
629     if (ret < 0)
630       return ret;
631     path.pop_back();
632   }
633   return 0;
634 }
635
636 int HashIndex::recursive_create_path(vector<string>& path, int level)
637 {
638   if (level == 0)
639     return 0;
640   for (int i = 0; i < 16; ++i) {
641     path.push_back(to_hex(i));
642     int ret = create_path(path);
643     if (ret < 0 && ret != -EEXIST)
644       return ret;
645     ret = recursive_create_path(path, level - 1);
646     if (ret < 0)
647       return ret;
648     path.pop_back();
649   }
650   return 0;
651 }
652
653 int HashIndex::recursive_remove(const vector<string> &path) {
654   return _recursive_remove(path, true);
655 }
656
657 int HashIndex::_recursive_remove(const vector<string> &path, bool top) {
658   vector<string> subdirs;
659   dout(20) << __func__ << " path=" << path << dendl;
660   int r = list_subdirs(path, &subdirs);
661   if (r < 0)
662     return r;
663   map<string, ghobject_t> objects;
664   r = list_objects(path, 0, 0, &objects);
665   if (r < 0)
666     return r;
667   if (!objects.empty())
668     return -ENOTEMPTY;
669   vector<string> subdir(path);
670   for (vector<string>::iterator i = subdirs.begin();
671        i != subdirs.end();
672        ++i) {
673     subdir.push_back(*i);
674     r = _recursive_remove(subdir, false);
675     if (r < 0)
676       return r;
677     subdir.pop_back();
678   }
679   if (top)
680     return 0;
681   else
682     return remove_path(path);
683 }
684
685 int HashIndex::start_col_split(const vector<string> &path) {
686   bufferlist bl;
687   InProgressOp op_tag(InProgressOp::COL_SPLIT, path);
688   op_tag.encode(bl);
689   int r = add_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl);
690   if (r < 0)
691     return r;
692   return fsync_dir(vector<string>());
693 }
694
695 int HashIndex::start_split(const vector<string> &path) {
696   bufferlist bl;
697   InProgressOp op_tag(InProgressOp::SPLIT, path);
698   op_tag.encode(bl);
699   int r = add_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl);
700   if (r < 0)
701     return r;
702   return fsync_dir(vector<string>());
703 }
704
705 int HashIndex::start_merge(const vector<string> &path) {
706   bufferlist bl;
707   InProgressOp op_tag(InProgressOp::MERGE, path);
708   op_tag.encode(bl);
709   int r = add_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl);
710   if (r < 0)
711     return r;
712   return fsync_dir(vector<string>());
713 }
714
715 int HashIndex::end_split_or_merge(const vector<string> &path) {
716   return remove_attr_path(vector<string>(), IN_PROGRESS_OP_TAG);
717 }
718
719 int HashIndex::get_info(const vector<string> &path, subdir_info_s *info) {
720   bufferlist buf;
721   int r = get_attr_path(path, SUBDIR_ATTR, buf);
722   if (r < 0)
723     return r;
724   bufferlist::iterator bufiter = buf.begin();
725   info->decode(bufiter);
726   assert(path.size() == (unsigned)info->hash_level);
727   return 0;
728 }
729
730 int HashIndex::set_info(const vector<string> &path, const subdir_info_s &info) {
731   bufferlist buf;
732   assert(path.size() == (unsigned)info.hash_level);
733   info.encode(buf);
734   return add_attr_path(path, SUBDIR_ATTR, buf);
735 }
736
737 bool HashIndex::must_merge(const subdir_info_s &info) {
738   return (info.hash_level > 0 &&
739           merge_threshold > 0 &&
740           info.objs < (unsigned)merge_threshold &&
741           info.subdirs == 0);
742 }
743
744 bool HashIndex::must_split(const subdir_info_s &info) {
745   return (info.hash_level < (unsigned)MAX_HASH_LEVEL &&
746           info.objs > ((unsigned)(abs(merge_threshold) * split_multiplier + settings.split_rand_factor) * 16));
747
748 }
749
750 int HashIndex::initiate_merge(const vector<string> &path, subdir_info_s info) {
751   return start_merge(path);
752 }
753
754 int HashIndex::complete_merge(const vector<string> &path, subdir_info_s info) {
755   vector<string> dst = path;
756   dst.pop_back();
757   subdir_info_s dstinfo;
758   int r, exists;
759   r = path_exists(path, &exists);
760   if (r < 0)
761     return r;
762   r = get_info(dst, &dstinfo);
763   if (r < 0)
764     return r;
765   if (exists) {
766     r = move_objects(path, dst);
767     if (r < 0)
768       return r;
769     r = reset_attr(dst);
770     if (r < 0)
771       return r;
772     r = remove_path(path);
773     if (r < 0)
774       return r;
775   }
776   if (must_merge(dstinfo)) {
777     r = initiate_merge(dst, dstinfo);
778     if (r < 0)
779       return r;
780     r = fsync_dir(dst);
781     if (r < 0)
782       return r;
783     return complete_merge(dst, dstinfo);
784   }
785   r = fsync_dir(dst);
786   if (r < 0)
787     return r;
788   return end_split_or_merge(path);
789 }
790
791 int HashIndex::initiate_split(const vector<string> &path, subdir_info_s info) {
792   return start_split(path);
793 }
794
795 int HashIndex::complete_split(const vector<string> &path, subdir_info_s info) {
796   int level = info.hash_level;
797   map<string, ghobject_t> objects;
798   vector<string> dst = path;
799   int r;
800   dst.push_back("");
801   r = list_objects(path, 0, 0, &objects);
802   if (r < 0)
803     return r;
804   vector<string> subdirs_vec;
805   r = list_subdirs(path, &subdirs_vec);
806   if (r < 0)
807     return r;
808   set<string> subdirs;
809   subdirs.insert(subdirs_vec.begin(), subdirs_vec.end());
810   map<string, map<string, ghobject_t> > mapped;
811   map<string, ghobject_t> moved;
812   int num_moved = 0;
813   for (map<string, ghobject_t>::iterator i = objects.begin();
814        i != objects.end();
815        ++i) {
816     vector<string> new_path;
817     get_path_components(i->second, &new_path);
818     mapped[new_path[level]][i->first] = i->second;
819   }
820   for (map<string, map<string, ghobject_t> >::iterator i = mapped.begin();
821        i != mapped.end();
822        ) {
823     dst[level] = i->first;
824     /* If the info already exists, it must be correct,
825      * we may be picking up a partially finished split */
826     subdir_info_s temp;
827     // subdir has already been fully copied
828     if (subdirs.count(i->first) && !get_info(dst, &temp)) {
829       for (map<string, ghobject_t>::iterator j = i->second.begin();
830            j != i->second.end();
831            ++j) {
832         moved[j->first] = j->second;
833         num_moved++;
834         objects.erase(j->first);
835       }
836       ++i;
837       continue;
838     }
839
840     subdir_info_s info_new;
841     info_new.objs = i->second.size();
842     info_new.subdirs = 0;
843     info_new.hash_level = level + 1;
844     if (must_merge(info_new) && !subdirs.count(i->first)) {
845       mapped.erase(i++);
846       continue;
847     }
848
849     // Subdir doesn't yet exist
850     if (!subdirs.count(i->first)) {
851       info.subdirs += 1;
852       r = create_path(dst);
853       if (r < 0)
854         return r;
855     } // else subdir has been created but only partially copied
856
857     for (map<string, ghobject_t>::iterator j = i->second.begin();
858          j != i->second.end();
859          ++j) {
860       moved[j->first] = j->second;
861       num_moved++;
862       objects.erase(j->first);
863       r = link_object(path, dst, j->second, j->first);
864       // May be a partially finished split
865       if (r < 0 && r != -EEXIST) {
866         return r;
867       }
868     }
869
870     r = fsync_dir(dst);
871     if (r < 0)
872       return r;
873
874     // Presence of info must imply that all objects have been copied
875     r = set_info(dst, info_new);
876     if (r < 0)
877       return r;
878
879     r = fsync_dir(dst);
880     if (r < 0)
881       return r;
882
883     ++i;
884   }
885   r = remove_objects(path, moved, &objects);
886   if (r < 0)
887     return r;
888   info.objs = objects.size();
889   r = reset_attr(path);
890   if (r < 0)
891     return r;
892   r = fsync_dir(path);
893   if (r < 0)
894     return r;
895   return end_split_or_merge(path);
896 }
897
898 void HashIndex::get_path_components(const ghobject_t &oid,
899                                     vector<string> *path) {
900   char buf[MAX_HASH_LEVEL + 1];
901   snprintf(buf, sizeof(buf), "%.*X", MAX_HASH_LEVEL, (uint32_t)oid.hobj.get_nibblewise_key());
902
903   // Path components are the hex characters of oid.hobj.hash, least
904   // significant first
905   for (int i = 0; i < MAX_HASH_LEVEL; ++i) {
906     path->push_back(string(&buf[i], 1));
907   }
908 }
909
910 string HashIndex::get_hash_str(uint32_t hash) {
911   char buf[MAX_HASH_LEVEL + 1];
912   snprintf(buf, sizeof(buf), "%.*X", MAX_HASH_LEVEL, hash);
913   string retval;
914   for (int i = 0; i < MAX_HASH_LEVEL; ++i) {
915     retval.push_back(buf[MAX_HASH_LEVEL - 1 - i]);
916   }
917   return retval;
918 }
919
920 string HashIndex::get_path_str(const ghobject_t &oid) {
921   assert(!oid.is_max());
922   return get_hash_str(oid.hobj.get_hash());
923 }
924
925 uint32_t HashIndex::hash_prefix_to_hash(string prefix) {
926   while (prefix.size() < sizeof(uint32_t) * 2) {
927     prefix.push_back('0');
928   }
929   uint32_t hash;
930   sscanf(prefix.c_str(), "%x", &hash);
931   // nibble reverse
932   hash = ((hash & 0x0f0f0f0f) << 4) | ((hash & 0xf0f0f0f0) >> 4);
933   hash = ((hash & 0x00ff00ff) << 8) | ((hash & 0xff00ff00) >> 8);
934   hash = ((hash & 0x0000ffff) << 16) | ((hash & 0xffff0000) >> 16);
935   return hash;
936 }
937
938 int HashIndex::get_path_contents_by_hash_bitwise(
939   const vector<string> &path,
940   const ghobject_t *next_object,
941   set<string, CmpHexdigitStringBitwise> *hash_prefixes,
942   set<pair<string, ghobject_t>, CmpPairBitwise> *objects)
943 {
944   map<string, ghobject_t> rev_objects;
945   int r;
946   r = list_objects(path, 0, 0, &rev_objects);
947   if (r < 0)
948     return r;
949   // bitwise sort
950   for (map<string, ghobject_t>::iterator i = rev_objects.begin();
951        i != rev_objects.end();
952        ++i) {
953     if (next_object && i->second < *next_object)
954       continue;
955     string hash_prefix = get_path_str(i->second);
956     hash_prefixes->insert(hash_prefix);
957     objects->insert(pair<string, ghobject_t>(hash_prefix, i->second));
958   }
959   vector<string> subdirs;
960   r = list_subdirs(path, &subdirs);
961   if (r < 0)
962     return r;
963
964   // sort subdirs bitwise (by reversing hex digit nibbles)
965   std::sort(subdirs.begin(), subdirs.end(), cmp_hexdigit_bitwise);
966
967   // Local to this function, we will convert the prefix strings
968   // (previously simply the reversed hex digits) to also have each
969   // digit's nibbles reversed.  This will make the strings sort
970   // bitwise.
971   string cur_prefix;
972   for (vector<string>::const_iterator i = path.begin();
973        i != path.end();
974        ++i) {
975     cur_prefix.append(reverse_hexdigit_bits_string(*i));
976   }
977   string next_object_string;
978   if (next_object)
979     next_object_string = reverse_hexdigit_bits_string(get_path_str(*next_object));
980   for (vector<string>::iterator i = subdirs.begin();
981        i != subdirs.end();
982        ++i) {
983     string candidate = cur_prefix + reverse_hexdigit_bits_string(*i);
984     if (next_object) {
985       if (next_object->is_max())
986         continue;
987       if (candidate < next_object_string.substr(0, candidate.size()))
988         continue;
989     }
990     // re-reverse the hex digit nibbles for the caller
991     hash_prefixes->insert(reverse_hexdigit_bits_string(candidate));
992   }
993   return 0;
994 }
995
996 int HashIndex::list_by_hash(const vector<string> &path,
997                             const ghobject_t &end,
998                             int max_count,
999                             ghobject_t *next,
1000                             vector<ghobject_t> *out)
1001 {
1002   assert(out);
1003   return list_by_hash_bitwise(path, end, max_count, next, out);
1004 }
1005
1006 int HashIndex::list_by_hash_bitwise(
1007   const vector<string> &path,
1008   const ghobject_t& end,
1009   int max_count,
1010   ghobject_t *next,
1011   vector<ghobject_t> *out)
1012 {
1013   vector<string> next_path = path;
1014   next_path.push_back("");
1015   set<string, CmpHexdigitStringBitwise> hash_prefixes;
1016   set<pair<string, ghobject_t>, CmpPairBitwise> objects;
1017   int r = get_path_contents_by_hash_bitwise(path,
1018                                             next,
1019                                             &hash_prefixes,
1020                                             &objects);
1021   if (r < 0)
1022     return r;
1023   for (set<string, CmpHexdigitStringBitwise>::iterator i = hash_prefixes.begin();
1024        i != hash_prefixes.end();
1025        ++i) {
1026     dout(20) << __func__ << " prefix " << *i << dendl;
1027     set<pair<string, ghobject_t>, CmpPairBitwise>::iterator j = objects.lower_bound(
1028       make_pair(*i, ghobject_t()));
1029     if (j == objects.end() || j->first != *i) {
1030       *(next_path.rbegin()) = *(i->rbegin());
1031       ghobject_t next_recurse;
1032       if (next)
1033         next_recurse = *next;
1034       r = list_by_hash_bitwise(next_path,
1035                                end,
1036                                max_count,
1037                                &next_recurse,
1038                                out);
1039
1040       if (r < 0)
1041         return r;
1042       if (!next_recurse.is_max()) {
1043         if (next)
1044           *next = next_recurse;
1045         return 0;
1046       }
1047     } else {
1048       while (j != objects.end() && j->first == *i) {
1049         if (max_count > 0 && out->size() == (unsigned)max_count) {
1050           if (next)
1051             *next = j->second;
1052           return 0;
1053         }
1054         if (j->second >= end) {
1055           if (next)
1056             *next = j->second;
1057           return 0;
1058         }
1059         if (!next || j->second >= *next) {
1060           dout(20) << __func__ << " prefix " << *i << " ob " << j->second << dendl;
1061           out->push_back(j->second);
1062         }
1063         ++j;
1064       }
1065     }
1066   }
1067   if (next)
1068     *next = ghobject_t::get_max();
1069   return 0;
1070 }
1071
1072