X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_replica_log.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_replica_log.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=e5b365bcf8efa7c812c943b96c606ddfca4976bf;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_replica_log.cc b/src/ceph/src/rgw/rgw_replica_log.cc deleted file mode 100644 index e5b365b..0000000 --- a/src/ceph/src/rgw/rgw_replica_log.cc +++ /dev/null @@ -1,283 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * Copyright 2013 Inktank - */ - -#include "common/ceph_json.h" - -#include "rgw_replica_log.h" -#include "cls/replica_log/cls_replica_log_client.h" -#include "cls/rgw/cls_rgw_client.h" -#include "rgw_rados.h" - -#define dout_subsys ceph_subsys_rgw - -void RGWReplicaBounds::dump(Formatter *f) const -{ - encode_json("marker", marker, f); - encode_json("oldest_time", oldest_time, f); - encode_json("markers", markers, f); -} - -void RGWReplicaBounds::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("marker", marker, obj); - JSONDecoder::decode_json("oldest_time", oldest_time, obj); - JSONDecoder::decode_json("markers", markers, obj); -} - -RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) : - cct(_store->cct), store(_store) {} - -int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const rgw_pool& pool) -{ - int r = rgw_init_ioctx(store->get_rados_handle(), pool, ctx, true); - if (r < 0) { - lderr(cct) << "ERROR: could not open rados pool " << pool << dendl; - } - return r; -} - -int RGWReplicaLogger::update_bound(const string& oid, const rgw_pool& pool, - const string& daemon_id, - const string& marker, const utime_t& time, - const list *entries, - bool need_to_exist) -{ - cls_replica_log_progress_marker progress; - progress.entity_id = daemon_id; - progress.position_marker = marker; - progress.position_time = time; - progress.items = *entries; - - librados::IoCtx ioctx; - int r = open_ioctx(ioctx, pool); - if (r < 0) { - return r; - } - - librados::ObjectWriteOperation opw; - if (need_to_exist) { - opw.assert_exists(); - } - cls_replica_log_update_bound(opw, progress); - return ioctx.operate(oid, &opw); -} - -int RGWReplicaLogger::write_bounds(const string& oid, const rgw_pool& pool, - RGWReplicaBounds& bounds) -{ - librados::IoCtx ioctx; - int r = open_ioctx(ioctx, pool); - if (r < 0) { - return r; - } - - librados::ObjectWriteOperation opw; - list::iterator iter = bounds.markers.begin(); - for (; iter != bounds.markers.end(); ++iter) { - RGWReplicaProgressMarker& progress = *iter; - cls_replica_log_update_bound(opw, progress); - } - - r = ioctx.operate(oid, &opw); - if (r < 0) { - return r; - } - - return 0; -} - -int RGWReplicaLogger::delete_bound(const string& oid, const rgw_pool& pool, - const string& daemon_id, bool purge_all, - bool need_to_exist) -{ - librados::IoCtx ioctx; - int r = open_ioctx(ioctx, pool); - if (r < 0) { - return r; - } - - librados::ObjectWriteOperation opw; - if (need_to_exist) { - opw.assert_exists(); - } - if (purge_all) { - opw.remove(); - } else { - cls_replica_log_delete_bound(opw, daemon_id); - } - return ioctx.operate(oid, &opw); -} - -int RGWReplicaLogger::get_bounds(const string& oid, const rgw_pool& pool, - RGWReplicaBounds& bounds) -{ - librados::IoCtx ioctx; - int r = open_ioctx(ioctx, pool); - if (r < 0) { - return r; - } - - return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers); -} - -RGWReplicaObjectLogger:: -RGWReplicaObjectLogger(RGWRados *_store, - const rgw_pool& _pool, - const string& _prefix) : RGWReplicaLogger(_store), - pool(_pool), prefix(_prefix) { - if (pool.empty()) - store->get_log_pool(pool); -} - -int RGWReplicaObjectLogger::create_log_objects(int shards) -{ - librados::IoCtx ioctx; - int r = open_ioctx(ioctx, pool); - if (r < 0) { - return r; - } - for (int i = 0; i < shards; ++i) { - string oid; - get_shard_oid(i, oid); - r = ioctx.create(oid, false); - if (r < 0) - return r; - } - return r; -} - -RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) : - RGWReplicaLogger(_store) -{ - store->get_log_pool(pool); - prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix; - prefix.append("."); -} - -string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id, bool index_by_instance) -{ - string s; - - if (index_by_instance) { - s = prefix + bucket.name + ":" + bucket.bucket_id; - } else { - s = prefix + bucket.name; - } - - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ".%d", shard_id); - s += buf; - } - return s; -} - -int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, - const string& marker, const utime_t& time, - const list *entries) -{ - if (shard_id >= 0 || - !BucketIndexShardsManager::is_shards_marker(marker)) { - return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool, - daemon_id, marker, time, entries, - false); - } - - BucketIndexShardsManager sm; - int ret = sm.from_string(marker, shard_id); - if (ret < 0) { - ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl; - return ret; - } - - map& vals = sm.get(); - - ret = 0; - - map::iterator iter; - for (iter = vals.begin(); iter != vals.end(); ++iter) { - ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; - int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool, - daemon_id, iter->second, time, entries, - true /* need to exist */); - - if (r == -ENOENT) { - RGWReplicaBounds bounds; - r = convert_old_bounds(bucket, -1, bounds); - if (r < 0 && r != -ENOENT) { - return r; - } - r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool, - daemon_id, marker, time, entries, false); - } - if (r < 0) { - ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; - ret = r; - } - } - - return ret; -} - -int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool purge_all) -{ - int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, true /* need to exist */); - if (r != -ENOENT) { - return r; - } - /* - * can only get here if need_to_exist == true, - * entry is not found, let's convert old entry if exists - */ - RGWReplicaBounds bounds; - r = convert_old_bounds(bucket, shard_id, bounds); - if (r < 0 && r != -ENOENT) { - return r; - } - return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, false); -} - -int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) { - int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds); - if (r != -ENOENT) { - return r; - } - - r = convert_old_bounds(bucket, shard_id, bounds); - if (r < 0) { - return r; - } - - return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds); -} - -int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) { - string old_key = obj_name(bucket, shard_id, false); - string new_key = obj_name(bucket, shard_id, true); - - /* couldn't find when indexed by instance, retry with old key by bucket name only */ - int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds); - if (r < 0) { - return r; - } - /* convert to new keys */ - r = RGWReplicaLogger::write_bounds(new_key, pool, bounds); - if (r < 0) { - return r; - } - - string daemon_id; - r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */ - if (r < 0) { - return r; - } - return 0; -}