X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fos%2Fmemstore%2FMemStore.cc;fp=src%2Fceph%2Fsrc%2Fos%2Fmemstore%2FMemStore.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=08be76e65b4b61303abe7a452c27f422326d44ff;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/os/memstore/MemStore.cc b/src/ceph/src/os/memstore/MemStore.cc deleted file mode 100644 index 08be76e..0000000 --- a/src/ceph/src/os/memstore/MemStore.cc +++ /dev/null @@ -1,1823 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2013 Inktank - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ -#include "acconfig.h" - -#ifdef HAVE_SYS_MOUNT_H -#include -#endif - -#ifdef HAVE_SYS_PARAM_H -#include -#endif - -#include "include/types.h" -#include "include/stringify.h" -#include "include/unordered_map.h" -#include "include/memory.h" -#include "common/errno.h" -#include "MemStore.h" -#include "include/compat.h" - -#define dout_context cct -#define dout_subsys ceph_subsys_filestore -#undef dout_prefix -#define dout_prefix *_dout << "memstore(" << path << ") " - -// for comparing collections for lock ordering -bool operator>(const MemStore::CollectionRef& l, - const MemStore::CollectionRef& r) -{ - return (unsigned long)l.get() > (unsigned long)r.get(); -} - - -int MemStore::mount() -{ - int r = _load(); - if (r < 0) - return r; - finisher.start(); - return 0; -} - -int MemStore::umount() -{ - finisher.wait_for_empty(); - finisher.stop(); - return _save(); -} - -int MemStore::_save() -{ - dout(10) << __func__ << dendl; - dump_all(); - set collections; - for (ceph::unordered_map::iterator p = coll_map.begin(); - p != coll_map.end(); - ++p) { - dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl; - collections.insert(p->first); - bufferlist bl; - assert(p->second); - p->second->encode(bl); - string fn = path + "/" + stringify(p->first); - int r = bl.write_file(fn.c_str()); - if (r < 0) - return r; - } - - string fn = path + "/collections"; - bufferlist bl; - ::encode(collections, bl); - int r = bl.write_file(fn.c_str()); - if (r < 0) - return r; - - return 0; -} - -void MemStore::dump_all() -{ - Formatter *f = Formatter::create("json-pretty"); - f->open_object_section("store"); - dump(f); - f->close_section(); - dout(0) << "dump:"; - f->flush(*_dout); - *_dout << dendl; - delete f; -} - -void MemStore::dump(Formatter *f) -{ - f->open_array_section("collections"); - for (ceph::unordered_map::iterator p = coll_map.begin(); - p != coll_map.end(); - ++p) { - f->open_object_section("collection"); - f->dump_string("name", stringify(p->first)); - - f->open_array_section("xattrs"); - for (map::iterator q = p->second->xattr.begin(); - q != p->second->xattr.end(); - ++q) { - f->open_object_section("xattr"); - f->dump_string("name", q->first); - f->dump_int("length", q->second.length()); - f->close_section(); - } - f->close_section(); - - f->open_array_section("objects"); - for (map::iterator q = p->second->object_map.begin(); - q != p->second->object_map.end(); - ++q) { - f->open_object_section("object"); - f->dump_string("name", stringify(q->first)); - if (q->second) - q->second->dump(f); - f->close_section(); - } - f->close_section(); - - f->close_section(); - } - f->close_section(); -} - -int MemStore::_load() -{ - dout(10) << __func__ << dendl; - bufferlist bl; - string fn = path + "/collections"; - string err; - int r = bl.read_file(fn.c_str(), &err); - if (r < 0) - return r; - - set collections; - bufferlist::iterator p = bl.begin(); - ::decode(collections, p); - - for (set::iterator q = collections.begin(); - q != collections.end(); - ++q) { - string fn = path + "/" + stringify(*q); - bufferlist cbl; - int r = cbl.read_file(fn.c_str(), &err); - if (r < 0) - return r; - CollectionRef c(new Collection(cct, *q)); - bufferlist::iterator p = cbl.begin(); - c->decode(p); - coll_map[*q] = c; - used_bytes += c->used_bytes(); - } - - dump_all(); - - return 0; -} - -void MemStore::set_fsid(uuid_d u) -{ - int r = write_meta("fs_fsid", stringify(u)); - assert(r >= 0); -} - -uuid_d MemStore::get_fsid() -{ - string fsid_str; - int r = read_meta("fs_fsid", &fsid_str); - assert(r >= 0); - uuid_d uuid; - bool b = uuid.parse(fsid_str.c_str()); - assert(b); - return uuid; -} - -int MemStore::mkfs() -{ - string fsid_str; - int r = read_meta("fs_fsid", &fsid_str); - if (r == -ENOENT) { - uuid_d fsid; - fsid.generate_random(); - fsid_str = stringify(fsid); - r = write_meta("fs_fsid", fsid_str); - if (r < 0) - return r; - dout(1) << __func__ << " new fsid " << fsid_str << dendl; - } else if (r < 0) { - return r; - } else { - dout(1) << __func__ << " had fsid " << fsid_str << dendl; - } - - string fn = path + "/collections"; - derr << path << dendl; - bufferlist bl; - set collections; - ::encode(collections, bl); - r = bl.write_file(fn.c_str()); - if (r < 0) - return r; - - r = write_meta("type", "memstore"); - if (r < 0) - return r; - - return 0; -} - -int MemStore::statfs(struct store_statfs_t *st) -{ - dout(10) << __func__ << dendl; - st->reset(); - st->total = cct->_conf->memstore_device_bytes; - st->available = MAX(int64_t(st->total) - int64_t(used_bytes), 0ll); - dout(10) << __func__ << ": used_bytes: " << used_bytes - << "/" << cct->_conf->memstore_device_bytes << dendl; - return 0; -} - -objectstore_perf_stat_t MemStore::get_cur_stats() -{ - // fixme - return objectstore_perf_stat_t(); -} - -MemStore::CollectionRef MemStore::get_collection(const coll_t& cid) -{ - RWLock::RLocker l(coll_lock); - ceph::unordered_map::iterator cp = coll_map.find(cid); - if (cp == coll_map.end()) - return CollectionRef(); - return cp->second; -} - - -// --------------- -// read operations - -bool MemStore::exists(const coll_t& cid, const ghobject_t& oid) -{ - CollectionHandle c = get_collection(cid); - if (!c) - return false; - return exists(c, oid); -} - -bool MemStore::exists(CollectionHandle &c_, const ghobject_t& oid) -{ - Collection *c = static_cast(c_.get()); - dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl; - if (!c->exists) - return false; - - // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the - // shared_ptr needs to be compared to nullptr. - return (bool)c->get_object(oid); -} - -int MemStore::stat( - const coll_t& cid, - const ghobject_t& oid, - struct stat *st, - bool allow_eio) -{ - CollectionHandle c = get_collection(cid); - if (!c) - return -ENOENT; - return stat(c, oid, st, allow_eio); -} - -int MemStore::stat( - CollectionHandle &c_, - const ghobject_t& oid, - struct stat *st, - bool allow_eio) -{ - Collection *c = static_cast(c_.get()); - dout(10) << __func__ << " " << c->cid << " " << oid << dendl; - if (!c->exists) - return -ENOENT; - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - st->st_size = o->get_size(); - st->st_blksize = 4096; - st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize; - st->st_nlink = 1; - return 0; -} - -int MemStore::set_collection_opts( - const coll_t& cid, - const pool_opts_t& opts) -{ - return -EOPNOTSUPP; -} - -int MemStore::read( - const coll_t& cid, - const ghobject_t& oid, - uint64_t offset, - size_t len, - bufferlist& bl, - uint32_t op_flags) -{ - CollectionHandle c = get_collection(cid); - if (!c) - return -ENOENT; - return read(c, oid, offset, len, bl, op_flags); -} - -int MemStore::read( - CollectionHandle &c_, - const ghobject_t& oid, - uint64_t offset, - size_t len, - bufferlist& bl, - uint32_t op_flags) -{ - Collection *c = static_cast(c_.get()); - dout(10) << __func__ << " " << c->cid << " " << oid << " " - << offset << "~" << len << dendl; - if (!c->exists) - return -ENOENT; - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - if (offset >= o->get_size()) - return 0; - size_t l = len; - if (l == 0 && offset == 0) // note: len == 0 means read the entire object - l = o->get_size(); - else if (offset + l > o->get_size()) - l = o->get_size() - offset; - bl.clear(); - return o->read(offset, l, bl); -} - -int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid, - uint64_t offset, size_t len, bufferlist& bl) -{ - map destmap; - int r = fiemap(cid, oid, offset, len, destmap); - if (r >= 0) - ::encode(destmap, bl); - return r; -} - -int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid, - uint64_t offset, size_t len, map& destmap) -{ - dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~" - << len << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - size_t l = len; - if (offset + l > o->get_size()) - l = o->get_size() - offset; - if (offset >= o->get_size()) - goto out; - destmap[offset] = l; - out: - return 0; -} - -int MemStore::getattr(const coll_t& cid, const ghobject_t& oid, - const char *name, bufferptr& value) -{ - CollectionHandle c = get_collection(cid); - if (!c) - return -ENOENT; - return getattr(c, oid, name, value); -} - -int MemStore::getattr(CollectionHandle &c_, const ghobject_t& oid, - const char *name, bufferptr& value) -{ - Collection *c = static_cast(c_.get()); - dout(10) << __func__ << " " << c->cid << " " << oid << " " << name << dendl; - if (!c->exists) - return -ENOENT; - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - string k(name); - std::lock_guard lock(o->xattr_mutex); - if (!o->xattr.count(k)) { - return -ENODATA; - } - value = o->xattr[k]; - return 0; -} - -int MemStore::getattrs(const coll_t& cid, const ghobject_t& oid, - map& aset) -{ - CollectionHandle c = get_collection(cid); - if (!c) - return -ENOENT; - return getattrs(c, oid, aset); -} - -int MemStore::getattrs(CollectionHandle &c_, const ghobject_t& oid, - map& aset) -{ - Collection *c = static_cast(c_.get()); - dout(10) << __func__ << " " << c->cid << " " << oid << dendl; - if (!c->exists) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->xattr_mutex); - aset = o->xattr; - return 0; -} - -int MemStore::list_collections(vector& ls) -{ - dout(10) << __func__ << dendl; - RWLock::RLocker l(coll_lock); - for (ceph::unordered_map::iterator p = coll_map.begin(); - p != coll_map.end(); - ++p) { - ls.push_back(p->first); - } - return 0; -} - -bool MemStore::collection_exists(const coll_t& cid) -{ - dout(10) << __func__ << " " << cid << dendl; - RWLock::RLocker l(coll_lock); - return coll_map.count(cid); -} - -int MemStore::collection_empty(const coll_t& cid, bool *empty) -{ - dout(10) << __func__ << " " << cid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - RWLock::RLocker l(c->lock); - *empty = c->object_map.empty(); - return 0; -} - -int MemStore::collection_bits(const coll_t& cid) -{ - dout(10) << __func__ << " " << cid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - RWLock::RLocker l(c->lock); - return c->bits; -} - -int MemStore::collection_list(const coll_t& cid, - const ghobject_t& start, - const ghobject_t& end, - int max, - vector *ls, ghobject_t *next) -{ - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - RWLock::RLocker l(c->lock); - - dout(10) << __func__ << " cid " << cid << " start " << start - << " end " << end << dendl; - map::iterator p = c->object_map.lower_bound(start); - while (p != c->object_map.end() && - ls->size() < (unsigned)max && - p->first < end) { - ls->push_back(p->first); - ++p; - } - if (next != NULL) { - if (p == c->object_map.end()) - *next = ghobject_t::get_max(); - else - *next = p->first; - } - dout(10) << __func__ << " cid " << cid << " got " << ls->size() << dendl; - return 0; -} - -int MemStore::omap_get( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object containing omap - bufferlist *header, ///< [out] omap header - map *out /// < [out] Key to value map - ) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - *header = o->omap_header; - *out = o->omap; - return 0; -} - -int MemStore::omap_get_header( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object containing omap - bufferlist *header, ///< [out] omap header - bool allow_eio ///< [in] don't assert on eio - ) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - *header = o->omap_header; - return 0; -} - -int MemStore::omap_get_keys( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object containing omap - set *keys ///< [out] Keys defined on oid - ) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - for (map::iterator p = o->omap.begin(); - p != o->omap.end(); - ++p) - keys->insert(p->first); - return 0; -} - -int MemStore::omap_get_values( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object containing omap - const set &keys, ///< [in] Keys to get - map *out ///< [out] Returned keys and values - ) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - for (set::const_iterator p = keys.begin(); - p != keys.end(); - ++p) { - map::iterator q = o->omap.find(*p); - if (q != o->omap.end()) - out->insert(*q); - } - return 0; -} - -int MemStore::omap_check_keys( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object containing omap - const set &keys, ///< [in] Keys to check - set *out ///< [out] Subset of keys defined on oid - ) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - for (set::const_iterator p = keys.begin(); - p != keys.end(); - ++p) { - map::iterator q = o->omap.find(*p); - if (q != o->omap.end()) - out->insert(*p); - } - return 0; -} - -class MemStore::OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl { - CollectionRef c; - ObjectRef o; - map::iterator it; -public: - OmapIteratorImpl(CollectionRef c, ObjectRef o) - : c(c), o(o), it(o->omap.begin()) {} - - int seek_to_first() override { - std::lock_guard(o->omap_mutex); - it = o->omap.begin(); - return 0; - } - int upper_bound(const string &after) override { - std::lock_guard(o->omap_mutex); - it = o->omap.upper_bound(after); - return 0; - } - int lower_bound(const string &to) override { - std::lock_guard(o->omap_mutex); - it = o->omap.lower_bound(to); - return 0; - } - bool valid() override { - std::lock_guard(o->omap_mutex); - return it != o->omap.end(); - } - int next(bool validate=true) override { - std::lock_guard(o->omap_mutex); - ++it; - return 0; - } - string key() override { - std::lock_guard(o->omap_mutex); - return it->first; - } - bufferlist value() override { - std::lock_guard(o->omap_mutex); - return it->second; - } - int status() override { - return 0; - } -}; - -ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(const coll_t& cid, - const ghobject_t& oid) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return ObjectMap::ObjectMapIterator(); - - ObjectRef o = c->get_object(oid); - if (!o) - return ObjectMap::ObjectMapIterator(); - return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o)); -} - - -// --------------- -// write operations - -int MemStore::queue_transactions(Sequencer *osr, - vector& tls, - TrackedOpRef op, - ThreadPool::TPHandle *handle) -{ - // because memstore operations are synchronous, we can implement the - // Sequencer with a mutex. this guarantees ordering on a given sequencer, - // while allowing operations on different sequencers to happen in parallel - struct OpSequencer : public Sequencer_impl { - OpSequencer(CephContext* cct) : - Sequencer_impl(cct) {} - std::mutex mutex; - void flush() override {} - bool flush_commit(Context*) override { return true; } - }; - - std::unique_lock lock; - if (osr) { - if (!osr->p) { - osr->p = new OpSequencer(cct); - } - auto seq = static_cast(osr->p.get()); - lock = std::unique_lock(seq->mutex); - } - - for (vector::iterator p = tls.begin(); p != tls.end(); ++p) { - // poke the TPHandle heartbeat just to exercise that code path - if (handle) - handle->reset_tp_timeout(); - - _do_transaction(*p); - } - - Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL; - ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit, - &on_apply_sync); - if (on_apply_sync) - on_apply_sync->complete(0); - if (on_apply) - finisher.queue(on_apply); - if (on_commit) - finisher.queue(on_commit); - return 0; -} - -void MemStore::_do_transaction(Transaction& t) -{ - Transaction::iterator i = t.begin(); - int pos = 0; - - while (i.have_op()) { - Transaction::Op *op = i.decode_op(); - int r = 0; - - switch (op->op) { - case Transaction::OP_NOP: - break; - case Transaction::OP_TOUCH: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - r = _touch(cid, oid); - } - break; - - case Transaction::OP_WRITE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - uint64_t len = op->len; - uint32_t fadvise_flags = i.get_fadvise_flags(); - bufferlist bl; - i.decode_bl(bl); - r = _write(cid, oid, off, len, bl, fadvise_flags); - } - break; - - case Transaction::OP_ZERO: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - uint64_t len = op->len; - r = _zero(cid, oid, off, len); - } - break; - - case Transaction::OP_TRIMCACHE: - { - // deprecated, no-op - } - break; - - case Transaction::OP_TRUNCATE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - r = _truncate(cid, oid, off); - } - break; - - case Transaction::OP_REMOVE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - r = _remove(cid, oid); - } - break; - - case Transaction::OP_SETATTR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string name = i.decode_string(); - bufferlist bl; - i.decode_bl(bl); - map to_set; - to_set[name] = bufferptr(bl.c_str(), bl.length()); - r = _setattrs(cid, oid, to_set); - } - break; - - case Transaction::OP_SETATTRS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - map aset; - i.decode_attrset(aset); - r = _setattrs(cid, oid, aset); - } - break; - - case Transaction::OP_RMATTR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string name = i.decode_string(); - r = _rmattr(cid, oid, name.c_str()); - } - break; - - case Transaction::OP_RMATTRS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - r = _rmattrs(cid, oid); - } - break; - - case Transaction::OP_CLONE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - r = _clone(cid, oid, noid); - } - break; - - case Transaction::OP_CLONERANGE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - uint64_t off = op->off; - uint64_t len = op->len; - r = _clone_range(cid, oid, noid, off, len, off); - } - break; - - case Transaction::OP_CLONERANGE2: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - uint64_t srcoff = op->off; - uint64_t len = op->len; - uint64_t dstoff = op->dest_off; - r = _clone_range(cid, oid, noid, srcoff, len, dstoff); - } - break; - - case Transaction::OP_MKCOLL: - { - coll_t cid = i.get_cid(op->cid); - r = _create_collection(cid, op->split_bits); - } - break; - - case Transaction::OP_COLL_HINT: - { - coll_t cid = i.get_cid(op->cid); - uint32_t type = op->hint_type; - bufferlist hint; - i.decode_bl(hint); - bufferlist::iterator hiter = hint.begin(); - if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) { - uint32_t pg_num; - uint64_t num_objs; - ::decode(pg_num, hiter); - ::decode(num_objs, hiter); - r = _collection_hint_expected_num_objs(cid, pg_num, num_objs); - } else { - // Ignore the hint - dout(10) << "Unrecognized collection hint type: " << type << dendl; - } - } - break; - - case Transaction::OP_RMCOLL: - { - coll_t cid = i.get_cid(op->cid); - r = _destroy_collection(cid); - } - break; - - case Transaction::OP_COLL_ADD: - { - coll_t ocid = i.get_cid(op->cid); - coll_t ncid = i.get_cid(op->dest_cid); - ghobject_t oid = i.get_oid(op->oid); - r = _collection_add(ncid, ocid, oid); - } - break; - - case Transaction::OP_COLL_REMOVE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - r = _remove(cid, oid); - } - break; - - case Transaction::OP_COLL_MOVE: - assert(0 == "deprecated"); - break; - - case Transaction::OP_COLL_MOVE_RENAME: - { - coll_t oldcid = i.get_cid(op->cid); - ghobject_t oldoid = i.get_oid(op->oid); - coll_t newcid = i.get_cid(op->dest_cid); - ghobject_t newoid = i.get_oid(op->dest_oid); - r = _collection_move_rename(oldcid, oldoid, newcid, newoid); - if (r == -ENOENT) - r = 0; - } - break; - - case Transaction::OP_TRY_RENAME: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oldoid = i.get_oid(op->oid); - ghobject_t newoid = i.get_oid(op->dest_oid); - r = _collection_move_rename(cid, oldoid, cid, newoid); - if (r == -ENOENT) - r = 0; - } - break; - - case Transaction::OP_COLL_SETATTR: - { - assert(0 == "not implemented"); - } - break; - - case Transaction::OP_COLL_RMATTR: - { - assert(0 == "not implemented"); - } - break; - - case Transaction::OP_COLL_RENAME: - { - assert(0 == "not implemented"); - } - break; - - case Transaction::OP_OMAP_CLEAR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - r = _omap_clear(cid, oid); - } - break; - case Transaction::OP_OMAP_SETKEYS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - bufferlist aset_bl; - i.decode_attrset_bl(&aset_bl); - r = _omap_setkeys(cid, oid, aset_bl); - } - break; - case Transaction::OP_OMAP_RMKEYS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - bufferlist keys_bl; - i.decode_keyset_bl(&keys_bl); - r = _omap_rmkeys(cid, oid, keys_bl); - } - break; - case Transaction::OP_OMAP_RMKEYRANGE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string first, last; - first = i.decode_string(); - last = i.decode_string(); - r = _omap_rmkeyrange(cid, oid, first, last); - } - break; - case Transaction::OP_OMAP_SETHEADER: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - bufferlist bl; - i.decode_bl(bl); - r = _omap_setheader(cid, oid, bl); - } - break; - case Transaction::OP_SPLIT_COLLECTION: - assert(0 == "deprecated"); - break; - case Transaction::OP_SPLIT_COLLECTION2: - { - coll_t cid = i.get_cid(op->cid); - uint32_t bits = op->split_bits; - uint32_t rem = op->split_rem; - coll_t dest = i.get_cid(op->dest_cid); - r = _split_collection(cid, bits, rem, dest); - } - break; - - case Transaction::OP_SETALLOCHINT: - { - r = 0; - } - break; - - default: - derr << "bad op " << op->op << dendl; - ceph_abort(); - } - - if (r < 0) { - bool ok = false; - - if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE || - op->op == Transaction::OP_CLONE || - op->op == Transaction::OP_CLONERANGE2 || - op->op == Transaction::OP_COLL_ADD)) - // -ENOENT is usually okay - ok = true; - if (r == -ENODATA) - ok = true; - - if (!ok) { - const char *msg = "unexpected error code"; - - if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE || - op->op == Transaction::OP_CLONE || - op->op == Transaction::OP_CLONERANGE2)) - msg = "ENOENT on clone suggests osd bug"; - - if (r == -ENOSPC) - // For now, if we hit _any_ ENOSPC, crash, before we do any damage - // by partially applying transactions. - msg = "ENOSPC from MemStore, misconfigured cluster or insufficient memory"; - - if (r == -ENOTEMPTY) { - msg = "ENOTEMPTY suggests garbage data in osd data dir"; - dump_all(); - } - - derr << " error " << cpp_strerror(r) << " not handled on operation " << op->op - << " (op " << pos << ", counting from 0)" << dendl; - dout(0) << msg << dendl; - dout(0) << " transaction dump:\n"; - JSONFormatter f(true); - f.open_object_section("transaction"); - t.dump(&f); - f.close_section(); - f.flush(*_dout); - *_dout << dendl; - assert(0 == "unexpected error"); - } - } - - ++pos; - } -} - -int MemStore::_touch(const coll_t& cid, const ghobject_t& oid) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - c->get_or_create_object(oid); - return 0; -} - -int MemStore::_write(const coll_t& cid, const ghobject_t& oid, - uint64_t offset, size_t len, const bufferlist& bl, - uint32_t fadvise_flags) -{ - dout(10) << __func__ << " " << cid << " " << oid << " " - << offset << "~" << len << dendl; - assert(len == bl.length()); - - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_or_create_object(oid); - if (len > 0) { - const ssize_t old_size = o->get_size(); - o->write(offset, bl); - used_bytes += (o->get_size() - old_size); - } - - return 0; -} - -int MemStore::_zero(const coll_t& cid, const ghobject_t& oid, - uint64_t offset, size_t len) -{ - dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~" - << len << dendl; - bufferlist bl; - bl.append_zero(len); - return _write(cid, oid, offset, len, bl); -} - -int MemStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size) -{ - dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - const ssize_t old_size = o->get_size(); - int r = o->truncate(size); - used_bytes += (o->get_size() - old_size); - return r; -} - -int MemStore::_remove(const coll_t& cid, const ghobject_t& oid) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - RWLock::WLocker l(c->lock); - - auto i = c->object_hash.find(oid); - if (i == c->object_hash.end()) - return -ENOENT; - used_bytes -= i->second->get_size(); - c->object_hash.erase(i); - c->object_map.erase(oid); - - return 0; -} - -int MemStore::_setattrs(const coll_t& cid, const ghobject_t& oid, - map& aset) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->xattr_mutex); - for (map::const_iterator p = aset.begin(); p != aset.end(); ++p) - o->xattr[p->first] = p->second; - return 0; -} - -int MemStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) -{ - dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->xattr_mutex); - auto i = o->xattr.find(name); - if (i == o->xattr.end()) - return -ENODATA; - o->xattr.erase(i); - return 0; -} - -int MemStore::_rmattrs(const coll_t& cid, const ghobject_t& oid) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->xattr_mutex); - o->xattr.clear(); - return 0; -} - -int MemStore::_clone(const coll_t& cid, const ghobject_t& oldoid, - const ghobject_t& newoid) -{ - dout(10) << __func__ << " " << cid << " " << oldoid - << " -> " << newoid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef oo = c->get_object(oldoid); - if (!oo) - return -ENOENT; - ObjectRef no = c->get_or_create_object(newoid); - used_bytes += oo->get_size() - no->get_size(); - no->clone(oo.get(), 0, oo->get_size(), 0); - - // take xattr and omap locks with std::lock() - std::unique_lock - ox_lock(oo->xattr_mutex, std::defer_lock), - nx_lock(no->xattr_mutex, std::defer_lock), - oo_lock(oo->omap_mutex, std::defer_lock), - no_lock(no->omap_mutex, std::defer_lock); - std::lock(ox_lock, nx_lock, oo_lock, no_lock); - - no->omap_header = oo->omap_header; - no->omap = oo->omap; - no->xattr = oo->xattr; - return 0; -} - -int MemStore::_clone_range(const coll_t& cid, const ghobject_t& oldoid, - const ghobject_t& newoid, - uint64_t srcoff, uint64_t len, uint64_t dstoff) -{ - dout(10) << __func__ << " " << cid << " " - << oldoid << " " << srcoff << "~" << len << " -> " - << newoid << " " << dstoff << "~" << len - << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef oo = c->get_object(oldoid); - if (!oo) - return -ENOENT; - ObjectRef no = c->get_or_create_object(newoid); - if (srcoff >= oo->get_size()) - return 0; - if (srcoff + len >= oo->get_size()) - len = oo->get_size() - srcoff; - - const ssize_t old_size = no->get_size(); - no->clone(oo.get(), srcoff, len, dstoff); - used_bytes += (no->get_size() - old_size); - - return len; -} - -int MemStore::_omap_clear(const coll_t& cid, const ghobject_t &oid) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - o->omap.clear(); - o->omap_header.clear(); - return 0; -} - -int MemStore::_omap_setkeys(const coll_t& cid, const ghobject_t &oid, - bufferlist& aset_bl) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - bufferlist::iterator p = aset_bl.begin(); - __u32 num; - ::decode(num, p); - while (num--) { - string key; - ::decode(key, p); - ::decode(o->omap[key], p); - } - return 0; -} - -int MemStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &oid, - bufferlist& keys_bl) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - bufferlist::iterator p = keys_bl.begin(); - __u32 num; - ::decode(num, p); - while (num--) { - string key; - ::decode(key, p); - o->omap.erase(key); - } - return 0; -} - -int MemStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid, - const string& first, const string& last) -{ - dout(10) << __func__ << " " << cid << " " << oid << " " << first - << " " << last << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - map::iterator p = o->omap.lower_bound(first); - map::iterator e = o->omap.lower_bound(last); - o->omap.erase(p, e); - return 0; -} - -int MemStore::_omap_setheader(const coll_t& cid, const ghobject_t &oid, - const bufferlist &bl) -{ - dout(10) << __func__ << " " << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - - ObjectRef o = c->get_object(oid); - if (!o) - return -ENOENT; - std::lock_guard lock(o->omap_mutex); - o->omap_header = bl; - return 0; -} - -int MemStore::_create_collection(const coll_t& cid, int bits) -{ - dout(10) << __func__ << " " << cid << dendl; - RWLock::WLocker l(coll_lock); - auto result = coll_map.insert(std::make_pair(cid, CollectionRef())); - if (!result.second) - return -EEXIST; - result.first->second.reset(new Collection(cct, cid)); - result.first->second->bits = bits; - return 0; -} - -int MemStore::_destroy_collection(const coll_t& cid) -{ - dout(10) << __func__ << " " << cid << dendl; - RWLock::WLocker l(coll_lock); - ceph::unordered_map::iterator cp = coll_map.find(cid); - if (cp == coll_map.end()) - return -ENOENT; - { - RWLock::RLocker l2(cp->second->lock); - if (!cp->second->object_map.empty()) - return -ENOTEMPTY; - cp->second->exists = false; - } - used_bytes -= cp->second->used_bytes(); - coll_map.erase(cp); - return 0; -} - -int MemStore::_collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid) -{ - dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - CollectionRef oc = get_collection(ocid); - if (!oc) - return -ENOENT; - RWLock::WLocker l1(MIN(&(*c), &(*oc))->lock); - RWLock::WLocker l2(MAX(&(*c), &(*oc))->lock); - - if (c->object_hash.count(oid)) - return -EEXIST; - if (oc->object_hash.count(oid) == 0) - return -ENOENT; - ObjectRef o = oc->object_hash[oid]; - c->object_map[oid] = o; - c->object_hash[oid] = o; - return 0; -} - -int MemStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, - coll_t cid, const ghobject_t& oid) -{ - dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> " - << cid << " " << oid << dendl; - CollectionRef c = get_collection(cid); - if (!c) - return -ENOENT; - CollectionRef oc = get_collection(oldcid); - if (!oc) - return -ENOENT; - - // note: c and oc may be the same - assert(&(*c) == &(*oc)); - c->lock.get_write(); - - int r = -EEXIST; - if (c->object_hash.count(oid)) - goto out; - r = -ENOENT; - if (oc->object_hash.count(oldoid) == 0) - goto out; - { - ObjectRef o = oc->object_hash[oldoid]; - c->object_map[oid] = o; - c->object_hash[oid] = o; - oc->object_map.erase(oldoid); - oc->object_hash.erase(oldoid); - } - r = 0; - out: - c->lock.put_write(); - return r; -} - -int MemStore::_split_collection(const coll_t& cid, uint32_t bits, uint32_t match, - coll_t dest) -{ - dout(10) << __func__ << " " << cid << " " << bits << " " << match << " " - << dest << dendl; - CollectionRef sc = get_collection(cid); - if (!sc) - return -ENOENT; - CollectionRef dc = get_collection(dest); - if (!dc) - return -ENOENT; - RWLock::WLocker l1(MIN(&(*sc), &(*dc))->lock); - RWLock::WLocker l2(MAX(&(*sc), &(*dc))->lock); - - map::iterator p = sc->object_map.begin(); - while (p != sc->object_map.end()) { - if (p->first.match(bits, match)) { - dout(20) << " moving " << p->first << dendl; - dc->object_map.insert(make_pair(p->first, p->second)); - dc->object_hash.insert(make_pair(p->first, p->second)); - sc->object_hash.erase(p->first); - sc->object_map.erase(p++); - } else { - ++p; - } - } - - sc->bits = bits; - assert(dc->bits == (int)bits); - - return 0; -} -namespace { -struct BufferlistObject : public MemStore::Object { - Spinlock mutex; - bufferlist data; - - size_t get_size() const override { return data.length(); } - - int read(uint64_t offset, uint64_t len, bufferlist &bl) override; - int write(uint64_t offset, const bufferlist &bl) override; - int clone(Object *src, uint64_t srcoff, uint64_t len, - uint64_t dstoff) override; - int truncate(uint64_t offset) override; - - void encode(bufferlist& bl) const override { - ENCODE_START(1, 1, bl); - ::encode(data, bl); - encode_base(bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::iterator& p) override { - DECODE_START(1, p); - ::decode(data, p); - decode_base(p); - DECODE_FINISH(p); - } -}; -} -// BufferlistObject -int BufferlistObject::read(uint64_t offset, uint64_t len, - bufferlist &bl) -{ - std::lock_guard lock(mutex); - bl.substr_of(data, offset, len); - return bl.length(); -} - -int BufferlistObject::write(uint64_t offset, const bufferlist &src) -{ - unsigned len = src.length(); - - std::lock_guard lock(mutex); - - // before - bufferlist newdata; - if (get_size() >= offset) { - newdata.substr_of(data, 0, offset); - } else { - if (get_size()) { - newdata.substr_of(data, 0, get_size()); - } - newdata.append_zero(offset - get_size()); - } - - newdata.append(src); - - // after - if (get_size() > offset + len) { - bufferlist tail; - tail.substr_of(data, offset + len, get_size() - (offset + len)); - newdata.append(tail); - } - - data.claim(newdata); - return 0; -} - -int BufferlistObject::clone(Object *src, uint64_t srcoff, - uint64_t len, uint64_t dstoff) -{ - auto srcbl = dynamic_cast(src); - if (srcbl == nullptr) - return -ENOTSUP; - - bufferlist bl; - { - std::lock_guard lock(srcbl->mutex); - if (srcoff == dstoff && len == src->get_size()) { - data = srcbl->data; - return 0; - } - bl.substr_of(srcbl->data, srcoff, len); - } - return write(dstoff, bl); -} - -int BufferlistObject::truncate(uint64_t size) -{ - std::lock_guard lock(mutex); - if (get_size() > size) { - bufferlist bl; - bl.substr_of(data, 0, size); - data.claim(bl); - } else if (get_size() == size) { - // do nothing - } else { - data.append_zero(size - get_size()); - } - return 0; -} - -// PageSetObject - -struct MemStore::PageSetObject : public Object { - PageSet data; - uint64_t data_len; -#if defined(__GLIBCXX__) - // use a thread-local vector for the pages returned by PageSet, so we - // can avoid allocations in read/write() - static thread_local PageSet::page_vector tls_pages; -#endif - - explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {} - - size_t get_size() const override { return data_len; } - - int read(uint64_t offset, uint64_t len, bufferlist &bl) override; - int write(uint64_t offset, const bufferlist &bl) override; - int clone(Object *src, uint64_t srcoff, uint64_t len, - uint64_t dstoff) override; - int truncate(uint64_t offset) override; - - void encode(bufferlist& bl) const override { - ENCODE_START(1, 1, bl); - ::encode(data_len, bl); - data.encode(bl); - encode_base(bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::iterator& p) override { - DECODE_START(1, p); - ::decode(data_len, p); - data.decode(p); - decode_base(p); - DECODE_FINISH(p); - } -}; - -#if defined(__GLIBCXX__) -// use a thread-local vector for the pages returned by PageSet, so we -// can avoid allocations in read/write() -thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages; -#define DEFINE_PAGE_VECTOR(name) -#else -#define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name; -#endif - -int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl) -{ - const auto start = offset; - const auto end = offset + len; - auto remaining = len; - - DEFINE_PAGE_VECTOR(tls_pages); - data.get_range(offset, len, tls_pages); - - // allocate a buffer for the data - buffer::ptr buf(len); - - auto p = tls_pages.begin(); - while (remaining) { - // no more pages in range - if (p == tls_pages.end() || (*p)->offset >= end) { - buf.zero(offset - start, remaining); - break; - } - auto page = *p; - - // fill any holes between pages with zeroes - if (page->offset > offset) { - const auto count = std::min(remaining, page->offset - offset); - buf.zero(offset - start, count); - remaining -= count; - offset = page->offset; - if (!remaining) - break; - } - - // read from page - const auto page_offset = offset - page->offset; - const auto count = min(remaining, data.get_page_size() - page_offset); - - buf.copy_in(offset - start, count, page->data + page_offset); - - remaining -= count; - offset += count; - - ++p; - } - - tls_pages.clear(); // drop page refs - - bl.append(std::move(buf)); - return len; -} - -int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src) -{ - unsigned len = src.length(); - - DEFINE_PAGE_VECTOR(tls_pages); - // make sure the page range is allocated - data.alloc_range(offset, src.length(), tls_pages); - - auto page = tls_pages.begin(); - - auto p = src.begin(); - while (len > 0) { - unsigned page_offset = offset - (*page)->offset; - unsigned pageoff = data.get_page_size() - page_offset; - unsigned count = min(len, pageoff); - p.copy(count, (*page)->data + page_offset); - offset += count; - len -= count; - if (count == pageoff) - ++page; - } - if (data_len < offset) - data_len = offset; - tls_pages.clear(); // drop page refs - return 0; -} - -int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff, - uint64_t len, uint64_t dstoff) -{ - const int64_t delta = dstoff - srcoff; - - auto &src_data = static_cast(src)->data; - const uint64_t src_page_size = src_data.get_page_size(); - - auto &dst_data = data; - const auto dst_page_size = dst_data.get_page_size(); - - DEFINE_PAGE_VECTOR(tls_pages); - PageSet::page_vector dst_pages; - - while (len) { - // limit to 16 pages at a time so tls_pages doesn't balloon in size - auto count = std::min(len, (uint64_t)src_page_size * 16); - src_data.get_range(srcoff, count, tls_pages); - - // allocate the destination range - // TODO: avoid allocating pages for holes in the source range - dst_data.alloc_range(srcoff + delta, count, dst_pages); - auto dst_iter = dst_pages.begin(); - - for (auto &src_page : tls_pages) { - auto sbegin = std::max(srcoff, src_page->offset); - auto send = std::min(srcoff + count, src_page->offset + src_page_size); - - // zero-fill holes before src_page - if (srcoff < sbegin) { - while (dst_iter != dst_pages.end()) { - auto &dst_page = *dst_iter; - auto dbegin = std::max(srcoff + delta, dst_page->offset); - auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size); - std::fill(dst_page->data + dbegin - dst_page->offset, - dst_page->data + dend - dst_page->offset, 0); - if (dend < dst_page->offset + dst_page_size) - break; - ++dst_iter; - } - const auto c = sbegin - srcoff; - count -= c; - len -= c; - } - - // copy data from src page to dst pages - while (dst_iter != dst_pages.end()) { - auto &dst_page = *dst_iter; - auto dbegin = std::max(sbegin + delta, dst_page->offset); - auto dend = std::min(send + delta, dst_page->offset + dst_page_size); - - std::copy(src_page->data + (dbegin - delta) - src_page->offset, - src_page->data + (dend - delta) - src_page->offset, - dst_page->data + dbegin - dst_page->offset); - if (dend < dst_page->offset + dst_page_size) - break; - ++dst_iter; - } - - const auto c = send - sbegin; - count -= c; - len -= c; - srcoff = send; - dstoff = send + delta; - } - tls_pages.clear(); // drop page refs - - // zero-fill holes after the last src_page - if (count > 0) { - while (dst_iter != dst_pages.end()) { - auto &dst_page = *dst_iter; - auto dbegin = std::max(dstoff, dst_page->offset); - auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size); - std::fill(dst_page->data + dbegin - dst_page->offset, - dst_page->data + dend - dst_page->offset, 0); - ++dst_iter; - } - srcoff += count; - dstoff += count; - len -= count; - } - dst_pages.clear(); // drop page refs - } - - // update object size - if (data_len < dstoff) - data_len = dstoff; - return 0; -} - -int MemStore::PageSetObject::truncate(uint64_t size) -{ - data.free_pages_after(size); - data_len = size; - - const auto page_size = data.get_page_size(); - const auto page_offset = size & ~(page_size-1); - if (page_offset == size) - return 0; - - DEFINE_PAGE_VECTOR(tls_pages); - // write zeroes to the rest of the last page - data.get_range(page_offset, page_size, tls_pages); - if (tls_pages.empty()) - return 0; - - auto page = tls_pages.begin(); - auto data = (*page)->data; - std::fill(data + (size - page_offset), data + page_size, 0); - tls_pages.clear(); // drop page ref - return 0; -} - - -MemStore::ObjectRef MemStore::Collection::create_object() const { - if (use_page_set) - return new PageSetObject(cct->_conf->memstore_page_size); - return new BufferlistObject(); -}