Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / memstore / MemStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14 #include "acconfig.h"
15
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
18 #endif
19
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
22 #endif
23
24 #include "include/types.h"
25 #include "include/stringify.h"
26 #include "include/unordered_map.h"
27 #include "include/memory.h"
28 #include "common/errno.h"
29 #include "MemStore.h"
30 #include "include/compat.h"
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_filestore
34 #undef dout_prefix
35 #define dout_prefix *_dout << "memstore(" << path << ") "
36
37 // for comparing collections for lock ordering
38 bool operator>(const MemStore::CollectionRef& l,
39                const MemStore::CollectionRef& r)
40 {
41   return (unsigned long)l.get() > (unsigned long)r.get();
42 }
43
44
45 int MemStore::mount()
46 {
47   int r = _load();
48   if (r < 0)
49     return r;
50   finisher.start();
51   return 0;
52 }
53
54 int MemStore::umount()
55 {
56   finisher.wait_for_empty();
57   finisher.stop();
58   return _save();
59 }
60
61 int MemStore::_save()
62 {
63   dout(10) << __func__ << dendl;
64   dump_all();
65   set<coll_t> collections;
66   for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
67        p != coll_map.end();
68        ++p) {
69     dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
70     collections.insert(p->first);
71     bufferlist bl;
72     assert(p->second);
73     p->second->encode(bl);
74     string fn = path + "/" + stringify(p->first);
75     int r = bl.write_file(fn.c_str());
76     if (r < 0)
77       return r;
78   }
79
80   string fn = path + "/collections";
81   bufferlist bl;
82   ::encode(collections, bl);
83   int r = bl.write_file(fn.c_str());
84   if (r < 0)
85     return r;
86
87   return 0;
88 }
89
90 void MemStore::dump_all()
91 {
92   Formatter *f = Formatter::create("json-pretty");
93   f->open_object_section("store");
94   dump(f);
95   f->close_section();
96   dout(0) << "dump:";
97   f->flush(*_dout);
98   *_dout << dendl;
99   delete f;
100 }
101
102 void MemStore::dump(Formatter *f)
103 {
104   f->open_array_section("collections");
105   for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
106        p != coll_map.end();
107        ++p) {
108     f->open_object_section("collection");
109     f->dump_string("name", stringify(p->first));
110
111     f->open_array_section("xattrs");
112     for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
113          q != p->second->xattr.end();
114          ++q) {
115       f->open_object_section("xattr");
116       f->dump_string("name", q->first);
117       f->dump_int("length", q->second.length());
118       f->close_section();
119     }
120     f->close_section();
121
122     f->open_array_section("objects");
123     for (map<ghobject_t,ObjectRef>::iterator q = p->second->object_map.begin();
124          q != p->second->object_map.end();
125          ++q) {
126       f->open_object_section("object");
127       f->dump_string("name", stringify(q->first));
128       if (q->second)
129         q->second->dump(f);
130       f->close_section();
131     }
132     f->close_section();
133
134     f->close_section();
135   }
136   f->close_section();
137 }
138
139 int MemStore::_load()
140 {
141   dout(10) << __func__ << dendl;
142   bufferlist bl;
143   string fn = path + "/collections";
144   string err;
145   int r = bl.read_file(fn.c_str(), &err);
146   if (r < 0)
147     return r;
148
149   set<coll_t> collections;
150   bufferlist::iterator p = bl.begin();
151   ::decode(collections, p);
152
153   for (set<coll_t>::iterator q = collections.begin();
154        q != collections.end();
155        ++q) {
156     string fn = path + "/" + stringify(*q);
157     bufferlist cbl;
158     int r = cbl.read_file(fn.c_str(), &err);
159     if (r < 0)
160       return r;
161     CollectionRef c(new Collection(cct, *q));
162     bufferlist::iterator p = cbl.begin();
163     c->decode(p);
164     coll_map[*q] = c;
165     used_bytes += c->used_bytes();
166   }
167
168   dump_all();
169
170   return 0;
171 }
172
173 void MemStore::set_fsid(uuid_d u)
174 {
175   int r = write_meta("fs_fsid", stringify(u));
176   assert(r >= 0);
177 }
178
179 uuid_d MemStore::get_fsid()
180 {
181   string fsid_str;
182   int r = read_meta("fs_fsid", &fsid_str);
183   assert(r >= 0);
184   uuid_d uuid;
185   bool b = uuid.parse(fsid_str.c_str());
186   assert(b);
187   return uuid;
188 }
189
190 int MemStore::mkfs()
191 {
192   string fsid_str;
193   int r = read_meta("fs_fsid", &fsid_str);
194   if (r == -ENOENT) {
195     uuid_d fsid;
196     fsid.generate_random();
197     fsid_str = stringify(fsid);
198     r = write_meta("fs_fsid", fsid_str);
199     if (r < 0)
200       return r;
201     dout(1) << __func__ << " new fsid " << fsid_str << dendl;
202   } else if (r < 0) {
203     return r;
204   } else {  
205     dout(1) << __func__ << " had fsid " << fsid_str << dendl;
206   }
207
208   string fn = path + "/collections";
209   derr << path << dendl;
210   bufferlist bl;
211   set<coll_t> collections;
212   ::encode(collections, bl);
213   r = bl.write_file(fn.c_str());
214   if (r < 0)
215     return r;
216
217   r = write_meta("type", "memstore");
218   if (r < 0)
219     return r;
220
221   return 0;
222 }
223
224 int MemStore::statfs(struct store_statfs_t *st)
225 {
226    dout(10) << __func__ << dendl;
227   st->reset();
228   st->total = cct->_conf->memstore_device_bytes;
229   st->available = MAX(int64_t(st->total) - int64_t(used_bytes), 0ll);
230   dout(10) << __func__ << ": used_bytes: " << used_bytes
231            << "/" << cct->_conf->memstore_device_bytes << dendl;
232   return 0;
233 }
234
235 objectstore_perf_stat_t MemStore::get_cur_stats()
236 {
237   // fixme
238   return objectstore_perf_stat_t();
239 }
240
241 MemStore::CollectionRef MemStore::get_collection(const coll_t& cid)
242 {
243   RWLock::RLocker l(coll_lock);
244   ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
245   if (cp == coll_map.end())
246     return CollectionRef();
247   return cp->second;
248 }
249
250
251 // ---------------
252 // read operations
253
254 bool MemStore::exists(const coll_t& cid, const ghobject_t& oid)
255 {
256   CollectionHandle c = get_collection(cid);
257   if (!c)
258     return false;
259   return exists(c, oid);
260 }
261
262 bool MemStore::exists(CollectionHandle &c_, const ghobject_t& oid)
263 {
264   Collection *c = static_cast<Collection*>(c_.get());
265   dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
266   if (!c->exists)
267     return false;
268
269   // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
270   // shared_ptr needs to be compared to nullptr.
271   return (bool)c->get_object(oid);
272 }
273
274 int MemStore::stat(
275     const coll_t& cid,
276     const ghobject_t& oid,
277     struct stat *st,
278     bool allow_eio)
279 {
280   CollectionHandle c = get_collection(cid);
281   if (!c)
282     return -ENOENT;
283   return stat(c, oid, st, allow_eio);
284 }
285
286 int MemStore::stat(
287   CollectionHandle &c_,
288   const ghobject_t& oid,
289   struct stat *st,
290   bool allow_eio)
291 {
292   Collection *c = static_cast<Collection*>(c_.get());
293   dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
294   if (!c->exists)
295     return -ENOENT;
296   ObjectRef o = c->get_object(oid);
297   if (!o)
298     return -ENOENT;
299   st->st_size = o->get_size();
300   st->st_blksize = 4096;
301   st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
302   st->st_nlink = 1;
303   return 0;
304 }
305
306 int MemStore::set_collection_opts(
307   const coll_t& cid,
308   const pool_opts_t& opts)
309 {
310   return -EOPNOTSUPP;
311 }
312
313 int MemStore::read(
314     const coll_t& cid,
315     const ghobject_t& oid,
316     uint64_t offset,
317     size_t len,
318     bufferlist& bl,
319     uint32_t op_flags)
320 {
321   CollectionHandle c = get_collection(cid);
322   if (!c)
323     return -ENOENT;
324   return read(c, oid, offset, len, bl, op_flags);
325 }
326
327 int MemStore::read(
328   CollectionHandle &c_,
329   const ghobject_t& oid,
330   uint64_t offset,
331   size_t len,
332   bufferlist& bl,
333   uint32_t op_flags)
334 {
335   Collection *c = static_cast<Collection*>(c_.get());
336   dout(10) << __func__ << " " << c->cid << " " << oid << " "
337            << offset << "~" << len << dendl;
338   if (!c->exists)
339     return -ENOENT;
340   ObjectRef o = c->get_object(oid);
341   if (!o)
342     return -ENOENT;
343   if (offset >= o->get_size())
344     return 0;
345   size_t l = len;
346   if (l == 0 && offset == 0)  // note: len == 0 means read the entire object
347     l = o->get_size();
348   else if (offset + l > o->get_size())
349     l = o->get_size() - offset;
350   bl.clear();
351   return o->read(offset, l, bl);
352 }
353
354 int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid,
355                      uint64_t offset, size_t len, bufferlist& bl)
356 {
357   map<uint64_t, uint64_t> destmap;
358   int r = fiemap(cid, oid, offset, len, destmap);
359   if (r >= 0)
360     ::encode(destmap, bl);
361   return r;
362 }
363
364 int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid,
365                      uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap)
366 {
367   dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
368            << len << dendl;
369   CollectionRef c = get_collection(cid);
370   if (!c)
371     return -ENOENT;
372
373   ObjectRef o = c->get_object(oid);
374   if (!o)
375     return -ENOENT;
376   size_t l = len;
377   if (offset + l > o->get_size())
378     l = o->get_size() - offset;
379   if (offset >= o->get_size())
380     goto out;
381   destmap[offset] = l;
382  out:
383   return 0;
384 }
385
386 int MemStore::getattr(const coll_t& cid, const ghobject_t& oid,
387                       const char *name, bufferptr& value)
388 {
389   CollectionHandle c = get_collection(cid);
390   if (!c)
391     return -ENOENT;
392   return getattr(c, oid, name, value);
393 }
394
395 int MemStore::getattr(CollectionHandle &c_, const ghobject_t& oid,
396                       const char *name, bufferptr& value)
397 {
398   Collection *c = static_cast<Collection*>(c_.get());
399   dout(10) << __func__ << " " << c->cid << " " << oid << " " << name << dendl;
400   if (!c->exists)
401     return -ENOENT;
402   ObjectRef o = c->get_object(oid);
403   if (!o)
404     return -ENOENT;
405   string k(name);
406   std::lock_guard<std::mutex> lock(o->xattr_mutex);
407   if (!o->xattr.count(k)) {
408     return -ENODATA;
409   }
410   value = o->xattr[k];
411   return 0;
412 }
413
414 int MemStore::getattrs(const coll_t& cid, const ghobject_t& oid,
415                        map<string,bufferptr>& aset)
416 {
417   CollectionHandle c = get_collection(cid);
418   if (!c)
419     return -ENOENT;
420   return getattrs(c, oid, aset);
421 }
422
423 int MemStore::getattrs(CollectionHandle &c_, const ghobject_t& oid,
424                        map<string,bufferptr>& aset)
425 {
426   Collection *c = static_cast<Collection*>(c_.get());
427   dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
428   if (!c->exists)
429     return -ENOENT;
430
431   ObjectRef o = c->get_object(oid);
432   if (!o)
433     return -ENOENT;
434   std::lock_guard<std::mutex> lock(o->xattr_mutex);
435   aset = o->xattr;
436   return 0;
437 }
438
439 int MemStore::list_collections(vector<coll_t>& ls)
440 {
441   dout(10) << __func__ << dendl;
442   RWLock::RLocker l(coll_lock);
443   for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
444        p != coll_map.end();
445        ++p) {
446     ls.push_back(p->first);
447   }
448   return 0;
449 }
450
451 bool MemStore::collection_exists(const coll_t& cid)
452 {
453   dout(10) << __func__ << " " << cid << dendl;
454   RWLock::RLocker l(coll_lock);
455   return coll_map.count(cid);
456 }
457
458 int MemStore::collection_empty(const coll_t& cid, bool *empty)
459 {
460   dout(10) << __func__ << " " << cid << dendl;
461   CollectionRef c = get_collection(cid);
462   if (!c)
463     return -ENOENT;
464   RWLock::RLocker l(c->lock);
465   *empty = c->object_map.empty();
466   return 0;
467 }
468
469 int MemStore::collection_bits(const coll_t& cid)
470 {
471   dout(10) << __func__ << " " << cid << dendl;
472   CollectionRef c = get_collection(cid);
473   if (!c)
474     return -ENOENT;
475   RWLock::RLocker l(c->lock);
476   return c->bits;
477 }
478
479 int MemStore::collection_list(const coll_t& cid,
480                               const ghobject_t& start,
481                               const ghobject_t& end,
482                               int max,
483                               vector<ghobject_t> *ls, ghobject_t *next)
484 {
485   CollectionRef c = get_collection(cid);
486   if (!c)
487     return -ENOENT;
488   RWLock::RLocker l(c->lock);
489
490   dout(10) << __func__ << " cid " << cid << " start " << start
491            << " end " << end << dendl;
492   map<ghobject_t,ObjectRef>::iterator p = c->object_map.lower_bound(start);
493   while (p != c->object_map.end() &&
494          ls->size() < (unsigned)max &&
495          p->first < end) {
496     ls->push_back(p->first);
497     ++p;
498   }
499   if (next != NULL) {
500     if (p == c->object_map.end())
501       *next = ghobject_t::get_max();
502     else
503       *next = p->first;
504   }
505   dout(10) << __func__ << " cid " << cid << " got " << ls->size() << dendl;
506   return 0;
507 }
508
509 int MemStore::omap_get(
510     const coll_t& cid,                ///< [in] Collection containing oid
511     const ghobject_t &oid,   ///< [in] Object containing omap
512     bufferlist *header,      ///< [out] omap header
513     map<string, bufferlist> *out /// < [out] Key to value map
514     )
515 {
516   dout(10) << __func__ << " " << cid << " " << oid << dendl;
517   CollectionRef c = get_collection(cid);
518   if (!c)
519     return -ENOENT;
520
521   ObjectRef o = c->get_object(oid);
522   if (!o)
523     return -ENOENT;
524   std::lock_guard<std::mutex> lock(o->omap_mutex);
525   *header = o->omap_header;
526   *out = o->omap;
527   return 0;
528 }
529
530 int MemStore::omap_get_header(
531     const coll_t& cid,                ///< [in] Collection containing oid
532     const ghobject_t &oid,   ///< [in] Object containing omap
533     bufferlist *header,      ///< [out] omap header
534     bool allow_eio ///< [in] don't assert on eio
535     )
536 {
537   dout(10) << __func__ << " " << cid << " " << oid << dendl;
538   CollectionRef c = get_collection(cid);
539   if (!c)
540     return -ENOENT;
541
542   ObjectRef o = c->get_object(oid);
543   if (!o)
544     return -ENOENT;
545   std::lock_guard<std::mutex> lock(o->omap_mutex);
546   *header = o->omap_header;
547   return 0;
548 }
549
550 int MemStore::omap_get_keys(
551     const coll_t& cid,              ///< [in] Collection containing oid
552     const ghobject_t &oid, ///< [in] Object containing omap
553     set<string> *keys      ///< [out] Keys defined on oid
554     )
555 {
556   dout(10) << __func__ << " " << cid << " " << oid << dendl;
557   CollectionRef c = get_collection(cid);
558   if (!c)
559     return -ENOENT;
560
561   ObjectRef o = c->get_object(oid);
562   if (!o)
563     return -ENOENT;
564   std::lock_guard<std::mutex> lock(o->omap_mutex);
565   for (map<string,bufferlist>::iterator p = o->omap.begin();
566        p != o->omap.end();
567        ++p)
568     keys->insert(p->first);
569   return 0;
570 }
571
572 int MemStore::omap_get_values(
573     const coll_t& cid,                    ///< [in] Collection containing oid
574     const ghobject_t &oid,       ///< [in] Object containing omap
575     const set<string> &keys,     ///< [in] Keys to get
576     map<string, bufferlist> *out ///< [out] Returned keys and values
577     )
578 {
579   dout(10) << __func__ << " " << cid << " " << oid << dendl;
580   CollectionRef c = get_collection(cid);
581   if (!c)
582     return -ENOENT;
583
584   ObjectRef o = c->get_object(oid);
585   if (!o)
586     return -ENOENT;
587   std::lock_guard<std::mutex> lock(o->omap_mutex);
588   for (set<string>::const_iterator p = keys.begin();
589        p != keys.end();
590        ++p) {
591     map<string,bufferlist>::iterator q = o->omap.find(*p);
592     if (q != o->omap.end())
593       out->insert(*q);
594   }
595   return 0;
596 }
597
598 int MemStore::omap_check_keys(
599     const coll_t& cid,                ///< [in] Collection containing oid
600     const ghobject_t &oid,   ///< [in] Object containing omap
601     const set<string> &keys, ///< [in] Keys to check
602     set<string> *out         ///< [out] Subset of keys defined on oid
603     )
604 {
605   dout(10) << __func__ << " " << cid << " " << oid << dendl;
606   CollectionRef c = get_collection(cid);
607   if (!c)
608     return -ENOENT;
609
610   ObjectRef o = c->get_object(oid);
611   if (!o)
612     return -ENOENT;
613   std::lock_guard<std::mutex> lock(o->omap_mutex);
614   for (set<string>::const_iterator p = keys.begin();
615        p != keys.end();
616        ++p) {
617     map<string,bufferlist>::iterator q = o->omap.find(*p);
618     if (q != o->omap.end())
619       out->insert(*p);
620   }
621   return 0;
622 }
623
624 class MemStore::OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
625   CollectionRef c;
626   ObjectRef o;
627   map<string,bufferlist>::iterator it;
628 public:
629   OmapIteratorImpl(CollectionRef c, ObjectRef o)
630     : c(c), o(o), it(o->omap.begin()) {}
631
632   int seek_to_first() override {
633     std::lock_guard<std::mutex>(o->omap_mutex);
634     it = o->omap.begin();
635     return 0;
636   }
637   int upper_bound(const string &after) override {
638     std::lock_guard<std::mutex>(o->omap_mutex);
639     it = o->omap.upper_bound(after);
640     return 0;
641   }
642   int lower_bound(const string &to) override {
643     std::lock_guard<std::mutex>(o->omap_mutex);
644     it = o->omap.lower_bound(to);
645     return 0;
646   }
647   bool valid() override {
648     std::lock_guard<std::mutex>(o->omap_mutex);
649     return it != o->omap.end();
650   }
651   int next(bool validate=true) override {
652     std::lock_guard<std::mutex>(o->omap_mutex);
653     ++it;
654     return 0;
655   }
656   string key() override {
657     std::lock_guard<std::mutex>(o->omap_mutex);
658     return it->first;
659   }
660   bufferlist value() override {
661     std::lock_guard<std::mutex>(o->omap_mutex);
662     return it->second;
663   }
664   int status() override {
665     return 0;
666   }
667 };
668
669 ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(const coll_t& cid,
670                                                          const ghobject_t& oid)
671 {
672   dout(10) << __func__ << " " << cid << " " << oid << dendl;
673   CollectionRef c = get_collection(cid);
674   if (!c)
675     return ObjectMap::ObjectMapIterator();
676
677   ObjectRef o = c->get_object(oid);
678   if (!o)
679     return ObjectMap::ObjectMapIterator();
680   return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
681 }
682
683
684 // ---------------
685 // write operations
686
687 int MemStore::queue_transactions(Sequencer *osr,
688                                  vector<Transaction>& tls,
689                                  TrackedOpRef op,
690                                  ThreadPool::TPHandle *handle)
691 {
692   // because memstore operations are synchronous, we can implement the
693   // Sequencer with a mutex. this guarantees ordering on a given sequencer,
694   // while allowing operations on different sequencers to happen in parallel
695   struct OpSequencer : public Sequencer_impl {
696     OpSequencer(CephContext* cct) :
697       Sequencer_impl(cct) {}
698     std::mutex mutex;
699     void flush() override {}
700     bool flush_commit(Context*) override { return true; }
701   };
702
703   std::unique_lock<std::mutex> lock;
704   if (osr) {
705     if (!osr->p) {
706       osr->p = new OpSequencer(cct);
707     }
708     auto seq = static_cast<OpSequencer*>(osr->p.get());
709     lock = std::unique_lock<std::mutex>(seq->mutex);
710   }
711
712   for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
713     // poke the TPHandle heartbeat just to exercise that code path
714     if (handle)
715       handle->reset_tp_timeout();
716
717     _do_transaction(*p);
718   }
719
720   Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
721   ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
722                                              &on_apply_sync);
723   if (on_apply_sync)
724     on_apply_sync->complete(0);
725   if (on_apply)
726     finisher.queue(on_apply);
727   if (on_commit)
728     finisher.queue(on_commit);
729   return 0;
730 }
731
732 void MemStore::_do_transaction(Transaction& t)
733 {
734   Transaction::iterator i = t.begin();
735   int pos = 0;
736
737   while (i.have_op()) {
738     Transaction::Op *op = i.decode_op();
739     int r = 0;
740
741     switch (op->op) {
742     case Transaction::OP_NOP:
743       break;
744     case Transaction::OP_TOUCH:
745       {
746         coll_t cid = i.get_cid(op->cid);
747         ghobject_t oid = i.get_oid(op->oid);
748         r = _touch(cid, oid);
749       }
750       break;
751
752     case Transaction::OP_WRITE:
753       {
754         coll_t cid = i.get_cid(op->cid);
755         ghobject_t oid = i.get_oid(op->oid);
756         uint64_t off = op->off;
757         uint64_t len = op->len;
758         uint32_t fadvise_flags = i.get_fadvise_flags();
759         bufferlist bl;
760         i.decode_bl(bl);
761         r = _write(cid, oid, off, len, bl, fadvise_flags);
762       }
763       break;
764
765     case Transaction::OP_ZERO:
766       {
767         coll_t cid = i.get_cid(op->cid);
768         ghobject_t oid = i.get_oid(op->oid);
769         uint64_t off = op->off;
770         uint64_t len = op->len;
771         r = _zero(cid, oid, off, len);
772       }
773       break;
774
775     case Transaction::OP_TRIMCACHE:
776       {
777         // deprecated, no-op
778       }
779       break;
780
781     case Transaction::OP_TRUNCATE:
782       {
783         coll_t cid = i.get_cid(op->cid);
784         ghobject_t oid = i.get_oid(op->oid);
785         uint64_t off = op->off;
786         r = _truncate(cid, oid, off);
787       }
788       break;
789
790     case Transaction::OP_REMOVE:
791       {
792         coll_t cid = i.get_cid(op->cid);
793         ghobject_t oid = i.get_oid(op->oid);
794         r = _remove(cid, oid);
795       }
796       break;
797
798     case Transaction::OP_SETATTR:
799       {
800         coll_t cid = i.get_cid(op->cid);
801         ghobject_t oid = i.get_oid(op->oid);
802         string name = i.decode_string();
803         bufferlist bl;
804         i.decode_bl(bl);
805         map<string, bufferptr> to_set;
806         to_set[name] = bufferptr(bl.c_str(), bl.length());
807         r = _setattrs(cid, oid, to_set);
808       }
809       break;
810
811     case Transaction::OP_SETATTRS:
812       {
813         coll_t cid = i.get_cid(op->cid);
814         ghobject_t oid = i.get_oid(op->oid);
815         map<string, bufferptr> aset;
816         i.decode_attrset(aset);
817         r = _setattrs(cid, oid, aset);
818       }
819       break;
820
821     case Transaction::OP_RMATTR:
822       {
823         coll_t cid = i.get_cid(op->cid);
824         ghobject_t oid = i.get_oid(op->oid);
825         string name = i.decode_string();
826         r = _rmattr(cid, oid, name.c_str());
827       }
828       break;
829
830     case Transaction::OP_RMATTRS:
831       {
832         coll_t cid = i.get_cid(op->cid);
833         ghobject_t oid = i.get_oid(op->oid);
834         r = _rmattrs(cid, oid);
835       }
836       break;
837
838     case Transaction::OP_CLONE:
839       {
840         coll_t cid = i.get_cid(op->cid);
841         ghobject_t oid = i.get_oid(op->oid);
842         ghobject_t noid = i.get_oid(op->dest_oid);
843         r = _clone(cid, oid, noid);
844       }
845       break;
846
847     case Transaction::OP_CLONERANGE:
848       {
849         coll_t cid = i.get_cid(op->cid);
850         ghobject_t oid = i.get_oid(op->oid);
851         ghobject_t noid = i.get_oid(op->dest_oid);
852         uint64_t off = op->off;
853         uint64_t len = op->len;
854         r = _clone_range(cid, oid, noid, off, len, off);
855       }
856       break;
857
858     case Transaction::OP_CLONERANGE2:
859       {
860         coll_t cid = i.get_cid(op->cid);
861         ghobject_t oid = i.get_oid(op->oid);
862         ghobject_t noid = i.get_oid(op->dest_oid);
863         uint64_t srcoff = op->off;
864         uint64_t len = op->len;
865         uint64_t dstoff = op->dest_off;
866         r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
867       }
868       break;
869
870     case Transaction::OP_MKCOLL:
871       {
872         coll_t cid = i.get_cid(op->cid);
873         r = _create_collection(cid, op->split_bits);
874       }
875       break;
876
877     case Transaction::OP_COLL_HINT:
878       {
879         coll_t cid = i.get_cid(op->cid);
880         uint32_t type = op->hint_type;
881         bufferlist hint;
882         i.decode_bl(hint);
883         bufferlist::iterator hiter = hint.begin();
884         if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
885           uint32_t pg_num;
886           uint64_t num_objs;
887           ::decode(pg_num, hiter);
888           ::decode(num_objs, hiter);
889           r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
890         } else {
891           // Ignore the hint
892           dout(10) << "Unrecognized collection hint type: " << type << dendl;
893         }
894       }
895       break;
896
897     case Transaction::OP_RMCOLL:
898       {
899         coll_t cid = i.get_cid(op->cid);
900         r = _destroy_collection(cid);
901       }
902       break;
903
904     case Transaction::OP_COLL_ADD:
905       {
906         coll_t ocid = i.get_cid(op->cid);
907         coll_t ncid = i.get_cid(op->dest_cid);
908         ghobject_t oid = i.get_oid(op->oid);
909         r = _collection_add(ncid, ocid, oid);
910       }
911       break;
912
913     case Transaction::OP_COLL_REMOVE:
914        {
915         coll_t cid = i.get_cid(op->cid);
916         ghobject_t oid = i.get_oid(op->oid);
917         r = _remove(cid, oid);
918        }
919       break;
920
921     case Transaction::OP_COLL_MOVE:
922       assert(0 == "deprecated");
923       break;
924
925     case Transaction::OP_COLL_MOVE_RENAME:
926       {
927         coll_t oldcid = i.get_cid(op->cid);
928         ghobject_t oldoid = i.get_oid(op->oid);
929         coll_t newcid = i.get_cid(op->dest_cid);
930         ghobject_t newoid = i.get_oid(op->dest_oid);
931         r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
932         if (r == -ENOENT)
933           r = 0;
934       }
935       break;
936
937     case Transaction::OP_TRY_RENAME:
938       {
939         coll_t cid = i.get_cid(op->cid);
940         ghobject_t oldoid = i.get_oid(op->oid);
941         ghobject_t newoid = i.get_oid(op->dest_oid);
942         r = _collection_move_rename(cid, oldoid, cid, newoid);
943         if (r == -ENOENT)
944           r = 0;
945       }
946       break;
947
948     case Transaction::OP_COLL_SETATTR:
949       {
950         assert(0 == "not implemented");
951       }
952       break;
953
954     case Transaction::OP_COLL_RMATTR:
955       {
956         assert(0 == "not implemented");
957       }
958       break;
959
960     case Transaction::OP_COLL_RENAME:
961       {
962         assert(0 == "not implemented");
963       }
964       break;
965
966     case Transaction::OP_OMAP_CLEAR:
967       {
968         coll_t cid = i.get_cid(op->cid);
969         ghobject_t oid = i.get_oid(op->oid);
970         r = _omap_clear(cid, oid);
971       }
972       break;
973     case Transaction::OP_OMAP_SETKEYS:
974       {
975         coll_t cid = i.get_cid(op->cid);
976         ghobject_t oid = i.get_oid(op->oid);
977         bufferlist aset_bl;
978         i.decode_attrset_bl(&aset_bl);
979         r = _omap_setkeys(cid, oid, aset_bl);
980       }
981       break;
982     case Transaction::OP_OMAP_RMKEYS:
983       {
984         coll_t cid = i.get_cid(op->cid);
985         ghobject_t oid = i.get_oid(op->oid);
986         bufferlist keys_bl;
987         i.decode_keyset_bl(&keys_bl);
988         r = _omap_rmkeys(cid, oid, keys_bl);
989       }
990       break;
991     case Transaction::OP_OMAP_RMKEYRANGE:
992       {
993         coll_t cid = i.get_cid(op->cid);
994         ghobject_t oid = i.get_oid(op->oid);
995         string first, last;
996         first = i.decode_string();
997         last = i.decode_string();
998         r = _omap_rmkeyrange(cid, oid, first, last);
999       }
1000       break;
1001     case Transaction::OP_OMAP_SETHEADER:
1002       {
1003         coll_t cid = i.get_cid(op->cid);
1004         ghobject_t oid = i.get_oid(op->oid);
1005         bufferlist bl;
1006         i.decode_bl(bl);
1007         r = _omap_setheader(cid, oid, bl);
1008       }
1009       break;
1010     case Transaction::OP_SPLIT_COLLECTION:
1011       assert(0 == "deprecated");
1012       break;
1013     case Transaction::OP_SPLIT_COLLECTION2:
1014       {
1015         coll_t cid = i.get_cid(op->cid);
1016         uint32_t bits = op->split_bits;
1017         uint32_t rem = op->split_rem;
1018         coll_t dest = i.get_cid(op->dest_cid);
1019         r = _split_collection(cid, bits, rem, dest);
1020       }
1021       break;
1022
1023     case Transaction::OP_SETALLOCHINT:
1024       {
1025         r = 0;
1026       }
1027       break;
1028
1029     default:
1030       derr << "bad op " << op->op << dendl;
1031       ceph_abort();
1032     }
1033
1034     if (r < 0) {
1035       bool ok = false;
1036
1037       if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
1038                             op->op == Transaction::OP_CLONE ||
1039                             op->op == Transaction::OP_CLONERANGE2 ||
1040                             op->op == Transaction::OP_COLL_ADD))
1041         // -ENOENT is usually okay
1042         ok = true;
1043       if (r == -ENODATA)
1044         ok = true;
1045
1046       if (!ok) {
1047         const char *msg = "unexpected error code";
1048
1049         if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
1050                              op->op == Transaction::OP_CLONE ||
1051                              op->op == Transaction::OP_CLONERANGE2))
1052           msg = "ENOENT on clone suggests osd bug";
1053
1054         if (r == -ENOSPC)
1055           // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1056           // by partially applying transactions.
1057           msg = "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1058
1059         if (r == -ENOTEMPTY) {
1060           msg = "ENOTEMPTY suggests garbage data in osd data dir";
1061           dump_all();
1062         }
1063
1064         derr    << " error " << cpp_strerror(r) << " not handled on operation " << op->op
1065                 << " (op " << pos << ", counting from 0)" << dendl;
1066         dout(0) << msg << dendl;
1067         dout(0) << " transaction dump:\n";
1068         JSONFormatter f(true);
1069         f.open_object_section("transaction");
1070         t.dump(&f);
1071         f.close_section();
1072         f.flush(*_dout);
1073         *_dout << dendl;
1074         assert(0 == "unexpected error");
1075       }
1076     }
1077
1078     ++pos;
1079   }
1080 }
1081
1082 int MemStore::_touch(const coll_t& cid, const ghobject_t& oid)
1083 {
1084   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1085   CollectionRef c = get_collection(cid);
1086   if (!c)
1087     return -ENOENT;
1088
1089   c->get_or_create_object(oid);
1090   return 0;
1091 }
1092
1093 int MemStore::_write(const coll_t& cid, const ghobject_t& oid,
1094                      uint64_t offset, size_t len, const bufferlist& bl,
1095                      uint32_t fadvise_flags)
1096 {
1097   dout(10) << __func__ << " " << cid << " " << oid << " "
1098            << offset << "~" << len << dendl;
1099   assert(len == bl.length());
1100
1101   CollectionRef c = get_collection(cid);
1102   if (!c)
1103     return -ENOENT;
1104
1105   ObjectRef o = c->get_or_create_object(oid);
1106   if (len > 0) {
1107     const ssize_t old_size = o->get_size();
1108     o->write(offset, bl);
1109     used_bytes += (o->get_size() - old_size);
1110   }
1111
1112   return 0;
1113 }
1114
1115 int MemStore::_zero(const coll_t& cid, const ghobject_t& oid,
1116                     uint64_t offset, size_t len)
1117 {
1118   dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
1119            << len << dendl;
1120   bufferlist bl;
1121   bl.append_zero(len);
1122   return _write(cid, oid, offset, len, bl);
1123 }
1124
1125 int MemStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
1126 {
1127   dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
1128   CollectionRef c = get_collection(cid);
1129   if (!c)
1130     return -ENOENT;
1131
1132   ObjectRef o = c->get_object(oid);
1133   if (!o)
1134     return -ENOENT;
1135   const ssize_t old_size = o->get_size();
1136   int r = o->truncate(size);
1137   used_bytes += (o->get_size() - old_size);
1138   return r;
1139 }
1140
1141 int MemStore::_remove(const coll_t& cid, const ghobject_t& oid)
1142 {
1143   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1144   CollectionRef c = get_collection(cid);
1145   if (!c)
1146     return -ENOENT;
1147   RWLock::WLocker l(c->lock);
1148
1149   auto i = c->object_hash.find(oid);
1150   if (i == c->object_hash.end())
1151     return -ENOENT;
1152   used_bytes -= i->second->get_size();
1153   c->object_hash.erase(i);
1154   c->object_map.erase(oid);
1155
1156   return 0;
1157 }
1158
1159 int MemStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
1160                         map<string,bufferptr>& aset)
1161 {
1162   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1163   CollectionRef c = get_collection(cid);
1164   if (!c)
1165     return -ENOENT;
1166
1167   ObjectRef o = c->get_object(oid);
1168   if (!o)
1169     return -ENOENT;
1170   std::lock_guard<std::mutex> lock(o->xattr_mutex);
1171   for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
1172     o->xattr[p->first] = p->second;
1173   return 0;
1174 }
1175
1176 int MemStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name)
1177 {
1178   dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1179   CollectionRef c = get_collection(cid);
1180   if (!c)
1181     return -ENOENT;
1182
1183   ObjectRef o = c->get_object(oid);
1184   if (!o)
1185     return -ENOENT;
1186   std::lock_guard<std::mutex> lock(o->xattr_mutex);
1187   auto i = o->xattr.find(name);
1188   if (i == o->xattr.end())
1189     return -ENODATA;
1190   o->xattr.erase(i);
1191   return 0;
1192 }
1193
1194 int MemStore::_rmattrs(const coll_t& cid, const ghobject_t& oid)
1195 {
1196   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1197   CollectionRef c = get_collection(cid);
1198   if (!c)
1199     return -ENOENT;
1200
1201   ObjectRef o = c->get_object(oid);
1202   if (!o)
1203     return -ENOENT;
1204   std::lock_guard<std::mutex> lock(o->xattr_mutex);
1205   o->xattr.clear();
1206   return 0;
1207 }
1208
1209 int MemStore::_clone(const coll_t& cid, const ghobject_t& oldoid,
1210                      const ghobject_t& newoid)
1211 {
1212   dout(10) << __func__ << " " << cid << " " << oldoid
1213            << " -> " << newoid << dendl;
1214   CollectionRef c = get_collection(cid);
1215   if (!c)
1216     return -ENOENT;
1217
1218   ObjectRef oo = c->get_object(oldoid);
1219   if (!oo)
1220     return -ENOENT;
1221   ObjectRef no = c->get_or_create_object(newoid);
1222   used_bytes += oo->get_size() - no->get_size();
1223   no->clone(oo.get(), 0, oo->get_size(), 0);
1224
1225   // take xattr and omap locks with std::lock()
1226   std::unique_lock<std::mutex>
1227       ox_lock(oo->xattr_mutex, std::defer_lock),
1228       nx_lock(no->xattr_mutex, std::defer_lock),
1229       oo_lock(oo->omap_mutex, std::defer_lock),
1230       no_lock(no->omap_mutex, std::defer_lock);
1231   std::lock(ox_lock, nx_lock, oo_lock, no_lock);
1232
1233   no->omap_header = oo->omap_header;
1234   no->omap = oo->omap;
1235   no->xattr = oo->xattr;
1236   return 0;
1237 }
1238
1239 int MemStore::_clone_range(const coll_t& cid, const ghobject_t& oldoid,
1240                            const ghobject_t& newoid,
1241                            uint64_t srcoff, uint64_t len, uint64_t dstoff)
1242 {
1243   dout(10) << __func__ << " " << cid << " "
1244            << oldoid << " " << srcoff << "~" << len << " -> "
1245            << newoid << " " << dstoff << "~" << len
1246            << dendl;
1247   CollectionRef c = get_collection(cid);
1248   if (!c)
1249     return -ENOENT;
1250
1251   ObjectRef oo = c->get_object(oldoid);
1252   if (!oo)
1253     return -ENOENT;
1254   ObjectRef no = c->get_or_create_object(newoid);
1255   if (srcoff >= oo->get_size())
1256     return 0;
1257   if (srcoff + len >= oo->get_size())
1258     len = oo->get_size() - srcoff;
1259
1260   const ssize_t old_size = no->get_size();
1261   no->clone(oo.get(), srcoff, len, dstoff);
1262   used_bytes += (no->get_size() - old_size);
1263
1264   return len;
1265 }
1266
1267 int MemStore::_omap_clear(const coll_t& cid, const ghobject_t &oid)
1268 {
1269   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1270   CollectionRef c = get_collection(cid);
1271   if (!c)
1272     return -ENOENT;
1273
1274   ObjectRef o = c->get_object(oid);
1275   if (!o)
1276     return -ENOENT;
1277   std::lock_guard<std::mutex> lock(o->omap_mutex);
1278   o->omap.clear();
1279   o->omap_header.clear();
1280   return 0;
1281 }
1282
1283 int MemStore::_omap_setkeys(const coll_t& cid, const ghobject_t &oid,
1284                             bufferlist& aset_bl)
1285 {
1286   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1287   CollectionRef c = get_collection(cid);
1288   if (!c)
1289     return -ENOENT;
1290
1291   ObjectRef o = c->get_object(oid);
1292   if (!o)
1293     return -ENOENT;
1294   std::lock_guard<std::mutex> lock(o->omap_mutex);
1295   bufferlist::iterator p = aset_bl.begin();
1296   __u32 num;
1297   ::decode(num, p);
1298   while (num--) {
1299     string key;
1300     ::decode(key, p);
1301     ::decode(o->omap[key], p);
1302   }
1303   return 0;
1304 }
1305
1306 int MemStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &oid,
1307                            bufferlist& keys_bl)
1308 {
1309   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1310   CollectionRef c = get_collection(cid);
1311   if (!c)
1312     return -ENOENT;
1313
1314   ObjectRef o = c->get_object(oid);
1315   if (!o)
1316     return -ENOENT;
1317   std::lock_guard<std::mutex> lock(o->omap_mutex);
1318   bufferlist::iterator p = keys_bl.begin();
1319   __u32 num;
1320   ::decode(num, p);
1321   while (num--) {
1322     string key;
1323     ::decode(key, p);
1324     o->omap.erase(key);
1325   }
1326   return 0;
1327 }
1328
1329 int MemStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
1330                                const string& first, const string& last)
1331 {
1332   dout(10) << __func__ << " " << cid << " " << oid << " " << first
1333            << " " << last << dendl;
1334   CollectionRef c = get_collection(cid);
1335   if (!c)
1336     return -ENOENT;
1337
1338   ObjectRef o = c->get_object(oid);
1339   if (!o)
1340     return -ENOENT;
1341   std::lock_guard<std::mutex> lock(o->omap_mutex);
1342   map<string,bufferlist>::iterator p = o->omap.lower_bound(first);
1343   map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
1344   o->omap.erase(p, e);
1345   return 0;
1346 }
1347
1348 int MemStore::_omap_setheader(const coll_t& cid, const ghobject_t &oid,
1349                               const bufferlist &bl)
1350 {
1351   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1352   CollectionRef c = get_collection(cid);
1353   if (!c)
1354     return -ENOENT;
1355
1356   ObjectRef o = c->get_object(oid);
1357   if (!o)
1358     return -ENOENT;
1359   std::lock_guard<std::mutex> lock(o->omap_mutex);
1360   o->omap_header = bl;
1361   return 0;
1362 }
1363
1364 int MemStore::_create_collection(const coll_t& cid, int bits)
1365 {
1366   dout(10) << __func__ << " " << cid << dendl;
1367   RWLock::WLocker l(coll_lock);
1368   auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
1369   if (!result.second)
1370     return -EEXIST;
1371   result.first->second.reset(new Collection(cct, cid));
1372   result.first->second->bits = bits;
1373   return 0;
1374 }
1375
1376 int MemStore::_destroy_collection(const coll_t& cid)
1377 {
1378   dout(10) << __func__ << " " << cid << dendl;
1379   RWLock::WLocker l(coll_lock);
1380   ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1381   if (cp == coll_map.end())
1382     return -ENOENT;
1383   {
1384     RWLock::RLocker l2(cp->second->lock);
1385     if (!cp->second->object_map.empty())
1386       return -ENOTEMPTY;
1387     cp->second->exists = false;
1388   }
1389   used_bytes -= cp->second->used_bytes();
1390   coll_map.erase(cp);
1391   return 0;
1392 }
1393
1394 int MemStore::_collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid)
1395 {
1396   dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
1397   CollectionRef c = get_collection(cid);
1398   if (!c)
1399     return -ENOENT;
1400   CollectionRef oc = get_collection(ocid);
1401   if (!oc)
1402     return -ENOENT;
1403   RWLock::WLocker l1(MIN(&(*c), &(*oc))->lock);
1404   RWLock::WLocker l2(MAX(&(*c), &(*oc))->lock);
1405
1406   if (c->object_hash.count(oid))
1407     return -EEXIST;
1408   if (oc->object_hash.count(oid) == 0)
1409     return -ENOENT;
1410   ObjectRef o = oc->object_hash[oid];
1411   c->object_map[oid] = o;
1412   c->object_hash[oid] = o;
1413   return 0;
1414 }
1415
1416 int MemStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
1417                                       coll_t cid, const ghobject_t& oid)
1418 {
1419   dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
1420            << cid << " " << oid << dendl;
1421   CollectionRef c = get_collection(cid);
1422   if (!c)
1423     return -ENOENT;
1424   CollectionRef oc = get_collection(oldcid);
1425   if (!oc)
1426     return -ENOENT;
1427
1428   // note: c and oc may be the same
1429   assert(&(*c) == &(*oc));
1430   c->lock.get_write();
1431
1432   int r = -EEXIST;
1433   if (c->object_hash.count(oid))
1434     goto out;
1435   r = -ENOENT;
1436   if (oc->object_hash.count(oldoid) == 0)
1437     goto out;
1438   {
1439     ObjectRef o = oc->object_hash[oldoid];
1440     c->object_map[oid] = o;
1441     c->object_hash[oid] = o;
1442     oc->object_map.erase(oldoid);
1443     oc->object_hash.erase(oldoid);
1444   }
1445   r = 0;
1446  out:
1447   c->lock.put_write();
1448   return r;
1449 }
1450
1451 int MemStore::_split_collection(const coll_t& cid, uint32_t bits, uint32_t match,
1452                                 coll_t dest)
1453 {
1454   dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
1455            << dest << dendl;
1456   CollectionRef sc = get_collection(cid);
1457   if (!sc)
1458     return -ENOENT;
1459   CollectionRef dc = get_collection(dest);
1460   if (!dc)
1461     return -ENOENT;
1462   RWLock::WLocker l1(MIN(&(*sc), &(*dc))->lock);
1463   RWLock::WLocker l2(MAX(&(*sc), &(*dc))->lock);
1464
1465   map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
1466   while (p != sc->object_map.end()) {
1467     if (p->first.match(bits, match)) {
1468       dout(20) << " moving " << p->first << dendl;
1469       dc->object_map.insert(make_pair(p->first, p->second));
1470       dc->object_hash.insert(make_pair(p->first, p->second));
1471       sc->object_hash.erase(p->first);
1472       sc->object_map.erase(p++);
1473     } else {
1474       ++p;
1475     }
1476   }
1477
1478   sc->bits = bits;
1479   assert(dc->bits == (int)bits);
1480
1481   return 0;
1482 }
1483 namespace {
1484 struct BufferlistObject : public MemStore::Object {
1485   Spinlock mutex;
1486   bufferlist data;
1487
1488   size_t get_size() const override { return data.length(); }
1489
1490   int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1491   int write(uint64_t offset, const bufferlist &bl) override;
1492   int clone(Object *src, uint64_t srcoff, uint64_t len,
1493             uint64_t dstoff) override;
1494   int truncate(uint64_t offset) override;
1495
1496   void encode(bufferlist& bl) const override {
1497     ENCODE_START(1, 1, bl);
1498     ::encode(data, bl);
1499     encode_base(bl);
1500     ENCODE_FINISH(bl);
1501   }
1502   void decode(bufferlist::iterator& p) override {
1503     DECODE_START(1, p);
1504     ::decode(data, p);
1505     decode_base(p);
1506     DECODE_FINISH(p);
1507   }
1508 };
1509 }
1510 // BufferlistObject
1511 int BufferlistObject::read(uint64_t offset, uint64_t len,
1512                                      bufferlist &bl)
1513 {
1514   std::lock_guard<Spinlock> lock(mutex);
1515   bl.substr_of(data, offset, len);
1516   return bl.length();
1517 }
1518
1519 int BufferlistObject::write(uint64_t offset, const bufferlist &src)
1520 {
1521   unsigned len = src.length();
1522
1523   std::lock_guard<Spinlock> lock(mutex);
1524
1525   // before
1526   bufferlist newdata;
1527   if (get_size() >= offset) {
1528     newdata.substr_of(data, 0, offset);
1529   } else {
1530     if (get_size()) {
1531       newdata.substr_of(data, 0, get_size());
1532     }
1533     newdata.append_zero(offset - get_size());
1534   }
1535
1536   newdata.append(src);
1537
1538   // after
1539   if (get_size() > offset + len) {
1540     bufferlist tail;
1541     tail.substr_of(data, offset + len, get_size() - (offset + len));
1542     newdata.append(tail);
1543   }
1544
1545   data.claim(newdata);
1546   return 0;
1547 }
1548
1549 int BufferlistObject::clone(Object *src, uint64_t srcoff,
1550                                       uint64_t len, uint64_t dstoff)
1551 {
1552   auto srcbl = dynamic_cast<BufferlistObject*>(src);
1553   if (srcbl == nullptr)
1554     return -ENOTSUP;
1555
1556   bufferlist bl;
1557   {
1558     std::lock_guard<Spinlock> lock(srcbl->mutex);
1559     if (srcoff == dstoff && len == src->get_size()) {
1560       data = srcbl->data;
1561       return 0;
1562     }
1563     bl.substr_of(srcbl->data, srcoff, len);
1564   }
1565   return write(dstoff, bl);
1566 }
1567
1568 int BufferlistObject::truncate(uint64_t size)
1569 {
1570   std::lock_guard<Spinlock> lock(mutex);
1571   if (get_size() > size) {
1572     bufferlist bl;
1573     bl.substr_of(data, 0, size);
1574     data.claim(bl);
1575   } else if (get_size() == size) {
1576     // do nothing
1577   } else {
1578     data.append_zero(size - get_size());
1579   }
1580   return 0;
1581 }
1582
1583 // PageSetObject
1584
1585 struct MemStore::PageSetObject : public Object {
1586   PageSet data;
1587   uint64_t data_len;
1588 #if defined(__GLIBCXX__)
1589   // use a thread-local vector for the pages returned by PageSet, so we
1590   // can avoid allocations in read/write()
1591   static thread_local PageSet::page_vector tls_pages;
1592 #endif
1593
1594   explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
1595
1596   size_t get_size() const override { return data_len; }
1597
1598   int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1599   int write(uint64_t offset, const bufferlist &bl) override;
1600   int clone(Object *src, uint64_t srcoff, uint64_t len,
1601             uint64_t dstoff) override;
1602   int truncate(uint64_t offset) override;
1603
1604   void encode(bufferlist& bl) const override {
1605     ENCODE_START(1, 1, bl);
1606     ::encode(data_len, bl);
1607     data.encode(bl);
1608     encode_base(bl);
1609     ENCODE_FINISH(bl);
1610   }
1611   void decode(bufferlist::iterator& p) override {
1612     DECODE_START(1, p);
1613     ::decode(data_len, p);
1614     data.decode(p);
1615     decode_base(p);
1616     DECODE_FINISH(p);
1617   }
1618 };
1619
1620 #if defined(__GLIBCXX__)
1621 // use a thread-local vector for the pages returned by PageSet, so we
1622 // can avoid allocations in read/write()
1623 thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
1624 #define DEFINE_PAGE_VECTOR(name)
1625 #else
1626 #define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1627 #endif
1628
1629 int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
1630 {
1631   const auto start = offset;
1632   const auto end = offset + len;
1633   auto remaining = len;
1634
1635   DEFINE_PAGE_VECTOR(tls_pages);
1636   data.get_range(offset, len, tls_pages);
1637
1638   // allocate a buffer for the data
1639   buffer::ptr buf(len);
1640
1641   auto p = tls_pages.begin();
1642   while (remaining) {
1643     // no more pages in range
1644     if (p == tls_pages.end() || (*p)->offset >= end) {
1645       buf.zero(offset - start, remaining);
1646       break;
1647     }
1648     auto page = *p;
1649
1650     // fill any holes between pages with zeroes
1651     if (page->offset > offset) {
1652       const auto count = std::min(remaining, page->offset - offset);
1653       buf.zero(offset - start, count);
1654       remaining -= count;
1655       offset = page->offset;
1656       if (!remaining)
1657         break;
1658     }
1659
1660     // read from page
1661     const auto page_offset = offset - page->offset;
1662     const auto count = min(remaining, data.get_page_size() - page_offset);
1663
1664     buf.copy_in(offset - start, count, page->data + page_offset);
1665
1666     remaining -= count;
1667     offset += count;
1668
1669     ++p;
1670   }
1671
1672   tls_pages.clear(); // drop page refs
1673
1674   bl.append(std::move(buf));
1675   return len;
1676 }
1677
1678 int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
1679 {
1680   unsigned len = src.length();
1681
1682   DEFINE_PAGE_VECTOR(tls_pages);
1683   // make sure the page range is allocated
1684   data.alloc_range(offset, src.length(), tls_pages);
1685
1686   auto page = tls_pages.begin();
1687
1688   auto p = src.begin();
1689   while (len > 0) {
1690     unsigned page_offset = offset - (*page)->offset;
1691     unsigned pageoff = data.get_page_size() - page_offset;
1692     unsigned count = min(len, pageoff);
1693     p.copy(count, (*page)->data + page_offset);
1694     offset += count;
1695     len -= count;
1696     if (count == pageoff)
1697       ++page;
1698   }
1699   if (data_len < offset)
1700     data_len = offset;
1701   tls_pages.clear(); // drop page refs
1702   return 0;
1703 }
1704
1705 int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
1706                                    uint64_t len, uint64_t dstoff)
1707 {
1708   const int64_t delta = dstoff - srcoff;
1709
1710   auto &src_data = static_cast<PageSetObject*>(src)->data;
1711   const uint64_t src_page_size = src_data.get_page_size();
1712
1713   auto &dst_data = data;
1714   const auto dst_page_size = dst_data.get_page_size();
1715
1716   DEFINE_PAGE_VECTOR(tls_pages);
1717   PageSet::page_vector dst_pages;
1718
1719   while (len) {
1720     // limit to 16 pages at a time so tls_pages doesn't balloon in size
1721     auto count = std::min(len, (uint64_t)src_page_size * 16);
1722     src_data.get_range(srcoff, count, tls_pages);
1723
1724     // allocate the destination range
1725     // TODO: avoid allocating pages for holes in the source range
1726     dst_data.alloc_range(srcoff + delta, count, dst_pages);
1727     auto dst_iter = dst_pages.begin();
1728
1729     for (auto &src_page : tls_pages) {
1730       auto sbegin = std::max(srcoff, src_page->offset);
1731       auto send = std::min(srcoff + count, src_page->offset + src_page_size);
1732
1733       // zero-fill holes before src_page
1734       if (srcoff < sbegin) {
1735         while (dst_iter != dst_pages.end()) {
1736           auto &dst_page = *dst_iter;
1737           auto dbegin = std::max(srcoff + delta, dst_page->offset);
1738           auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size);
1739           std::fill(dst_page->data + dbegin - dst_page->offset,
1740                     dst_page->data + dend - dst_page->offset, 0);
1741           if (dend < dst_page->offset + dst_page_size)
1742             break;
1743           ++dst_iter;
1744         }
1745         const auto c = sbegin - srcoff;
1746         count -= c;
1747         len -= c;
1748       }
1749
1750       // copy data from src page to dst pages
1751       while (dst_iter != dst_pages.end()) {
1752         auto &dst_page = *dst_iter;
1753         auto dbegin = std::max(sbegin + delta, dst_page->offset);
1754         auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
1755
1756         std::copy(src_page->data + (dbegin - delta) - src_page->offset,
1757                   src_page->data + (dend - delta) - src_page->offset,
1758                   dst_page->data + dbegin - dst_page->offset);
1759         if (dend < dst_page->offset + dst_page_size)
1760           break;
1761         ++dst_iter;
1762       }
1763
1764       const auto c = send - sbegin;
1765       count -= c;
1766       len -= c;
1767       srcoff = send;
1768       dstoff = send + delta;
1769     }
1770     tls_pages.clear(); // drop page refs
1771
1772     // zero-fill holes after the last src_page
1773     if (count > 0) {
1774       while (dst_iter != dst_pages.end()) {
1775         auto &dst_page = *dst_iter;
1776         auto dbegin = std::max(dstoff, dst_page->offset);
1777         auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size);
1778         std::fill(dst_page->data + dbegin - dst_page->offset,
1779                   dst_page->data + dend - dst_page->offset, 0);
1780         ++dst_iter;
1781       }
1782       srcoff += count;
1783       dstoff += count;
1784       len -= count;
1785     }
1786     dst_pages.clear(); // drop page refs
1787   }
1788
1789   // update object size
1790   if (data_len < dstoff)
1791     data_len = dstoff;
1792   return 0;
1793 }
1794
1795 int MemStore::PageSetObject::truncate(uint64_t size)
1796 {
1797   data.free_pages_after(size);
1798   data_len = size;
1799
1800   const auto page_size = data.get_page_size();
1801   const auto page_offset = size & ~(page_size-1);
1802   if (page_offset == size)
1803     return 0;
1804
1805   DEFINE_PAGE_VECTOR(tls_pages);
1806   // write zeroes to the rest of the last page
1807   data.get_range(page_offset, page_size, tls_pages);
1808   if (tls_pages.empty())
1809     return 0;
1810
1811   auto page = tls_pages.begin();
1812   auto data = (*page)->data;
1813   std::fill(data + (size - page_offset), data + page_size, 0);
1814   tls_pages.clear(); // drop page ref
1815   return 0;
1816 }
1817
1818
1819 MemStore::ObjectRef MemStore::Collection::create_object() const {
1820   if (use_page_set)
1821     return new PageSetObject(cct->_conf->memstore_page_size);
1822   return new BufferlistObject();
1823 }