Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / FuseStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "FuseStore.h"
6 #include "os/ObjectStore.h"
7 #include "include/stringify.h"
8 #include "common/errno.h"
9
10 #define FUSE_USE_VERSION 30
11 #include <fuse.h>
12
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <unistd.h>
16 #include <fcntl.h>           /* Definition of AT_* constants */
17 #include <sys/stat.h>
18
19 #if defined(DARWIN) || defined(__FreeBSD__)
20 #include <sys/param.h>
21 #include <sys/mount.h>
22 #endif
23
24 #define dout_context store->cct
25 #define dout_subsys ceph_subsys_fuse
26 #include "common/debug.h"
27 #undef dout_prefix
28 #define dout_prefix *_dout << "fuse "
29
30 // some fuse-y bits of state
31 struct fs_info {
32   struct fuse_args args;
33   struct fuse *f;
34   struct fuse_chan *ch;
35   char *mountpoint;
36 };
37
38 int FuseStore::open_file(string p, struct fuse_file_info *fi,
39                          std::function<int(bufferlist *bl)> f)
40 {
41   if (open_files.count(p)) {
42     OpenFile *o = open_files[p];
43     fi->fh = reinterpret_cast<uint64_t>(o);
44     ++o->ref;
45     return 0;
46   }
47   bufferlist bl;
48   int r = f(&bl);
49   if (r < 0) {
50     return r;
51   }
52   OpenFile *o = new OpenFile;
53   o->path = p;
54   o->bl.claim(bl);
55   open_files[p] = o;
56   fi->fh = reinterpret_cast<uint64_t>(o);
57   ++o->ref;
58   return 0;
59 }
60
61 FuseStore::FuseStore(ObjectStore *s, string p)
62   : store(s),
63     mount_point(p),
64     fuse_thread(this)
65 {
66   info = new fs_info();
67 }
68
69 FuseStore::~FuseStore()
70 {
71   delete info;
72 }
73
74 /*
75  * / - root directory
76  * $cid/
77  * $cid/type - objectstore type
78  * $cid/bitwise_hash_start = lowest hash value
79  * $cid/bitwise_hash_end = highest hash value
80  * $cid/bitwise_hash_bits - how many bits are significant
81  * $cid/pgmeta/ - pgmeta object
82  * $cid/all/ - all objects
83  * $cid/all/$obj/
84  * $cid/all/$obj/bitwise_hash
85  * $cid/all/$obj/data
86  * $cid/all/$obj/omap/$key
87  * $cid/all/$obj/attr/$name
88  * $cid/by_bitwise_hash/$hash/$bits/$obj - all objects with this (bitwise) hash (prefix)
89  */
90 enum {
91   FN_ROOT = 1,
92   FN_TYPE,
93   FN_COLLECTION,
94   FN_HASH_START,
95   FN_HASH_END,
96   FN_HASH_BITS,
97   FN_OBJECT,
98   FN_OBJECT_HASH,
99   FN_OBJECT_DATA,
100   FN_OBJECT_OMAP_HEADER,
101   FN_OBJECT_OMAP,
102   FN_OBJECT_OMAP_VAL,
103   FN_OBJECT_ATTR,
104   FN_OBJECT_ATTR_VAL,
105   FN_ALL,
106   FN_HASH_DIR,
107   FN_HASH_VAL,
108 };
109
110 static int parse_fn(CephContext* cct, const char *path, coll_t *cid,
111                     ghobject_t *oid, string *key,
112                     uint32_t *hash, uint32_t *hash_bits)
113 {
114   list<string> v;
115   for (const char *p = path; *p; ++p) {
116     if (*p == '/')
117       continue;
118     const char *e;
119     for (e = p + 1; *e && *e != '/'; e++) ;
120     string c(p, e-p);
121     v.push_back(c);
122     p = e;
123     if (!*p)
124       break;
125   }
126   ldout(cct, 10) << __func__ << " path " << path << " -> " << v << dendl;
127
128   if (v.empty())
129     return FN_ROOT;
130
131   if (v.front() == "type")
132     return FN_TYPE;
133
134   if (!cid->parse(v.front())) {
135     return -ENOENT;
136   }
137   if (v.size() == 1)
138     return FN_COLLECTION;
139   v.pop_front();
140
141   if (v.front() == "bitwise_hash_start")
142     return FN_HASH_START;
143   if (v.front() == "bitwise_hash_end")
144     return FN_HASH_END;
145   if (v.front() == "bitwise_hash_bits")
146     return FN_HASH_BITS;
147   if (v.front() == "pgmeta") {
148     spg_t pgid;
149     if (cid->is_pg(&pgid)) {
150       *oid = pgid.make_pgmeta_oid();
151       v.pop_front();
152       if (v.empty())
153         return FN_OBJECT;
154       goto do_object;
155     }
156     return -ENOENT;
157   }
158   if (v.front() == "all") {
159     v.pop_front();
160     if (v.empty())
161       return FN_ALL;
162     goto do_dir;
163   }
164   if (v.front() == "by_bitwise_hash") {
165     v.pop_front();
166     if (v.empty())
167       return FN_HASH_DIR;
168     unsigned long hv, hm;
169     int r = sscanf(v.front().c_str(), "%lx", &hv);
170     if (r != 1)
171       return -ENOENT;
172     int shift = 32 - v.front().length() * 4;
173     v.pop_front();
174     if (v.empty())
175       return FN_HASH_DIR;
176     r = sscanf(v.front().c_str(), "%ld", &hm);
177     if (r != 1)
178       return -ENOENT;
179     if (hm < 1 || hm > 32)
180       return -ENOENT;
181     v.pop_front();
182     *hash = hv << shift;//hobject_t::_reverse_bits(hv << shift);
183     *hash_bits = hm;
184     if (v.empty())
185       return FN_HASH_VAL;
186     goto do_dir;
187   }
188   return -ENOENT;
189
190  do_dir:
191   {
192     string o = v.front();
193     if (!oid->parse(o)) {
194       return -ENOENT;
195     }
196     v.pop_front();
197     if (v.empty())
198       return FN_OBJECT;
199   }
200
201  do_object:
202   if (v.front() == "data")
203     return FN_OBJECT_DATA;
204   if (v.front() == "omap_header")
205     return FN_OBJECT_OMAP_HEADER;
206   if (v.front() == "omap") {
207     v.pop_front();
208     if (v.empty())
209       return FN_OBJECT_OMAP;
210     *key = v.front();
211     v.pop_front();
212     if (v.empty())
213       return FN_OBJECT_OMAP_VAL;
214     return -ENOENT;
215   }
216   if (v.front() == "attr") {
217     v.pop_front();
218     if (v.empty())
219       return FN_OBJECT_ATTR;
220     *key = v.front();
221     v.pop_front();
222     if (v.empty())
223       return FN_OBJECT_ATTR_VAL;
224     return -ENOENT;
225   }
226   if (v.front() == "bitwise_hash")
227     return FN_OBJECT_HASH;
228   return -ENOENT;
229 }
230
231
232 static int os_getattr(const char *path, struct stat *stbuf)
233 {
234   fuse_context *fc = fuse_get_context();
235   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
236   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
237   coll_t cid;
238   ghobject_t oid;
239   string key;
240   uint32_t hash_value, hash_bits;
241   int t = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
242                    &hash_bits);
243   if (t < 0)
244     return t;
245
246   std::lock_guard<std::mutex> l(fs->lock);
247
248   stbuf->st_size = 0;
249   stbuf->st_uid = 0;
250   stbuf->st_gid = 0;
251   stbuf->st_mode = S_IFREG | 0700;
252
253   switch (t) {
254   case FN_OBJECT_OMAP:
255   case FN_OBJECT_ATTR:
256   case FN_OBJECT:
257   case FN_OBJECT_DATA:
258   case FN_OBJECT_OMAP_HEADER:
259   case FN_OBJECT_OMAP_VAL:
260     {
261       spg_t pgid;
262       if (cid.is_pg(&pgid)) {
263         int bits = fs->store->collection_bits(cid);
264         if (bits >= 0 && !oid.match(bits, pgid.ps())) {
265           // sorry, not part of this PG
266           return -ENOENT;
267         }
268       }
269     }
270     break;
271   }
272
273   switch (t) {
274   case FN_OBJECT_OMAP:
275   case FN_OBJECT_ATTR:
276   case FN_OBJECT:
277     if (!fs->store->exists(cid, oid))
278       return -ENOENT;
279     // fall-thru
280   case FN_ALL:
281   case FN_HASH_DIR:
282   case FN_HASH_VAL:
283   case FN_COLLECTION:
284     if (!fs->store->collection_exists(cid))
285       return -ENOENT;
286     // fall-thru
287   case FN_ROOT:
288     stbuf->st_mode = S_IFDIR | 0700;
289     return 0;
290
291   case FN_TYPE:
292     stbuf->st_size = fs->store->get_type().length() + 1;
293     break;
294
295   case FN_OBJECT_HASH:
296     if (!fs->store->exists(cid, oid))
297       return -ENOENT;
298     stbuf->st_size = 9;
299     return 0;
300
301   case FN_HASH_END:
302     if (!fs->store->collection_exists(cid))
303       return -ENOENT;
304     if (fs->store->collection_bits(cid) < 0)
305       return -ENOENT;
306     // fall-thru
307   case FN_HASH_START:
308     stbuf->st_size = 9;
309     return 0;
310
311   case FN_HASH_BITS:
312     {
313       if (!fs->store->collection_exists(cid))
314         return -ENOENT;
315       int bits = fs->store->collection_bits(cid);
316       if (bits < 0)
317         return -ENOENT;
318       char buf[8];
319       snprintf(buf, sizeof(buf), "%d\n", bits);
320       stbuf->st_size = strlen(buf);
321     }
322     return 0;
323
324   case FN_OBJECT_DATA:
325     {
326       if (!fs->store->exists(cid, oid))
327         return -ENOENT;
328       int r = fs->store->stat(cid, oid, stbuf);
329       if (r < 0)
330         return r;
331     }
332     break;
333
334   case FN_OBJECT_OMAP_HEADER:
335     {
336       if (!fs->store->exists(cid, oid))
337         return -ENOENT;
338       bufferlist bl;
339       fs->store->omap_get_header(cid, oid, &bl);
340       stbuf->st_size = bl.length();
341     }
342     break;
343
344   case FN_OBJECT_OMAP_VAL:
345     {
346       if (!fs->store->exists(cid, oid))
347         return -ENOENT;
348       set<string> k;
349       k.insert(key);
350       map<string,bufferlist> v;
351       fs->store->omap_get_values(cid, oid, k, &v);
352       if (!v.count(key)) {
353         return -ENOENT;
354       }
355       stbuf->st_size = v[key].length();
356     }
357     break;
358
359   case FN_OBJECT_ATTR_VAL:
360     {
361       if (!fs->store->exists(cid, oid))
362         return -ENOENT;
363       bufferptr v;
364       int r = fs->store->getattr(cid, oid, key.c_str(), v);
365       if (r == -ENODATA)
366         r = -ENOENT;
367       if (r < 0)
368         return r;
369       stbuf->st_size = v.length();
370     }
371     break;
372
373   default:
374     return -ENOENT;
375   }
376
377   return 0;
378 }
379
380 static int os_readdir(const char *path,
381                       void *buf,
382                       fuse_fill_dir_t filler,
383                       off_t offset,
384                       struct fuse_file_info *fi)
385 {
386   fuse_context *fc = fuse_get_context();
387   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
388   ldout(fs->store->cct, 10) << __func__ << " " << path << " offset " << offset
389                      << dendl;
390   coll_t cid;
391   ghobject_t oid;
392   string key;
393   uint32_t hash_value, hash_bits;
394   int t = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
395                    &hash_bits);
396   if (t < 0)
397     return t;
398
399   std::lock_guard<std::mutex> l(fs->lock);
400
401   // we can't shift 32 bits or else off_t will go negative
402   const int hash_shift = 31;
403
404   switch (t) {
405   case FN_ROOT:
406     {
407       filler(buf, "type", NULL, 0);
408       vector<coll_t> cls;
409       fs->store->list_collections(cls);
410       for (auto c : cls) {
411         int r = filler(buf, stringify(c).c_str(), NULL, 0);
412         if (r > 0)
413           break;
414       }
415     }
416     break;
417
418   case FN_COLLECTION:
419     {
420       filler(buf, "bitwise_hash_start", NULL, 0);
421       if (fs->store->collection_bits(cid) >= 0) {
422         filler(buf, "bitwise_hash_end", NULL, 0);
423         filler(buf, "bitwise_hash_bits", NULL, 0);
424       }
425       filler(buf, "all", NULL, 0);
426       filler(buf, "by_bitwise_hash", NULL, 0);
427       spg_t pgid;
428       if (cid.is_pg(&pgid) &&
429           fs->store->exists(cid, pgid.make_pgmeta_oid())) {
430         filler(buf, "pgmeta", NULL, 0);
431       }
432     }
433     break;
434
435   case FN_OBJECT:
436     {
437       filler(buf, "bitwise_hash", NULL, 0);
438       filler(buf, "data", NULL, 0);
439       filler(buf, "omap", NULL, 0);
440       filler(buf, "attr", NULL, 0);
441       filler(buf, "omap_header", NULL, 0);
442     }
443     break;
444
445   case FN_HASH_VAL:
446   case FN_ALL:
447     {
448       uint32_t bitwise_hash = (offset >> hash_shift) & 0xffffffff;
449       uint32_t hashoff = offset - (bitwise_hash << hash_shift);
450       int skip = hashoff;
451       ghobject_t next = cid.get_min_hobj();
452       if (offset) {
453         // obey the offset
454         next.hobj.set_hash(hobject_t::_reverse_bits(bitwise_hash));
455       } else if (t == FN_HASH_VAL) {
456         next.hobj.set_hash(hobject_t::_reverse_bits(hash_value));
457       }
458       ghobject_t last;
459       if (t == FN_HASH_VAL) {
460         last = next;
461         uint64_t rev_end = (hash_value | (0xffffffff >> hash_bits)) + 1;
462         if (rev_end >= 0x100000000)
463           last = ghobject_t::get_max();
464         else
465           last.hobj.set_hash(hobject_t::_reverse_bits(rev_end));
466       } else {
467         last = ghobject_t::get_max();
468       }
469       ldout(fs->store->cct, 10) << __func__ << std::hex
470                          << " offset " << offset << " hash "
471                          << hobject_t::_reverse_bits(hash_value)
472                          << std::dec
473                          << "/" << hash_bits
474                          << " first " << next << " last " << last
475                          << dendl;
476       while (true) {
477         vector<ghobject_t> ls;
478         int r = fs->store->collection_list(
479           cid, next, last, 1000, &ls, &next);
480         if (r < 0)
481           return r;
482         for (auto p : ls) {
483           if (skip) {
484             --skip;
485             continue;
486           }
487           uint32_t cur_bitwise_hash = p.hobj.get_bitwise_key_u32();
488           if (cur_bitwise_hash != bitwise_hash) {
489             bitwise_hash = cur_bitwise_hash;
490             hashoff = 0;
491           }
492           ++hashoff;
493           uint64_t cur_off = ((uint64_t)bitwise_hash << hash_shift) |
494             (uint64_t)hashoff;
495           string s = stringify(p);
496           r = filler(buf, s.c_str(), NULL, cur_off);
497           if (r)
498             break;
499         }
500         if (r)
501           break;
502         if (next == ghobject_t::get_max() || next == last)
503           break;
504       }
505     }
506     break;
507
508   case FN_OBJECT_OMAP:
509     {
510       set<string> keys;
511       fs->store->omap_get_keys(cid, oid, &keys);
512       unsigned skip = offset;
513       for (auto k : keys) {
514         if (skip) {
515           --skip;
516           continue;
517         }
518         ++offset;
519         int r = filler(buf, k.c_str(), NULL, offset);
520         if (r)
521           break;
522       }
523     }
524     break;
525
526   case FN_OBJECT_ATTR:
527     {
528       map<string,bufferptr> aset;
529       fs->store->getattrs(cid, oid, aset);
530       unsigned skip = offset;
531       for (auto a : aset) {
532         if (skip) {
533           --skip;
534           continue;
535         }
536         ++offset;
537         int r = filler(buf, a.first.c_str(), NULL, offset);
538         if (r)
539           break;
540       }
541     }
542     break;
543   }
544   return 0;
545 }
546
547 static int os_open(const char *path, struct fuse_file_info *fi)
548 {
549   fuse_context *fc = fuse_get_context();
550   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
551   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
552   coll_t cid;
553   ghobject_t oid;
554   string key;
555   uint32_t hash_value, hash_bits;
556   int t = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
557                    &hash_bits);
558   if (t < 0)
559     return t;
560
561   std::lock_guard<std::mutex> l(fs->lock);
562
563   bufferlist *pbl = 0;
564   switch (t) {
565   case FN_TYPE:
566     pbl = new bufferlist;
567     pbl->append(fs->store->get_type());
568     pbl->append("\n");
569     break;
570
571   case FN_HASH_START:
572     {
573       pbl = new bufferlist;
574       spg_t pgid;
575       if (cid.is_pg(&pgid)) {
576         unsigned long h;
577         h = hobject_t::_reverse_bits(pgid.ps());
578         char buf[10];
579         snprintf(buf, sizeof(buf), "%08lx\n", h);
580         pbl->append(buf);
581       } else {
582         pbl->append("00000000\n");
583       }
584     }
585     break;
586
587   case FN_HASH_END:
588     {
589       spg_t pgid;
590       unsigned long h;
591       if (cid.is_pg(&pgid)) {
592         int hash_bits = fs->store->collection_bits(cid);
593         if (hash_bits >= 0) {
594           uint64_t rev_start = hobject_t::_reverse_bits(pgid.ps());
595           uint64_t rev_end = (rev_start | (0xffffffff >> hash_bits));
596           h = rev_end;
597         } else {
598           return -ENOENT;
599         }
600       } else {
601         h = 0xffffffff;
602       }
603       char buf[10];
604       snprintf(buf, sizeof(buf), "%08lx\n", h);
605       pbl = new bufferlist;
606       pbl->append(buf);
607     }
608     break;
609
610   case FN_HASH_BITS:
611     {
612       int r = fs->store->collection_bits(cid);
613       if (r < 0)
614         return r;
615       char buf[8];
616       snprintf(buf, sizeof(buf), "%d\n", r);
617       pbl = new bufferlist;
618       pbl->append(buf);
619     }
620     break;
621
622   case FN_OBJECT_HASH:
623     {
624       pbl = new bufferlist;
625       char buf[10];
626       snprintf(buf, sizeof(buf), "%08x\n",
627                (unsigned)oid.hobj.get_bitwise_key_u32());
628       pbl->append(buf);
629     }
630     break;
631
632   case FN_OBJECT_DATA:
633     {
634       int r = fs->open_file(
635         path, fi,
636         [&](bufferlist *pbl) {
637           return fs->store->read(cid, oid, 0, 0, *pbl);
638         });
639       if (r < 0) {
640         return r;
641       }
642     }
643     break;
644
645   case FN_OBJECT_ATTR_VAL:
646     {
647       int r = fs->open_file(
648         path, fi,
649         [&](bufferlist *pbl) {
650           bufferptr bp;
651           int r = fs->store->getattr(cid, oid, key.c_str(), bp);
652           if (r < 0)
653             return r;
654           pbl->append(bp);
655           return 0;
656         });
657       if (r < 0)
658         return r;
659     }
660     break;
661
662   case FN_OBJECT_OMAP_VAL:
663     {
664       int r = fs->open_file(
665         path, fi,
666         [&](bufferlist *pbl) {
667           set<string> k;
668           k.insert(key);
669           map<string,bufferlist> v;
670           int r = fs->store->omap_get_values(cid, oid, k, &v);
671           if (r < 0)
672             return r;
673           *pbl = v[key];
674           return 0;
675         });
676       if (r < 0)
677         return r;
678     }
679     break;
680
681   case FN_OBJECT_OMAP_HEADER:
682     {
683       int r = fs->open_file(
684         path, fi,
685         [&](bufferlist *pbl) {
686           return fs->store->omap_get_header(cid, oid, pbl);
687         });
688       if (r < 0)
689        return r;
690     }
691     break;
692   }
693
694   if (pbl) {
695     FuseStore::OpenFile *o = new FuseStore::OpenFile;
696     o->bl.claim(*pbl);
697     fi->fh = reinterpret_cast<uint64_t>(o);
698   }
699   return 0;
700 }
701
702 static int os_mkdir(const char *path, mode_t mode)
703 {
704   fuse_context *fc = fuse_get_context();
705   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
706   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
707   coll_t cid;
708   ghobject_t oid;
709   string key;
710   uint32_t hash_value, hash_bits;
711   int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
712                    &hash_bits);
713   if (f < 0)
714     return f;
715
716   std::lock_guard<std::mutex> l(fs->lock);
717
718   ObjectStore::Transaction t;
719   switch (f) {
720   case FN_OBJECT:
721     {
722       spg_t pgid;
723       if (cid.is_pg(&pgid)) {
724         int bits = fs->store->collection_bits(cid);
725         if (bits >= 0 && !oid.match(bits, pgid.ps())) {
726           // sorry, not part of this PG
727           return -EINVAL;
728         }
729       }
730       t.touch(cid, oid);
731     }
732     break;
733
734   case FN_COLLECTION:
735     if (cid.is_pg()) {
736       // use the mode for the bit count.  e.g., mkdir --mode=0003
737       // mnt/0.7_head will create 0.7 with bits = 3.
738       mode &= 0777;
739       if (mode >= 32)
740         return -EINVAL;
741     } else {
742       mode = 0;
743     }
744     t.create_collection(cid, mode);
745     break;
746
747   default:
748     return -EPERM;
749   }
750
751   if (!t.empty()) {
752     ceph::shared_ptr<ObjectStore::Sequencer> osr(
753       new ObjectStore::Sequencer("fuse"));
754     fs->store->apply_transaction(&*osr, std::move(t));
755     C_SaferCond waiter;
756     if (!osr->flush_commit(&waiter))
757       waiter.wait();
758   }
759
760   return 0;
761 }
762
763 static int os_chmod(const char *path, mode_t mode)
764 {
765   fuse_context *fc = fuse_get_context();
766   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
767   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
768   return 0;
769 }
770
771 static int os_create(const char *path, mode_t mode, struct fuse_file_info *fi)
772 {
773   fuse_context *fc = fuse_get_context();
774   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
775   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
776   coll_t cid;
777   ghobject_t oid;
778   string key;
779   uint32_t hash_value, hash_bits;
780   int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
781                    &hash_bits);
782   if (f < 0)
783     return f;
784
785   std::lock_guard<std::mutex> l(fs->lock);
786
787   ObjectStore::Transaction t;
788   bufferlist *pbl = 0;
789   switch (f) {
790   case FN_OBJECT_DATA:
791     {
792       pbl = new bufferlist;
793       fs->store->read(cid, oid, 0, 0, *pbl);
794     }
795     break;
796
797   case FN_OBJECT_ATTR_VAL:
798     {
799       pbl = new bufferlist;
800       bufferptr bp;
801       int r = fs->store->getattr(cid, oid, key.c_str(), bp);
802       if (r == -ENODATA) {
803         bufferlist empty;
804         t.setattr(cid, oid, key.c_str(), empty);
805       }
806       pbl->append(bp);
807     }
808     break;
809
810   case FN_OBJECT_OMAP_VAL:
811     {
812       pbl = new bufferlist;
813       set<string> k;
814       k.insert(key);
815       map<string,bufferlist> v;
816       fs->store->omap_get_values(cid, oid, k, &v);
817       if (v.count(key) == 0) {
818         map<string,bufferlist> aset;
819         aset[key] = bufferlist();
820         t.omap_setkeys(cid, oid, aset);
821       } else {
822         *pbl = v[key];
823       }
824     }
825     break;
826   }
827
828   if (!t.empty()) {
829     ceph::shared_ptr<ObjectStore::Sequencer> osr(
830       new ObjectStore::Sequencer("fuse"));
831     fs->store->apply_transaction(&*osr, std::move(t));
832     C_SaferCond waiter;
833     if (!osr->flush_commit(&waiter))
834       waiter.wait();
835   }
836
837   if (pbl) {
838     FuseStore::OpenFile *o = new FuseStore::OpenFile;
839     o->bl.claim(*pbl);
840     o->dirty = true;
841     fi->fh = reinterpret_cast<uint64_t>(o);
842   }
843   return 0;
844 }
845
846 static int os_release(const char *path, struct fuse_file_info *fi)
847 {
848   fuse_context *fc = fuse_get_context();
849   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
850   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
851   std::lock_guard<std::mutex> l(fs->lock);
852   FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
853   if (--o->ref == 0) {
854     ldout(fs->store->cct, 10) << __func__ << " closing last " << o->path << dendl;
855     fs->open_files.erase(o->path);
856     delete o;
857   }
858   return 0;
859 }
860
861 static int os_read(const char *path, char *buf, size_t size, off_t offset,
862                    struct fuse_file_info *fi)
863 {
864   fuse_context *fc = fuse_get_context();
865   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
866   ldout(fs->store->cct, 10) << __func__ << " " << path << " offset " << offset
867                      << " size " << size << dendl;
868   std::lock_guard<std::mutex> l(fs->lock);
869   FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
870   if (!o)
871     return 0;
872   if (offset >= o->bl.length())
873     return 0;
874   if (offset + size > o->bl.length())
875     size = o->bl.length() - offset;
876   bufferlist r;
877   r.substr_of(o->bl, offset, size);
878   memcpy(buf, r.c_str(), r.length());
879   return r.length();
880 }
881
882 static int os_write(const char *path, const char *buf, size_t size,
883                     off_t offset, struct fuse_file_info *fi)
884 {
885   fuse_context *fc = fuse_get_context();
886   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
887   ldout(fs->store->cct, 10) << __func__ << " " << path << " offset " << offset
888                      << " size " << size << dendl;
889   std::lock_guard<std::mutex> l(fs->lock);
890   FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
891   if (!o)
892     return 0;
893
894   bufferlist final;
895   if (offset) {
896     if (offset > o->bl.length()) {
897       final.substr_of(o->bl, 0, offset);
898     } else {
899       final.claim_append(o->bl);
900       size_t zlen = offset - final.length();
901       final.append_zero(zlen);
902     }
903   }
904   final.append(buf, size);
905   if (offset + size < o->bl.length()) {
906     bufferlist rest;
907     rest.substr_of(o->bl, offset + size, o->bl.length() - offset - size);
908     final.claim_append(rest);
909   }
910   o->bl = final;
911   o->dirty = true;
912   return size;
913 }
914
915 int os_flush(const char *path, struct fuse_file_info *fi)
916 {
917   fuse_context *fc = fuse_get_context();
918   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
919   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
920   coll_t cid;
921   ghobject_t oid;
922   string key;
923   uint32_t hash_value, hash_bits;
924   int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
925                    &hash_bits);
926   if (f < 0)
927     return f;
928
929   std::lock_guard<std::mutex> l(fs->lock);
930
931   FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
932   if (!o)
933     return 0;
934   if (!o->dirty)
935     return 0;
936
937   ObjectStore::Transaction t;
938
939   switch (f) {
940   case FN_OBJECT_DATA:
941     t.write(cid, oid, 0, o->bl.length(), o->bl);
942     break;
943
944   case FN_OBJECT_ATTR_VAL:
945     t.setattr(cid, oid, key.c_str(), o->bl);
946     break;
947
948   case FN_OBJECT_OMAP_VAL:
949     {
950       map<string,bufferlist> aset;
951       aset[key] = o->bl;
952       t.omap_setkeys(cid, oid, aset);
953       break;
954     }
955
956   case FN_OBJECT_OMAP_HEADER:
957     t.omap_setheader(cid, oid, o->bl);
958     break;
959
960   default:
961     return 0;
962   }
963
964   ceph::shared_ptr<ObjectStore::Sequencer> osr(
965     new ObjectStore::Sequencer("fuse"));
966   fs->store->apply_transaction(&*osr, std::move(t));
967   C_SaferCond waiter;
968   if (!osr->flush_commit(&waiter))
969     waiter.wait();
970
971   return 0;
972 }
973
974 static int os_unlink(const char *path)
975 {
976   fuse_context *fc = fuse_get_context();
977   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
978   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
979   coll_t cid;
980   ghobject_t oid;
981   string key;
982   uint32_t hash_value, hash_bits;
983   int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
984                    &hash_bits);
985   if (f < 0)
986     return f;
987
988   std::lock_guard<std::mutex> l(fs->lock);
989
990   ObjectStore::Transaction t;
991
992   switch (f) {
993   case FN_OBJECT_OMAP_VAL:
994     {
995       set<string> keys;
996       keys.insert(key);
997       t.omap_rmkeys(cid, oid, keys);
998     }
999     break;
1000
1001   case FN_OBJECT_ATTR_VAL:
1002     t.rmattr(cid, oid, key.c_str());
1003     break;
1004
1005   case FN_OBJECT_OMAP_HEADER:
1006     {
1007       bufferlist empty;
1008       t.omap_setheader(cid, oid, empty);
1009     }
1010     break;
1011
1012   case FN_OBJECT:
1013     t.remove(cid, oid);
1014     break;
1015
1016   case FN_COLLECTION:
1017     {
1018       bool empty;
1019       int r = fs->store->collection_empty(cid, &empty);
1020       if (r < 0)
1021         return r;
1022       if (!empty)
1023         return -ENOTEMPTY;
1024       t.remove_collection(cid);
1025     }
1026     break;
1027
1028   case FN_OBJECT_DATA:
1029     t.truncate(cid, oid, 0);
1030     break;
1031
1032   default:
1033     return -EPERM;
1034   }
1035
1036   ceph::shared_ptr<ObjectStore::Sequencer> osr(
1037     new ObjectStore::Sequencer("fuse"));
1038   fs->store->apply_transaction(&*osr, std::move(t));
1039   C_SaferCond waiter;
1040   if (!osr->flush_commit(&waiter))
1041     waiter.wait();
1042
1043   return 0;
1044 }
1045
1046 static int os_truncate(const char *path, off_t size)
1047 {
1048   fuse_context *fc = fuse_get_context();
1049   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
1050   ldout(fs->store->cct, 10) << __func__ << " " << path << " size " << size << dendl;
1051   coll_t cid;
1052   ghobject_t oid;
1053   string key;
1054   uint32_t hash_value, hash_bits;
1055   int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
1056                    &hash_bits);
1057   if (f < 0)
1058     return f;
1059
1060   if (f == FN_OBJECT_OMAP_VAL ||
1061       f == FN_OBJECT_ATTR_VAL ||
1062       f == FN_OBJECT_OMAP_HEADER) {
1063     if (size)
1064       return -EPERM;
1065     return 0;
1066   }
1067   if (f != FN_OBJECT_DATA)
1068     return -EPERM;
1069
1070   std::lock_guard<std::mutex> l(fs->lock);
1071
1072   if (fs->open_files.count(path)) {
1073     FuseStore::OpenFile *o = fs->open_files[path];
1074     if (o->bl.length() > size) {
1075       bufferlist t;
1076       t.substr_of(o->bl, 0, size);
1077       o->bl.swap(t);
1078     }
1079   }
1080
1081   ObjectStore::Transaction t;
1082   t.truncate(cid, oid, size);
1083   ceph::shared_ptr<ObjectStore::Sequencer> osr(
1084     new ObjectStore::Sequencer("fuse"));
1085   fs->store->apply_transaction(&*osr, std::move(t));
1086   C_SaferCond waiter;
1087   if (!osr->flush_commit(&waiter))
1088     waiter.wait();
1089   return 0;
1090 }
1091
1092 static int os_statfs(const char *path, struct statvfs *stbuf)
1093 {
1094   fuse_context *fc = fuse_get_context();
1095   FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
1096   ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
1097   std::lock_guard<std::mutex> l(fs->lock);
1098
1099   struct store_statfs_t s;
1100   int r = fs->store->statfs(&s);
1101   if (r < 0)
1102     return r;
1103   stbuf->f_bsize = 4096;   // LIES!
1104   stbuf->f_blocks = s.total / 4096;
1105   stbuf->f_bavail = s.available / 4096;
1106
1107   return 0;
1108 }
1109
1110 static struct fuse_operations fs_oper = {
1111   getattr: os_getattr,
1112   readlink: 0,
1113   getdir: 0,
1114   mknod: 0,
1115   mkdir: os_mkdir,
1116   unlink: os_unlink,
1117   rmdir: os_unlink,
1118   symlink: 0,
1119   rename: 0,
1120   link: 0,
1121   chmod: os_chmod,
1122   chown: 0,
1123   truncate: os_truncate,
1124   utime: 0,
1125   open: os_open,
1126   read: os_read,
1127   write: os_write,
1128   statfs: os_statfs,
1129   flush: os_flush,
1130   release: os_release,
1131   fsync: 0,
1132   setxattr: 0,
1133   getxattr: 0,
1134   listxattr: 0,
1135   removexattr: 0,
1136   opendir: 0,
1137   readdir: os_readdir,
1138   releasedir: 0,
1139   fsyncdir: 0,
1140   init: 0,
1141   destroy: 0,
1142   access: 0,
1143   create: os_create,
1144 };
1145
1146 int FuseStore::main()
1147 {
1148   const char *v[] = {
1149     "foo",
1150     mount_point.c_str(),
1151     "-f",
1152     "-d", // debug
1153   };
1154   int c = 3;
1155   if (store->cct->_conf->fuse_debug)
1156     ++c;
1157   return fuse_main(c, (char**)v, &fs_oper, (void*)this);
1158 }
1159
1160 int FuseStore::start()
1161 {
1162   dout(10) << __func__ << dendl;
1163
1164   memset(&info->args, 0, sizeof(info->args));
1165   const char *v[] = {
1166     "foo",
1167     mount_point.c_str(),
1168     "-f", // foreground
1169     "-d", // debug
1170   };
1171   int c = 3;
1172   if (store->cct->_conf->fuse_debug)
1173     ++c;
1174   fuse_args a = FUSE_ARGS_INIT(c, (char**)v);
1175   info->args = a;
1176   if (fuse_parse_cmdline(&info->args, &info->mountpoint, NULL, NULL) == -1) {
1177     derr << __func__ << " failed to parse args" << dendl;
1178     return -EINVAL;
1179   }
1180
1181   info->ch = fuse_mount(info->mountpoint, &info->args);
1182   if (!info->ch) {
1183     derr << __func__ << " fuse_mount failed" << dendl;
1184     return -EIO;
1185   }
1186
1187   info->f = fuse_new(info->ch, &info->args, &fs_oper, sizeof(fs_oper),
1188                      (void*)this);
1189   if (!info->f) {
1190     fuse_unmount(info->mountpoint, info->ch);
1191     derr << __func__ << " fuse_new failed" << dendl;
1192     return -EIO;
1193   }
1194
1195   fuse_thread.create("fusestore");
1196   dout(10) << __func__ << " done" << dendl;
1197   return 0;
1198 }
1199
1200 int FuseStore::loop()
1201 {
1202   dout(10) << __func__ << " enter" << dendl;
1203   int r = fuse_loop(info->f);
1204   if (r)
1205     derr << __func__ << " got " << cpp_strerror(r) << dendl;
1206   dout(10) << __func__ << " exit" << dendl;
1207   return r;
1208 }
1209
1210 int FuseStore::stop()
1211 {
1212   dout(10) << __func__ << " enter" << dendl;
1213   fuse_unmount(info->mountpoint, info->ch);
1214   fuse_thread.join();
1215   fuse_destroy(info->f);
1216   dout(10) << __func__ << " exit" << dendl;
1217   return 0;
1218 }