X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fkv%2FKineticStore.cc;fp=src%2Fceph%2Fsrc%2Fkv%2FKineticStore.cc;h=ddbd48720f8ae25f944ec61f7532e5d5ec090ee9;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/kv/KineticStore.cc b/src/ceph/src/kv/KineticStore.cc new file mode 100644 index 0000000..ddbd487 --- /dev/null +++ b/src/ceph/src/kv/KineticStore.cc @@ -0,0 +1,362 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "KineticStore.h" +#include "common/ceph_crypto.h" + +#include +#include +#include +#include "include/memory.h" +#include +using std::string; +#include "common/perf_counters.h" + +#define dout_subsys ceph_subsys_kinetic + +int KineticStore::init() +{ + // init defaults. caller can override these if they want + // prior to calling open. + host = cct->_conf->kinetic_host; + port = cct->_conf->kinetic_port; + user_id = cct->_conf->kinetic_user_id; + hmac_key = cct->_conf->kinetic_hmac_key; + use_ssl = cct->_conf->kinetic_use_ssl; + return 0; +} + +int KineticStore::_test_init(CephContext *c) +{ + kinetic::KineticConnectionFactory conn_factory = + kinetic::NewKineticConnectionFactory(); + + kinetic::ConnectionOptions options; + options.host = cct->_conf->kinetic_host; + options.port = cct->_conf->kinetic_port; + options.user_id = cct->_conf->kinetic_user_id; + options.hmac_key = cct->_conf->kinetic_hmac_key; + options.use_ssl = cct->_conf->kinetic_use_ssl; + + kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); + kinetic_conn.reset(); + if (!status.ok()) + derr << __func__ << "Unable to connect to kinetic store " << options.host + << ":" << options.port << " : " << status.ToString() << dendl; + return status.ok() ? 0 : -EIO; +} + +int KineticStore::do_open(ostream &out, bool create_if_missing) +{ + kinetic::KineticConnectionFactory conn_factory = + kinetic::NewKineticConnectionFactory(); + kinetic::ConnectionOptions options; + options.host = host; + options.port = port; + options.user_id = user_id; + options.hmac_key = hmac_key; + options.use_ssl = use_ssl; + kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); + if (!status.ok()) { + derr << "Unable to connect to kinetic store " << host << ":" << port + << " : " << status.ToString() << dendl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last); + plb.add_u64_counter(l_kinetic_gets, "kinetic_get", "Gets"); + plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction", "Transactions"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + return 0; +} + +KineticStore::KineticStore(CephContext *c) : + cct(c), + logger(NULL) +{ + host = c->_conf->kinetic_host; + port = c->_conf->kinetic_port; + user_id = c->_conf->kinetic_user_id; + hmac_key = c->_conf->kinetic_hmac_key; + use_ssl = c->_conf->kinetic_use_ssl; +} + +KineticStore::~KineticStore() +{ + close(); + delete logger; +} + +void KineticStore::close() +{ + kinetic_conn.reset(); + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int KineticStore::submit_transaction(KeyValueDB::Transaction t) +{ + KineticTransactionImpl * _t = + static_cast(t.get()); + + dout(20) << "kinetic submit_transaction" << dendl; + + for (vector::iterator it = _t->ops.begin(); + it != _t->ops.end(); ++it) { + kinetic::KineticStatus status(kinetic::StatusCode::OK, ""); + if (it->type == KINETIC_OP_WRITE) { + string data(it->data.c_str(), it->data.length()); + kinetic::KineticRecord record(data, "", "", + com::seagate::kinetic::client::proto::Message_Algorithm_SHA1); + dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl; + status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION, + record); + dout(30) << "kinetic after put of " << it->key << dendl; + } else { + assert(it->type == KINETIC_OP_DELETE); + dout(30) << "kinetic before delete" << dendl; + status = kinetic_conn->Delete(it->key, "", + kinetic::WriteMode::IGNORE_VERSION); + dout(30) << "kinetic after delete" << dendl; + } + if (!status.ok()) { + derr << "kinetic error submitting transaction: " + << status.message() << dendl; + return -1; + } + } + + logger->inc(l_kinetic_txns); + return 0; +} + +int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + return submit_transaction(t); +} + +void KineticStore::KineticTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic set key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl)); +} + +void KineticStore::KineticTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic rm key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); +} + +void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl; + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + string key = combine_strings(prefix, it->key()); + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); + dout(30) << "kinetic rm key by prefix: " << key << dendl; + } +} + +void KineticStore::KineticTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) +{ + KeyValueDB::Iterator it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + ops.push_back( + KineticOp(KINETIC_OP_DELETE, combine_strings(prefix, it->key()))); + it->next(); + } +} + +int KineticStore::get( + const string &prefix, + const std::set &keys, + std::map *out) +{ + dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl; + for (std::set::const_iterator i = keys.begin(); + i != keys.end(); + ++i) { + unique_ptr record; + string key = combine_strings(prefix, *i); + dout(30) << "before get key " << key << dendl; + kinetic::KineticStatus status = kinetic_conn->Get(key, record); + if (!status.ok()) + break; + dout(30) << "kinetic get got key: " << key << dendl; + out->insert(make_pair(key, to_bufferlist(*record.get()))); + } + logger->inc(l_kinetic_gets); + return 0; +} + +string KineticStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(1); + out.append(value); + return out; +} + +bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record) +{ + bufferlist bl; + bl.append(*(record.value())); + return bl; +} + +int KineticStore::split_key(string &in, string *prefix, string *key) +{ + size_t prefix_len = 0; + char* in_data = in.c_str(); + + // Find separator inside Slice + char* separator = (char*) memchr((void*)in_data, 1, in.size()); + if (separator == NULL) + return -EINVAL; + prefix_len = size_t(separator - in_data); + if (prefix_len >= in.size()) + return -EINVAL; + + // Fetch prefix and/or key directly from Slice + if (prefix) + *prefix = string(in_data, prefix_len); + if (key) + *key = string(separator+1, in.size()-prefix_len-1); + return 0; +} + +KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn), + kinetic_status(kinetic::StatusCode::OK, "") +{ + dout(30) << "kinetic iterator constructor()" << dendl; + const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; + kinetic::KeyRangeIterator it = + kinetic_conn->IterateKeyRange("", true, last_key, true, 1024); + while (it != kinetic::KeyRangeEnd()) { + try { + keys.insert(*it); + dout(30) << "kinetic iterator added " << *it << dendl; + } catch (std::runtime_error &e) { + kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what()); + return; + } + ++it; + } + keys_iter = keys.begin(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl; + keys_iter = keys.lower_bound(prefix); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last() +{ + dout(30) << "kinetic iterator seek_to_last()" << dendl; + keys_iter = keys.end(); + if (keys.begin() != keys_iter) + --keys_iter; + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl; + keys_iter = keys.upper_bound(prefix + "\2"); + if (keys.begin() == keys_iter) { + keys_iter = keys.end(); + } else { + --keys_iter; + } + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) { + dout(30) << "kinetic iterator upper_bound()" << dendl; + string bound = combine_strings(prefix, after); + keys_iter = keys.upper_bound(bound); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) { + dout(30) << "kinetic iterator lower_bound()" << dendl; + string bound = combine_strings(prefix, to); + keys_iter = keys.lower_bound(bound); + return 0; +} + +bool KineticStore::KineticWholeSpaceIteratorImpl::valid() { + dout(30) << "kinetic iterator valid()" << dendl; + return keys_iter != keys.end(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::next() { + dout(30) << "kinetic iterator next()" << dendl; + if (keys_iter != keys.end()) { + ++keys_iter; + return 0; + } + return -1; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::prev() { + dout(30) << "kinetic iterator prev()" << dendl; + if (keys_iter != keys.begin()) { + --keys_iter; + return 0; + } + keys_iter = keys.end(); + return -1; +} + +string KineticStore::KineticWholeSpaceIteratorImpl::key() { + dout(30) << "kinetic iterator key()" << dendl; + string out_key; + split_key(*keys_iter, NULL, &out_key); + return out_key; +} + +pair KineticStore::KineticWholeSpaceIteratorImpl::raw_key() { + dout(30) << "kinetic iterator raw_key()" << dendl; + string prefix, key; + split_key(*keys_iter, &prefix, &key); + return make_pair(prefix, key); +} + +bool KineticStore::KineticWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) { + // Look for "prefix\1" right in *keys_iter without making a copy + string key = *keys_iter; + if ((key.size() > prefix.length()) && (key[prefix.length()] == '\1')) { + return memcmp(key.c_str(), prefix.c_str(), prefix.length()) == 0; + } else { + return false; + } +} + + +bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() { + dout(30) << "kinetic iterator value()" << dendl; + unique_ptr record; + kinetic_status = kinetic_conn->Get(*keys_iter, record); + return to_bufferlist(*record.get()); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::status() { + dout(30) << "kinetic iterator status()" << dendl; + return kinetic_status.ok() ? 0 : -1; +}