Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / ObjectStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14 #include <ctype.h>
15 #include <sstream>
16 #include "include/memory.h"
17 #include "ObjectStore.h"
18 #include "common/Formatter.h"
19 #include "common/safe_io.h"
20
21 #include "filestore/FileStore.h"
22 #include "memstore/MemStore.h"
23 #if defined(HAVE_LIBAIO)
24 #include "bluestore/BlueStore.h"
25 #endif
26 #include "kstore/KStore.h"
27
28 void decode_str_str_map_to_bl(bufferlist::iterator& p,
29                               bufferlist *out)
30 {
31   bufferlist::iterator start = p;
32   __u32 n;
33   ::decode(n, p);
34   unsigned len = 4;
35   while (n--) {
36     __u32 l;
37     ::decode(l, p);
38     p.advance(l);
39     len += 4 + l;
40     ::decode(l, p);
41     p.advance(l);
42     len += 4 + l;
43   }
44   start.copy(len, *out);
45 }
46
47 void decode_str_set_to_bl(bufferlist::iterator& p,
48                           bufferlist *out)
49 {
50   bufferlist::iterator start = p;
51   __u32 n;
52   ::decode(n, p);
53   unsigned len = 4;
54   while (n--) {
55     __u32 l;
56     ::decode(l, p);
57     p.advance(l);
58     len += 4 + l;
59   }
60   start.copy(len, *out);
61 }
62
63 ObjectStore *ObjectStore::create(CephContext *cct,
64                                  const string& type,
65                                  const string& data,
66                                  const string& journal,
67                                  osflagbits_t flags)
68 {
69   if (type == "filestore") {
70     return new FileStore(cct, data, journal, flags);
71   }
72   if (type == "memstore") {
73     return new MemStore(cct, data);
74   }
75 #if defined(HAVE_LIBAIO)
76   if (type == "bluestore") {
77     return new BlueStore(cct, data);
78   }
79   if (type == "random") {
80     if (rand() % 2) {
81       return new FileStore(cct, data, journal, flags);
82     } else {
83       return new BlueStore(cct, data);
84     }
85   }
86 #else
87   if (type == "random") {
88     return new FileStore(cct, data, journal, flags);
89   }
90 #endif
91   if (type == "kstore" &&
92       cct->check_experimental_feature_enabled("kstore")) {
93     return new KStore(cct, data);
94   }
95   return NULL;
96 }
97
98 int ObjectStore::probe_block_device_fsid(
99   CephContext *cct,
100   const string& path,
101   uuid_d *fsid)
102 {
103   int r;
104
105 #if defined(HAVE_LIBAIO)
106   // first try bluestore -- it has a crc on its header and will fail
107   // reliably.
108   r = BlueStore::get_block_device_fsid(cct, path, fsid);
109   if (r == 0) {
110     lgeneric_dout(cct, 0) << __func__ << " " << path << " is bluestore, "
111                           << *fsid << dendl;
112     return r;
113   }
114 #endif
115
116   // okay, try FileStore (journal).
117   r = FileStore::get_block_device_fsid(cct, path, fsid);
118   if (r == 0) {
119     lgeneric_dout(cct, 0) << __func__ << " " << path << " is filestore, "
120                           << *fsid << dendl;
121     return r;
122   }
123
124   return -EINVAL;
125 }
126
127 int ObjectStore::write_meta(const std::string& key,
128                             const std::string& value)
129 {
130   string v = value;
131   v += "\n";
132   int r = safe_write_file(path.c_str(), key.c_str(),
133                           v.c_str(), v.length());
134   if (r < 0)
135     return r;
136   return 0;
137 }
138
139 int ObjectStore::read_meta(const std::string& key,
140                            std::string *value)
141 {
142   char buf[4096];
143   int r = safe_read_file(path.c_str(), key.c_str(),
144                          buf, sizeof(buf));
145   if (r <= 0)
146     return r;
147   // drop trailing newlines
148   while (r && isspace(buf[r-1])) {
149     --r;
150   }
151   *value = string(buf, r);
152   return 0;
153 }
154
155
156
157
158 ostream& operator<<(ostream& out, const ObjectStore::Sequencer& s)
159 {
160   return out << "osr(" << s.get_name() << " " << &s << ")";
161 }
162
163 ostream& operator<<(ostream& out, const ObjectStore::Transaction& tx) {
164
165   return out << "Transaction(" << &tx << ")"; 
166 }
167
168 unsigned ObjectStore::apply_transactions(Sequencer *osr,
169                                          vector<Transaction>& tls,
170                                          Context *ondisk)
171 {
172   // use op pool
173   Cond my_cond;
174   Mutex my_lock("ObjectStore::apply_transaction::my_lock");
175   int r = 0;
176   bool done;
177   C_SafeCond *onreadable = new C_SafeCond(&my_lock, &my_cond, &done, &r);
178
179   queue_transactions(osr, tls, onreadable, ondisk);
180
181   my_lock.Lock();
182   while (!done)
183     my_cond.Wait(my_lock);
184   my_lock.Unlock();
185   return r;
186 }
187
188 int ObjectStore::queue_transactions(
189   Sequencer *osr,
190   vector<Transaction>& tls,
191   Context *onreadable,
192   Context *oncommit,
193   Context *onreadable_sync,
194   Context *oncomplete,
195   TrackedOpRef op = TrackedOpRef())
196 {
197   RunOnDeleteRef _complete (std::make_shared<RunOnDelete>(oncomplete));
198   Context *_onreadable = new Wrapper<RunOnDeleteRef>(
199     onreadable, _complete);
200   Context *_oncommit = new Wrapper<RunOnDeleteRef>(
201     oncommit, _complete);
202   return queue_transactions(osr, tls, _onreadable, _oncommit,
203                             onreadable_sync, op);
204 }