Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / memstore / MemStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013- Sage Weil <sage@inktank.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15
16 #ifndef CEPH_MEMSTORE_H
17 #define CEPH_MEMSTORE_H
18
19 #include <mutex>
20 #include <boost/intrusive_ptr.hpp>
21
22 #include "include/unordered_map.h"
23 #include "include/memory.h"
24 #include "include/Spinlock.h"
25 #include "common/Finisher.h"
26 #include "common/RefCountedObj.h"
27 #include "common/RWLock.h"
28 #include "os/ObjectStore.h"
29 #include "PageSet.h"
30 #include "include/assert.h"
31
32 class MemStore : public ObjectStore {
33 public:
34   struct Object : public RefCountedObject {
35     std::mutex xattr_mutex;
36     std::mutex omap_mutex;
37     map<string,bufferptr> xattr;
38     bufferlist omap_header;
39     map<string,bufferlist> omap;
40
41     typedef boost::intrusive_ptr<Object> Ref;
42     friend void intrusive_ptr_add_ref(Object *o) { o->get(); }
43     friend void intrusive_ptr_release(Object *o) { o->put(); }
44
45     Object() : RefCountedObject(nullptr, 0) {}
46     // interface for object data
47     virtual size_t get_size() const = 0;
48     virtual int read(uint64_t offset, uint64_t len, bufferlist &bl) = 0;
49     virtual int write(uint64_t offset, const bufferlist &bl) = 0;
50     virtual int clone(Object *src, uint64_t srcoff, uint64_t len,
51                       uint64_t dstoff) = 0;
52     virtual int truncate(uint64_t offset) = 0;
53     virtual void encode(bufferlist& bl) const = 0;
54     virtual void decode(bufferlist::iterator& p) = 0;
55
56     void encode_base(bufferlist& bl) const {
57       ::encode(xattr, bl);
58       ::encode(omap_header, bl);
59       ::encode(omap, bl);
60     }
61     void decode_base(bufferlist::iterator& p) {
62       ::decode(xattr, p);
63       ::decode(omap_header, p);
64       ::decode(omap, p);
65     }
66
67     void dump(Formatter *f) const {
68       f->dump_int("data_len", get_size());
69       f->dump_int("omap_header_len", omap_header.length());
70
71       f->open_array_section("xattrs");
72       for (map<string,bufferptr>::const_iterator p = xattr.begin();
73            p != xattr.end();
74            ++p) {
75         f->open_object_section("xattr");
76         f->dump_string("name", p->first);
77         f->dump_int("length", p->second.length());
78         f->close_section();
79       }
80       f->close_section();
81
82       f->open_array_section("omap");
83       for (map<string,bufferlist>::const_iterator p = omap.begin();
84            p != omap.end();
85            ++p) {
86         f->open_object_section("pair");
87         f->dump_string("key", p->first);
88         f->dump_int("length", p->second.length());
89         f->close_section();
90       }
91       f->close_section();
92     }
93   };
94   typedef Object::Ref ObjectRef;
95
96   struct PageSetObject;
97   struct Collection : public CollectionImpl {
98     coll_t cid;
99     int bits;
100     CephContext *cct;
101     bool use_page_set;
102     ceph::unordered_map<ghobject_t, ObjectRef> object_hash;  ///< for lookup
103     map<ghobject_t, ObjectRef> object_map;        ///< for iteration
104     map<string,bufferptr> xattr;
105     RWLock lock;   ///< for object_{map,hash}
106     bool exists;
107
108     typedef boost::intrusive_ptr<Collection> Ref;
109     friend void intrusive_ptr_add_ref(Collection *c) { c->get(); }
110     friend void intrusive_ptr_release(Collection *c) { c->put(); }
111
112     const coll_t &get_cid() override {
113       return cid;
114     }
115
116     ObjectRef create_object() const;
117
118     // NOTE: The lock only needs to protect the object_map/hash, not the
119     // contents of individual objects.  The osd is already sequencing
120     // reads and writes, so we will never see them concurrently at this
121     // level.
122
123     ObjectRef get_object(ghobject_t oid) {
124       RWLock::RLocker l(lock);
125       auto o = object_hash.find(oid);
126       if (o == object_hash.end())
127         return ObjectRef();
128       return o->second;
129     }
130
131     ObjectRef get_or_create_object(ghobject_t oid) {
132       RWLock::WLocker l(lock);
133       auto result = object_hash.emplace(oid, ObjectRef());
134       if (result.second)
135         object_map[oid] = result.first->second = create_object();
136       return result.first->second;
137     }
138
139     void encode(bufferlist& bl) const {
140       ENCODE_START(1, 1, bl);
141       ::encode(xattr, bl);
142       ::encode(use_page_set, bl);
143       uint32_t s = object_map.size();
144       ::encode(s, bl);
145       for (map<ghobject_t, ObjectRef>::const_iterator p = object_map.begin();
146            p != object_map.end();
147            ++p) {
148         ::encode(p->first, bl);
149         p->second->encode(bl);
150       }
151       ENCODE_FINISH(bl);
152     }
153     void decode(bufferlist::iterator& p) {
154       DECODE_START(1, p);
155       ::decode(xattr, p);
156       ::decode(use_page_set, p);
157       uint32_t s;
158       ::decode(s, p);
159       while (s--) {
160         ghobject_t k;
161         ::decode(k, p);
162         auto o = create_object();
163         o->decode(p);
164         object_map.insert(make_pair(k, o));
165         object_hash.insert(make_pair(k, o));
166       }
167       DECODE_FINISH(p);
168     }
169
170     uint64_t used_bytes() const {
171       uint64_t result = 0;
172       for (map<ghobject_t, ObjectRef>::const_iterator p = object_map.begin();
173            p != object_map.end();
174            ++p) {
175         result += p->second->get_size();
176       }
177
178       return result;
179     }
180
181     explicit Collection(CephContext *cct, coll_t c)
182       : cid(c),
183         cct(cct),
184         use_page_set(cct->_conf->memstore_page_set),
185         lock("MemStore::Collection::lock", true, false),
186         exists(true) {}
187   };
188   typedef Collection::Ref CollectionRef;
189
190 private:
191   class OmapIteratorImpl;
192
193
194   ceph::unordered_map<coll_t, CollectionRef> coll_map;
195   RWLock coll_lock;    ///< rwlock to protect coll_map
196
197   CollectionRef get_collection(const coll_t& cid);
198
199   Finisher finisher;
200
201   uint64_t used_bytes;
202
203   void _do_transaction(Transaction& t);
204
205   int _touch(const coll_t& cid, const ghobject_t& oid);
206   int _write(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len,
207               const bufferlist& bl, uint32_t fadvise_flags = 0);
208   int _zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len);
209   int _truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size);
210   int _remove(const coll_t& cid, const ghobject_t& oid);
211   int _setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset);
212   int _rmattr(const coll_t& cid, const ghobject_t& oid, const char *name);
213   int _rmattrs(const coll_t& cid, const ghobject_t& oid);
214   int _clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid);
215   int _clone_range(const coll_t& cid, const ghobject_t& oldoid,
216                    const ghobject_t& newoid,
217                    uint64_t srcoff, uint64_t len, uint64_t dstoff);
218   int _omap_clear(const coll_t& cid, const ghobject_t &oid);
219   int _omap_setkeys(const coll_t& cid, const ghobject_t &oid, bufferlist& aset_bl);
220   int _omap_rmkeys(const coll_t& cid, const ghobject_t &oid, bufferlist& keys_bl);
221   int _omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
222                        const string& first, const string& last);
223   int _omap_setheader(const coll_t& cid, const ghobject_t &oid, const bufferlist &bl);
224
225   int _collection_hint_expected_num_objs(const coll_t& cid, uint32_t pg_num,
226       uint64_t num_objs) const { return 0; }
227   int _create_collection(const coll_t& c, int bits);
228   int _destroy_collection(const coll_t& c);
229   int _collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid);
230   int _collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
231                               coll_t cid, const ghobject_t& o);
232   int _split_collection(const coll_t& cid, uint32_t bits, uint32_t rem, coll_t dest);
233
234   int _save();
235   int _load();
236
237   void dump(Formatter *f);
238   void dump_all();
239
240 public:
241   MemStore(CephContext *cct, const string& path)
242     : ObjectStore(cct, path),
243       coll_lock("MemStore::coll_lock"),
244       finisher(cct),
245       used_bytes(0) {}
246   ~MemStore() override { }
247
248   string get_type() override {
249     return "memstore";
250   }
251
252   bool test_mount_in_use() override {
253     return false;
254   }
255
256   int mount() override;
257   int umount() override;
258
259   int fsck(bool deep) override {
260     return 0;
261   }
262
263   int validate_hobject_key(const hobject_t &obj) const override {
264     return 0;
265   }
266   unsigned get_max_attr_name_length() override {
267     return 256;  // arbitrary; there is no real limit internally
268   }
269
270   int mkfs() override;
271   int mkjournal() override {
272     return 0;
273   }
274   bool wants_journal() override {
275     return false;
276   }
277   bool allows_journal() override {
278     return false;
279   }
280   bool needs_journal() override {
281     return false;
282   }
283
284   int statfs(struct store_statfs_t *buf) override;
285
286   bool exists(const coll_t& cid, const ghobject_t& oid) override;
287   bool exists(CollectionHandle &c, const ghobject_t& oid) override;
288   int stat(const coll_t& cid, const ghobject_t& oid,
289            struct stat *st, bool allow_eio = false) override;
290   int stat(CollectionHandle &c, const ghobject_t& oid,
291            struct stat *st, bool allow_eio = false) override;
292   int set_collection_opts(
293     const coll_t& cid,
294     const pool_opts_t& opts) override;
295   int read(
296     const coll_t& cid,
297     const ghobject_t& oid,
298     uint64_t offset,
299     size_t len,
300     bufferlist& bl,
301     uint32_t op_flags = 0) override;
302   int read(
303     CollectionHandle &c,
304     const ghobject_t& oid,
305     uint64_t offset,
306     size_t len,
307     bufferlist& bl,
308     uint32_t op_flags = 0) override;
309   using ObjectStore::fiemap;
310   int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override;
311   int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
312   int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
313               bufferptr& value) override;
314   int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
315               bufferptr& value) override;
316   int getattrs(const coll_t& cid, const ghobject_t& oid,
317                map<string,bufferptr>& aset) override;
318   int getattrs(CollectionHandle &c, const ghobject_t& oid,
319                map<string,bufferptr>& aset) override;
320
321   int list_collections(vector<coll_t>& ls) override;
322
323   CollectionHandle open_collection(const coll_t& c) override {
324     return get_collection(c);
325   }
326   bool collection_exists(const coll_t& c) override;
327   int collection_empty(const coll_t& c, bool *empty) override;
328   int collection_bits(const coll_t& c) override;
329   using ObjectStore::collection_list;
330   int collection_list(const coll_t& cid,
331                       const ghobject_t& start, const ghobject_t& end, int max,
332                       vector<ghobject_t> *ls, ghobject_t *next) override;
333
334   using ObjectStore::omap_get;
335   int omap_get(
336     const coll_t& cid,                ///< [in] Collection containing oid
337     const ghobject_t &oid,   ///< [in] Object containing omap
338     bufferlist *header,      ///< [out] omap header
339     map<string, bufferlist> *out /// < [out] Key to value map
340     ) override;
341
342   using ObjectStore::omap_get_header;
343   /// Get omap header
344   int omap_get_header(
345     const coll_t& cid,                ///< [in] Collection containing oid
346     const ghobject_t &oid,   ///< [in] Object containing omap
347     bufferlist *header,      ///< [out] omap header
348     bool allow_eio = false ///< [in] don't assert on eio
349     ) override;
350
351   using ObjectStore::omap_get_keys;
352   /// Get keys defined on oid
353   int omap_get_keys(
354     const coll_t& cid,              ///< [in] Collection containing oid
355     const ghobject_t &oid, ///< [in] Object containing omap
356     set<string> *keys      ///< [out] Keys defined on oid
357     ) override;
358
359   using ObjectStore::omap_get_values;
360   /// Get key values
361   int omap_get_values(
362     const coll_t& cid,                    ///< [in] Collection containing oid
363     const ghobject_t &oid,       ///< [in] Object containing omap
364     const set<string> &keys,     ///< [in] Keys to get
365     map<string, bufferlist> *out ///< [out] Returned keys and values
366     ) override;
367
368   using ObjectStore::omap_check_keys;
369   /// Filters keys into out which are defined on oid
370   int omap_check_keys(
371     const coll_t& cid,                ///< [in] Collection containing oid
372     const ghobject_t &oid,   ///< [in] Object containing omap
373     const set<string> &keys, ///< [in] Keys to check
374     set<string> *out         ///< [out] Subset of keys defined on oid
375     ) override;
376
377   using ObjectStore::get_omap_iterator;
378   ObjectMap::ObjectMapIterator get_omap_iterator(
379     const coll_t& cid,              ///< [in] collection
380     const ghobject_t &oid  ///< [in] object
381     ) override;
382
383   void set_fsid(uuid_d u) override;
384   uuid_d get_fsid() override;
385
386   uint64_t estimate_objects_overhead(uint64_t num_objects) override {
387     return 0; //do not care
388   }
389
390   objectstore_perf_stat_t get_cur_stats() override;
391
392   const PerfCounters* get_perf_counters() const override {
393     return nullptr;
394   }
395
396
397   int queue_transactions(
398     Sequencer *osr, vector<Transaction>& tls,
399     TrackedOpRef op = TrackedOpRef(),
400     ThreadPool::TPHandle *handle = NULL) override;
401 };
402
403
404
405
406 #endif