Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / LFNIndex.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <string>
16 #include <map>
17 #include <set>
18 #include <vector>
19 #include <errno.h>
20 #include <string.h>
21
22 #if defined(__FreeBSD__)
23 #include <sys/param.h>
24 #endif
25
26 #include "osd/osd_types.h"
27 #include "include/object.h"
28 #include "common/config.h"
29 #include "common/debug.h"
30 #include "include/buffer.h"
31 #include "common/ceph_crypto.h"
32 #include "include/compat.h"
33 #include "chain_xattr.h"
34
35 #include "LFNIndex.h"
36 using ceph::crypto::SHA1;
37
38 #define dout_context cct
39 #define dout_subsys ceph_subsys_filestore
40 #undef dout_prefix
41 #define dout_prefix *_dout << "LFNIndex(" << get_base_path() << ") "
42
43
44 const string LFNIndex::LFN_ATTR = "user.cephos.lfn";
45 const string LFNIndex::PHASH_ATTR_PREFIX = "user.cephos.phash.";
46 const string LFNIndex::SUBDIR_PREFIX = "DIR_";
47 const string LFNIndex::FILENAME_COOKIE = "long";
48 const int LFNIndex::FILENAME_PREFIX_LEN =  FILENAME_SHORT_LEN - FILENAME_HASH_LEN -
49                                                                 FILENAME_COOKIE.size() -
50                                                                 FILENAME_EXTRA;
51 void LFNIndex::maybe_inject_failure()
52 {
53   if (error_injection_enabled) {
54     if (current_failure > last_failure &&
55         (((double)(rand() % 10000))/((double)(10000))
56          < error_injection_probability)) {
57       last_failure = current_failure;
58       current_failure = 0;
59       throw RetryException();
60     }
61     ++current_failure;
62   }
63 }
64
65 // Helper to close fd's when we leave scope.  This is useful when used
66 // in combination with RetryException, thrown by the above.
67 struct FDCloser {
68   int fd;
69   explicit FDCloser(int f) : fd(f) {}
70   ~FDCloser() {
71     VOID_TEMP_FAILURE_RETRY(::close(fd));
72   }
73 };
74
75
76 /* Public methods */
77
78 uint64_t LFNIndex::get_max_escaped_name_len(const hobject_t &obj)
79 {
80   ghobject_t ghobj(obj);
81   ghobj.shard_id = shard_id_t(0);
82   ghobj.generation = 0;
83   ghobj.hobj.snap = 0;
84   return lfn_generate_object_name_current(ghobj).size();
85 }
86
87 int LFNIndex::init()
88 {
89   return _init();
90 }
91
92 int LFNIndex::created(const ghobject_t &oid, const char *path)
93 {
94   WRAP_RETRY(
95   vector<string> path_comp;
96   string short_name;
97   r = decompose_full_path(path, &path_comp, 0, &short_name);
98   if (r < 0)
99     goto out;
100   r = lfn_created(path_comp, oid, short_name);
101   if (r < 0) {
102     if (failed) {
103       /* This is hacky, but the only way we get ENOENT from lfn_created here is
104        * if we did a failure injection in _created below AND actually started the
105        * split or merge.  In that case, lfn_created already suceeded, and
106        * WRAP_RETRY already cleaned it up and we are actually done.  In a real
107        * failure, the filestore itself would have ended up calling this with
108        * the new path, not the old one, so we'd find it.
109        */
110       r = 0;
111     }
112     goto out;
113   }
114   r = _created(path_comp, oid, short_name);
115   if (r < 0)
116     goto out;
117     );
118 }
119
120 int LFNIndex::unlink(const ghobject_t &oid)
121 {
122   WRAP_RETRY(
123   vector<string> path;
124   string short_name;
125   r = _lookup(oid, &path, &short_name, NULL);
126   if (r < 0) {
127     goto out;
128   }
129   r = _remove(path, oid, short_name);
130   if (r < 0) {
131     goto out;
132   }
133   );
134 }
135
136 int LFNIndex::lookup(const ghobject_t &oid,
137                      IndexedPath *out_path,
138                      int *hardlink)
139 {
140   WRAP_RETRY(
141   vector<string> path;
142   string short_name;
143   r = _lookup(oid, &path, &short_name, hardlink);
144   if (r < 0)
145     goto out;
146   string full_path = get_full_path(path, short_name);
147   *out_path = std::make_shared<Path>(full_path, this);
148   r = 0;
149   );
150 }
151
152 int LFNIndex::pre_hash_collection(uint32_t pg_num, uint64_t expected_num_objs)
153 {
154   return _pre_hash_collection(pg_num, expected_num_objs);
155 }
156
157
158 int LFNIndex::collection_list_partial(const ghobject_t &start,
159                                       const ghobject_t &end,
160                                       int max_count,
161                                       vector<ghobject_t> *ls,
162                                       ghobject_t *next)
163 {
164   return _collection_list_partial(start, end, max_count, ls, next);
165 }
166
167 /* Derived class utility methods */
168
169 int LFNIndex::fsync_dir(const vector<string> &path)
170 {
171   maybe_inject_failure();
172   int fd = ::open(get_full_path_subdir(path).c_str(), O_RDONLY);
173   if (fd < 0)
174     return -errno;
175   FDCloser f(fd);
176   maybe_inject_failure();
177   int r = ::fsync(fd);
178   maybe_inject_failure();
179   if (r < 0)
180     return -errno;
181   else
182     return 0;
183 }
184
185 int LFNIndex::link_object(const vector<string> &from,
186                           const vector<string> &to,
187                           const ghobject_t &oid,
188                           const string &from_short_name)
189 {
190   int r;
191   string from_path = get_full_path(from, from_short_name);
192   string to_path;
193   maybe_inject_failure();
194   r = lfn_get_name(to, oid, 0, &to_path, 0);
195   if (r < 0)
196     return r;
197   maybe_inject_failure();
198   r = ::link(from_path.c_str(), to_path.c_str());
199   maybe_inject_failure();
200   if (r < 0)
201     return -errno;
202   else
203     return 0;
204 }
205
206 int LFNIndex::remove_objects(const vector<string> &dir,
207                              const map<string, ghobject_t> &to_remove,
208                              map<string, ghobject_t> *remaining)
209 {
210   set<string> clean_chains;
211   for (map<string, ghobject_t>::const_iterator to_clean = to_remove.begin();
212        to_clean != to_remove.end();
213        ++to_clean) {
214     if (!lfn_is_hashed_filename(to_clean->first)) {
215       maybe_inject_failure();
216       int r = ::unlink(get_full_path(dir, to_clean->first).c_str());
217       maybe_inject_failure();
218       if (r < 0)
219         return -errno;
220       continue;
221     }
222     if (clean_chains.count(lfn_get_short_name(to_clean->second, 0)))
223       continue;
224     set<int> holes;
225     map<int, pair<string, ghobject_t> > chain;
226     for (int i = 0; ; ++i) {
227       string short_name = lfn_get_short_name(to_clean->second, i);
228       if (remaining->count(short_name)) {
229         chain[i] = *(remaining->find(short_name));
230       } else if (to_remove.count(short_name)) {
231         holes.insert(i);
232       } else {
233         break;
234       }
235     }
236
237     map<int, pair<string, ghobject_t > >::reverse_iterator candidate = chain.rbegin();
238     for (set<int>::iterator i = holes.begin();
239          i != holes.end();
240          ++i) {
241       if (candidate == chain.rend() || *i > candidate->first) {
242         string remove_path_name =
243           get_full_path(dir, lfn_get_short_name(to_clean->second, *i));
244         maybe_inject_failure();
245         int r = ::unlink(remove_path_name.c_str());
246         maybe_inject_failure();
247         if (r < 0)
248           return -errno;
249         continue;
250       }
251       string from = get_full_path(dir, candidate->second.first);
252       string to = get_full_path(dir, lfn_get_short_name(candidate->second.second, *i));
253       maybe_inject_failure();
254       int r = ::rename(from.c_str(), to.c_str());
255       maybe_inject_failure();
256       if (r < 0)
257         return -errno;
258       remaining->erase(candidate->second.first);
259       remaining->insert(pair<string, ghobject_t>(
260                           lfn_get_short_name(candidate->second.second, *i),
261                                              candidate->second.second));
262       ++candidate;
263     }
264     if (!holes.empty())
265       clean_chains.insert(lfn_get_short_name(to_clean->second, 0));
266   }
267   return 0;
268 }
269
270 int LFNIndex::move_objects(const vector<string> &from,
271                            const vector<string> &to)
272 {
273   map<string, ghobject_t> to_move;
274   int r;
275   r = list_objects(from, 0, NULL, &to_move);
276   if (r < 0)
277     return r;
278   for (map<string,ghobject_t>::iterator i = to_move.begin();
279        i != to_move.end();
280        ++i) {
281     string from_path = get_full_path(from, i->first);
282     string to_path, to_name;
283     r = lfn_get_name(to, i->second, &to_name, &to_path, 0);
284     if (r < 0)
285       return r;
286     maybe_inject_failure();
287     r = ::link(from_path.c_str(), to_path.c_str());
288     if (r < 0 && errno != EEXIST)
289       return -errno;
290     maybe_inject_failure();
291     r = lfn_created(to, i->second, to_name);
292     maybe_inject_failure();
293     if (r < 0)
294       return r;
295   }
296   r = fsync_dir(to);
297   if (r < 0)
298     return r;
299   for (map<string,ghobject_t>::iterator i = to_move.begin();
300        i != to_move.end();
301        ++i) {
302     maybe_inject_failure();
303     r = ::unlink(get_full_path(from, i->first).c_str());
304     maybe_inject_failure();
305     if (r < 0)
306       return -errno;
307   }
308   return fsync_dir(from);
309 }
310
311 int LFNIndex::remove_object(const vector<string> &from,
312                             const ghobject_t &oid)
313 {
314   string short_name;
315   int r, exist;
316   maybe_inject_failure();
317   r = get_mangled_name(from, oid, &short_name, &exist);
318   maybe_inject_failure();
319   if (r < 0)
320     return r;
321   if (exist == 0)
322     return -ENOENT;
323   return lfn_unlink(from, oid, short_name);
324 }
325
326 int LFNIndex::get_mangled_name(const vector<string> &from,
327                                const ghobject_t &oid,
328                                string *mangled_name, int *hardlink)
329 {
330   return lfn_get_name(from, oid, mangled_name, 0, hardlink);
331 }
332
333 int LFNIndex::move_subdir(
334   LFNIndex &from,
335   LFNIndex &dest,
336   const vector<string> &path,
337   string dir
338   )
339 {
340   vector<string> sub_path(path.begin(), path.end());
341   sub_path.push_back(dir);
342   string from_path(from.get_full_path_subdir(sub_path));
343   string to_path(dest.get_full_path_subdir(sub_path));
344   int r = ::rename(from_path.c_str(), to_path.c_str());
345   if (r < 0)
346     return -errno;
347   return 0;
348 }
349
350 int LFNIndex::move_object(
351   LFNIndex &from,
352   LFNIndex &dest,
353   const vector<string> &path,
354   const pair<string, ghobject_t> &obj
355   )
356 {
357   string from_path(from.get_full_path(path, obj.first));
358   string to_path;
359   string to_name;
360   int exists;
361   int r = dest.lfn_get_name(path, obj.second, &to_name, &to_path, &exists);
362   if (r < 0)
363     return r;
364   if (!exists) {
365     r = ::link(from_path.c_str(), to_path.c_str());
366     if (r < 0)
367       return r;
368   }
369   r = dest.lfn_created(path, obj.second, to_name);
370   if (r < 0)
371     return r;
372   r = dest.fsync_dir(path);
373   if (r < 0)
374     return r;
375   r = from.remove_object(path, obj.second);
376   if (r < 0)
377     return r;
378   return from.fsync_dir(path);
379 }
380
381
382 static int get_hobject_from_oinfo(const char *dir, const char *file,
383                                   ghobject_t *o)
384 {
385   char path[PATH_MAX];
386   snprintf(path, sizeof(path), "%s/%s", dir, file);
387   // Hack, user.ceph._ is the attribute used to store the object info
388   bufferptr bp;
389   int r = chain_getxattr_buf(
390     path,
391     "user.ceph._",
392     &bp);
393   if (r < 0)
394     return r;
395   bufferlist bl;
396   if (r > 0)
397     bl.push_back(bp);
398   object_info_t oi(bl);
399   *o = ghobject_t(oi.soid);
400   return 0;
401 }
402
403
404 int LFNIndex::list_objects(const vector<string> &to_list, int max_objs,
405                            long *handle, map<string, ghobject_t> *out)
406 {
407   string to_list_path = get_full_path_subdir(to_list);
408   DIR *dir = ::opendir(to_list_path.c_str());
409   if (!dir) {
410     return -errno;
411   }
412
413   if (handle && *handle) {
414     seekdir(dir, *handle);
415   }
416
417   struct dirent *de = nullptr;
418   int r = 0;
419   int listed = 0;
420   bool end = true;
421   while ((de = ::readdir(dir))) {
422     end = false;
423     if (max_objs > 0 && listed >= max_objs) {
424       break;
425     }
426     if (de->d_name[0] == '.')
427       continue;
428     string short_name(de->d_name);
429     ghobject_t obj;
430     if (lfn_is_object(short_name)) {
431       r = lfn_translate(to_list, short_name, &obj);
432       if (r == -EINVAL) {
433         continue;
434       } else if (r < 0) {
435         goto cleanup;
436       } else {
437         string long_name = lfn_generate_object_name(obj);
438         if (!lfn_must_hash(long_name)) {
439           assert(long_name == short_name);
440         }
441         if (index_version == HASH_INDEX_TAG)
442           get_hobject_from_oinfo(to_list_path.c_str(), short_name.c_str(), &obj);
443
444         out->insert(pair<string, ghobject_t>(short_name, obj));
445         ++listed;
446       }
447     }
448   }
449
450   if (handle && !end) {
451     *handle = telldir(dir);
452   }
453
454   r = 0;
455  cleanup:
456   ::closedir(dir);
457   return r;
458 }
459
460 int LFNIndex::list_subdirs(const vector<string> &to_list,
461                            vector<string> *out)
462 {
463   string to_list_path = get_full_path_subdir(to_list);
464   DIR *dir = ::opendir(to_list_path.c_str());
465   if (!dir)
466     return -errno;
467
468   struct dirent *de = nullptr;
469   while ((de = ::readdir(dir))) {
470     string short_name(de->d_name);
471     string demangled_name;
472     if (lfn_is_subdir(short_name, &demangled_name)) {
473       out->push_back(demangled_name);
474     }
475   }
476
477   ::closedir(dir);
478   return 0;
479 }
480
481 int LFNIndex::create_path(const vector<string> &to_create)
482 {
483   maybe_inject_failure();
484   int r = ::mkdir(get_full_path_subdir(to_create).c_str(), 0777);
485   maybe_inject_failure();
486   if (r < 0)
487     return -errno;
488   else
489     return 0;
490 }
491
492 int LFNIndex::remove_path(const vector<string> &to_remove)
493 {
494   maybe_inject_failure();
495   int r = ::rmdir(get_full_path_subdir(to_remove).c_str());
496   maybe_inject_failure();
497   if (r < 0)
498     return -errno;
499   else
500     return 0;
501 }
502
503 int LFNIndex::path_exists(const vector<string> &to_check, int *exists)
504 {
505   string full_path = get_full_path_subdir(to_check);
506   struct stat buf;
507   if (::stat(full_path.c_str(), &buf)) {
508     int r = -errno;
509     if (r == -ENOENT) {
510       *exists = 0;
511       return 0;
512     } else {
513       return r;
514     }
515   } else {
516     *exists = 1;
517     return 0;
518   }
519 }
520
521 int LFNIndex::add_attr_path(const vector<string> &path,
522                             const string &attr_name,
523                             bufferlist &attr_value)
524 {
525   string full_path = get_full_path_subdir(path);
526   maybe_inject_failure();
527   return chain_setxattr<false, true>(
528     full_path.c_str(), mangle_attr_name(attr_name).c_str(),
529     reinterpret_cast<void *>(attr_value.c_str()),
530     attr_value.length());
531 }
532
533 int LFNIndex::get_attr_path(const vector<string> &path,
534                             const string &attr_name,
535                             bufferlist &attr_value)
536 {
537   string full_path = get_full_path_subdir(path);
538   bufferptr bp;
539   int r = chain_getxattr_buf(
540     full_path.c_str(),
541     mangle_attr_name(attr_name).c_str(),
542     &bp);
543   if (r > 0)
544     attr_value.push_back(bp);
545   return r;
546 }
547
548 int LFNIndex::remove_attr_path(const vector<string> &path,
549                                const string &attr_name)
550 {
551   string full_path = get_full_path_subdir(path);
552   string mangled_attr_name = mangle_attr_name(attr_name);
553   maybe_inject_failure();
554   return chain_removexattr(full_path.c_str(), mangled_attr_name.c_str());
555 }
556
557 string LFNIndex::lfn_generate_object_name_keyless(const ghobject_t &oid)
558 {
559   char s[FILENAME_MAX_LEN];
560   char *end = s + sizeof(s);
561   char *t = s;
562
563   assert(oid.generation == ghobject_t::NO_GEN);
564   const char *i = oid.hobj.oid.name.c_str();
565   // Escape subdir prefix
566   if (oid.hobj.oid.name.substr(0, 4) == "DIR_") {
567     *t++ = '\\';
568     *t++ = 'd';
569     i += 4;
570   }
571   while (*i && t < end) {
572     if (*i == '\\') {
573       *t++ = '\\';
574       *t++ = '\\';
575     } else if (*i == '.' && i == oid.hobj.oid.name.c_str()) {  // only escape leading .
576       *t++ = '\\';
577       *t++ = '.';
578     } else if (*i == '/') {
579       *t++ = '\\';
580       *t++ = 's';
581     } else
582       *t++ = *i;
583     i++;
584   }
585
586   if (oid.hobj.snap == CEPH_NOSNAP)
587     t += snprintf(t, end - t, "_head");
588   else if (oid.hobj.snap == CEPH_SNAPDIR)
589     t += snprintf(t, end - t, "_snapdir");
590   else
591     t += snprintf(t, end - t, "_%llx", (long long unsigned)oid.hobj.snap);
592   snprintf(t, end - t, "_%.*X", (int)(sizeof(oid.hobj.get_hash())*2), oid.hobj.get_hash());
593
594   return string(s);
595 }
596
597 static void append_escaped(string::const_iterator begin,
598                            string::const_iterator end,
599                            string *out)
600 {
601   for (string::const_iterator i = begin; i != end; ++i) {
602     if (*i == '\\') {
603       out->append("\\\\");
604     } else if (*i == '/') {
605       out->append("\\s");
606     } else if (*i == '_') {
607       out->append("\\u");
608     } else if (*i == '\0') {
609       out->append("\\n");
610     } else {
611       out->append(i, i+1);
612     }
613   }
614 }
615
616 string LFNIndex::lfn_generate_object_name_current(const ghobject_t &oid)
617 {
618   string full_name;
619   string::const_iterator i = oid.hobj.oid.name.begin();
620   if (oid.hobj.oid.name.substr(0, 4) == "DIR_") {
621     full_name.append("\\d");
622     i += 4;
623   } else if (oid.hobj.oid.name[0] == '.') {
624     full_name.append("\\.");
625     ++i;
626   }
627   append_escaped(i, oid.hobj.oid.name.end(), &full_name);
628   full_name.append("_");
629   append_escaped(oid.hobj.get_key().begin(), oid.hobj.get_key().end(), &full_name);
630   full_name.append("_");
631
632   char buf[PATH_MAX];
633   char *t = buf;
634   const char *end = t + sizeof(buf);
635   if (oid.hobj.snap == CEPH_NOSNAP)
636     t += snprintf(t, end - t, "head");
637   else if (oid.hobj.snap == CEPH_SNAPDIR)
638     t += snprintf(t, end - t, "snapdir");
639   else
640     t += snprintf(t, end - t, "%llx", (long long unsigned)oid.hobj.snap);
641   t += snprintf(t, end - t, "_%.*X", (int)(sizeof(oid.hobj.get_hash())*2), oid.hobj.get_hash());
642   full_name.append(buf, t);
643   full_name.append("_");
644
645   append_escaped(oid.hobj.nspace.begin(), oid.hobj.nspace.end(), &full_name);
646   full_name.append("_");
647
648   t = buf;
649   if (oid.hobj.pool == -1)
650     t += snprintf(t, end - t, "none");
651   else
652     t += snprintf(t, end - t, "%llx", (long long unsigned)oid.hobj.pool);
653   full_name.append(buf, t);
654
655   if (oid.generation != ghobject_t::NO_GEN ||
656       oid.shard_id != shard_id_t::NO_SHARD) {
657     full_name.append("_");
658
659     t = buf;
660     t += snprintf(t, end - buf, "%llx", (long long unsigned)oid.generation);
661     full_name.append(buf, t);
662
663     full_name.append("_");
664
665     t = buf;
666     t += snprintf(t, end - buf, "%x", (int)oid.shard_id);
667     full_name.append(buf, t);
668   }
669
670   return full_name;
671 }
672
673 string LFNIndex::lfn_generate_object_name_poolless(const ghobject_t &oid)
674 {
675   if (index_version == HASH_INDEX_TAG)
676     return lfn_generate_object_name_keyless(oid);
677
678   assert(oid.generation == ghobject_t::NO_GEN);
679   string full_name;
680   string::const_iterator i = oid.hobj.oid.name.begin();
681   if (oid.hobj.oid.name.substr(0, 4) == "DIR_") {
682     full_name.append("\\d");
683     i += 4;
684   } else if (oid.hobj.oid.name[0] == '.') {
685     full_name.append("\\.");
686     ++i;
687   }
688   append_escaped(i, oid.hobj.oid.name.end(), &full_name);
689   full_name.append("_");
690   append_escaped(oid.hobj.get_key().begin(), oid.hobj.get_key().end(), &full_name);
691   full_name.append("_");
692
693   char snap_with_hash[PATH_MAX];
694   char *t = snap_with_hash;
695   char *end = t + sizeof(snap_with_hash);
696   if (oid.hobj.snap == CEPH_NOSNAP)
697     t += snprintf(t, end - t, "head");
698   else if (oid.hobj.snap == CEPH_SNAPDIR)
699     t += snprintf(t, end - t, "snapdir");
700   else
701     t += snprintf(t, end - t, "%llx", (long long unsigned)oid.hobj.snap);
702   snprintf(t, end - t, "_%.*X", (int)(sizeof(oid.hobj.get_hash())*2), oid.hobj.get_hash());
703   full_name += string(snap_with_hash);
704   return full_name;
705 }
706
707 int LFNIndex::lfn_get_name(const vector<string> &path,
708                            const ghobject_t &oid,
709                            string *mangled_name, string *out_path,
710                            int *hardlink)
711 {
712   string full_name = lfn_generate_object_name(oid);
713   int r;
714
715   if (!lfn_must_hash(full_name)) {
716     if (mangled_name)
717       *mangled_name = full_name;
718     if (out_path)
719       *out_path = get_full_path(path, full_name);
720     if (hardlink) {
721       struct stat buf;
722       string full_path = get_full_path(path, full_name);
723       maybe_inject_failure();
724       r = ::stat(full_path.c_str(), &buf);
725       if (r < 0) {
726         if (errno == ENOENT)
727           *hardlink = 0;
728         else
729           return -errno;
730       } else {
731         *hardlink = buf.st_nlink;
732       }
733     }
734     return 0;
735   }
736
737   int i = 0;
738   string candidate;
739   string candidate_path;
740   for ( ; ; ++i) {
741     candidate = lfn_get_short_name(oid, i);
742     candidate_path = get_full_path(path, candidate);
743     bufferptr bp;
744     r = chain_getxattr_buf(
745       candidate_path.c_str(),
746       get_lfn_attr().c_str(),
747       &bp);
748     if (r < 0) {
749       if (errno != ENODATA && errno != ENOENT)
750         return -errno;
751       if (errno == ENODATA) {
752         // Left over from incomplete transaction, it'll be replayed
753         maybe_inject_failure();
754         r = ::unlink(candidate_path.c_str());
755         maybe_inject_failure();
756         if (r < 0)
757           return -errno;
758       }
759       if (mangled_name)
760         *mangled_name = candidate;
761       if (out_path)
762         *out_path = candidate_path;
763       if (hardlink)
764         *hardlink = 0;
765       return 0;
766     }
767     assert(r > 0);
768     string lfn(bp.c_str(), bp.length());
769     if (lfn == full_name) {
770       if (mangled_name)
771         *mangled_name = candidate;
772       if (out_path)
773         *out_path = candidate_path;
774       if (hardlink) {
775         struct stat st;
776         r = ::stat(candidate_path.c_str(), &st);
777         if (r < 0) {
778           if (errno == ENOENT)
779             *hardlink = 0;
780           else
781             return -errno;
782         } else {
783           *hardlink = st.st_nlink;
784         }
785       }
786       return 0;
787     }
788     bp = bufferptr();
789     r = chain_getxattr_buf(
790       candidate_path.c_str(),
791       get_alt_lfn_attr().c_str(),
792       &bp);
793     if (r > 0) {
794       // only consider alt name if nlink > 1
795       struct stat st;
796       int rc = ::stat(candidate_path.c_str(), &st);
797       if (rc < 0)
798         return -errno;
799       if (st.st_nlink <= 1) {
800         // left over from incomplete unlink, remove
801         maybe_inject_failure();
802         dout(20) << __func__ << " found extra alt attr for " << candidate_path
803                  << ", long name " << string(bp.c_str(), bp.length()) << dendl;
804         rc = chain_removexattr(candidate_path.c_str(),
805                                get_alt_lfn_attr().c_str());
806         maybe_inject_failure();
807         if (rc < 0)
808           return rc;
809         continue;
810       }
811       string lfn(bp.c_str(), bp.length());
812       if (lfn == full_name) {
813         dout(20) << __func__ << " used alt attr for " << full_name << dendl;
814         if (mangled_name)
815           *mangled_name = candidate;
816         if (out_path)
817           *out_path = candidate_path;
818         if (hardlink)
819           *hardlink = st.st_nlink;
820         return 0;
821       }
822     }
823   }
824   ceph_abort(); // Unreachable
825   return 0;
826 }
827
828 int LFNIndex::lfn_created(const vector<string> &path,
829                           const ghobject_t &oid,
830                           const string &mangled_name)
831 {
832   if (!lfn_is_hashed_filename(mangled_name))
833     return 0;
834   string full_path = get_full_path(path, mangled_name);
835   string full_name = lfn_generate_object_name(oid);
836   maybe_inject_failure();
837
838   // if the main attr exists and is different, move it to the alt attr.
839   bufferptr bp;
840   int r = chain_getxattr_buf(
841     full_path.c_str(),
842     get_lfn_attr().c_str(),
843     &bp);
844   if (r > 0) {
845     string lfn(bp.c_str(), bp.length());
846     if (lfn != full_name) {
847       dout(20) << __func__ << " " << mangled_name
848                << " moving old name to alt attr "
849                << lfn
850                << ", new name is " << full_name << dendl;
851       r = chain_setxattr<false, true>(
852         full_path.c_str(), get_alt_lfn_attr().c_str(),
853         bp.c_str(), bp.length());
854       if (r < 0)
855         return r;
856     }
857   }
858
859   return chain_setxattr<false, true>(
860     full_path.c_str(), get_lfn_attr().c_str(),
861     full_name.c_str(), full_name.size());
862 }
863
864 int LFNIndex::lfn_unlink(const vector<string> &path,
865                          const ghobject_t &oid,
866                          const string &mangled_name)
867 {
868   if (!lfn_is_hashed_filename(mangled_name)) {
869     string full_path = get_full_path(path, mangled_name);
870     maybe_inject_failure();
871     int r = ::unlink(full_path.c_str());
872     maybe_inject_failure();
873     if (r < 0)
874       return -errno;
875     return 0;
876   }
877
878   int i = 0;
879   for ( ; ; ++i) {
880     string candidate = lfn_get_short_name(oid, i);
881     if (candidate == mangled_name)
882       break;
883   }
884   int removed_index = i;
885   ++i;
886   for ( ; ; ++i) {
887     struct stat buf;
888     string to_check = lfn_get_short_name(oid, i);
889     string to_check_path = get_full_path(path, to_check);
890     int r = ::stat(to_check_path.c_str(), &buf);
891     if (r < 0) {
892       if (errno == ENOENT) {
893         break;
894       } else {
895         return -errno;
896       }
897     }
898   }
899   string full_path = get_full_path(path, mangled_name);
900   int fd = ::open(full_path.c_str(), O_RDONLY);
901   if (fd < 0)
902     return -errno;
903   FDCloser f(fd);
904   if (i == removed_index + 1) {
905     maybe_inject_failure();
906     int r = ::unlink(full_path.c_str());
907     maybe_inject_failure();
908     if (r < 0)
909       return -errno;
910   } else {
911     string& rename_to = full_path;
912     string rename_from = get_full_path(path, lfn_get_short_name(oid, i - 1));
913     maybe_inject_failure();
914     int r = ::rename(rename_from.c_str(), rename_to.c_str());
915     maybe_inject_failure();
916     if (r < 0)
917       return -errno;
918   }
919   struct stat st;
920   int r = ::fstat(fd, &st);
921   if (r == 0 && st.st_nlink > 0) {
922     // remove alt attr
923     dout(20) << __func__ << " removing alt attr from " << full_path << dendl;
924     fsync_dir(path);
925     chain_fremovexattr(fd, get_alt_lfn_attr().c_str());
926   }
927   return r;
928 }
929
930 int LFNIndex::lfn_translate(const vector<string> &path,
931                             const string &short_name,
932                             ghobject_t *out)
933 {
934   if (!lfn_is_hashed_filename(short_name)) {
935     return lfn_parse_object_name(short_name, out);
936   }
937   string full_path = get_full_path(path, short_name);
938   // First, check alt attr
939   bufferptr bp;
940   int r = chain_getxattr_buf(
941     full_path.c_str(),
942     get_alt_lfn_attr().c_str(),
943     &bp);
944   if (r > 0) {
945     // There is an alt attr, does it match?
946     string lfn(bp.c_str(), bp.length());
947     if (short_name_matches(short_name.c_str(), lfn.c_str())) {
948       return lfn_parse_object_name(lfn, out);
949     }
950   }
951
952   // Get lfn_attr
953   bp = bufferptr();
954   r = chain_getxattr_buf(
955     full_path.c_str(),
956     get_lfn_attr().c_str(),
957     &bp);
958   if (r < 0)
959     return r;
960   if (r == 0)
961     return -EINVAL;
962
963   string long_name(bp.c_str(), bp.length());
964   return lfn_parse_object_name(long_name, out);
965 }
966
967 bool LFNIndex::lfn_is_object(const string &short_name)
968 {
969   return lfn_is_hashed_filename(short_name) || !lfn_is_subdir(short_name, 0);
970 }
971
972 bool LFNIndex::lfn_is_subdir(const string &name, string *demangled)
973 {
974   if (name.substr(0, SUBDIR_PREFIX.size()) == SUBDIR_PREFIX) {
975     if (demangled)
976       *demangled = demangle_path_component(name);
977     return 1;
978   }
979   return 0;
980 }
981
982 static int parse_object(const char *s, ghobject_t& o)
983 {
984   const char *hash = s + strlen(s) - 1;
985   while (*hash != '_' &&
986          hash > s)
987     hash--;
988   const char *bar = hash - 1;
989   while (*bar != '_' &&
990          bar > s)
991     bar--;
992   if (*bar == '_') {
993     char buf[bar-s + 1];
994     char *t = buf;
995     const char *i = s;
996     while (i < bar) {
997       if (*i == '\\') {
998         i++;
999         switch (*i) {
1000         case '\\': *t++ = '\\'; break;
1001         case '.': *t++ = '.'; break;
1002         case 's': *t++ = '/'; break;
1003         case 'd': {
1004           *t++ = 'D';
1005           *t++ = 'I';
1006           *t++ = 'R';
1007           *t++ = '_';
1008           break;
1009         }
1010         default: ceph_abort();
1011         }
1012       } else {
1013         *t++ = *i;
1014       }
1015       i++;
1016     }
1017     *t = 0;
1018     o.hobj.oid.name = string(buf, t-buf);
1019     if (strncmp(bar+1, "head", 4) == 0)
1020       o.hobj.snap = CEPH_NOSNAP;
1021     else if (strncmp(bar+1, "snapdir", 7) == 0)
1022       o.hobj.snap = CEPH_SNAPDIR;
1023     else
1024       o.hobj.snap = strtoull(bar+1, NULL, 16);
1025
1026     uint32_t hobject_hash_input;
1027     sscanf(hash, "_%X", &hobject_hash_input);
1028     o.hobj.set_hash(hobject_hash_input);
1029
1030     return 1;
1031   }
1032   return 0;
1033 }
1034
1035 int LFNIndex::lfn_parse_object_name_keyless(const string &long_name, ghobject_t *out)
1036 {
1037   int r = parse_object(long_name.c_str(), *out);
1038   int64_t pool = -1;
1039   spg_t pg;
1040   if (coll().is_pg_prefix(&pg))
1041     pool = (int64_t)pg.pgid.pool();
1042   out->hobj.pool = pool;
1043   if (!r) return -EINVAL;
1044   string temp = lfn_generate_object_name(*out);
1045   return r ? 0 : -EINVAL;
1046 }
1047
1048 static bool append_unescaped(string::const_iterator begin,
1049                              string::const_iterator end,
1050                              string *out)
1051 {
1052   for (string::const_iterator i = begin; i != end; ++i) {
1053     if (*i == '\\') {
1054       ++i;
1055       if (*i == '\\')
1056         out->append("\\");
1057       else if (*i == 's')
1058         out->append("/");
1059       else if (*i == 'n')
1060         (*out) += '\0';
1061       else if (*i == 'u')
1062         out->append("_");
1063       else
1064         return false;
1065     } else {
1066       out->append(i, i+1);
1067     }
1068   }
1069   return true;
1070 }
1071
1072 int LFNIndex::lfn_parse_object_name_poolless(const string &long_name,
1073                                              ghobject_t *out)
1074 {
1075   string name;
1076   string key;
1077   uint32_t hash;
1078   snapid_t snap;
1079
1080   string::const_iterator current = long_name.begin();
1081   if (*current == '\\') {
1082     ++current;
1083     if (current == long_name.end()) {
1084       return -EINVAL;
1085     } else if (*current == 'd') {
1086       name.append("DIR_");
1087       ++current;
1088     } else if (*current == '.') {
1089       name.append(".");
1090       ++current;
1091     } else {
1092       --current;
1093     }
1094   }
1095
1096   string::const_iterator end = current;
1097   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1098   if (end == long_name.end())
1099     return -EINVAL;
1100   if (!append_unescaped(current, end, &name))
1101     return -EINVAL;
1102
1103   current = ++end;
1104   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1105   if (end == long_name.end())
1106     return -EINVAL;
1107   if (!append_unescaped(current, end, &key))
1108     return -EINVAL;
1109
1110   current = ++end;
1111   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1112   if (end == long_name.end())
1113     return -EINVAL;
1114   string snap_str(current, end);
1115
1116   current = ++end;
1117   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1118   if (end != long_name.end())
1119     return -EINVAL;
1120   string hash_str(current, end);
1121
1122   if (snap_str == "head")
1123     snap = CEPH_NOSNAP;
1124   else if (snap_str == "snapdir")
1125     snap = CEPH_SNAPDIR;
1126   else
1127     snap = strtoull(snap_str.c_str(), NULL, 16);
1128   sscanf(hash_str.c_str(), "%X", &hash);
1129
1130
1131   int64_t pool = -1;
1132   spg_t pg;
1133   if (coll().is_pg_prefix(&pg))
1134     pool = (int64_t)pg.pgid.pool();
1135   (*out) = ghobject_t(hobject_t(name, key, snap, hash, pool, ""));
1136   return 0;
1137 }
1138
1139
1140 int LFNIndex::lfn_parse_object_name(const string &long_name, ghobject_t *out)
1141 {
1142   string name;
1143   string key;
1144   string ns;
1145   uint32_t hash;
1146   snapid_t snap;
1147   uint64_t pool;
1148   gen_t generation = ghobject_t::NO_GEN;
1149   shard_id_t shard_id = shard_id_t::NO_SHARD;
1150
1151   if (index_version == HASH_INDEX_TAG)
1152     return lfn_parse_object_name_keyless(long_name, out);
1153   if (index_version == HASH_INDEX_TAG_2)
1154     return lfn_parse_object_name_poolless(long_name, out);
1155
1156   string::const_iterator current = long_name.begin();
1157   if (*current == '\\') {
1158     ++current;
1159     if (current == long_name.end()) {
1160       return -EINVAL;
1161     } else if (*current == 'd') {
1162       name.append("DIR_");
1163       ++current;
1164     } else if (*current == '.') {
1165       name.append(".");
1166       ++current;
1167     } else {
1168       --current;
1169     }
1170   }
1171
1172   string::const_iterator end = current;
1173   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1174   if (end == long_name.end())
1175     return -EINVAL;
1176   if (!append_unescaped(current, end, &name))
1177     return -EINVAL;
1178
1179   current = ++end;
1180   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1181   if (end == long_name.end())
1182     return -EINVAL;
1183   if (!append_unescaped(current, end, &key))
1184     return -EINVAL;
1185
1186   current = ++end;
1187   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1188   if (end == long_name.end())
1189     return -EINVAL;
1190   string snap_str(current, end);
1191
1192   current = ++end;
1193   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1194   if (end == long_name.end())
1195     return -EINVAL;
1196   string hash_str(current, end);
1197
1198   current = ++end;
1199   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1200   if (end == long_name.end())
1201     return -EINVAL;
1202   if (!append_unescaped(current, end, &ns))
1203     return -EINVAL;
1204
1205   current = ++end;
1206   for ( ; end != long_name.end() && *end != '_'; ++end) ;
1207   string pstring(current, end);
1208
1209   // Optional generation/shard_id
1210   string genstring, shardstring;
1211   if (end != long_name.end()) {
1212     current = ++end;
1213     for ( ; end != long_name.end() && *end != '_'; ++end) ;
1214     if (end == long_name.end())
1215       return -EINVAL;
1216     genstring = string(current, end);
1217
1218     generation = (gen_t)strtoull(genstring.c_str(), NULL, 16);
1219
1220     current = ++end;
1221     for ( ; end != long_name.end() && *end != '_'; ++end) ;
1222     if (end != long_name.end())
1223       return -EINVAL;
1224     shardstring = string(current, end);
1225
1226     shard_id = (shard_id_t)strtoul(shardstring.c_str(), NULL, 16);
1227   }
1228
1229   if (snap_str == "head")
1230     snap = CEPH_NOSNAP;
1231   else if (snap_str == "snapdir")
1232     snap = CEPH_SNAPDIR;
1233   else
1234     snap = strtoull(snap_str.c_str(), NULL, 16);
1235   sscanf(hash_str.c_str(), "%X", &hash);
1236
1237   if (pstring == "none")
1238     pool = (uint64_t)-1;
1239   else
1240     pool = strtoull(pstring.c_str(), NULL, 16);
1241
1242   (*out) = ghobject_t(hobject_t(name, key, snap, hash, (int64_t)pool, ns), generation, shard_id);
1243   return 0;
1244 }
1245
1246 bool LFNIndex::lfn_is_hashed_filename(const string &name)
1247 {
1248   if (name.size() < (unsigned)FILENAME_SHORT_LEN) {
1249     return 0;
1250   }
1251   if (name.substr(name.size() - FILENAME_COOKIE.size(), FILENAME_COOKIE.size())
1252       == FILENAME_COOKIE) {
1253     return 1;
1254   } else {
1255     return 0;
1256   }
1257 }
1258
1259 bool LFNIndex::lfn_must_hash(const string &long_name)
1260 {
1261   return (int)long_name.size() >= FILENAME_SHORT_LEN;
1262 }
1263
1264 static inline void buf_to_hex(const unsigned char *buf, int len, char *str)
1265 {
1266   int i;
1267   str[0] = '\0';
1268   for (i = 0; i < len; i++) {
1269     sprintf(&str[i*2], "%02x", (int)buf[i]);
1270   }
1271 }
1272
1273 int LFNIndex::hash_filename(const char *filename, char *hash, int buf_len)
1274 {
1275   if (buf_len < FILENAME_HASH_LEN + 1)
1276     return -EINVAL;
1277
1278   char buf[FILENAME_LFN_DIGEST_SIZE];
1279   char hex[FILENAME_LFN_DIGEST_SIZE * 2];
1280
1281   SHA1 h;
1282   h.Update((const byte *)filename, strlen(filename));
1283   h.Final((byte *)buf);
1284
1285   buf_to_hex((byte *)buf, (FILENAME_HASH_LEN + 1) / 2, hex);
1286   strncpy(hash, hex, FILENAME_HASH_LEN);
1287   hash[FILENAME_HASH_LEN] = '\0';
1288   return 0;
1289 }
1290
1291 void LFNIndex::build_filename(const char *old_filename, int i, char *filename, int len)
1292 {
1293   char hash[FILENAME_HASH_LEN + 1];
1294
1295   assert(len >= FILENAME_SHORT_LEN + 4);
1296
1297   strncpy(filename, old_filename, FILENAME_PREFIX_LEN);
1298   filename[FILENAME_PREFIX_LEN] = '\0';
1299   if ((int)strlen(filename) < FILENAME_PREFIX_LEN)
1300     return;
1301   if (old_filename[FILENAME_PREFIX_LEN] == '\0')
1302     return;
1303
1304   hash_filename(old_filename, hash, sizeof(hash));
1305   int ofs = FILENAME_PREFIX_LEN;
1306   while (1) {
1307     int suffix_len = sprintf(filename + ofs, "_%s_%d_%s", hash, i, FILENAME_COOKIE.c_str());
1308     if (ofs + suffix_len <= FILENAME_SHORT_LEN || !ofs)
1309       break;
1310     ofs--;
1311   }
1312 }
1313
1314 bool LFNIndex::short_name_matches(const char *short_name, const char *cand_long_name)
1315 {
1316   const char *end = short_name;
1317   while (*end) ++end;
1318   const char *suffix = end;
1319   if (suffix > short_name)  --suffix;                   // last char
1320   while (suffix > short_name && *suffix != '_') --suffix; // back to first _
1321   if (suffix > short_name) --suffix;                   // one behind that
1322   while (suffix > short_name && *suffix != '_') --suffix; // back to second _
1323
1324   int index = -1;
1325   char buf[FILENAME_SHORT_LEN + 4];
1326   assert((end - suffix) < (int)sizeof(buf));
1327   int r = sscanf(suffix, "_%d_%s", &index, buf);
1328   if (r < 2)
1329     return false;
1330   if (strcmp(buf, FILENAME_COOKIE.c_str()) != 0)
1331     return false;
1332   build_filename(cand_long_name, index, buf, sizeof(buf));
1333   return strcmp(short_name, buf) == 0;
1334 }
1335
1336 string LFNIndex::lfn_get_short_name(const ghobject_t &oid, int i)
1337 {
1338   string long_name = lfn_generate_object_name(oid);
1339   assert(lfn_must_hash(long_name));
1340   char buf[FILENAME_SHORT_LEN + 4];
1341   build_filename(long_name.c_str(), i, buf, sizeof(buf));
1342   return string(buf);
1343 }
1344
1345 const string &LFNIndex::get_base_path()
1346 {
1347   return base_path;
1348 }
1349
1350 string LFNIndex::get_full_path_subdir(const vector<string> &rel)
1351 {
1352   string retval = get_base_path();
1353   for (vector<string>::const_iterator i = rel.begin();
1354        i != rel.end();
1355        ++i) {
1356     retval += "/";
1357     retval += mangle_path_component(*i);
1358   }
1359   return retval;
1360 }
1361
1362 string LFNIndex::get_full_path(const vector<string> &rel, const string &name)
1363 {
1364   return get_full_path_subdir(rel) + "/" + name;
1365 }
1366
1367 string LFNIndex::mangle_path_component(const string &component)
1368 {
1369   return SUBDIR_PREFIX + component;
1370 }
1371
1372 string LFNIndex::demangle_path_component(const string &component)
1373 {
1374   return component.substr(SUBDIR_PREFIX.size(), component.size() - SUBDIR_PREFIX.size());
1375 }
1376
1377 int LFNIndex::decompose_full_path(const char *in, vector<string> *out,
1378                                   ghobject_t *oid, string *shortname)
1379 {
1380   const char *beginning = in + get_base_path().size();
1381   const char *end = beginning;
1382   while (1) {
1383     end++;
1384     beginning = end++;
1385     for ( ; *end != '\0' && *end != '/'; ++end) ;
1386     if (*end != '\0') {
1387       out->push_back(demangle_path_component(string(beginning, end - beginning)));
1388       continue;
1389     } else {
1390       break;
1391     }
1392   }
1393   *shortname = string(beginning, end - beginning);
1394   if (oid) {
1395     int r = lfn_translate(*out, *shortname, oid);
1396     if (r < 0)
1397       return r;
1398   }
1399   return 0;
1400 }
1401
1402 string LFNIndex::mangle_attr_name(const string &attr)
1403 {
1404   return PHASH_ATTR_PREFIX + attr;
1405 }