1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * This is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License version 2.1, as published by the Free Software
9 * Foundation. See file COPYING.
10 * Copyright 2013 Inktank
13 #include "common/ceph_json.h"
15 #include "rgw_replica_log.h"
16 #include "cls/replica_log/cls_replica_log_client.h"
17 #include "cls/rgw/cls_rgw_client.h"
18 #include "rgw_rados.h"
20 #define dout_subsys ceph_subsys_rgw
22 void RGWReplicaBounds::dump(Formatter *f) const
24 encode_json("marker", marker, f);
25 encode_json("oldest_time", oldest_time, f);
26 encode_json("markers", markers, f);
29 void RGWReplicaBounds::decode_json(JSONObj *obj) {
30 JSONDecoder::decode_json("marker", marker, obj);
31 JSONDecoder::decode_json("oldest_time", oldest_time, obj);
32 JSONDecoder::decode_json("markers", markers, obj);
35 RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
36 cct(_store->cct), store(_store) {}
38 int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const rgw_pool& pool)
40 int r = rgw_init_ioctx(store->get_rados_handle(), pool, ctx, true);
42 lderr(cct) << "ERROR: could not open rados pool " << pool << dendl;
47 int RGWReplicaLogger::update_bound(const string& oid, const rgw_pool& pool,
48 const string& daemon_id,
49 const string& marker, const utime_t& time,
50 const list<RGWReplicaItemMarker> *entries,
53 cls_replica_log_progress_marker progress;
54 progress.entity_id = daemon_id;
55 progress.position_marker = marker;
56 progress.position_time = time;
57 progress.items = *entries;
59 librados::IoCtx ioctx;
60 int r = open_ioctx(ioctx, pool);
65 librados::ObjectWriteOperation opw;
69 cls_replica_log_update_bound(opw, progress);
70 return ioctx.operate(oid, &opw);
73 int RGWReplicaLogger::write_bounds(const string& oid, const rgw_pool& pool,
74 RGWReplicaBounds& bounds)
76 librados::IoCtx ioctx;
77 int r = open_ioctx(ioctx, pool);
82 librados::ObjectWriteOperation opw;
83 list<RGWReplicaProgressMarker>::iterator iter = bounds.markers.begin();
84 for (; iter != bounds.markers.end(); ++iter) {
85 RGWReplicaProgressMarker& progress = *iter;
86 cls_replica_log_update_bound(opw, progress);
89 r = ioctx.operate(oid, &opw);
97 int RGWReplicaLogger::delete_bound(const string& oid, const rgw_pool& pool,
98 const string& daemon_id, bool purge_all,
101 librados::IoCtx ioctx;
102 int r = open_ioctx(ioctx, pool);
107 librados::ObjectWriteOperation opw;
114 cls_replica_log_delete_bound(opw, daemon_id);
116 return ioctx.operate(oid, &opw);
119 int RGWReplicaLogger::get_bounds(const string& oid, const rgw_pool& pool,
120 RGWReplicaBounds& bounds)
122 librados::IoCtx ioctx;
123 int r = open_ioctx(ioctx, pool);
128 return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers);
131 RGWReplicaObjectLogger::
132 RGWReplicaObjectLogger(RGWRados *_store,
133 const rgw_pool& _pool,
134 const string& _prefix) : RGWReplicaLogger(_store),
135 pool(_pool), prefix(_prefix) {
137 store->get_log_pool(pool);
140 int RGWReplicaObjectLogger::create_log_objects(int shards)
142 librados::IoCtx ioctx;
143 int r = open_ioctx(ioctx, pool);
147 for (int i = 0; i < shards; ++i) {
149 get_shard_oid(i, oid);
150 r = ioctx.create(oid, false);
157 RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) :
158 RGWReplicaLogger(_store)
160 store->get_log_pool(pool);
161 prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix;
165 string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id, bool index_by_instance)
169 if (index_by_instance) {
170 s = prefix + bucket.name + ":" + bucket.bucket_id;
172 s = prefix + bucket.name;
177 snprintf(buf, sizeof(buf), ".%d", shard_id);
183 int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
184 const string& marker, const utime_t& time,
185 const list<RGWReplicaItemMarker> *entries)
188 !BucketIndexShardsManager::is_shards_marker(marker)) {
189 return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool,
190 daemon_id, marker, time, entries,
194 BucketIndexShardsManager sm;
195 int ret = sm.from_string(marker, shard_id);
197 ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl;
201 map<int, string>& vals = sm.get();
205 map<int, string>::iterator iter;
206 for (iter = vals.begin(); iter != vals.end(); ++iter) {
207 ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
208 int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
209 daemon_id, iter->second, time, entries,
210 true /* need to exist */);
213 RGWReplicaBounds bounds;
214 r = convert_old_bounds(bucket, -1, bounds);
215 if (r < 0 && r != -ENOENT) {
218 r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
219 daemon_id, marker, time, entries, false);
222 ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
230 int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool purge_all)
232 int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, true /* need to exist */);
237 * can only get here if need_to_exist == true,
238 * entry is not found, let's convert old entry if exists
240 RGWReplicaBounds bounds;
241 r = convert_old_bounds(bucket, shard_id, bounds);
242 if (r < 0 && r != -ENOENT) {
245 return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, false);
248 int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
249 int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
254 r = convert_old_bounds(bucket, shard_id, bounds);
259 return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
262 int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
263 string old_key = obj_name(bucket, shard_id, false);
264 string new_key = obj_name(bucket, shard_id, true);
266 /* couldn't find when indexed by instance, retry with old key by bucket name only */
267 int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds);
271 /* convert to new keys */
272 r = RGWReplicaLogger::write_bounds(new_key, pool, bounds);
278 r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */