1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
23 #include "osd/osd_types.h"
25 #include "include/compat.h"
26 #include "include/stringify.h"
27 #include "common/errno.h"
28 #include "common/safe_io.h"
29 #include "common/Formatter.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_kstore
39 * superblock, features
40 * refcounted extents (for efficient clone)
44 const string PREFIX_SUPER = "S"; // field -> value
45 const string PREFIX_COLL = "C"; // collection name -> (nothing)
46 const string PREFIX_OBJ = "O"; // object name -> onode
47 const string PREFIX_DATA = "D"; // nid + offset -> data
48 const string PREFIX_OMAP = "M"; // u64 + keyname -> value
51 * object name key structure
53 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
54 * encoded u64: poolid + 2^63 (so that it sorts properly)
55 * encoded u32: hash (bit reversed)
59 * escaped string: namespace
61 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
62 * we are followed just by the key. otherwise, we are followed by
63 * the key and then the object name.
65 * escaped string: object name (unless '=' above)
68 * encoded u64: generation
72 * string encoding in the key
74 * The key string needs to lexicographically sort the same way that
75 * ghobject_t does. We do this by escaping anything <= to '#' with #
76 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
79 * We use ! as a terminator for strings; this works because it is < #
80 * and will get escaped if it is present in the string.
84 static void append_escaped(const string &in, string *out)
87 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
89 snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
91 } else if (*i >= '~') {
92 snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
101 static int decode_escaped(const char *p, string *out)
103 const char *orig_p = p;
104 while (*p && *p != '!') {
105 if (*p == '#' || *p == '~') {
107 int r = sscanf(++p, "%2x", &hex);
110 out->push_back((char)hex);
113 out->push_back(*p++);
119 // some things we encode in binary (as le32 or le64); print the
120 // resulting key strings nicely
121 static string pretty_binary_string(const string& in)
125 out.reserve(in.length() * 3);
126 enum { NONE, HEX, STRING } mode = NONE;
127 unsigned from = 0, i;
128 for (i=0; i < in.length(); ++i) {
129 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
130 (mode == HEX && in.length() - i >= 4 &&
131 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
132 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
133 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
134 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
135 if (mode == STRING) {
136 out.append(in.substr(from, i - from));
143 if (in.length() - i >= 4) {
144 // print a whole u32 at once
145 snprintf(buf, sizeof(buf), "%08x",
146 (uint32_t)(((unsigned char)in[i] << 24) |
147 ((unsigned char)in[i+1] << 16) |
148 ((unsigned char)in[i+2] << 8) |
149 ((unsigned char)in[i+3] << 0)));
152 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
156 if (mode != STRING) {
163 if (mode == STRING) {
164 out.append(in.substr(from, i - from));
170 static void _key_encode_shard(shard_id_t shard, string *key)
172 // make field ordering match with ghobject_t compare operations
173 if (shard == shard_id_t::NO_SHARD) {
174 // otherwise ff will sort *after* 0, not before.
178 snprintf(buf, sizeof(buf), "%02x", (int)shard);
182 static const char *_key_decode_shard(const char *key, shard_id_t *pshard)
185 *pshard = shard_id_t::NO_SHARD;
188 int r = sscanf(key, "%x", &shard);
191 *pshard = shard_id_t(shard);
196 static void get_coll_key_range(const coll_t& cid, int bits,
197 string *temp_start, string *temp_end,
198 string *start, string *end)
206 if (cid.is_pg(&pgid)) {
207 _key_encode_shard(pgid.shard, start);
209 *temp_start = *start;
212 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, start);
213 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_start);
214 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), start);
215 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), temp_start);
217 temp_start->append(".");
219 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, end);
220 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_end);
223 hobject_t::_reverse_bits(pgid.ps()) + (1ull << (32-bits));
224 if (end_hash <= 0xffffffffull) {
225 _key_encode_u32(end_hash, end);
226 _key_encode_u32(end_hash, temp_end);
228 temp_end->append(".");
230 _key_encode_u32(0xffffffff, end);
231 _key_encode_u32(0xffffffff, temp_end);
233 temp_end->append(":");
236 _key_encode_shard(shard_id_t::NO_SHARD, start);
237 _key_encode_u64(-1ull + 0x8000000000000000ull, start);
239 _key_encode_u32(0, start);
241 _key_encode_u32(0xffffffff, end);
244 // no separate temp section
250 static int get_key_object(const string& key, ghobject_t *oid);
252 static void get_object_key(CephContext* cct, const ghobject_t& oid,
257 _key_encode_shard(oid.shard_id, key);
258 _key_encode_u64(oid.hobj.pool + 0x8000000000000000ull, key);
259 _key_encode_u32(oid.hobj.get_bitwise_key_u32(), key);
262 append_escaped(oid.hobj.nspace, key);
264 if (oid.hobj.get_key().length()) {
265 // is a key... could be < = or >.
266 // (ASCII chars < = and > sort in that order, yay)
267 if (oid.hobj.get_key() < oid.hobj.oid.name) {
269 append_escaped(oid.hobj.get_key(), key);
270 append_escaped(oid.hobj.oid.name, key);
271 } else if (oid.hobj.get_key() > oid.hobj.oid.name) {
273 append_escaped(oid.hobj.get_key(), key);
274 append_escaped(oid.hobj.oid.name, key);
278 append_escaped(oid.hobj.oid.name, key);
283 append_escaped(oid.hobj.oid.name, key);
286 _key_encode_u64(oid.hobj.snap, key);
287 _key_encode_u64(oid.generation, key);
292 int r = get_key_object(*key, &t);
294 derr << " r " << r << dendl;
295 derr << "key " << pretty_binary_string(*key) << dendl;
296 derr << "oid " << oid << dendl;
297 derr << " t " << t << dendl;
303 static int get_key_object(const string& key, ghobject_t *oid)
306 const char *p = key.c_str();
308 p = _key_decode_shard(p, &oid->shard_id);
311 p = _key_decode_u64(p, &pool);
312 oid->hobj.pool = pool - 0x8000000000000000ull;
315 p = _key_decode_u32(p, &hash);
316 oid->hobj.set_bitwise_key_u32(hash);
321 r = decode_escaped(p, &oid->hobj.nspace);
329 r = decode_escaped(p, &oid->hobj.oid.name);
333 } else if (*p == '<' || *p == '>') {
337 r = decode_escaped(p, &okey);
341 r = decode_escaped(p, &oid->hobj.oid.name);
345 oid->hobj.set_key(okey);
351 p = _key_decode_u64(p, &oid->hobj.snap.val);
352 p = _key_decode_u64(p, &oid->generation);
354 // if we get something other than a null terminator here,
355 // something goes wrong.
363 static void get_data_key(uint64_t nid, uint64_t offset, string *out)
365 _key_encode_u64(nid, out);
366 _key_encode_u64(offset, out);
370 static void get_omap_header(uint64_t id, string *out)
372 _key_encode_u64(id, out);
376 // hmm, I don't think there's any need to escape the user key since we
377 // have a clean prefix.
378 static void get_omap_key(uint64_t id, const string& key, string *out)
380 _key_encode_u64(id, out);
385 static void rewrite_omap_key(uint64_t id, string old, string *out)
387 _key_encode_u64(id, out);
388 out->append(old.substr(out->length()));
391 static void decode_omap_key(const string& key, string *user_key)
393 *user_key = key.substr(sizeof(uint64_t) + 1);
396 static void get_omap_tail(uint64_t id, string *out)
398 _key_encode_u64(id, out);
407 #define dout_prefix *_dout << "kstore.onode(" << this << ") "
409 void KStore::Onode::flush()
411 std::unique_lock<std::mutex> l(flush_lock);
412 dout(20) << __func__ << " " << flush_txns << dendl;
413 while (!flush_txns.empty())
415 dout(20) << __func__ << " done" << dendl;
421 #define dout_prefix *_dout << "kstore.lru(" << this << ") "
423 void KStore::OnodeHashLRU::_touch(OnodeRef o)
425 lru_list_t::iterator p = lru.iterator_to(*o);
430 void KStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
432 std::lock_guard<std::mutex> l(lock);
433 dout(30) << __func__ << " " << oid << " " << o << dendl;
434 assert(onode_map.count(oid) == 0);
439 KStore::OnodeRef KStore::OnodeHashLRU::lookup(const ghobject_t& oid)
441 std::lock_guard<std::mutex> l(lock);
442 dout(30) << __func__ << dendl;
443 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
444 if (p == onode_map.end()) {
445 dout(30) << __func__ << " " << oid << " miss" << dendl;
448 dout(30) << __func__ << " " << oid << " hit " << p->second << dendl;
453 void KStore::OnodeHashLRU::clear()
455 std::lock_guard<std::mutex> l(lock);
456 dout(10) << __func__ << dendl;
461 void KStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
462 const ghobject_t& new_oid)
464 std::lock_guard<std::mutex> l(lock);
465 dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
466 ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
467 po = onode_map.find(old_oid);
468 pn = onode_map.find(new_oid);
470 assert(po != onode_map.end());
471 if (pn != onode_map.end()) {
472 lru_list_t::iterator p = lru.iterator_to(*pn->second);
476 OnodeRef o = po->second;
478 // install a non-existent onode it its place
479 po->second.reset(new Onode(cct, old_oid, o->key));
480 lru.push_back(*po->second);
483 onode_map.insert(make_pair(new_oid, o));
486 get_object_key(cct, new_oid, &o->key);
489 bool KStore::OnodeHashLRU::get_next(
490 const ghobject_t& after,
491 pair<ghobject_t,OnodeRef> *next)
493 std::lock_guard<std::mutex> l(lock);
494 dout(20) << __func__ << " after " << after << dendl;
496 if (after == ghobject_t()) {
500 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.begin();
501 assert(p != onode_map.end());
502 next->first = p->first;
503 next->second = p->second;
507 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(after);
508 assert(p != onode_map.end()); // for now
509 lru_list_t::iterator pi = lru.iterator_to(*p->second);
511 if (pi == lru.end()) {
514 next->first = pi->oid;
515 next->second = onode_map[pi->oid];
519 int KStore::OnodeHashLRU::trim(int max)
521 std::lock_guard<std::mutex> l(lock);
522 dout(20) << __func__ << " max " << max
523 << " size " << onode_map.size() << dendl;
525 int num = onode_map.size() - max;
526 if (onode_map.size() == 0 || num <= 0)
527 return 0; // don't even try
529 lru_list_t::iterator p = lru.end();
534 int refs = o->nref.load();
536 dout(20) << __func__ << " " << o->oid << " has " << refs
537 << " refs; stopping with " << num << " left to trim" << dendl;
540 dout(30) << __func__ << " trim " << o->oid << dendl;
541 if (p != lru.begin()) {
547 o->get(); // paranoia
548 onode_map.erase(o->oid);
556 // =======================================================
561 #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
563 KStore::Collection::Collection(KStore *ns, coll_t c)
566 lock("KStore::Collection::lock", true, false),
567 onode_map(store->cct)
571 KStore::OnodeRef KStore::Collection::get_onode(
572 const ghobject_t& oid,
575 assert(create ? lock.is_wlocked() : lock.is_locked());
578 if (cid.is_pg(&pgid)) {
579 if (!oid.match(cnode.bits, pgid.ps())) {
580 lderr(store->cct) << __func__ << " oid " << oid << " not part of "
581 << pgid << " bits " << cnode.bits << dendl;
586 OnodeRef o = onode_map.lookup(oid);
591 get_object_key(store->cct, oid, &key);
593 ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
594 << pretty_binary_string(key) << dendl;
597 int r = store->db->get(PREFIX_OBJ, key, &v);
598 ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
600 if (v.length() == 0) {
601 assert(r == -ENOENT);
606 on = new Onode(store->cct, oid, key);
611 on = new Onode(store->cct, oid, key);
613 bufferlist::iterator p = v.begin();
614 ::decode(on->onode, p);
617 onode_map.add(oid, o);
623 // =======================================================
626 #define dout_prefix *_dout << "kstore(" << path << ") "
628 KStore::KStore(CephContext *cct, const string& path)
629 : ObjectStore(cct, path),
634 coll_lock("KStore::coll_lock"),
637 throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
638 throttle_bytes(cct, "kstore_max_bytes", cct->_conf->kstore_max_bytes),
640 kv_sync_thread(this),
655 void KStore::_init_logger()
658 PerfCountersBuilder b(cct, "KStore",
659 l_kstore_first, l_kstore_last);
660 b.add_time_avg(l_kstore_state_prepare_lat, "state_prepare_lat", "Average prepare state latency");
661 b.add_time_avg(l_kstore_state_kv_queued_lat, "state_kv_queued_lat", "Average kv_queued state latency");
662 b.add_time_avg(l_kstore_state_kv_done_lat, "state_kv_done_lat", "Average kv_done state latency");
663 b.add_time_avg(l_kstore_state_finishing_lat, "state_finishing_lat", "Average finishing state latency");
664 b.add_time_avg(l_kstore_state_done_lat, "state_done_lat", "Average done state latency");
665 logger = b.create_perf_counters();
666 cct->get_perfcounters_collection()->add(logger);
669 void KStore::_shutdown_logger()
672 cct->get_perfcounters_collection()->remove(logger);
676 int KStore::_open_path()
679 path_fd = ::open(path.c_str(), O_DIRECTORY);
682 derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
689 void KStore::_close_path()
691 VOID_TEMP_FAILURE_RETRY(::close(path_fd));
695 int KStore::_open_fsid(bool create)
701 fsid_fd = ::openat(path_fd, "fsid", flags, 0644);
704 derr << __func__ << " " << cpp_strerror(err) << dendl;
710 int KStore::_read_fsid(uuid_d *uuid)
713 memset(fsid_str, 0, sizeof(fsid_str));
714 int ret = safe_read(fsid_fd, fsid_str, sizeof(fsid_str));
716 derr << __func__ << " failed: " << cpp_strerror(ret) << dendl;
723 if (!uuid->parse(fsid_str)) {
724 derr << __func__ << " unparsable uuid " << fsid_str << dendl;
730 int KStore::_write_fsid()
732 int r = ::ftruncate(fsid_fd, 0);
735 derr << __func__ << " fsid truncate failed: " << cpp_strerror(r) << dendl;
738 string str = stringify(fsid) + "\n";
739 r = safe_write(fsid_fd, str.c_str(), str.length());
741 derr << __func__ << " fsid write failed: " << cpp_strerror(r) << dendl;
744 r = ::fsync(fsid_fd);
747 derr << __func__ << " fsid fsync failed: " << cpp_strerror(r) << dendl;
753 void KStore::_close_fsid()
755 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
759 int KStore::_lock_fsid()
762 memset(&l, 0, sizeof(l));
764 l.l_whence = SEEK_SET;
767 int r = ::fcntl(fsid_fd, F_SETLK, &l);
770 derr << __func__ << " failed to lock " << path << "/fsid"
771 << " (is another ceph-osd still running?)"
772 << cpp_strerror(err) << dendl;
778 bool KStore::test_mount_in_use()
780 // most error conditions mean the mount is not in use (e.g., because
781 // it doesn't exist). only if we fail to lock do we conclude it is
784 int r = _open_path();
787 r = _open_fsid(false);
792 ret = true; // if we can't lock, it is in use
799 int KStore::_open_db(bool create)
804 snprintf(fn, sizeof(fn), "%s/db", path.c_str());
808 kv_backend = cct->_conf->kstore_backend;
810 r = read_meta("kv_backend", &kv_backend);
812 derr << __func__ << " uanble to read 'kv_backend' meta" << dendl;
816 dout(10) << __func__ << " kv_backend = " << kv_backend << dendl;
819 int r = ::mkdir(fn, 0755);
822 if (r < 0 && r != -EEXIST) {
823 derr << __func__ << " failed to create " << fn << ": " << cpp_strerror(r)
829 char walfn[PATH_MAX];
830 snprintf(walfn, sizeof(walfn), "%s/db.wal", path.c_str());
831 r = ::mkdir(walfn, 0755);
834 if (r < 0 && r != -EEXIST) {
835 derr << __func__ << " failed to create " << walfn
836 << ": " << cpp_strerror(r)
842 db = KeyValueDB::create(cct, kv_backend, fn);
844 derr << __func__ << " error creating db" << dendl;
848 if (kv_backend == "rocksdb")
849 options = cct->_conf->kstore_rocksdb_options;
853 r = db->create_and_open(err);
857 derr << __func__ << " erroring opening db: " << err.str() << dendl;
862 dout(1) << __func__ << " opened " << kv_backend
863 << " path " << fn << " options " << options << dendl;
867 void KStore::_close_db()
874 int KStore::_open_collections(int *errors)
876 assert(coll_map.empty());
877 KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
878 for (it->upper_bound(string());
882 if (cid.parse(it->key())) {
883 CollectionRef c(new Collection(this, cid));
884 bufferlist bl = it->value();
885 bufferlist::iterator p = bl.begin();
887 ::decode(c->cnode, p);
888 } catch (buffer::error& e) {
889 derr << __func__ << " failed to decode cnode, key:"
890 << pretty_binary_string(it->key()) << dendl;
893 dout(20) << __func__ << " opened " << cid << dendl;
896 derr << __func__ << " unrecognized collection " << it->key() << dendl;
906 dout(1) << __func__ << " path " << path << dendl;
914 r = _open_fsid(true);
922 r = _read_fsid(&old_fsid);
923 if (r < 0 || old_fsid.is_zero()) {
924 if (fsid.is_zero()) {
925 fsid.generate_random();
926 dout(1) << __func__ << " generated fsid " << fsid << dendl;
928 dout(1) << __func__ << " using provided fsid " << fsid << dendl;
930 // we'll write it last.
932 if (!fsid.is_zero() && fsid != old_fsid) {
933 derr << __func__ << " on-disk fsid " << old_fsid
934 << " != provided " << fsid << dendl;
939 dout(1) << __func__ << " already created, fsid is " << fsid << dendl;
947 r = write_meta("kv_backend", cct->_conf->kstore_backend);
951 r = write_meta("type", "kstore");
955 // indicate mkfs completion/success by writing the fsid file
958 dout(10) << __func__ << " success" << dendl;
960 derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
973 dout(1) << __func__ << " path " << path << dendl;
975 if (cct->_conf->kstore_fsck_on_mount) {
976 int rc = fsck(cct->_conf->kstore_fsck_on_mount_deep);
981 int r = _open_path();
984 r = _open_fsid(false);
988 r = _read_fsid(&fsid);
1000 r = _open_super_meta();
1004 r = _open_collections();
1009 kv_sync_thread.create("kstore_kv_sync");
1023 int KStore::umount()
1026 dout(1) << __func__ << dendl;
1029 _reap_collections();
1032 dout(20) << __func__ << " stopping kv thread" << dendl;
1034 dout(20) << __func__ << " draining finisher" << dendl;
1035 finisher.wait_for_empty();
1036 dout(20) << __func__ << " stopping finisher" << dendl;
1038 dout(20) << __func__ << " closing" << dendl;
1047 int KStore::fsck(bool deep)
1049 dout(1) << __func__ << dendl;
1051 dout(1) << __func__ << " finish with " << errors << " errors" << dendl;
1055 void KStore::_sync()
1057 dout(10) << __func__ << dendl;
1059 std::unique_lock<std::mutex> l(kv_lock);
1060 while (!kv_committing.empty() ||
1061 !kv_queue.empty()) {
1062 dout(20) << " waiting for kv to commit" << dendl;
1063 kv_sync_cond.wait(l);
1066 dout(10) << __func__ << " done" << dendl;
1069 int KStore::statfs(struct store_statfs_t* buf)
1071 return db->get_statfs(buf);
1077 KStore::CollectionRef KStore::_get_collection(coll_t cid)
1079 RWLock::RLocker l(coll_lock);
1080 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1081 if (cp == coll_map.end())
1082 return CollectionRef();
1086 void KStore::_queue_reap_collection(CollectionRef& c)
1088 dout(10) << __func__ << " " << c->cid << dendl;
1089 std::lock_guard<std::mutex> l(reap_lock);
1090 removed_collections.push_back(c);
1093 void KStore::_reap_collections()
1095 list<CollectionRef> removed_colls;
1096 std::lock_guard<std::mutex> l(reap_lock);
1097 removed_colls.swap(removed_collections);
1099 for (list<CollectionRef>::iterator p = removed_colls.begin();
1100 p != removed_colls.end();
1102 CollectionRef c = *p;
1103 dout(10) << __func__ << " " << c->cid << dendl;
1105 pair<ghobject_t,OnodeRef> next;
1106 while (c->onode_map.get_next(next.first, &next)) {
1107 assert(!next.second->exists);
1108 if (!next.second->flush_txns.empty()) {
1109 dout(10) << __func__ << " " << c->cid << " " << next.second->oid
1110 << " flush_txns " << next.second->flush_txns << dendl;
1115 c->onode_map.clear();
1116 dout(10) << __func__ << " " << c->cid << " done" << dendl;
1119 dout(10) << __func__ << " all reaped" << dendl;
1125 bool KStore::exists(const coll_t& cid, const ghobject_t& oid)
1127 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1128 CollectionRef c = _get_collection(cid);
1131 RWLock::RLocker l(c->lock);
1132 OnodeRef o = c->get_onode(oid, false);
1133 if (!o || !o->exists)
1140 const ghobject_t& oid,
1144 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1145 CollectionRef c = _get_collection(cid);
1148 RWLock::RLocker l(c->lock);
1149 OnodeRef o = c->get_onode(oid, false);
1150 if (!o || !o->exists)
1152 st->st_size = o->onode.size;
1153 st->st_blksize = 4096;
1154 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
1159 int KStore::set_collection_opts(
1161 const pool_opts_t& opts)
1168 const ghobject_t& oid,
1174 dout(15) << __func__ << " " << cid << " " << oid
1175 << " " << offset << "~" << length
1178 CollectionRef c = _get_collection(cid);
1181 RWLock::RLocker l(c->lock);
1185 OnodeRef o = c->get_onode(oid, false);
1186 if (!o || !o->exists) {
1191 if (offset == length && offset == 0)
1192 length = o->onode.size;
1194 r = _do_read(o, offset, length, bl, op_flags);
1197 dout(10) << __func__ << " " << cid << " " << oid
1198 << " " << offset << "~" << length
1199 << " = " << r << dendl;
1203 int KStore::_do_read(
1211 uint64_t stripe_size = o->onode.stripe_size;
1212 uint64_t stripe_off;
1214 dout(20) << __func__ << " " << offset << "~" << length << " size "
1215 << o->onode.size << " nid " << o->onode.nid << dendl;
1218 if (offset > o->onode.size) {
1221 if (offset + length > o->onode.size) {
1222 length = o->onode.size - offset;
1224 if (stripe_size == 0) {
1225 bl.append_zero(length);
1232 stripe_off = offset % stripe_size;
1233 while (length > 0) {
1235 _do_read_stripe(o, offset - stripe_off, &stripe);
1236 dout(30) << __func__ << " stripe " << offset - stripe_off << " got "
1237 << stripe.length() << dendl;
1238 unsigned swant = MIN(stripe_size - stripe_off, length);
1239 if (stripe.length()) {
1240 if (swant == stripe.length()) {
1241 bl.claim_append(stripe);
1242 dout(30) << __func__ << " taking full stripe" << dendl;
1245 if (stripe_off < stripe.length()) {
1246 l = MIN(stripe.length() - stripe_off, swant);
1248 t.substr_of(stripe, stripe_off, l);
1250 dout(30) << __func__ << " taking " << stripe_off << "~" << l << dendl;
1253 bl.append_zero(swant - l);
1254 dout(30) << __func__ << " adding " << swant - l << " zeros" << dendl;
1258 dout(30) << __func__ << " generating " << swant << " zeros" << dendl;
1259 bl.append_zero(swant);
1266 dout(30) << " result:\n";
1276 const ghobject_t& oid,
1281 map<uint64_t, uint64_t> m;
1282 int r = fiemap(cid, oid, offset, len, m);
1292 const ghobject_t& oid,
1295 map<uint64_t, uint64_t>& destmap)
1297 CollectionRef c = _get_collection(cid);
1300 RWLock::RLocker l(c->lock);
1302 OnodeRef o = c->get_onode(oid, false);
1303 if (!o || !o->exists) {
1307 if (offset > o->onode.size)
1310 if (offset + len > o->onode.size) {
1311 len = o->onode.size - offset;
1314 dout(20) << __func__ << " " << offset << "~" << len << " size "
1315 << o->onode.size << dendl;
1317 // FIXME: do something smarter here
1318 destmap[0] = o->onode.size;
1321 dout(20) << __func__ << " " << offset << "~" << len
1322 << " size = 0 (" << destmap << ")" << dendl;
1326 int KStore::getattr(
1328 const ghobject_t& oid,
1332 dout(15) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1333 CollectionRef c = _get_collection(cid);
1336 RWLock::RLocker l(c->lock);
1340 OnodeRef o = c->get_onode(oid, false);
1341 if (!o || !o->exists) {
1346 if (!o->onode.attrs.count(k)) {
1350 value = o->onode.attrs[k];
1353 dout(10) << __func__ << " " << cid << " " << oid << " " << name
1354 << " = " << r << dendl;
1358 int KStore::getattrs(
1360 const ghobject_t& oid,
1361 map<string,bufferptr>& aset)
1363 dout(15) << __func__ << " " << cid << " " << oid << dendl;
1364 CollectionRef c = _get_collection(cid);
1367 RWLock::RLocker l(c->lock);
1370 OnodeRef o = c->get_onode(oid, false);
1371 if (!o || !o->exists) {
1375 aset = o->onode.attrs;
1378 dout(10) << __func__ << " " << cid << " " << oid
1379 << " = " << r << dendl;
1383 int KStore::list_collections(vector<coll_t>& ls)
1385 RWLock::RLocker l(coll_lock);
1386 for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
1387 p != coll_map.end();
1389 ls.push_back(p->first);
1393 bool KStore::collection_exists(const coll_t& c)
1395 RWLock::RLocker l(coll_lock);
1396 return coll_map.count(c);
1399 int KStore::collection_empty(const coll_t& cid, bool *empty)
1401 dout(15) << __func__ << " " << cid << dendl;
1402 vector<ghobject_t> ls;
1404 int r = collection_list(cid, ghobject_t(), ghobject_t::get_max(), 1,
1407 derr << __func__ << " collection_list returned: " << cpp_strerror(r)
1411 *empty = ls.empty();
1412 dout(10) << __func__ << " " << cid << " = " << (int)(*empty) << dendl;
1416 int KStore::collection_bits(const coll_t& cid)
1418 dout(15) << __func__ << " " << cid << dendl;
1419 CollectionHandle ch = _get_collection(cid);
1422 Collection *c = static_cast<Collection*>(ch.get());
1423 RWLock::RLocker l(c->lock);
1424 dout(10) << __func__ << " " << cid << " = " << c->cnode.bits << dendl;
1425 return c->cnode.bits;
1428 int KStore::collection_list(
1429 const coll_t& cid, const ghobject_t& start, const ghobject_t& end, int max,
1430 vector<ghobject_t> *ls, ghobject_t *pnext)
1432 CollectionHandle c = _get_collection(cid);
1435 return collection_list(c, start, end, max, ls, pnext);
1438 int KStore::collection_list(
1439 CollectionHandle &c_, const ghobject_t& start, const ghobject_t& end, int max,
1440 vector<ghobject_t> *ls, ghobject_t *pnext)
1443 Collection *c = static_cast<Collection*>(c_.get());
1444 dout(15) << __func__ << " " << c->cid
1445 << " start " << start << " end " << end << " max " << max << dendl;
1448 RWLock::RLocker l(c->lock);
1449 r = _collection_list(c, start, end, max, ls, pnext);
1452 dout(10) << __func__ << " " << c->cid
1453 << " start " << start << " end " << end << " max " << max
1454 << " = " << r << ", ls.size() = " << ls->size()
1455 << ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
1459 int KStore::_collection_list(
1460 Collection* c, const ghobject_t& start, const ghobject_t& end, int max,
1461 vector<ghobject_t> *ls, ghobject_t *pnext)
1464 KeyValueDB::Iterator it;
1465 string temp_start_key, temp_end_key;
1466 string start_key, end_key;
1467 bool set_next = false;
1471 ghobject_t static_next;
1473 pnext = &static_next;
1475 if (start == ghobject_t::get_max() ||
1476 start.hobj.is_max()) {
1479 get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
1480 &start_key, &end_key);
1481 dout(20) << __func__
1482 << " range " << pretty_binary_string(temp_start_key)
1483 << " to " << pretty_binary_string(temp_end_key)
1484 << " and " << pretty_binary_string(start_key)
1485 << " to " << pretty_binary_string(end_key)
1486 << " start " << start << dendl;
1487 it = db->get_iterator(PREFIX_OBJ);
1488 if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
1489 it->upper_bound(temp_start_key);
1493 get_object_key(cct, start, &k);
1494 if (start.hobj.is_temp()) {
1496 assert(k >= temp_start_key && k < temp_end_key);
1499 assert(k >= start_key && k < end_key);
1501 dout(20) << " start from " << pretty_binary_string(k)
1502 << " temp=" << (int)temp << dendl;
1505 if (end.hobj.is_max()) {
1506 pend = temp ? temp_end_key : end_key;
1508 get_object_key(cct, end, &end_key);
1509 if (end.hobj.is_temp()) {
1515 pend = temp ? temp_end_key : end_key;
1518 dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1520 if (!it->valid() || it->key() >= pend) {
1522 dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
1524 dout(20) << __func__ << " key " << pretty_binary_string(it->key())
1525 << " > " << end << dendl;
1527 if (end.hobj.is_temp()) {
1530 dout(30) << __func__ << " switch to non-temp namespace" << dendl;
1532 it->upper_bound(start_key);
1534 dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1539 dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
1541 int r = get_key_object(it->key(), &oid);
1543 if (ls->size() >= (unsigned)max) {
1544 dout(20) << __func__ << " reached max " << max << dendl;
1554 *pnext = ghobject_t::get_max();
1561 KStore::OmapIteratorImpl::OmapIteratorImpl(
1562 CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
1563 : c(c), o(o), it(it)
1565 RWLock::RLocker l(c->lock);
1566 if (o->onode.omap_head) {
1567 get_omap_key(o->onode.omap_head, string(), &head);
1568 get_omap_tail(o->onode.omap_head, &tail);
1569 it->lower_bound(head);
1573 int KStore::OmapIteratorImpl::seek_to_first()
1575 RWLock::RLocker l(c->lock);
1576 if (o->onode.omap_head) {
1577 it->lower_bound(head);
1579 it = KeyValueDB::Iterator();
1584 int KStore::OmapIteratorImpl::upper_bound(const string& after)
1586 RWLock::RLocker l(c->lock);
1587 if (o->onode.omap_head) {
1589 get_omap_key(o->onode.omap_head, after, &key);
1590 it->upper_bound(key);
1592 it = KeyValueDB::Iterator();
1597 int KStore::OmapIteratorImpl::lower_bound(const string& to)
1599 RWLock::RLocker l(c->lock);
1600 if (o->onode.omap_head) {
1602 get_omap_key(o->onode.omap_head, to, &key);
1603 it->lower_bound(key);
1605 it = KeyValueDB::Iterator();
1610 bool KStore::OmapIteratorImpl::valid()
1612 RWLock::RLocker l(c->lock);
1613 if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
1620 int KStore::OmapIteratorImpl::next(bool validate)
1622 RWLock::RLocker l(c->lock);
1623 if (o->onode.omap_head) {
1631 string KStore::OmapIteratorImpl::key()
1633 RWLock::RLocker l(c->lock);
1634 assert(it->valid());
1635 string db_key = it->raw_key().second;
1637 decode_omap_key(db_key, &user_key);
1641 bufferlist KStore::OmapIteratorImpl::value()
1643 RWLock::RLocker l(c->lock);
1644 assert(it->valid());
1648 int KStore::omap_get(
1649 const coll_t& cid, ///< [in] Collection containing oid
1650 const ghobject_t &oid, ///< [in] Object containing omap
1651 bufferlist *header, ///< [out] omap header
1652 map<string, bufferlist> *out /// < [out] Key to value map
1655 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1656 CollectionRef c = _get_collection(cid);
1659 RWLock::RLocker l(c->lock);
1661 OnodeRef o = c->get_onode(oid, false);
1662 if (!o || !o->exists) {
1666 if (!o->onode.omap_head)
1670 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1672 get_omap_header(o->onode.omap_head, &head);
1673 get_omap_tail(o->onode.omap_head, &tail);
1674 it->lower_bound(head);
1675 while (it->valid()) {
1676 if (it->key() == head) {
1677 dout(30) << __func__ << " got header" << dendl;
1678 *header = it->value();
1679 } else if (it->key() >= tail) {
1680 dout(30) << __func__ << " reached tail" << dendl;
1684 decode_omap_key(it->key(), &user_key);
1685 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1686 << " -> " << user_key << dendl;
1687 assert(it->key() < tail);
1688 (*out)[user_key] = it->value();
1694 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1698 int KStore::omap_get_header(
1699 const coll_t& cid, ///< [in] Collection containing oid
1700 const ghobject_t &oid, ///< [in] Object containing omap
1701 bufferlist *header, ///< [out] omap header
1702 bool allow_eio ///< [in] don't assert on eio
1705 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1706 CollectionRef c = _get_collection(cid);
1709 RWLock::RLocker l(c->lock);
1711 OnodeRef o = c->get_onode(oid, false);
1712 if (!o || !o->exists) {
1716 if (!o->onode.omap_head)
1721 get_omap_header(o->onode.omap_head, &head);
1722 if (db->get(PREFIX_OMAP, head, header) >= 0) {
1723 dout(30) << __func__ << " got header" << dendl;
1725 dout(30) << __func__ << " no header" << dendl;
1729 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1733 int KStore::omap_get_keys(
1734 const coll_t& cid, ///< [in] Collection containing oid
1735 const ghobject_t &oid, ///< [in] Object containing omap
1736 set<string> *keys ///< [out] Keys defined on oid
1739 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1740 CollectionRef c = _get_collection(cid);
1743 RWLock::RLocker l(c->lock);
1745 OnodeRef o = c->get_onode(oid, false);
1746 if (!o || !o->exists) {
1750 if (!o->onode.omap_head)
1754 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1756 get_omap_key(o->onode.omap_head, string(), &head);
1757 get_omap_tail(o->onode.omap_head, &tail);
1758 it->lower_bound(head);
1759 while (it->valid()) {
1760 if (it->key() >= tail) {
1761 dout(30) << __func__ << " reached tail" << dendl;
1765 decode_omap_key(it->key(), &user_key);
1766 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1767 << " -> " << user_key << dendl;
1768 assert(it->key() < tail);
1769 keys->insert(user_key);
1774 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1778 int KStore::omap_get_values(
1779 const coll_t& cid, ///< [in] Collection containing oid
1780 const ghobject_t &oid, ///< [in] Object containing omap
1781 const set<string> &keys, ///< [in] Keys to get
1782 map<string, bufferlist> *out ///< [out] Returned keys and values
1785 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1786 CollectionRef c = _get_collection(cid);
1789 RWLock::RLocker l(c->lock);
1791 OnodeRef o = c->get_onode(oid, false);
1792 if (!o || !o->exists) {
1796 if (!o->onode.omap_head)
1799 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1801 get_omap_key(o->onode.omap_head, *p, &key);
1803 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1804 dout(30) << __func__ << " got " << pretty_binary_string(key)
1805 << " -> " << *p << dendl;
1806 out->insert(make_pair(*p, val));
1810 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1814 int KStore::omap_check_keys(
1815 const coll_t& cid, ///< [in] Collection containing oid
1816 const ghobject_t &oid, ///< [in] Object containing omap
1817 const set<string> &keys, ///< [in] Keys to check
1818 set<string> *out ///< [out] Subset of keys defined on oid
1821 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1822 CollectionRef c = _get_collection(cid);
1825 RWLock::RLocker l(c->lock);
1827 OnodeRef o = c->get_onode(oid, false);
1828 if (!o || !o->exists) {
1832 if (!o->onode.omap_head)
1835 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1837 get_omap_key(o->onode.omap_head, *p, &key);
1839 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1840 dout(30) << __func__ << " have " << pretty_binary_string(key)
1841 << " -> " << *p << dendl;
1844 dout(30) << __func__ << " miss " << pretty_binary_string(key)
1845 << " -> " << *p << dendl;
1849 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1853 ObjectMap::ObjectMapIterator KStore::get_omap_iterator(
1854 const coll_t& cid, ///< [in] collection
1855 const ghobject_t &oid ///< [in] object
1859 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1860 CollectionRef c = _get_collection(cid);
1862 dout(10) << __func__ << " " << cid << "doesn't exist" <<dendl;
1863 return ObjectMap::ObjectMapIterator();
1865 RWLock::RLocker l(c->lock);
1866 OnodeRef o = c->get_onode(oid, false);
1867 if (!o || !o->exists) {
1868 dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
1869 return ObjectMap::ObjectMapIterator();
1872 dout(10) << __func__ << " header = " << o->onode.omap_head <<dendl;
1873 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1874 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it));
1878 // -----------------
1881 int KStore::_open_super_meta()
1887 db->get(PREFIX_SUPER, "nid_max", &bl);
1888 bufferlist::iterator p = bl.begin();
1890 ::decode(nid_max, p);
1891 } catch (buffer::error& e) {
1893 dout(10) << __func__ << " old nid_max " << nid_max << dendl;
1899 void KStore::_assign_nid(TransContext *txc, OnodeRef o)
1903 std::lock_guard<std::mutex> l(nid_lock);
1904 o->onode.nid = ++nid_last;
1905 dout(20) << __func__ << " " << o->oid << " nid " << o->onode.nid << dendl;
1906 if (nid_last > nid_max) {
1907 nid_max += cct->_conf->kstore_nid_prealloc;
1909 ::encode(nid_max, bl);
1910 txc->t->set(PREFIX_SUPER, "nid_max", bl);
1911 dout(10) << __func__ << " nid_max now " << nid_max << dendl;
1915 KStore::TransContext *KStore::_txc_create(OpSequencer *osr)
1917 TransContext *txc = new TransContext(osr);
1918 txc->t = db->get_transaction();
1919 osr->queue_new(txc);
1920 dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
1924 void KStore::_txc_state_proc(TransContext *txc)
1927 dout(10) << __func__ << " txc " << txc
1928 << " " << txc->get_state_name() << dendl;
1929 switch (txc->state) {
1930 case TransContext::STATE_PREPARE:
1931 txc->log_state_latency(logger, l_kstore_state_prepare_lat);
1932 txc->state = TransContext::STATE_KV_QUEUED;
1933 if (!cct->_conf->kstore_sync_transaction) {
1934 std::lock_guard<std::mutex> l(kv_lock);
1935 if (cct->_conf->kstore_sync_submit_transaction) {
1936 int r = db->submit_transaction(txc->t);
1939 kv_queue.push_back(txc);
1940 kv_cond.notify_one();
1944 int r = db->submit_transaction_sync(txc->t);
1949 case TransContext::STATE_KV_QUEUED:
1950 txc->log_state_latency(logger, l_kstore_state_kv_queued_lat);
1951 txc->state = TransContext::STATE_KV_DONE;
1952 _txc_finish_kv(txc);
1955 case TransContext::STATE_KV_DONE:
1956 txc->log_state_latency(logger, l_kstore_state_kv_done_lat);
1957 txc->state = TransContext::STATE_FINISHING;
1960 case TransContext::TransContext::STATE_FINISHING:
1961 txc->log_state_latency(logger, l_kstore_state_finishing_lat);
1966 derr << __func__ << " unexpected txc " << txc
1967 << " state " << txc->get_state_name() << dendl;
1968 assert(0 == "unexpected txc state");
1974 void KStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
1976 dout(20) << __func__ << " osr " << osr << " txc " << txc
1977 << " onodes " << txc->onodes << dendl;
1980 for (set<OnodeRef>::iterator p = txc->onodes.begin();
1981 p != txc->onodes.end();
1984 ::encode((*p)->onode, bl);
1985 dout(20) << " onode size is " << bl.length() << dendl;
1986 txc->t->set(PREFIX_OBJ, (*p)->key, bl);
1988 std::lock_guard<std::mutex> l((*p)->flush_lock);
1989 (*p)->flush_txns.insert(txc);
1993 void KStore::_txc_finish_kv(TransContext *txc)
1995 dout(20) << __func__ << " txc " << txc << dendl;
1997 // warning: we're calling onreadable_sync inside the sequencer lock
1998 if (txc->onreadable_sync) {
1999 txc->onreadable_sync->complete(0);
2000 txc->onreadable_sync = NULL;
2002 if (txc->onreadable) {
2003 finisher.queue(txc->onreadable);
2004 txc->onreadable = NULL;
2006 if (txc->oncommit) {
2007 finisher.queue(txc->oncommit);
2008 txc->oncommit = NULL;
2010 if (!txc->oncommits.empty()) {
2011 finisher.queue(txc->oncommits);
2014 throttle_ops.put(txc->ops);
2015 throttle_bytes.put(txc->bytes);
2018 void KStore::_txc_finish(TransContext *txc)
2020 dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
2021 assert(txc->state == TransContext::STATE_FINISHING);
2023 for (set<OnodeRef>::iterator p = txc->onodes.begin();
2024 p != txc->onodes.end();
2026 std::lock_guard<std::mutex> l((*p)->flush_lock);
2027 dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
2029 assert((*p)->flush_txns.count(txc));
2030 (*p)->flush_txns.erase(txc);
2031 if ((*p)->flush_txns.empty()) {
2032 (*p)->flush_cond.notify_all();
2033 (*p)->clear_pending_stripes();
2038 txc->onodes.clear();
2040 while (!txc->removed_collections.empty()) {
2041 _queue_reap_collection(txc->removed_collections.front());
2042 txc->removed_collections.pop_front();
2045 OpSequencerRef osr = txc->osr;
2047 std::lock_guard<std::mutex> l(osr->qlock);
2048 txc->state = TransContext::STATE_DONE;
2051 _osr_reap_done(osr.get());
2054 void KStore::_osr_reap_done(OpSequencer *osr)
2056 std::lock_guard<std::mutex> l(osr->qlock);
2057 dout(20) << __func__ << " osr " << osr << dendl;
2058 while (!osr->q.empty()) {
2059 TransContext *txc = &osr->q.front();
2060 dout(20) << __func__ << " txc " << txc << " " << txc->get_state_name()
2062 if (txc->state != TransContext::STATE_DONE) {
2066 if (txc->first_collection) {
2067 txc->first_collection->onode_map.trim(cct->_conf->kstore_onode_map_size);
2071 txc->log_state_latency(logger, l_kstore_state_done_lat);
2073 osr->qcond.notify_all();
2075 dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
2079 void KStore::_kv_sync_thread()
2081 dout(10) << __func__ << " start" << dendl;
2082 std::unique_lock<std::mutex> l(kv_lock);
2084 assert(kv_committing.empty());
2085 if (kv_queue.empty()) {
2088 dout(20) << __func__ << " sleep" << dendl;
2089 kv_sync_cond.notify_all();
2091 dout(20) << __func__ << " wake" << dendl;
2093 dout(20) << __func__ << " committing " << kv_queue.size() << dendl;
2094 kv_committing.swap(kv_queue);
2095 utime_t start = ceph_clock_now();
2098 dout(30) << __func__ << " committing txc " << kv_committing << dendl;
2100 // one transaction to force a sync
2101 KeyValueDB::Transaction t = db->get_transaction();
2102 if (!cct->_conf->kstore_sync_submit_transaction) {
2103 for (std::deque<TransContext *>::iterator it = kv_committing.begin();
2104 it != kv_committing.end();
2106 int r = db->submit_transaction((*it)->t);
2110 int r = db->submit_transaction_sync(t);
2112 utime_t finish = ceph_clock_now();
2113 utime_t dur = finish - start;
2114 dout(20) << __func__ << " committed " << kv_committing.size()
2115 << " in " << dur << dendl;
2116 while (!kv_committing.empty()) {
2117 TransContext *txc = kv_committing.front();
2118 _txc_state_proc(txc);
2119 kv_committing.pop_front();
2122 // this is as good a place as any ...
2123 _reap_collections();
2128 dout(10) << __func__ << " finish" << dendl;
2132 // ---------------------------
2135 int KStore::queue_transactions(
2137 vector<Transaction>& tls,
2139 ThreadPool::TPHandle *handle)
2141 Context *onreadable;
2143 Context *onreadable_sync;
2144 ObjectStore::Transaction::collect_contexts(
2145 tls, &onreadable, &ondisk, &onreadable_sync);
2147 // set up the sequencer
2151 osr = static_cast<OpSequencer *>(posr->p.get());
2152 dout(10) << __func__ << " existing " << osr << " " << *osr << dendl;
2154 osr = new OpSequencer(cct);
2157 dout(10) << __func__ << " new " << osr << " " << *osr << dendl;
2161 TransContext *txc = _txc_create(osr);
2162 txc->onreadable = onreadable;
2163 txc->onreadable_sync = onreadable_sync;
2164 txc->oncommit = ondisk;
2166 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
2168 txc->ops += (*p).get_num_ops();
2169 txc->bytes += (*p).get_num_bytes();
2170 _txc_add_transaction(txc, &(*p));
2173 _txc_finalize(osr, txc);
2175 throttle_ops.get(txc->ops);
2176 throttle_bytes.get(txc->bytes);
2179 _txc_state_proc(txc);
2183 void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
2185 Transaction::iterator i = t->begin();
2187 dout(30) << __func__ << " transaction dump:\n";
2188 JSONFormatter f(true);
2189 f.open_object_section("transaction");
2195 vector<CollectionRef> cvec(i.colls.size());
2197 for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
2199 cvec[j] = _get_collection(*p);
2201 // note first collection we reference
2202 if (!j && !txc->first_collection)
2203 txc->first_collection = cvec[j];
2205 vector<OnodeRef> ovec(i.objects.size());
2207 for (int pos = 0; i.have_op(); ++pos) {
2208 Transaction::Op *op = i.decode_op();
2212 if (op->op == Transaction::OP_NOP)
2215 // collection operations
2216 CollectionRef &c = cvec[op->cid];
2218 case Transaction::OP_RMCOLL:
2220 coll_t cid = i.get_cid(op->cid);
2221 r = _remove_collection(txc, cid, &c);
2227 case Transaction::OP_MKCOLL:
2230 coll_t cid = i.get_cid(op->cid);
2231 r = _create_collection(txc, cid, op->split_bits, &c);
2237 case Transaction::OP_SPLIT_COLLECTION:
2238 assert(0 == "deprecated");
2241 case Transaction::OP_SPLIT_COLLECTION2:
2243 uint32_t bits = op->split_bits;
2244 uint32_t rem = op->split_rem;
2245 r = _split_collection(txc, c, cvec[op->dest_cid], bits, rem);
2251 case Transaction::OP_COLL_HINT:
2253 uint32_t type = op->hint_type;
2256 bufferlist::iterator hiter = hint.begin();
2257 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2260 ::decode(pg_num, hiter);
2261 ::decode(num_objs, hiter);
2262 dout(10) << __func__ << " collection hint objects is a no-op, "
2263 << " pg_num " << pg_num << " num_objects " << num_objs
2267 dout(10) << __func__ << " unknown collection hint " << type << dendl;
2273 case Transaction::OP_COLL_SETATTR:
2277 case Transaction::OP_COLL_RMATTR:
2281 case Transaction::OP_COLL_RENAME:
2282 assert(0 == "not implemented");
2286 derr << " error " << cpp_strerror(r)
2287 << " not handled on operation " << op->op
2288 << " (op " << pos << ", counting from 0)" << dendl;
2289 dout(0) << " transaction dump:\n";
2290 JSONFormatter f(true);
2291 f.open_object_section("transaction");
2296 assert(0 == "unexpected error");
2299 // object operations
2300 RWLock::WLocker l(c->lock);
2301 OnodeRef &o = ovec[op->oid];
2303 // these operations implicity create the object
2304 bool create = false;
2305 if (op->op == Transaction::OP_TOUCH ||
2306 op->op == Transaction::OP_WRITE ||
2307 op->op == Transaction::OP_ZERO) {
2310 ghobject_t oid = i.get_oid(op->oid);
2311 o = c->get_onode(oid, create);
2313 if (!o || !o->exists) {
2314 dout(10) << __func__ << " op " << op->op << " got ENOENT on "
2323 case Transaction::OP_TOUCH:
2324 r = _touch(txc, c, o);
2327 case Transaction::OP_WRITE:
2329 uint64_t off = op->off;
2330 uint64_t len = op->len;
2331 uint32_t fadvise_flags = i.get_fadvise_flags();
2334 r = _write(txc, c, o, off, len, bl, fadvise_flags);
2338 case Transaction::OP_ZERO:
2340 uint64_t off = op->off;
2341 uint64_t len = op->len;
2342 r = _zero(txc, c, o, off, len);
2346 case Transaction::OP_TRIMCACHE:
2348 // deprecated, no-op
2352 case Transaction::OP_TRUNCATE:
2354 uint64_t off = op->off;
2355 r = _truncate(txc, c, o, off);
2359 case Transaction::OP_REMOVE:
2360 r = _remove(txc, c, o);
2363 case Transaction::OP_SETATTR:
2365 string name = i.decode_string();
2368 map<string, bufferptr> to_set;
2369 to_set[name] = bufferptr(bl.c_str(), bl.length());
2370 r = _setattrs(txc, c, o, to_set);
2374 case Transaction::OP_SETATTRS:
2376 map<string, bufferptr> aset;
2377 i.decode_attrset(aset);
2378 r = _setattrs(txc, c, o, aset);
2382 case Transaction::OP_RMATTR:
2384 string name = i.decode_string();
2385 r = _rmattr(txc, c, o, name);
2389 case Transaction::OP_RMATTRS:
2391 r = _rmattrs(txc, c, o);
2395 case Transaction::OP_CLONE:
2397 const ghobject_t& noid = i.get_oid(op->dest_oid);
2398 OnodeRef no = c->get_onode(noid, true);
2399 r = _clone(txc, c, o, no);
2403 case Transaction::OP_CLONERANGE:
2404 assert(0 == "deprecated");
2407 case Transaction::OP_CLONERANGE2:
2409 const ghobject_t& noid = i.get_oid(op->dest_oid);
2410 OnodeRef no = c->get_onode(noid, true);
2411 uint64_t srcoff = op->off;
2412 uint64_t len = op->len;
2413 uint64_t dstoff = op->dest_off;
2414 r = _clone_range(txc, c, o, no, srcoff, len, dstoff);
2418 case Transaction::OP_COLL_ADD:
2419 assert(0 == "not implemented");
2422 case Transaction::OP_COLL_REMOVE:
2423 assert(0 == "not implemented");
2426 case Transaction::OP_COLL_MOVE:
2427 assert(0 == "deprecated");
2430 case Transaction::OP_COLL_MOVE_RENAME:
2432 assert(op->cid == op->dest_cid);
2433 const ghobject_t& noid = i.get_oid(op->dest_oid);
2434 OnodeRef no = c->get_onode(noid, true);
2435 r = _rename(txc, c, o, no, noid);
2440 case Transaction::OP_TRY_RENAME:
2442 const ghobject_t& noid = i.get_oid(op->dest_oid);
2443 OnodeRef no = c->get_onode(noid, true);
2444 r = _rename(txc, c, o, no, noid);
2451 case Transaction::OP_OMAP_CLEAR:
2453 r = _omap_clear(txc, c, o);
2456 case Transaction::OP_OMAP_SETKEYS:
2459 i.decode_attrset_bl(&aset_bl);
2460 r = _omap_setkeys(txc, c, o, aset_bl);
2463 case Transaction::OP_OMAP_RMKEYS:
2466 i.decode_keyset_bl(&keys_bl);
2467 r = _omap_rmkeys(txc, c, o, keys_bl);
2470 case Transaction::OP_OMAP_RMKEYRANGE:
2473 first = i.decode_string();
2474 last = i.decode_string();
2475 r = _omap_rmkey_range(txc, c, o, first, last);
2478 case Transaction::OP_OMAP_SETHEADER:
2482 r = _omap_setheader(txc, c, o, bl);
2486 case Transaction::OP_SETALLOCHINT:
2488 uint64_t expected_object_size = op->expected_object_size;
2489 uint64_t expected_write_size = op->expected_write_size;
2490 uint32_t flags = op->alloc_hint_flags;
2491 r = _setallochint(txc, c, o,
2492 expected_object_size,
2493 expected_write_size,
2499 derr << "bad op " << op->op << dendl;
2507 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
2508 op->op == Transaction::OP_CLONE ||
2509 op->op == Transaction::OP_CLONERANGE2 ||
2510 op->op == Transaction::OP_COLL_ADD))
2511 // -ENOENT is usually okay
2517 const char *msg = "unexpected error code";
2519 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
2520 op->op == Transaction::OP_CLONE ||
2521 op->op == Transaction::OP_CLONERANGE2))
2522 msg = "ENOENT on clone suggests osd bug";
2525 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2526 // by partially applying transactions.
2527 msg = "ENOSPC from key value store, misconfigured cluster";
2529 if (r == -ENOTEMPTY) {
2530 msg = "ENOTEMPTY suggests garbage data in osd data dir";
2533 dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
2534 << " (op " << pos << ", counting from 0)" << dendl;
2535 dout(0) << msg << dendl;
2536 dout(0) << " transaction dump:\n";
2537 JSONFormatter f(true);
2538 f.open_object_section("transaction");
2543 assert(0 == "unexpected error");
2551 // -----------------
2554 int KStore::_touch(TransContext *txc,
2558 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2561 _assign_nid(txc, o);
2562 txc->write_onode(o);
2563 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2567 void KStore::_dump_onode(OnodeRef o)
2569 dout(30) << __func__ << " " << o
2570 << " nid " << o->onode.nid
2571 << " size " << o->onode.size
2572 << " expected_object_size " << o->onode.expected_object_size
2573 << " expected_write_size " << o->onode.expected_write_size
2575 for (map<string,bufferptr>::iterator p = o->onode.attrs.begin();
2576 p != o->onode.attrs.end();
2578 dout(30) << __func__ << " attr " << p->first
2579 << " len " << p->second.length() << dendl;
2583 void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl)
2585 map<uint64_t,bufferlist>::iterator p = o->pending_stripes.find(offset);
2586 if (p == o->pending_stripes.end()) {
2588 get_data_key(o->onode.nid, offset, &key);
2589 db->get(PREFIX_DATA, key, pbl);
2590 o->pending_stripes[offset] = *pbl;
2596 void KStore::_do_write_stripe(TransContext *txc, OnodeRef o,
2597 uint64_t offset, bufferlist& bl)
2599 o->pending_stripes[offset] = bl;
2601 get_data_key(o->onode.nid, offset, &key);
2602 txc->t->set(PREFIX_DATA, key, bl);
2605 void KStore::_do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset)
2607 o->pending_stripes.erase(offset);
2609 get_data_key(o->onode.nid, offset, &key);
2610 txc->t->rmkey(PREFIX_DATA, key);
2613 int KStore::_do_write(TransContext *txc,
2615 uint64_t offset, uint64_t length,
2616 bufferlist& orig_bl,
2617 uint32_t fadvise_flags)
2621 dout(20) << __func__
2622 << " " << o->oid << " " << offset << "~" << length
2623 << " - have " << o->onode.size
2624 << " bytes, nid " << o->onode.nid << dendl;
2632 uint64_t stripe_size = o->onode.stripe_size;
2634 o->onode.stripe_size = cct->_conf->kstore_default_stripe_size;
2635 stripe_size = o->onode.stripe_size;
2638 unsigned bl_off = 0;
2639 while (length > 0) {
2640 uint64_t offset_rem = offset % stripe_size;
2641 uint64_t end_rem = (offset + length) % stripe_size;
2642 if (offset_rem == 0 && end_rem == 0) {
2644 bl.substr_of(orig_bl, bl_off, stripe_size);
2645 dout(30) << __func__ << " full stripe " << offset << dendl;
2646 _do_write_stripe(txc, o, offset, bl);
2647 offset += stripe_size;
2648 length -= stripe_size;
2649 bl_off += stripe_size;
2652 uint64_t stripe_off = offset - offset_rem;
2654 _do_read_stripe(o, stripe_off, &prev);
2655 dout(20) << __func__ << " read previous stripe " << stripe_off
2656 << ", got " << prev.length() << dendl;
2659 unsigned p = MIN(prev.length(), offset_rem);
2661 dout(20) << __func__ << " reuse leading " << p << " bytes" << dendl;
2662 bl.substr_of(prev, 0, p);
2664 if (p < offset_rem) {
2665 dout(20) << __func__ << " add leading " << offset_rem - p << " zeros" << dendl;
2666 bl.append_zero(offset_rem - p);
2669 unsigned use = stripe_size - offset_rem;
2671 use -= stripe_size - end_rem;
2672 dout(20) << __func__ << " using " << use << " for this stripe" << dendl;
2674 t.substr_of(orig_bl, bl_off, use);
2678 if (end_rem < prev.length()) {
2679 unsigned l = prev.length() - end_rem;
2680 dout(20) << __func__ << " reuse trailing " << l << " bytes" << dendl;
2682 t.substr_of(prev, end_rem, l);
2686 dout(30) << " writing:\n";
2689 _do_write_stripe(txc, o, stripe_off, bl);
2694 if (offset > o->onode.size) {
2695 dout(20) << __func__ << " extending size to " << offset + length
2697 o->onode.size = offset;
2703 int KStore::_write(TransContext *txc,
2706 uint64_t offset, size_t length,
2708 uint32_t fadvise_flags)
2710 dout(15) << __func__ << " " << c->cid << " " << o->oid
2711 << " " << offset << "~" << length
2713 _assign_nid(txc, o);
2714 int r = _do_write(txc, o, offset, length, bl, fadvise_flags);
2715 txc->write_onode(o);
2717 dout(10) << __func__ << " " << c->cid << " " << o->oid
2718 << " " << offset << "~" << length
2719 << " = " << r << dendl;
2723 int KStore::_zero(TransContext *txc,
2726 uint64_t offset, size_t length)
2728 dout(15) << __func__ << " " << c->cid << " " << o->oid
2729 << " " << offset << "~" << length
2735 _assign_nid(txc, o);
2737 uint64_t stripe_size = o->onode.stripe_size;
2739 uint64_t end = offset + length;
2740 uint64_t pos = offset;
2741 uint64_t stripe_off = pos % stripe_size;
2742 while (pos < offset + length) {
2743 if (stripe_off || end - pos < stripe_size) {
2745 _do_read_stripe(o, pos - stripe_off, &stripe);
2746 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2747 << stripe.length() << dendl;
2749 bl.substr_of(stripe, 0, MIN(stripe.length(), stripe_off));
2750 if (end >= pos - stripe_off + stripe_size ||
2751 end >= o->onode.size) {
2752 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2753 << " to " << bl.length() << dendl;
2755 auto len = end - (pos - stripe_off + bl.length());
2756 bl.append_zero(len);
2757 dout(20) << __func__ << " adding " << len << " of zeros" << dendl;
2758 if (stripe.length() > bl.length()) {
2759 unsigned l = stripe.length() - bl.length();
2761 t.substr_of(stripe, stripe.length() - l, l);
2762 dout(20) << __func__ << " keeping tail " << l << " of stripe" << dendl;
2766 _do_write_stripe(txc, o, pos - stripe_off, bl);
2767 pos += stripe_size - stripe_off;
2770 dout(20) << __func__ << " rm stripe " << pos << dendl;
2771 _do_remove_stripe(txc, o, pos - stripe_off);
2776 if (offset + length > o->onode.size) {
2777 o->onode.size = offset + length;
2778 dout(20) << __func__ << " extending size to " << offset + length
2781 txc->write_onode(o);
2783 dout(10) << __func__ << " " << c->cid << " " << o->oid
2784 << " " << offset << "~" << length
2785 << " = " << r << dendl;
2789 int KStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset)
2791 uint64_t stripe_size = o->onode.stripe_size;
2795 // trim down stripes
2797 uint64_t pos = offset;
2798 uint64_t stripe_off = pos % stripe_size;
2799 while (pos < o->onode.size) {
2802 _do_read_stripe(o, pos - stripe_off, &stripe);
2803 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2804 << stripe.length() << dendl;
2806 t.substr_of(stripe, 0, MIN(stripe_off, stripe.length()));
2807 _do_write_stripe(txc, o, pos - stripe_off, t);
2808 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2809 << " to " << t.length() << dendl;
2810 pos += stripe_size - stripe_off;
2813 dout(20) << __func__ << " rm stripe " << pos << dendl;
2814 _do_remove_stripe(txc, o, pos - stripe_off);
2819 // trim down cached tail
2820 if (o->tail_bl.length()) {
2821 if (offset / stripe_size != o->onode.size / stripe_size) {
2822 dout(20) << __func__ << " clear cached tail" << dendl;
2828 o->onode.size = offset;
2829 dout(10) << __func__ << " truncate size to " << offset << dendl;
2831 txc->write_onode(o);
2835 int KStore::_truncate(TransContext *txc,
2840 dout(15) << __func__ << " " << c->cid << " " << o->oid
2843 int r = _do_truncate(txc, o, offset);
2844 dout(10) << __func__ << " " << c->cid << " " << o->oid
2846 << " = " << r << dendl;
2850 int KStore::_do_remove(TransContext *txc,
2855 _do_truncate(txc, o, 0);
2858 if (o->onode.omap_head) {
2859 _do_omap_clear(txc, o->onode.omap_head);
2862 o->onode = kstore_onode_t();
2863 txc->onodes.erase(o);
2864 get_object_key(cct, o->oid, &key);
2865 txc->t->rmkey(PREFIX_OBJ, key);
2869 int KStore::_remove(TransContext *txc,
2873 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2874 int r = _do_remove(txc, o);
2875 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2879 int KStore::_setattr(TransContext *txc,
2885 dout(15) << __func__ << " " << c->cid << " " << o->oid
2886 << " " << name << " (" << val.length() << " bytes)"
2889 o->onode.attrs[name] = val;
2890 txc->write_onode(o);
2891 dout(10) << __func__ << " " << c->cid << " " << o->oid
2892 << " " << name << " (" << val.length() << " bytes)"
2893 << " = " << r << dendl;
2897 int KStore::_setattrs(TransContext *txc,
2900 const map<string,bufferptr>& aset)
2902 dout(15) << __func__ << " " << c->cid << " " << o->oid
2903 << " " << aset.size() << " keys"
2906 for (map<string,bufferptr>::const_iterator p = aset.begin();
2907 p != aset.end(); ++p) {
2908 if (p->second.is_partial())
2909 o->onode.attrs[p->first] = bufferptr(p->second.c_str(), p->second.length());
2911 o->onode.attrs[p->first] = p->second;
2913 txc->write_onode(o);
2914 dout(10) << __func__ << " " << c->cid << " " << o->oid
2915 << " " << aset.size() << " keys"
2916 << " = " << r << dendl;
2921 int KStore::_rmattr(TransContext *txc,
2926 dout(15) << __func__ << " " << c->cid << " " << o->oid
2927 << " " << name << dendl;
2929 o->onode.attrs.erase(name);
2930 txc->write_onode(o);
2931 dout(10) << __func__ << " " << c->cid << " " << o->oid
2932 << " " << name << " = " << r << dendl;
2936 int KStore::_rmattrs(TransContext *txc,
2940 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2942 o->onode.attrs.clear();
2943 txc->write_onode(o);
2944 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2948 void KStore::_do_omap_clear(TransContext *txc, uint64_t id)
2950 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
2951 string prefix, tail;
2952 get_omap_header(id, &prefix);
2953 get_omap_tail(id, &tail);
2954 it->lower_bound(prefix);
2955 while (it->valid()) {
2956 if (it->key() >= tail) {
2957 dout(30) << __func__ << " stop at " << tail << dendl;
2960 txc->t->rmkey(PREFIX_OMAP, it->key());
2961 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
2966 int KStore::_omap_clear(TransContext *txc,
2970 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2972 if (o->onode.omap_head != 0) {
2973 _do_omap_clear(txc, o->onode.omap_head);
2975 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2979 int KStore::_omap_setkeys(TransContext *txc,
2984 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2986 bufferlist::iterator p = bl.begin();
2988 if (!o->onode.omap_head) {
2989 o->onode.omap_head = o->onode.nid;
2990 txc->write_onode(o);
2999 get_omap_key(o->onode.omap_head, key, &final_key);
3000 dout(30) << __func__ << " " << pretty_binary_string(final_key)
3001 << " <- " << key << dendl;
3002 txc->t->set(PREFIX_OMAP, final_key, value);
3005 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3009 int KStore::_omap_setheader(TransContext *txc,
3014 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3017 if (!o->onode.omap_head) {
3018 o->onode.omap_head = o->onode.nid;
3019 txc->write_onode(o);
3021 get_omap_header(o->onode.omap_head, &key);
3022 txc->t->set(PREFIX_OMAP, key, bl);
3024 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3028 int KStore::_omap_rmkeys(TransContext *txc,
3033 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3035 bufferlist::iterator p = bl.begin();
3038 if (!o->onode.omap_head) {
3047 get_omap_key(o->onode.omap_head, key, &final_key);
3048 dout(30) << __func__ << " rm " << pretty_binary_string(final_key)
3049 << " <- " << key << dendl;
3050 txc->t->rmkey(PREFIX_OMAP, final_key);
3055 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3059 int KStore::_omap_rmkey_range(TransContext *txc,
3062 const string& first, const string& last)
3064 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3065 KeyValueDB::Iterator it;
3066 string key_first, key_last;
3069 if (!o->onode.omap_head) {
3072 it = db->get_iterator(PREFIX_OMAP);
3073 get_omap_key(o->onode.omap_head, first, &key_first);
3074 get_omap_key(o->onode.omap_head, last, &key_last);
3075 it->lower_bound(key_first);
3076 while (it->valid()) {
3077 if (it->key() >= key_last) {
3078 dout(30) << __func__ << " stop at " << pretty_binary_string(key_last)
3082 txc->t->rmkey(PREFIX_OMAP, it->key());
3083 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
3089 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3093 int KStore::_setallochint(TransContext *txc,
3096 uint64_t expected_object_size,
3097 uint64_t expected_write_size,
3100 dout(15) << __func__ << " " << c->cid << " " << o->oid
3101 << " object_size " << expected_object_size
3102 << " write_size " << expected_write_size
3103 << " flags " << flags
3106 o->onode.expected_object_size = expected_object_size;
3107 o->onode.expected_write_size = expected_write_size;
3108 o->onode.alloc_hint_flags = flags;
3110 txc->write_onode(o);
3111 dout(10) << __func__ << " " << c->cid << " " << o->oid
3112 << " object_size " << expected_object_size
3113 << " write_size " << expected_write_size
3114 << " = " << r << dendl;
3118 int KStore::_clone(TransContext *txc,
3123 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3124 << newo->oid << dendl;
3126 if (oldo->oid.hobj.get_hash() != newo->oid.hobj.get_hash()) {
3127 derr << __func__ << " mismatched hash on " << oldo->oid
3128 << " and " << newo->oid << dendl;
3133 newo->exists = true;
3134 _assign_nid(txc, newo);
3139 r = _do_read(oldo, 0, oldo->onode.size, bl, 0);
3143 // truncate any old data
3144 r = _do_truncate(txc, newo, 0);
3148 r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0);
3152 newo->onode.attrs = oldo->onode.attrs;
3155 if (newo->onode.omap_head) {
3156 dout(20) << __func__ << " clearing old omap data" << dendl;
3157 _do_omap_clear(txc, newo->onode.omap_head);
3159 if (oldo->onode.omap_head) {
3160 dout(20) << __func__ << " copying omap data" << dendl;
3161 if (!newo->onode.omap_head) {
3162 newo->onode.omap_head = newo->onode.nid;
3164 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
3166 get_omap_header(oldo->onode.omap_head, &head);
3167 get_omap_tail(oldo->onode.omap_head, &tail);
3168 it->lower_bound(head);
3169 while (it->valid()) {
3171 if (it->key() >= tail) {
3172 dout(30) << __func__ << " reached tail" << dendl;
3175 dout(30) << __func__ << " got header/data "
3176 << pretty_binary_string(it->key()) << dendl;
3177 assert(it->key() < tail);
3178 rewrite_omap_key(newo->onode.omap_head, it->key(), &key);
3179 txc->t->set(PREFIX_OMAP, key, it->value());
3185 txc->write_onode(newo);
3189 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3190 << newo->oid << " = " << r << dendl;
3194 int KStore::_clone_range(TransContext *txc,
3198 uint64_t srcoff, uint64_t length, uint64_t dstoff)
3200 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3201 << newo->oid << " from " << srcoff << "~" << length
3202 << " to offset " << dstoff << dendl;
3206 newo->exists = true;
3207 _assign_nid(txc, newo);
3209 r = _do_read(oldo, srcoff, length, bl, 0);
3213 r = _do_write(txc, newo, dstoff, bl.length(), bl, 0);
3217 txc->write_onode(newo);
3222 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3223 << newo->oid << " from " << srcoff << "~" << length
3224 << " to offset " << dstoff
3225 << " = " << r << dendl;
3229 int KStore::_rename(TransContext *txc,
3233 const ghobject_t& new_oid)
3235 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3236 << new_oid << dendl;
3238 ghobject_t old_oid = oldo->oid;
3240 string old_key, new_key;
3242 if (newo && newo->exists) {
3243 // destination object already exists, remove it first
3244 r = _do_remove(txc, newo);
3249 txc->t->rmkey(PREFIX_OBJ, oldo->key);
3250 txc->write_onode(oldo);
3251 c->onode_map.rename(old_oid, new_oid); // this adjusts oldo->{oid,key}
3255 dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> "
3256 << new_oid << " = " << r << dendl;
3262 int KStore::_create_collection(
3268 dout(15) << __func__ << " " << cid << " bits " << bits << dendl;
3273 RWLock::WLocker l(coll_lock);
3278 c->reset(new Collection(this, cid));
3279 (*c)->cnode.bits = bits;
3282 ::encode((*c)->cnode, bl);
3283 txc->t->set(PREFIX_COLL, stringify(cid), bl);
3287 dout(10) << __func__ << " " << cid << " bits " << bits << " = " << r << dendl;
3291 int KStore::_remove_collection(TransContext *txc, coll_t cid,
3294 dout(15) << __func__ << " " << cid << dendl;
3298 RWLock::WLocker l(coll_lock);
3303 size_t nonexistent_count = 0;
3304 pair<ghobject_t,OnodeRef> next_onode;
3305 while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
3306 if (next_onode.second->exists) {
3310 ++nonexistent_count;
3312 vector<ghobject_t> ls;
3314 // Enumerate onodes in db, up to nonexistent_count + 1
3315 // then check if all of them are marked as non-existent.
3316 // Bypass the check if returned number is greater than nonexistent_count
3317 r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(),
3318 nonexistent_count + 1, &ls, &next);
3320 bool exists = false; //ls.size() > nonexistent_count;
3321 for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
3322 dout(10) << __func__ << " oid " << *it << dendl;
3323 auto onode = (*c)->onode_map.lookup(*it);
3324 exists = !onode || onode->exists;
3326 dout(10) << __func__ << " " << *it
3327 << " exists in db" << dendl;
3331 coll_map.erase(cid);
3332 txc->removed_collections.push_back(*c);
3334 txc->t->rmkey(PREFIX_COLL, stringify(cid));
3337 dout(10) << __func__ << " " << cid
3338 << " is non-empty" << dendl;
3345 dout(10) << __func__ << " " << cid << " = " << r << dendl;
3349 int KStore::_split_collection(TransContext *txc,
3352 unsigned bits, int rem)
3354 dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
3355 << " bits " << bits << dendl;
3357 RWLock::WLocker l(c->lock);
3358 RWLock::WLocker l2(d->lock);
3359 c->onode_map.clear();
3360 d->onode_map.clear();
3361 c->cnode.bits = bits;
3362 assert(d->cnode.bits == bits);
3366 ::encode(c->cnode, bl);
3367 txc->t->set(PREFIX_COLL, stringify(c->cid), bl);
3369 dout(10) << __func__ << " " << c->cid << " to " << d->cid << " "
3370 << " bits " << bits << " = " << r << dendl;
3374 // ===========================================