1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "KineticStore.h"
4 #include "common/ceph_crypto.h"
9 #include "include/memory.h"
12 #include "common/perf_counters.h"
14 #define dout_subsys ceph_subsys_kinetic
16 int KineticStore::init()
18 // init defaults. caller can override these if they want
19 // prior to calling open.
20 host = cct->_conf->kinetic_host;
21 port = cct->_conf->kinetic_port;
22 user_id = cct->_conf->kinetic_user_id;
23 hmac_key = cct->_conf->kinetic_hmac_key;
24 use_ssl = cct->_conf->kinetic_use_ssl;
28 int KineticStore::_test_init(CephContext *c)
30 kinetic::KineticConnectionFactory conn_factory =
31 kinetic::NewKineticConnectionFactory();
33 kinetic::ConnectionOptions options;
34 options.host = cct->_conf->kinetic_host;
35 options.port = cct->_conf->kinetic_port;
36 options.user_id = cct->_conf->kinetic_user_id;
37 options.hmac_key = cct->_conf->kinetic_hmac_key;
38 options.use_ssl = cct->_conf->kinetic_use_ssl;
40 kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
43 derr << __func__ << "Unable to connect to kinetic store " << options.host
44 << ":" << options.port << " : " << status.ToString() << dendl;
45 return status.ok() ? 0 : -EIO;
48 int KineticStore::do_open(ostream &out, bool create_if_missing)
50 kinetic::KineticConnectionFactory conn_factory =
51 kinetic::NewKineticConnectionFactory();
52 kinetic::ConnectionOptions options;
55 options.user_id = user_id;
56 options.hmac_key = hmac_key;
57 options.use_ssl = use_ssl;
58 kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
60 derr << "Unable to connect to kinetic store " << host << ":" << port
61 << " : " << status.ToString() << dendl;
65 PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last);
66 plb.add_u64_counter(l_kinetic_gets, "kinetic_get", "Gets");
67 plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction", "Transactions");
68 logger = plb.create_perf_counters();
69 cct->get_perfcounters_collection()->add(logger);
73 KineticStore::KineticStore(CephContext *c) :
77 host = c->_conf->kinetic_host;
78 port = c->_conf->kinetic_port;
79 user_id = c->_conf->kinetic_user_id;
80 hmac_key = c->_conf->kinetic_hmac_key;
81 use_ssl = c->_conf->kinetic_use_ssl;
84 KineticStore::~KineticStore()
90 void KineticStore::close()
94 cct->get_perfcounters_collection()->remove(logger);
97 int KineticStore::submit_transaction(KeyValueDB::Transaction t)
99 KineticTransactionImpl * _t =
100 static_cast<KineticTransactionImpl *>(t.get());
102 dout(20) << "kinetic submit_transaction" << dendl;
104 for (vector<KineticOp>::iterator it = _t->ops.begin();
105 it != _t->ops.end(); ++it) {
106 kinetic::KineticStatus status(kinetic::StatusCode::OK, "");
107 if (it->type == KINETIC_OP_WRITE) {
108 string data(it->data.c_str(), it->data.length());
109 kinetic::KineticRecord record(data, "", "",
110 com::seagate::kinetic::client::proto::Message_Algorithm_SHA1);
111 dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl;
112 status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION,
114 dout(30) << "kinetic after put of " << it->key << dendl;
116 assert(it->type == KINETIC_OP_DELETE);
117 dout(30) << "kinetic before delete" << dendl;
118 status = kinetic_conn->Delete(it->key, "",
119 kinetic::WriteMode::IGNORE_VERSION);
120 dout(30) << "kinetic after delete" << dendl;
123 derr << "kinetic error submitting transaction: "
124 << status.message() << dendl;
129 logger->inc(l_kinetic_txns);
133 int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t)
135 return submit_transaction(t);
138 void KineticStore::KineticTransactionImpl::set(
139 const string &prefix,
141 const bufferlist &to_set_bl)
143 string key = combine_strings(prefix, k);
144 dout(30) << "kinetic set key " << key << dendl;
145 ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl));
148 void KineticStore::KineticTransactionImpl::rmkey(const string &prefix,
151 string key = combine_strings(prefix, k);
152 dout(30) << "kinetic rm key " << key << dendl;
153 ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
156 void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix)
158 dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl;
159 KeyValueDB::Iterator it = db->get_iterator(prefix);
160 for (it->seek_to_first();
163 string key = combine_strings(prefix, it->key());
164 ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
165 dout(30) << "kinetic rm key by prefix: " << key << dendl;
169 void KineticStore::KineticTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
171 KeyValueDB::Iterator it = db->get_iterator(prefix);
172 it->lower_bound(start);
173 while (it->valid()) {
174 if (it->key() >= end) {
178 KineticOp(KINETIC_OP_DELETE, combine_strings(prefix, it->key())));
183 int KineticStore::get(
184 const string &prefix,
185 const std::set<string> &keys,
186 std::map<string, bufferlist> *out)
188 dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl;
189 for (std::set<string>::const_iterator i = keys.begin();
192 unique_ptr<kinetic::KineticRecord> record;
193 string key = combine_strings(prefix, *i);
194 dout(30) << "before get key " << key << dendl;
195 kinetic::KineticStatus status = kinetic_conn->Get(key, record);
198 dout(30) << "kinetic get got key: " << key << dendl;
199 out->insert(make_pair(key, to_bufferlist(*record.get())));
201 logger->inc(l_kinetic_gets);
205 string KineticStore::combine_strings(const string &prefix, const string &value)
213 bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record)
216 bl.append(*(record.value()));
220 int KineticStore::split_key(string &in, string *prefix, string *key)
222 size_t prefix_len = 0;
223 char* in_data = in.c_str();
225 // Find separator inside Slice
226 char* separator = (char*) memchr((void*)in_data, 1, in.size());
227 if (separator == NULL)
229 prefix_len = size_t(separator - in_data);
230 if (prefix_len >= in.size())
233 // Fetch prefix and/or key directly from Slice
235 *prefix = string(in_data, prefix_len);
237 *key = string(separator+1, in.size()-prefix_len-1);
241 KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn),
242 kinetic_status(kinetic::StatusCode::OK, "")
244 dout(30) << "kinetic iterator constructor()" << dendl;
245 const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
246 kinetic::KeyRangeIterator it =
247 kinetic_conn->IterateKeyRange("", true, last_key, true, 1024);
248 while (it != kinetic::KeyRangeEnd()) {
251 dout(30) << "kinetic iterator added " << *it << dendl;
252 } catch (std::runtime_error &e) {
253 kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what());
258 keys_iter = keys.begin();
261 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
263 dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl;
264 keys_iter = keys.lower_bound(prefix);
268 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last()
270 dout(30) << "kinetic iterator seek_to_last()" << dendl;
271 keys_iter = keys.end();
272 if (keys.begin() != keys_iter)
277 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
279 dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl;
280 keys_iter = keys.upper_bound(prefix + "\2");
281 if (keys.begin() == keys_iter) {
282 keys_iter = keys.end();
289 int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) {
290 dout(30) << "kinetic iterator upper_bound()" << dendl;
291 string bound = combine_strings(prefix, after);
292 keys_iter = keys.upper_bound(bound);
296 int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) {
297 dout(30) << "kinetic iterator lower_bound()" << dendl;
298 string bound = combine_strings(prefix, to);
299 keys_iter = keys.lower_bound(bound);
303 bool KineticStore::KineticWholeSpaceIteratorImpl::valid() {
304 dout(30) << "kinetic iterator valid()" << dendl;
305 return keys_iter != keys.end();
308 int KineticStore::KineticWholeSpaceIteratorImpl::next() {
309 dout(30) << "kinetic iterator next()" << dendl;
310 if (keys_iter != keys.end()) {
317 int KineticStore::KineticWholeSpaceIteratorImpl::prev() {
318 dout(30) << "kinetic iterator prev()" << dendl;
319 if (keys_iter != keys.begin()) {
323 keys_iter = keys.end();
327 string KineticStore::KineticWholeSpaceIteratorImpl::key() {
328 dout(30) << "kinetic iterator key()" << dendl;
330 split_key(*keys_iter, NULL, &out_key);
334 pair<string,string> KineticStore::KineticWholeSpaceIteratorImpl::raw_key() {
335 dout(30) << "kinetic iterator raw_key()" << dendl;
337 split_key(*keys_iter, &prefix, &key);
338 return make_pair(prefix, key);
341 bool KineticStore::KineticWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
342 // Look for "prefix\1" right in *keys_iter without making a copy
343 string key = *keys_iter;
344 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\1')) {
345 return memcmp(key.c_str(), prefix.c_str(), prefix.length()) == 0;
352 bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() {
353 dout(30) << "kinetic iterator value()" << dendl;
354 unique_ptr<kinetic::KineticRecord> record;
355 kinetic_status = kinetic_conn->Get(*keys_iter, record);
356 return to_bufferlist(*record.get());
359 int KineticStore::KineticWholeSpaceIteratorImpl::status() {
360 dout(30) << "kinetic iterator status()" << dendl;
361 return kinetic_status.ok() ? 0 : -1;