X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_replica_log.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_replica_log.cc;h=e5b365bcf8efa7c812c943b96c606ddfca4976bf;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_replica_log.cc b/src/ceph/src/rgw/rgw_replica_log.cc new file mode 100644 index 0000000..e5b365b --- /dev/null +++ b/src/ceph/src/rgw/rgw_replica_log.cc @@ -0,0 +1,283 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "common/ceph_json.h" + +#include "rgw_replica_log.h" +#include "cls/replica_log/cls_replica_log_client.h" +#include "cls/rgw/cls_rgw_client.h" +#include "rgw_rados.h" + +#define dout_subsys ceph_subsys_rgw + +void RGWReplicaBounds::dump(Formatter *f) const +{ + encode_json("marker", marker, f); + encode_json("oldest_time", oldest_time, f); + encode_json("markers", markers, f); +} + +void RGWReplicaBounds::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("marker", marker, obj); + JSONDecoder::decode_json("oldest_time", oldest_time, obj); + JSONDecoder::decode_json("markers", markers, obj); +} + +RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) : + cct(_store->cct), store(_store) {} + +int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const rgw_pool& pool) +{ + int r = rgw_init_ioctx(store->get_rados_handle(), pool, ctx, true); + if (r < 0) { + lderr(cct) << "ERROR: could not open rados pool " << pool << dendl; + } + return r; +} + +int RGWReplicaLogger::update_bound(const string& oid, const rgw_pool& pool, + const string& daemon_id, + const string& marker, const utime_t& time, + const list *entries, + bool need_to_exist) +{ + cls_replica_log_progress_marker progress; + progress.entity_id = daemon_id; + progress.position_marker = marker; + progress.position_time = time; + progress.items = *entries; + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation opw; + if (need_to_exist) { + opw.assert_exists(); + } + cls_replica_log_update_bound(opw, progress); + return ioctx.operate(oid, &opw); +} + +int RGWReplicaLogger::write_bounds(const string& oid, const rgw_pool& pool, + RGWReplicaBounds& bounds) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation opw; + list::iterator iter = bounds.markers.begin(); + for (; iter != bounds.markers.end(); ++iter) { + RGWReplicaProgressMarker& progress = *iter; + cls_replica_log_update_bound(opw, progress); + } + + r = ioctx.operate(oid, &opw); + if (r < 0) { + return r; + } + + return 0; +} + +int RGWReplicaLogger::delete_bound(const string& oid, const rgw_pool& pool, + const string& daemon_id, bool purge_all, + bool need_to_exist) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation opw; + if (need_to_exist) { + opw.assert_exists(); + } + if (purge_all) { + opw.remove(); + } else { + cls_replica_log_delete_bound(opw, daemon_id); + } + return ioctx.operate(oid, &opw); +} + +int RGWReplicaLogger::get_bounds(const string& oid, const rgw_pool& pool, + RGWReplicaBounds& bounds) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers); +} + +RGWReplicaObjectLogger:: +RGWReplicaObjectLogger(RGWRados *_store, + const rgw_pool& _pool, + const string& _prefix) : RGWReplicaLogger(_store), + pool(_pool), prefix(_prefix) { + if (pool.empty()) + store->get_log_pool(pool); +} + +int RGWReplicaObjectLogger::create_log_objects(int shards) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + for (int i = 0; i < shards; ++i) { + string oid; + get_shard_oid(i, oid); + r = ioctx.create(oid, false); + if (r < 0) + return r; + } + return r; +} + +RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) : + RGWReplicaLogger(_store) +{ + store->get_log_pool(pool); + prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix; + prefix.append("."); +} + +string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id, bool index_by_instance) +{ + string s; + + if (index_by_instance) { + s = prefix + bucket.name + ":" + bucket.bucket_id; + } else { + s = prefix + bucket.name; + } + + if (shard_id >= 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ".%d", shard_id); + s += buf; + } + return s; +} + +int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, + const string& marker, const utime_t& time, + const list *entries) +{ + if (shard_id >= 0 || + !BucketIndexShardsManager::is_shards_marker(marker)) { + return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool, + daemon_id, marker, time, entries, + false); + } + + BucketIndexShardsManager sm; + int ret = sm.from_string(marker, shard_id); + if (ret < 0) { + ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl; + return ret; + } + + map& vals = sm.get(); + + ret = 0; + + map::iterator iter; + for (iter = vals.begin(); iter != vals.end(); ++iter) { + ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; + int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool, + daemon_id, iter->second, time, entries, + true /* need to exist */); + + if (r == -ENOENT) { + RGWReplicaBounds bounds; + r = convert_old_bounds(bucket, -1, bounds); + if (r < 0 && r != -ENOENT) { + return r; + } + r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool, + daemon_id, marker, time, entries, false); + } + if (r < 0) { + ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; + ret = r; + } + } + + return ret; +} + +int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool purge_all) +{ + int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, true /* need to exist */); + if (r != -ENOENT) { + return r; + } + /* + * can only get here if need_to_exist == true, + * entry is not found, let's convert old entry if exists + */ + RGWReplicaBounds bounds; + r = convert_old_bounds(bucket, shard_id, bounds); + if (r < 0 && r != -ENOENT) { + return r; + } + return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, false); +} + +int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) { + int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds); + if (r != -ENOENT) { + return r; + } + + r = convert_old_bounds(bucket, shard_id, bounds); + if (r < 0) { + return r; + } + + return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds); +} + +int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) { + string old_key = obj_name(bucket, shard_id, false); + string new_key = obj_name(bucket, shard_id, true); + + /* couldn't find when indexed by instance, retry with old key by bucket name only */ + int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds); + if (r < 0) { + return r; + } + /* convert to new keys */ + r = RGWReplicaLogger::write_bounds(new_key, pool, bounds); + if (r < 0) { + return r; + } + + string daemon_id; + r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */ + if (r < 0) { + return r; + } + return 0; +}