remove ceph code
[stor4nfv.git] / src / ceph / src / os / FuseStore.cc
diff --git a/src/ceph/src/os/FuseStore.cc b/src/ceph/src/os/FuseStore.cc
deleted file mode 100644 (file)
index 3ae0638..0000000
+++ /dev/null
@@ -1,1218 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "include/compat.h"
-#include "FuseStore.h"
-#include "os/ObjectStore.h"
-#include "include/stringify.h"
-#include "common/errno.h"
-
-#define FUSE_USE_VERSION 30
-#include <fuse.h>
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <fcntl.h>           /* Definition of AT_* constants */
-#include <sys/stat.h>
-
-#if defined(DARWIN) || defined(__FreeBSD__)
-#include <sys/param.h>
-#include <sys/mount.h>
-#endif
-
-#define dout_context store->cct
-#define dout_subsys ceph_subsys_fuse
-#include "common/debug.h"
-#undef dout_prefix
-#define dout_prefix *_dout << "fuse "
-
-// some fuse-y bits of state
-struct fs_info {
-  struct fuse_args args;
-  struct fuse *f;
-  struct fuse_chan *ch;
-  char *mountpoint;
-};
-
-int FuseStore::open_file(string p, struct fuse_file_info *fi,
-                        std::function<int(bufferlist *bl)> f)
-{
-  if (open_files.count(p)) {
-    OpenFile *o = open_files[p];
-    fi->fh = reinterpret_cast<uint64_t>(o);
-    ++o->ref;
-    return 0;
-  }
-  bufferlist bl;
-  int r = f(&bl);
-  if (r < 0) {
-    return r;
-  }
-  OpenFile *o = new OpenFile;
-  o->path = p;
-  o->bl.claim(bl);
-  open_files[p] = o;
-  fi->fh = reinterpret_cast<uint64_t>(o);
-  ++o->ref;
-  return 0;
-}
-
-FuseStore::FuseStore(ObjectStore *s, string p)
-  : store(s),
-    mount_point(p),
-    fuse_thread(this)
-{
-  info = new fs_info();
-}
-
-FuseStore::~FuseStore()
-{
-  delete info;
-}
-
-/*
- * / - root directory
- * $cid/
- * $cid/type - objectstore type
- * $cid/bitwise_hash_start = lowest hash value
- * $cid/bitwise_hash_end = highest hash value
- * $cid/bitwise_hash_bits - how many bits are significant
- * $cid/pgmeta/ - pgmeta object
- * $cid/all/ - all objects
- * $cid/all/$obj/
- * $cid/all/$obj/bitwise_hash
- * $cid/all/$obj/data
- * $cid/all/$obj/omap/$key
- * $cid/all/$obj/attr/$name
- * $cid/by_bitwise_hash/$hash/$bits/$obj - all objects with this (bitwise) hash (prefix)
- */
-enum {
-  FN_ROOT = 1,
-  FN_TYPE,
-  FN_COLLECTION,
-  FN_HASH_START,
-  FN_HASH_END,
-  FN_HASH_BITS,
-  FN_OBJECT,
-  FN_OBJECT_HASH,
-  FN_OBJECT_DATA,
-  FN_OBJECT_OMAP_HEADER,
-  FN_OBJECT_OMAP,
-  FN_OBJECT_OMAP_VAL,
-  FN_OBJECT_ATTR,
-  FN_OBJECT_ATTR_VAL,
-  FN_ALL,
-  FN_HASH_DIR,
-  FN_HASH_VAL,
-};
-
-static int parse_fn(CephContext* cct, const char *path, coll_t *cid,
-                   ghobject_t *oid, string *key,
-                   uint32_t *hash, uint32_t *hash_bits)
-{
-  list<string> v;
-  for (const char *p = path; *p; ++p) {
-    if (*p == '/')
-      continue;
-    const char *e;
-    for (e = p + 1; *e && *e != '/'; e++) ;
-    string c(p, e-p);
-    v.push_back(c);
-    p = e;
-    if (!*p)
-      break;
-  }
-  ldout(cct, 10) << __func__ << " path " << path << " -> " << v << dendl;
-
-  if (v.empty())
-    return FN_ROOT;
-
-  if (v.front() == "type")
-    return FN_TYPE;
-
-  if (!cid->parse(v.front())) {
-    return -ENOENT;
-  }
-  if (v.size() == 1)
-    return FN_COLLECTION;
-  v.pop_front();
-
-  if (v.front() == "bitwise_hash_start")
-    return FN_HASH_START;
-  if (v.front() == "bitwise_hash_end")
-    return FN_HASH_END;
-  if (v.front() == "bitwise_hash_bits")
-    return FN_HASH_BITS;
-  if (v.front() == "pgmeta") {
-    spg_t pgid;
-    if (cid->is_pg(&pgid)) {
-      *oid = pgid.make_pgmeta_oid();
-      v.pop_front();
-      if (v.empty())
-       return FN_OBJECT;
-      goto do_object;
-    }
-    return -ENOENT;
-  }
-  if (v.front() == "all") {
-    v.pop_front();
-    if (v.empty())
-      return FN_ALL;
-    goto do_dir;
-  }
-  if (v.front() == "by_bitwise_hash") {
-    v.pop_front();
-    if (v.empty())
-      return FN_HASH_DIR;
-    unsigned long hv, hm;
-    int r = sscanf(v.front().c_str(), "%lx", &hv);
-    if (r != 1)
-      return -ENOENT;
-    int shift = 32 - v.front().length() * 4;
-    v.pop_front();
-    if (v.empty())
-      return FN_HASH_DIR;
-    r = sscanf(v.front().c_str(), "%ld", &hm);
-    if (r != 1)
-      return -ENOENT;
-    if (hm < 1 || hm > 32)
-      return -ENOENT;
-    v.pop_front();
-    *hash = hv << shift;//hobject_t::_reverse_bits(hv << shift);
-    *hash_bits = hm;
-    if (v.empty())
-      return FN_HASH_VAL;
-    goto do_dir;
-  }
-  return -ENOENT;
-
- do_dir:
-  {
-    string o = v.front();
-    if (!oid->parse(o)) {
-      return -ENOENT;
-    }
-    v.pop_front();
-    if (v.empty())
-      return FN_OBJECT;
-  }
-
- do_object:
-  if (v.front() == "data")
-    return FN_OBJECT_DATA;
-  if (v.front() == "omap_header")
-    return FN_OBJECT_OMAP_HEADER;
-  if (v.front() == "omap") {
-    v.pop_front();
-    if (v.empty())
-      return FN_OBJECT_OMAP;
-    *key = v.front();
-    v.pop_front();
-    if (v.empty())
-      return FN_OBJECT_OMAP_VAL;
-    return -ENOENT;
-  }
-  if (v.front() == "attr") {
-    v.pop_front();
-    if (v.empty())
-      return FN_OBJECT_ATTR;
-    *key = v.front();
-    v.pop_front();
-    if (v.empty())
-      return FN_OBJECT_ATTR_VAL;
-    return -ENOENT;
-  }
-  if (v.front() == "bitwise_hash")
-    return FN_OBJECT_HASH;
-  return -ENOENT;
-}
-
-
-static int os_getattr(const char *path, struct stat *stbuf)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int t = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (t < 0)
-    return t;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  stbuf->st_size = 0;
-  stbuf->st_uid = 0;
-  stbuf->st_gid = 0;
-  stbuf->st_mode = S_IFREG | 0700;
-
-  switch (t) {
-  case FN_OBJECT_OMAP:
-  case FN_OBJECT_ATTR:
-  case FN_OBJECT:
-  case FN_OBJECT_DATA:
-  case FN_OBJECT_OMAP_HEADER:
-  case FN_OBJECT_OMAP_VAL:
-    {
-      spg_t pgid;
-      if (cid.is_pg(&pgid)) {
-       int bits = fs->store->collection_bits(cid);
-       if (bits >= 0 && !oid.match(bits, pgid.ps())) {
-         // sorry, not part of this PG
-         return -ENOENT;
-       }
-      }
-    }
-    break;
-  }
-
-  switch (t) {
-  case FN_OBJECT_OMAP:
-  case FN_OBJECT_ATTR:
-  case FN_OBJECT:
-    if (!fs->store->exists(cid, oid))
-      return -ENOENT;
-    // fall-thru
-  case FN_ALL:
-  case FN_HASH_DIR:
-  case FN_HASH_VAL:
-  case FN_COLLECTION:
-    if (!fs->store->collection_exists(cid))
-      return -ENOENT;
-    // fall-thru
-  case FN_ROOT:
-    stbuf->st_mode = S_IFDIR | 0700;
-    return 0;
-
-  case FN_TYPE:
-    stbuf->st_size = fs->store->get_type().length() + 1;
-    break;
-
-  case FN_OBJECT_HASH:
-    if (!fs->store->exists(cid, oid))
-      return -ENOENT;
-    stbuf->st_size = 9;
-    return 0;
-
-  case FN_HASH_END:
-    if (!fs->store->collection_exists(cid))
-      return -ENOENT;
-    if (fs->store->collection_bits(cid) < 0)
-      return -ENOENT;
-    // fall-thru
-  case FN_HASH_START:
-    stbuf->st_size = 9;
-    return 0;
-
-  case FN_HASH_BITS:
-    {
-      if (!fs->store->collection_exists(cid))
-       return -ENOENT;
-      int bits = fs->store->collection_bits(cid);
-      if (bits < 0)
-       return -ENOENT;
-      char buf[8];
-      snprintf(buf, sizeof(buf), "%d\n", bits);
-      stbuf->st_size = strlen(buf);
-    }
-    return 0;
-
-  case FN_OBJECT_DATA:
-    {
-      if (!fs->store->exists(cid, oid))
-       return -ENOENT;
-      int r = fs->store->stat(cid, oid, stbuf);
-      if (r < 0)
-       return r;
-    }
-    break;
-
-  case FN_OBJECT_OMAP_HEADER:
-    {
-      if (!fs->store->exists(cid, oid))
-       return -ENOENT;
-      bufferlist bl;
-      fs->store->omap_get_header(cid, oid, &bl);
-      stbuf->st_size = bl.length();
-    }
-    break;
-
-  case FN_OBJECT_OMAP_VAL:
-    {
-      if (!fs->store->exists(cid, oid))
-       return -ENOENT;
-      set<string> k;
-      k.insert(key);
-      map<string,bufferlist> v;
-      fs->store->omap_get_values(cid, oid, k, &v);
-      if (!v.count(key)) {
-       return -ENOENT;
-      }
-      stbuf->st_size = v[key].length();
-    }
-    break;
-
-  case FN_OBJECT_ATTR_VAL:
-    {
-      if (!fs->store->exists(cid, oid))
-       return -ENOENT;
-      bufferptr v;
-      int r = fs->store->getattr(cid, oid, key.c_str(), v);
-      if (r == -ENODATA)
-       r = -ENOENT;
-      if (r < 0)
-       return r;
-      stbuf->st_size = v.length();
-    }
-    break;
-
-  default:
-    return -ENOENT;
-  }
-
-  return 0;
-}
-
-static int os_readdir(const char *path,
-                     void *buf,
-                     fuse_fill_dir_t filler,
-                     off_t offset,
-                     struct fuse_file_info *fi)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << " offset " << offset
-                    << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int t = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (t < 0)
-    return t;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  // we can't shift 32 bits or else off_t will go negative
-  const int hash_shift = 31;
-
-  switch (t) {
-  case FN_ROOT:
-    {
-      filler(buf, "type", NULL, 0);
-      vector<coll_t> cls;
-      fs->store->list_collections(cls);
-      for (auto c : cls) {
-       int r = filler(buf, stringify(c).c_str(), NULL, 0);
-       if (r > 0)
-         break;
-      }
-    }
-    break;
-
-  case FN_COLLECTION:
-    {
-      filler(buf, "bitwise_hash_start", NULL, 0);
-      if (fs->store->collection_bits(cid) >= 0) {
-       filler(buf, "bitwise_hash_end", NULL, 0);
-       filler(buf, "bitwise_hash_bits", NULL, 0);
-      }
-      filler(buf, "all", NULL, 0);
-      filler(buf, "by_bitwise_hash", NULL, 0);
-      spg_t pgid;
-      if (cid.is_pg(&pgid) &&
-         fs->store->exists(cid, pgid.make_pgmeta_oid())) {
-       filler(buf, "pgmeta", NULL, 0);
-      }
-    }
-    break;
-
-  case FN_OBJECT:
-    {
-      filler(buf, "bitwise_hash", NULL, 0);
-      filler(buf, "data", NULL, 0);
-      filler(buf, "omap", NULL, 0);
-      filler(buf, "attr", NULL, 0);
-      filler(buf, "omap_header", NULL, 0);
-    }
-    break;
-
-  case FN_HASH_VAL:
-  case FN_ALL:
-    {
-      uint32_t bitwise_hash = (offset >> hash_shift) & 0xffffffff;
-      uint32_t hashoff = offset - (bitwise_hash << hash_shift);
-      int skip = hashoff;
-      ghobject_t next = cid.get_min_hobj();
-      if (offset) {
-       // obey the offset
-       next.hobj.set_hash(hobject_t::_reverse_bits(bitwise_hash));
-      } else if (t == FN_HASH_VAL) {
-       next.hobj.set_hash(hobject_t::_reverse_bits(hash_value));
-      }
-      ghobject_t last;
-      if (t == FN_HASH_VAL) {
-       last = next;
-       uint64_t rev_end = (hash_value | (0xffffffff >> hash_bits)) + 1;
-       if (rev_end >= 0x100000000)
-         last = ghobject_t::get_max();
-       else
-         last.hobj.set_hash(hobject_t::_reverse_bits(rev_end));
-      } else {
-       last = ghobject_t::get_max();
-      }
-      ldout(fs->store->cct, 10) << __func__ << std::hex
-                        << " offset " << offset << " hash "
-                        << hobject_t::_reverse_bits(hash_value)
-                        << std::dec
-                        << "/" << hash_bits
-                        << " first " << next << " last " << last
-                        << dendl;
-      while (true) {
-       vector<ghobject_t> ls;
-       int r = fs->store->collection_list(
-         cid, next, last, 1000, &ls, &next);
-       if (r < 0)
-         return r;
-       for (auto p : ls) {
-         if (skip) {
-           --skip;
-           continue;
-         }
-         uint32_t cur_bitwise_hash = p.hobj.get_bitwise_key_u32();
-         if (cur_bitwise_hash != bitwise_hash) {
-           bitwise_hash = cur_bitwise_hash;
-           hashoff = 0;
-         }
-         ++hashoff;
-         uint64_t cur_off = ((uint64_t)bitwise_hash << hash_shift) |
-           (uint64_t)hashoff;
-         string s = stringify(p);
-         r = filler(buf, s.c_str(), NULL, cur_off);
-         if (r)
-           break;
-       }
-       if (r)
-         break;
-       if (next == ghobject_t::get_max() || next == last)
-         break;
-      }
-    }
-    break;
-
-  case FN_OBJECT_OMAP:
-    {
-      set<string> keys;
-      fs->store->omap_get_keys(cid, oid, &keys);
-      unsigned skip = offset;
-      for (auto k : keys) {
-       if (skip) {
-         --skip;
-         continue;
-       }
-       ++offset;
-       int r = filler(buf, k.c_str(), NULL, offset);
-       if (r)
-         break;
-      }
-    }
-    break;
-
-  case FN_OBJECT_ATTR:
-    {
-      map<string,bufferptr> aset;
-      fs->store->getattrs(cid, oid, aset);
-      unsigned skip = offset;
-      for (auto a : aset) {
-       if (skip) {
-         --skip;
-         continue;
-       }
-       ++offset;
-       int r = filler(buf, a.first.c_str(), NULL, offset);
-       if (r)
-         break;
-      }
-    }
-    break;
-  }
-  return 0;
-}
-
-static int os_open(const char *path, struct fuse_file_info *fi)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int t = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (t < 0)
-    return t;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  bufferlist *pbl = 0;
-  switch (t) {
-  case FN_TYPE:
-    pbl = new bufferlist;
-    pbl->append(fs->store->get_type());
-    pbl->append("\n");
-    break;
-
-  case FN_HASH_START:
-    {
-      pbl = new bufferlist;
-      spg_t pgid;
-      if (cid.is_pg(&pgid)) {
-       unsigned long h;
-       h = hobject_t::_reverse_bits(pgid.ps());
-       char buf[10];
-       snprintf(buf, sizeof(buf), "%08lx\n", h);
-       pbl->append(buf);
-      } else {
-       pbl->append("00000000\n");
-      }
-    }
-    break;
-
-  case FN_HASH_END:
-    {
-      spg_t pgid;
-      unsigned long h;
-      if (cid.is_pg(&pgid)) {
-       int hash_bits = fs->store->collection_bits(cid);
-       if (hash_bits >= 0) {
-         uint64_t rev_start = hobject_t::_reverse_bits(pgid.ps());
-         uint64_t rev_end = (rev_start | (0xffffffff >> hash_bits));
-         h = rev_end;
-       } else {
-         return -ENOENT;
-       }
-      } else {
-       h = 0xffffffff;
-      }
-      char buf[10];
-      snprintf(buf, sizeof(buf), "%08lx\n", h);
-      pbl = new bufferlist;
-      pbl->append(buf);
-    }
-    break;
-
-  case FN_HASH_BITS:
-    {
-      int r = fs->store->collection_bits(cid);
-      if (r < 0)
-        return r;
-      char buf[8];
-      snprintf(buf, sizeof(buf), "%d\n", r);
-      pbl = new bufferlist;
-      pbl->append(buf);
-    }
-    break;
-
-  case FN_OBJECT_HASH:
-    {
-      pbl = new bufferlist;
-      char buf[10];
-      snprintf(buf, sizeof(buf), "%08x\n",
-              (unsigned)oid.hobj.get_bitwise_key_u32());
-      pbl->append(buf);
-    }
-    break;
-
-  case FN_OBJECT_DATA:
-    {
-      int r = fs->open_file(
-       path, fi,
-       [&](bufferlist *pbl) {
-         return fs->store->read(cid, oid, 0, 0, *pbl);
-       });
-      if (r < 0) {
-        return r;
-      }
-    }
-    break;
-
-  case FN_OBJECT_ATTR_VAL:
-    {
-      int r = fs->open_file(
-       path, fi,
-       [&](bufferlist *pbl) {
-         bufferptr bp;
-         int r = fs->store->getattr(cid, oid, key.c_str(), bp);
-         if (r < 0)
-           return r;
-         pbl->append(bp);
-         return 0;
-       });
-      if (r < 0)
-        return r;
-    }
-    break;
-
-  case FN_OBJECT_OMAP_VAL:
-    {
-      int r = fs->open_file(
-       path, fi,
-       [&](bufferlist *pbl) {
-         set<string> k;
-         k.insert(key);
-         map<string,bufferlist> v;
-         int r = fs->store->omap_get_values(cid, oid, k, &v);
-         if (r < 0)
-           return r;
-         *pbl = v[key];
-         return 0;
-       });
-      if (r < 0)
-       return r;
-    }
-    break;
-
-  case FN_OBJECT_OMAP_HEADER:
-    {
-      int r = fs->open_file(
-       path, fi,
-       [&](bufferlist *pbl) {
-         return fs->store->omap_get_header(cid, oid, pbl);
-       });
-      if (r < 0)
-       return r;
-    }
-    break;
-  }
-
-  if (pbl) {
-    FuseStore::OpenFile *o = new FuseStore::OpenFile;
-    o->bl.claim(*pbl);
-    fi->fh = reinterpret_cast<uint64_t>(o);
-  }
-  return 0;
-}
-
-static int os_mkdir(const char *path, mode_t mode)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (f < 0)
-    return f;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  ObjectStore::Transaction t;
-  switch (f) {
-  case FN_OBJECT:
-    {
-      spg_t pgid;
-      if (cid.is_pg(&pgid)) {
-       int bits = fs->store->collection_bits(cid);
-       if (bits >= 0 && !oid.match(bits, pgid.ps())) {
-         // sorry, not part of this PG
-         return -EINVAL;
-       }
-      }
-      t.touch(cid, oid);
-    }
-    break;
-
-  case FN_COLLECTION:
-    if (cid.is_pg()) {
-      // use the mode for the bit count.  e.g., mkdir --mode=0003
-      // mnt/0.7_head will create 0.7 with bits = 3.
-      mode &= 0777;
-      if (mode >= 32)
-       return -EINVAL;
-    } else {
-      mode = 0;
-    }
-    t.create_collection(cid, mode);
-    break;
-
-  default:
-    return -EPERM;
-  }
-
-  if (!t.empty()) {
-    ceph::shared_ptr<ObjectStore::Sequencer> osr(
-      new ObjectStore::Sequencer("fuse"));
-    fs->store->apply_transaction(&*osr, std::move(t));
-    C_SaferCond waiter;
-    if (!osr->flush_commit(&waiter))
-      waiter.wait();
-  }
-
-  return 0;
-}
-
-static int os_chmod(const char *path, mode_t mode)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  return 0;
-}
-
-static int os_create(const char *path, mode_t mode, struct fuse_file_info *fi)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (f < 0)
-    return f;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  ObjectStore::Transaction t;
-  bufferlist *pbl = 0;
-  switch (f) {
-  case FN_OBJECT_DATA:
-    {
-      pbl = new bufferlist;
-      fs->store->read(cid, oid, 0, 0, *pbl);
-    }
-    break;
-
-  case FN_OBJECT_ATTR_VAL:
-    {
-      pbl = new bufferlist;
-      bufferptr bp;
-      int r = fs->store->getattr(cid, oid, key.c_str(), bp);
-      if (r == -ENODATA) {
-       bufferlist empty;
-       t.setattr(cid, oid, key.c_str(), empty);
-      }
-      pbl->append(bp);
-    }
-    break;
-
-  case FN_OBJECT_OMAP_VAL:
-    {
-      pbl = new bufferlist;
-      set<string> k;
-      k.insert(key);
-      map<string,bufferlist> v;
-      fs->store->omap_get_values(cid, oid, k, &v);
-      if (v.count(key) == 0) {
-       map<string,bufferlist> aset;
-       aset[key] = bufferlist();
-       t.omap_setkeys(cid, oid, aset);
-      } else {
-       *pbl = v[key];
-      }
-    }
-    break;
-  }
-
-  if (!t.empty()) {
-    ceph::shared_ptr<ObjectStore::Sequencer> osr(
-      new ObjectStore::Sequencer("fuse"));
-    fs->store->apply_transaction(&*osr, std::move(t));
-    C_SaferCond waiter;
-    if (!osr->flush_commit(&waiter))
-      waiter.wait();
-  }
-
-  if (pbl) {
-    FuseStore::OpenFile *o = new FuseStore::OpenFile;
-    o->bl.claim(*pbl);
-    o->dirty = true;
-    fi->fh = reinterpret_cast<uint64_t>(o);
-  }
-  return 0;
-}
-
-static int os_release(const char *path, struct fuse_file_info *fi)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  std::lock_guard<std::mutex> l(fs->lock);
-  FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
-  if (--o->ref == 0) {
-    ldout(fs->store->cct, 10) << __func__ << " closing last " << o->path << dendl;
-    fs->open_files.erase(o->path);
-    delete o;
-  }
-  return 0;
-}
-
-static int os_read(const char *path, char *buf, size_t size, off_t offset,
-                  struct fuse_file_info *fi)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << " offset " << offset
-                    << " size " << size << dendl;
-  std::lock_guard<std::mutex> l(fs->lock);
-  FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
-  if (!o)
-    return 0;
-  if (offset >= o->bl.length())
-    return 0;
-  if (offset + size > o->bl.length())
-    size = o->bl.length() - offset;
-  bufferlist r;
-  r.substr_of(o->bl, offset, size);
-  memcpy(buf, r.c_str(), r.length());
-  return r.length();
-}
-
-static int os_write(const char *path, const char *buf, size_t size,
-                   off_t offset, struct fuse_file_info *fi)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << " offset " << offset
-                    << " size " << size << dendl;
-  std::lock_guard<std::mutex> l(fs->lock);
-  FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
-  if (!o)
-    return 0;
-
-  bufferlist final;
-  if (offset) {
-    if (offset > o->bl.length()) {
-      final.substr_of(o->bl, 0, offset);
-    } else {
-      final.claim_append(o->bl);
-      size_t zlen = offset - final.length();
-      final.append_zero(zlen);
-    }
-  }
-  final.append(buf, size);
-  if (offset + size < o->bl.length()) {
-    bufferlist rest;
-    rest.substr_of(o->bl, offset + size, o->bl.length() - offset - size);
-    final.claim_append(rest);
-  }
-  o->bl = final;
-  o->dirty = true;
-  return size;
-}
-
-int os_flush(const char *path, struct fuse_file_info *fi)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (f < 0)
-    return f;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  FuseStore::OpenFile *o = reinterpret_cast<FuseStore::OpenFile*>(fi->fh);
-  if (!o)
-    return 0;
-  if (!o->dirty)
-    return 0;
-
-  ObjectStore::Transaction t;
-
-  switch (f) {
-  case FN_OBJECT_DATA:
-    t.write(cid, oid, 0, o->bl.length(), o->bl);
-    break;
-
-  case FN_OBJECT_ATTR_VAL:
-    t.setattr(cid, oid, key.c_str(), o->bl);
-    break;
-
-  case FN_OBJECT_OMAP_VAL:
-    {
-      map<string,bufferlist> aset;
-      aset[key] = o->bl;
-      t.omap_setkeys(cid, oid, aset);
-      break;
-    }
-
-  case FN_OBJECT_OMAP_HEADER:
-    t.omap_setheader(cid, oid, o->bl);
-    break;
-
-  default:
-    return 0;
-  }
-
-  ceph::shared_ptr<ObjectStore::Sequencer> osr(
-    new ObjectStore::Sequencer("fuse"));
-  fs->store->apply_transaction(&*osr, std::move(t));
-  C_SaferCond waiter;
-  if (!osr->flush_commit(&waiter))
-    waiter.wait();
-
-  return 0;
-}
-
-static int os_unlink(const char *path)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (f < 0)
-    return f;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  ObjectStore::Transaction t;
-
-  switch (f) {
-  case FN_OBJECT_OMAP_VAL:
-    {
-      set<string> keys;
-      keys.insert(key);
-      t.omap_rmkeys(cid, oid, keys);
-    }
-    break;
-
-  case FN_OBJECT_ATTR_VAL:
-    t.rmattr(cid, oid, key.c_str());
-    break;
-
-  case FN_OBJECT_OMAP_HEADER:
-    {
-      bufferlist empty;
-      t.omap_setheader(cid, oid, empty);
-    }
-    break;
-
-  case FN_OBJECT:
-    t.remove(cid, oid);
-    break;
-
-  case FN_COLLECTION:
-    {
-      bool empty;
-      int r = fs->store->collection_empty(cid, &empty);
-      if (r < 0)
-        return r;
-      if (!empty)
-        return -ENOTEMPTY;
-      t.remove_collection(cid);
-    }
-    break;
-
-  case FN_OBJECT_DATA:
-    t.truncate(cid, oid, 0);
-    break;
-
-  default:
-    return -EPERM;
-  }
-
-  ceph::shared_ptr<ObjectStore::Sequencer> osr(
-    new ObjectStore::Sequencer("fuse"));
-  fs->store->apply_transaction(&*osr, std::move(t));
-  C_SaferCond waiter;
-  if (!osr->flush_commit(&waiter))
-    waiter.wait();
-
-  return 0;
-}
-
-static int os_truncate(const char *path, off_t size)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << " size " << size << dendl;
-  coll_t cid;
-  ghobject_t oid;
-  string key;
-  uint32_t hash_value, hash_bits;
-  int f = parse_fn(fs->store->cct, path, &cid, &oid, &key, &hash_value,
-                  &hash_bits);
-  if (f < 0)
-    return f;
-
-  if (f == FN_OBJECT_OMAP_VAL ||
-      f == FN_OBJECT_ATTR_VAL ||
-      f == FN_OBJECT_OMAP_HEADER) {
-    if (size)
-      return -EPERM;
-    return 0;
-  }
-  if (f != FN_OBJECT_DATA)
-    return -EPERM;
-
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  if (fs->open_files.count(path)) {
-    FuseStore::OpenFile *o = fs->open_files[path];
-    if (o->bl.length() > size) {
-      bufferlist t;
-      t.substr_of(o->bl, 0, size);
-      o->bl.swap(t);
-    }
-  }
-
-  ObjectStore::Transaction t;
-  t.truncate(cid, oid, size);
-  ceph::shared_ptr<ObjectStore::Sequencer> osr(
-    new ObjectStore::Sequencer("fuse"));
-  fs->store->apply_transaction(&*osr, std::move(t));
-  C_SaferCond waiter;
-  if (!osr->flush_commit(&waiter))
-    waiter.wait();
-  return 0;
-}
-
-static int os_statfs(const char *path, struct statvfs *stbuf)
-{
-  fuse_context *fc = fuse_get_context();
-  FuseStore *fs = static_cast<FuseStore*>(fc->private_data);
-  ldout(fs->store->cct, 10) << __func__ << " " << path << dendl;
-  std::lock_guard<std::mutex> l(fs->lock);
-
-  struct store_statfs_t s;
-  int r = fs->store->statfs(&s);
-  if (r < 0)
-    return r;
-  stbuf->f_bsize = 4096;   // LIES!
-  stbuf->f_blocks = s.total / 4096;
-  stbuf->f_bavail = s.available / 4096;
-
-  return 0;
-}
-
-static struct fuse_operations fs_oper = {
-  getattr: os_getattr,
-  readlink: 0,
-  getdir: 0,
-  mknod: 0,
-  mkdir: os_mkdir,
-  unlink: os_unlink,
-  rmdir: os_unlink,
-  symlink: 0,
-  rename: 0,
-  link: 0,
-  chmod: os_chmod,
-  chown: 0,
-  truncate: os_truncate,
-  utime: 0,
-  open: os_open,
-  read: os_read,
-  write: os_write,
-  statfs: os_statfs,
-  flush: os_flush,
-  release: os_release,
-  fsync: 0,
-  setxattr: 0,
-  getxattr: 0,
-  listxattr: 0,
-  removexattr: 0,
-  opendir: 0,
-  readdir: os_readdir,
-  releasedir: 0,
-  fsyncdir: 0,
-  init: 0,
-  destroy: 0,
-  access: 0,
-  create: os_create,
-};
-
-int FuseStore::main()
-{
-  const char *v[] = {
-    "foo",
-    mount_point.c_str(),
-    "-f",
-    "-d", // debug
-  };
-  int c = 3;
-  if (store->cct->_conf->fuse_debug)
-    ++c;
-  return fuse_main(c, (char**)v, &fs_oper, (void*)this);
-}
-
-int FuseStore::start()
-{
-  dout(10) << __func__ << dendl;
-
-  memset(&info->args, 0, sizeof(info->args));
-  const char *v[] = {
-    "foo",
-    mount_point.c_str(),
-    "-f", // foreground
-    "-d", // debug
-  };
-  int c = 3;
-  if (store->cct->_conf->fuse_debug)
-    ++c;
-  fuse_args a = FUSE_ARGS_INIT(c, (char**)v);
-  info->args = a;
-  if (fuse_parse_cmdline(&info->args, &info->mountpoint, NULL, NULL) == -1) {
-    derr << __func__ << " failed to parse args" << dendl;
-    return -EINVAL;
-  }
-
-  info->ch = fuse_mount(info->mountpoint, &info->args);
-  if (!info->ch) {
-    derr << __func__ << " fuse_mount failed" << dendl;
-    return -EIO;
-  }
-
-  info->f = fuse_new(info->ch, &info->args, &fs_oper, sizeof(fs_oper),
-                    (void*)this);
-  if (!info->f) {
-    fuse_unmount(info->mountpoint, info->ch);
-    derr << __func__ << " fuse_new failed" << dendl;
-    return -EIO;
-  }
-
-  fuse_thread.create("fusestore");
-  dout(10) << __func__ << " done" << dendl;
-  return 0;
-}
-
-int FuseStore::loop()
-{
-  dout(10) << __func__ << " enter" << dendl;
-  int r = fuse_loop(info->f);
-  if (r)
-    derr << __func__ << " got " << cpp_strerror(r) << dendl;
-  dout(10) << __func__ << " exit" << dendl;
-  return r;
-}
-
-int FuseStore::stop()
-{
-  dout(10) << __func__ << " enter" << dendl;
-  fuse_unmount(info->mountpoint, info->ch);
-  fuse_thread.join();
-  fuse_destroy(info->f);
-  dout(10) << __func__ << " exit" << dendl;
-  return 0;
-}