+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- * Copyright 2013 Inktank
- */
-
-#include "common/ceph_json.h"
-
-#include "rgw_replica_log.h"
-#include "cls/replica_log/cls_replica_log_client.h"
-#include "cls/rgw/cls_rgw_client.h"
-#include "rgw_rados.h"
-
-#define dout_subsys ceph_subsys_rgw
-
-void RGWReplicaBounds::dump(Formatter *f) const
-{
- encode_json("marker", marker, f);
- encode_json("oldest_time", oldest_time, f);
- encode_json("markers", markers, f);
-}
-
-void RGWReplicaBounds::decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("marker", marker, obj);
- JSONDecoder::decode_json("oldest_time", oldest_time, obj);
- JSONDecoder::decode_json("markers", markers, obj);
-}
-
-RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
- cct(_store->cct), store(_store) {}
-
-int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const rgw_pool& pool)
-{
- int r = rgw_init_ioctx(store->get_rados_handle(), pool, ctx, true);
- if (r < 0) {
- lderr(cct) << "ERROR: could not open rados pool " << pool << dendl;
- }
- return r;
-}
-
-int RGWReplicaLogger::update_bound(const string& oid, const rgw_pool& pool,
- const string& daemon_id,
- const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries,
- bool need_to_exist)
-{
- cls_replica_log_progress_marker progress;
- progress.entity_id = daemon_id;
- progress.position_marker = marker;
- progress.position_time = time;
- progress.items = *entries;
-
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation opw;
- if (need_to_exist) {
- opw.assert_exists();
- }
- cls_replica_log_update_bound(opw, progress);
- return ioctx.operate(oid, &opw);
-}
-
-int RGWReplicaLogger::write_bounds(const string& oid, const rgw_pool& pool,
- RGWReplicaBounds& bounds)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation opw;
- list<RGWReplicaProgressMarker>::iterator iter = bounds.markers.begin();
- for (; iter != bounds.markers.end(); ++iter) {
- RGWReplicaProgressMarker& progress = *iter;
- cls_replica_log_update_bound(opw, progress);
- }
-
- r = ioctx.operate(oid, &opw);
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-int RGWReplicaLogger::delete_bound(const string& oid, const rgw_pool& pool,
- const string& daemon_id, bool purge_all,
- bool need_to_exist)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation opw;
- if (need_to_exist) {
- opw.assert_exists();
- }
- if (purge_all) {
- opw.remove();
- } else {
- cls_replica_log_delete_bound(opw, daemon_id);
- }
- return ioctx.operate(oid, &opw);
-}
-
-int RGWReplicaLogger::get_bounds(const string& oid, const rgw_pool& pool,
- RGWReplicaBounds& bounds)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers);
-}
-
-RGWReplicaObjectLogger::
-RGWReplicaObjectLogger(RGWRados *_store,
- const rgw_pool& _pool,
- const string& _prefix) : RGWReplicaLogger(_store),
- pool(_pool), prefix(_prefix) {
- if (pool.empty())
- store->get_log_pool(pool);
-}
-
-int RGWReplicaObjectLogger::create_log_objects(int shards)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
- for (int i = 0; i < shards; ++i) {
- string oid;
- get_shard_oid(i, oid);
- r = ioctx.create(oid, false);
- if (r < 0)
- return r;
- }
- return r;
-}
-
-RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) :
- RGWReplicaLogger(_store)
-{
- store->get_log_pool(pool);
- prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix;
- prefix.append(".");
-}
-
-string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id, bool index_by_instance)
-{
- string s;
-
- if (index_by_instance) {
- s = prefix + bucket.name + ":" + bucket.bucket_id;
- } else {
- s = prefix + bucket.name;
- }
-
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ".%d", shard_id);
- s += buf;
- }
- return s;
-}
-
-int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
- const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries)
-{
- if (shard_id >= 0 ||
- !BucketIndexShardsManager::is_shards_marker(marker)) {
- return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool,
- daemon_id, marker, time, entries,
- false);
- }
-
- BucketIndexShardsManager sm;
- int ret = sm.from_string(marker, shard_id);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl;
- return ret;
- }
-
- map<int, string>& vals = sm.get();
-
- ret = 0;
-
- map<int, string>::iterator iter;
- for (iter = vals.begin(); iter != vals.end(); ++iter) {
- ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
- int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
- daemon_id, iter->second, time, entries,
- true /* need to exist */);
-
- if (r == -ENOENT) {
- RGWReplicaBounds bounds;
- r = convert_old_bounds(bucket, -1, bounds);
- if (r < 0 && r != -ENOENT) {
- return r;
- }
- r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
- daemon_id, marker, time, entries, false);
- }
- if (r < 0) {
- ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
- ret = r;
- }
- }
-
- return ret;
-}
-
-int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool purge_all)
-{
- int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, true /* need to exist */);
- if (r != -ENOENT) {
- return r;
- }
- /*
- * can only get here if need_to_exist == true,
- * entry is not found, let's convert old entry if exists
- */
- RGWReplicaBounds bounds;
- r = convert_old_bounds(bucket, shard_id, bounds);
- if (r < 0 && r != -ENOENT) {
- return r;
- }
- return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, false);
-}
-
-int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
- int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
- if (r != -ENOENT) {
- return r;
- }
-
- r = convert_old_bounds(bucket, shard_id, bounds);
- if (r < 0) {
- return r;
- }
-
- return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
-}
-
-int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
- string old_key = obj_name(bucket, shard_id, false);
- string new_key = obj_name(bucket, shard_id, true);
-
- /* couldn't find when indexed by instance, retry with old key by bucket name only */
- int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds);
- if (r < 0) {
- return r;
- }
- /* convert to new keys */
- r = RGWReplicaLogger::write_bounds(new_key, pool, bounds);
- if (r < 0) {
- return r;
- }
-
- string daemon_id;
- r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */
- if (r < 0) {
- return r;
- }
- return 0;
-}