Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / kv / KineticStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "KineticStore.h"
4 #include "common/ceph_crypto.h"
5
6 #include <set>
7 #include <map>
8 #include <string>
9 #include "include/memory.h"
10 #include <errno.h>
11 using std::string;
12 #include "common/perf_counters.h"
13
14 #define dout_subsys ceph_subsys_kinetic
15
16 int KineticStore::init()
17 {
18   // init defaults.  caller can override these if they want
19   // prior to calling open.
20   host = cct->_conf->kinetic_host;
21   port = cct->_conf->kinetic_port;
22   user_id = cct->_conf->kinetic_user_id;
23   hmac_key = cct->_conf->kinetic_hmac_key;
24   use_ssl = cct->_conf->kinetic_use_ssl;
25   return 0;
26 }
27
28 int KineticStore::_test_init(CephContext *c)
29 {
30   kinetic::KineticConnectionFactory conn_factory =
31     kinetic::NewKineticConnectionFactory();
32
33   kinetic::ConnectionOptions options;
34   options.host = cct->_conf->kinetic_host;
35   options.port = cct->_conf->kinetic_port;
36   options.user_id = cct->_conf->kinetic_user_id;
37   options.hmac_key = cct->_conf->kinetic_hmac_key;
38   options.use_ssl = cct->_conf->kinetic_use_ssl;
39
40   kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
41   kinetic_conn.reset();
42   if (!status.ok())
43     derr << __func__ << "Unable to connect to kinetic store " << options.host
44          << ":" << options.port << " : " << status.ToString() << dendl;
45   return status.ok() ? 0 : -EIO;
46 }
47
48 int KineticStore::do_open(ostream &out, bool create_if_missing)
49 {
50   kinetic::KineticConnectionFactory conn_factory =
51     kinetic::NewKineticConnectionFactory();
52   kinetic::ConnectionOptions options;
53   options.host = host;
54   options.port = port;
55   options.user_id = user_id;
56   options.hmac_key = hmac_key;
57   options.use_ssl = use_ssl;
58   kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
59   if (!status.ok()) {
60     derr << "Unable to connect to kinetic store " << host << ":" << port
61          << " : " << status.ToString() << dendl;
62     return -EINVAL;
63   }
64
65   PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last);
66   plb.add_u64_counter(l_kinetic_gets, "kinetic_get", "Gets");
67   plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction", "Transactions");
68   logger = plb.create_perf_counters();
69   cct->get_perfcounters_collection()->add(logger);
70   return 0;
71 }
72
73 KineticStore::KineticStore(CephContext *c) :
74   cct(c),
75   logger(NULL)
76 {
77   host = c->_conf->kinetic_host;
78   port = c->_conf->kinetic_port;
79   user_id = c->_conf->kinetic_user_id;
80   hmac_key = c->_conf->kinetic_hmac_key;
81   use_ssl = c->_conf->kinetic_use_ssl;
82 }
83
84 KineticStore::~KineticStore()
85 {
86   close();
87   delete logger;
88 }
89
90 void KineticStore::close()
91 {
92   kinetic_conn.reset();
93   if (logger)
94     cct->get_perfcounters_collection()->remove(logger);
95 }
96
97 int KineticStore::submit_transaction(KeyValueDB::Transaction t)
98 {
99   KineticTransactionImpl * _t =
100     static_cast<KineticTransactionImpl *>(t.get());
101
102   dout(20) << "kinetic submit_transaction" << dendl;
103
104   for (vector<KineticOp>::iterator it = _t->ops.begin();
105        it != _t->ops.end(); ++it) {
106     kinetic::KineticStatus status(kinetic::StatusCode::OK, "");
107     if (it->type == KINETIC_OP_WRITE) {
108       string data(it->data.c_str(), it->data.length());
109       kinetic::KineticRecord record(data, "", "",
110                                     com::seagate::kinetic::client::proto::Message_Algorithm_SHA1);
111       dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl;
112       status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION,
113                                  record);
114       dout(30) << "kinetic after put of " << it->key << dendl;
115     } else {
116       assert(it->type == KINETIC_OP_DELETE);
117       dout(30) << "kinetic before delete" << dendl;
118       status = kinetic_conn->Delete(it->key, "",
119                                     kinetic::WriteMode::IGNORE_VERSION);
120       dout(30) << "kinetic after delete" << dendl;
121     }
122     if (!status.ok()) {
123       derr << "kinetic error submitting transaction: "
124            << status.message() << dendl;
125       return -1;
126     }
127   }
128
129   logger->inc(l_kinetic_txns);
130   return 0;
131 }
132
133 int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t)
134 {
135   return submit_transaction(t);
136 }
137
138 void KineticStore::KineticTransactionImpl::set(
139   const string &prefix,
140   const string &k,
141   const bufferlist &to_set_bl)
142 {
143   string key = combine_strings(prefix, k);
144   dout(30) << "kinetic set key " << key << dendl;
145   ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl));
146 }
147
148 void KineticStore::KineticTransactionImpl::rmkey(const string &prefix,
149                                                  const string &k)
150 {
151   string key = combine_strings(prefix, k);
152   dout(30) << "kinetic rm key " << key << dendl;
153   ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
154 }
155
156 void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix)
157 {
158   dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl;
159   KeyValueDB::Iterator it = db->get_iterator(prefix);
160   for (it->seek_to_first();
161        it->valid();
162        it->next()) {
163     string key = combine_strings(prefix, it->key());
164     ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
165     dout(30) << "kinetic rm key by prefix: " << key << dendl;
166   }
167 }
168
169 void KineticStore::KineticTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
170 {
171   KeyValueDB::Iterator it = db->get_iterator(prefix);
172   it->lower_bound(start);
173   while (it->valid()) {
174     if (it->key() >= end) {
175       break;
176     }
177     ops.push_back(
178         KineticOp(KINETIC_OP_DELETE, combine_strings(prefix, it->key())));
179     it->next();
180   }
181 }
182
183 int KineticStore::get(
184     const string &prefix,
185     const std::set<string> &keys,
186     std::map<string, bufferlist> *out)
187 {
188   dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl;
189   for (std::set<string>::const_iterator i = keys.begin();
190        i != keys.end();
191        ++i) {
192     unique_ptr<kinetic::KineticRecord> record;
193     string key = combine_strings(prefix, *i);
194     dout(30) << "before get key " << key << dendl;
195     kinetic::KineticStatus status = kinetic_conn->Get(key, record);
196     if (!status.ok())
197       break;
198     dout(30) << "kinetic get got key: " << key << dendl;
199     out->insert(make_pair(key, to_bufferlist(*record.get())));
200   }
201   logger->inc(l_kinetic_gets);
202   return 0;
203 }
204
205 string KineticStore::combine_strings(const string &prefix, const string &value)
206 {
207   string out = prefix;
208   out.push_back(1);
209   out.append(value);
210   return out;
211 }
212
213 bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record)
214 {
215   bufferlist bl;
216   bl.append(*(record.value()));
217   return bl;
218 }
219
220 int KineticStore::split_key(string &in, string *prefix, string *key)
221 {
222   size_t prefix_len = 0;
223   char* in_data = in.c_str();
224   
225   // Find separator inside Slice
226   char* separator = (char*) memchr((void*)in_data, 1, in.size());
227   if (separator == NULL)
228      return -EINVAL;
229   prefix_len = size_t(separator - in_data);
230   if (prefix_len >= in.size())
231     return -EINVAL;
232
233   // Fetch prefix and/or key directly from Slice
234   if (prefix)
235     *prefix = string(in_data, prefix_len);
236   if (key)
237     *key = string(separator+1, in.size()-prefix_len-1);
238   return 0;
239 }
240
241 KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn),
242    kinetic_status(kinetic::StatusCode::OK, "")
243 {
244   dout(30) << "kinetic iterator constructor()" << dendl;
245   const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
246   kinetic::KeyRangeIterator it =
247     kinetic_conn->IterateKeyRange("", true, last_key, true, 1024);
248   while (it != kinetic::KeyRangeEnd()) {
249     try {
250       keys.insert(*it);
251       dout(30) << "kinetic iterator added " << *it << dendl;
252     } catch (std::runtime_error &e) {
253       kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what());
254       return;
255     }
256     ++it;
257   }
258   keys_iter = keys.begin();
259 }
260
261 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
262 {
263   dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl;
264   keys_iter = keys.lower_bound(prefix);
265   return 0;
266 }
267
268 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last()
269 {
270   dout(30) << "kinetic iterator seek_to_last()" << dendl;
271   keys_iter = keys.end();
272   if (keys.begin() != keys_iter)
273     --keys_iter;
274   return 0;
275 }
276
277 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
278 {
279   dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl;
280   keys_iter = keys.upper_bound(prefix + "\2");
281   if (keys.begin() == keys_iter) {
282     keys_iter = keys.end();
283   } else {
284     --keys_iter;
285   }
286   return 0;
287 }
288
289 int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) {
290   dout(30) << "kinetic iterator upper_bound()" << dendl;
291   string bound = combine_strings(prefix, after);
292   keys_iter = keys.upper_bound(bound);
293   return 0;
294 }
295
296 int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) {
297   dout(30) << "kinetic iterator lower_bound()" << dendl;
298   string bound = combine_strings(prefix, to);
299   keys_iter = keys.lower_bound(bound);
300   return 0;
301 }
302
303 bool KineticStore::KineticWholeSpaceIteratorImpl::valid() {
304   dout(30) << "kinetic iterator valid()" << dendl;
305   return keys_iter != keys.end();
306 }
307
308 int KineticStore::KineticWholeSpaceIteratorImpl::next() {
309   dout(30) << "kinetic iterator next()" << dendl;
310   if (keys_iter != keys.end()) {
311       ++keys_iter;
312       return 0;
313   }
314   return -1;
315 }
316
317 int KineticStore::KineticWholeSpaceIteratorImpl::prev() {
318   dout(30) << "kinetic iterator prev()" << dendl;
319   if (keys_iter != keys.begin()) {
320       --keys_iter;
321       return 0;
322   }
323   keys_iter = keys.end();
324   return -1;
325 }
326
327 string KineticStore::KineticWholeSpaceIteratorImpl::key() {
328   dout(30) << "kinetic iterator key()" << dendl;
329   string out_key;
330   split_key(*keys_iter, NULL, &out_key);
331   return out_key;
332 }
333
334 pair<string,string> KineticStore::KineticWholeSpaceIteratorImpl::raw_key() {
335   dout(30) << "kinetic iterator raw_key()" << dendl;
336   string prefix, key;
337   split_key(*keys_iter, &prefix, &key);
338   return make_pair(prefix, key);
339 }
340
341 bool KineticStore::KineticWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
342   // Look for "prefix\1" right in *keys_iter without making a copy
343   string key = *keys_iter;
344   if ((key.size() > prefix.length()) && (key[prefix.length()] == '\1')) {
345     return memcmp(key.c_str(), prefix.c_str(), prefix.length()) == 0;
346   } else {
347     return false;
348   }
349 }
350
351
352 bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() {
353   dout(30) << "kinetic iterator value()" << dendl;
354   unique_ptr<kinetic::KineticRecord> record;
355   kinetic_status = kinetic_conn->Get(*keys_iter, record);
356   return to_bufferlist(*record.get());
357 }
358
359 int KineticStore::KineticWholeSpaceIteratorImpl::status() {
360   dout(30) << "kinetic iterator status()" << dendl;
361   return kinetic_status.ok() ? 0 : -1;
362 }