Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_replica_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * This is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License version 2.1, as published by the Free Software
9  * Foundation.  See file COPYING.
10  * Copyright 2013 Inktank
11  */
12
13 #include "common/ceph_json.h"
14
15 #include "rgw_replica_log.h"
16 #include "cls/replica_log/cls_replica_log_client.h"
17 #include "cls/rgw/cls_rgw_client.h"
18 #include "rgw_rados.h"
19
20 #define dout_subsys ceph_subsys_rgw
21
22 void RGWReplicaBounds::dump(Formatter *f) const
23 {
24   encode_json("marker", marker, f);
25   encode_json("oldest_time", oldest_time, f);
26   encode_json("markers", markers, f);
27 }
28
29 void RGWReplicaBounds::decode_json(JSONObj *obj) {
30   JSONDecoder::decode_json("marker", marker, obj);
31   JSONDecoder::decode_json("oldest_time", oldest_time, obj);
32   JSONDecoder::decode_json("markers", markers, obj);
33 }
34
35 RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
36     cct(_store->cct), store(_store) {}
37
38 int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const rgw_pool& pool)
39 {
40   int r = rgw_init_ioctx(store->get_rados_handle(), pool, ctx, true);
41   if (r < 0) {
42     lderr(cct) << "ERROR: could not open rados pool " << pool << dendl;
43   }
44   return r;
45 }
46
47 int RGWReplicaLogger::update_bound(const string& oid, const rgw_pool& pool,
48                                    const string& daemon_id,
49                                    const string& marker, const utime_t& time,
50                                    const list<RGWReplicaItemMarker> *entries,
51                                    bool need_to_exist)
52 {
53   cls_replica_log_progress_marker progress;
54   progress.entity_id = daemon_id;
55   progress.position_marker = marker;
56   progress.position_time = time;
57   progress.items = *entries;
58
59   librados::IoCtx ioctx;
60   int r = open_ioctx(ioctx, pool);
61   if (r < 0) {
62     return r;
63   }
64
65   librados::ObjectWriteOperation opw;
66   if (need_to_exist) {
67     opw.assert_exists();
68   }
69   cls_replica_log_update_bound(opw, progress);
70   return ioctx.operate(oid, &opw);
71 }
72
73 int RGWReplicaLogger::write_bounds(const string& oid, const rgw_pool& pool,
74                                    RGWReplicaBounds& bounds)
75 {
76   librados::IoCtx ioctx;
77   int r = open_ioctx(ioctx, pool);
78   if (r < 0) {
79     return r;
80   }
81
82   librados::ObjectWriteOperation opw;
83   list<RGWReplicaProgressMarker>::iterator iter = bounds.markers.begin();
84   for (; iter != bounds.markers.end(); ++iter) {
85     RGWReplicaProgressMarker& progress = *iter;
86     cls_replica_log_update_bound(opw, progress);
87   }
88
89   r = ioctx.operate(oid, &opw);
90   if (r < 0) {
91     return r;
92   }
93
94   return 0;
95 }
96
97 int RGWReplicaLogger::delete_bound(const string& oid, const rgw_pool& pool,
98                                    const string& daemon_id, bool purge_all,
99                                    bool need_to_exist)
100 {
101   librados::IoCtx ioctx;
102   int r = open_ioctx(ioctx, pool);
103   if (r < 0) {
104     return r;
105   }
106
107   librados::ObjectWriteOperation opw;
108   if (need_to_exist) {
109     opw.assert_exists();
110   }
111   if (purge_all) {
112     opw.remove();
113   } else {
114     cls_replica_log_delete_bound(opw, daemon_id);
115   }
116   return ioctx.operate(oid, &opw);
117 }
118
119 int RGWReplicaLogger::get_bounds(const string& oid, const rgw_pool& pool,
120                                  RGWReplicaBounds& bounds)
121 {
122   librados::IoCtx ioctx;
123   int r = open_ioctx(ioctx, pool);
124   if (r < 0) {
125     return r;
126   }
127
128   return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers);
129 }
130
131 RGWReplicaObjectLogger::
132 RGWReplicaObjectLogger(RGWRados *_store,
133                        const rgw_pool& _pool,
134                        const string& _prefix) : RGWReplicaLogger(_store),
135                        pool(_pool), prefix(_prefix) {
136   if (pool.empty())
137     store->get_log_pool(pool);
138 }
139
140 int RGWReplicaObjectLogger::create_log_objects(int shards)
141 {
142   librados::IoCtx ioctx;
143   int r = open_ioctx(ioctx, pool);
144   if (r < 0) {
145     return r;
146   }
147   for (int i = 0; i < shards; ++i) {
148     string oid;
149     get_shard_oid(i, oid);
150     r = ioctx.create(oid, false);
151     if (r < 0)
152       return r;
153   }
154   return r;
155 }
156
157 RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) :
158   RGWReplicaLogger(_store)
159 {
160   store->get_log_pool(pool);
161   prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix;
162   prefix.append(".");
163 }
164
165 string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id, bool index_by_instance)
166 {
167   string s;
168
169   if (index_by_instance) {
170     s = prefix + bucket.name + ":" + bucket.bucket_id;
171   } else {
172     s = prefix + bucket.name;
173   }
174
175   if (shard_id >= 0) {
176     char buf[16];
177     snprintf(buf, sizeof(buf), ".%d", shard_id);
178     s += buf;
179   }
180   return s;
181 }
182
183 int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
184                    const string& marker, const utime_t& time,
185                    const list<RGWReplicaItemMarker> *entries)
186 {
187   if (shard_id >= 0 ||
188       !BucketIndexShardsManager::is_shards_marker(marker)) {
189     return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool,
190                                           daemon_id, marker, time, entries,
191                                           false);
192   }
193
194   BucketIndexShardsManager sm;
195   int ret = sm.from_string(marker, shard_id);
196   if (ret < 0) {
197     ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl;
198     return ret;
199   }
200
201   map<int, string>& vals = sm.get();
202
203   ret = 0;
204
205   map<int, string>::iterator iter;
206   for (iter = vals.begin(); iter != vals.end(); ++iter) {
207     ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
208     int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
209                                           daemon_id, iter->second, time, entries,
210                                           true /* need to exist */);
211
212     if (r == -ENOENT) {
213       RGWReplicaBounds bounds;
214       r = convert_old_bounds(bucket, -1, bounds);
215       if (r < 0 && r != -ENOENT) {
216         return r;
217       }
218       r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
219                                          daemon_id, marker, time, entries, false);
220     }
221     if (r < 0) {
222       ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
223       ret = r;
224     }
225   }
226
227   return ret;
228 }
229
230 int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool purge_all)
231 {
232   int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, true /* need to exist */);
233   if (r != -ENOENT) {
234     return r;
235   }
236   /*
237    * can only get here if need_to_exist == true,
238    * entry is not found, let's convert old entry if exists
239    */
240   RGWReplicaBounds bounds;
241   r = convert_old_bounds(bucket, shard_id, bounds);
242   if (r < 0 && r != -ENOENT) {
243     return r;
244   }
245   return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, false);
246 }
247
248 int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
249   int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
250   if (r != -ENOENT) {
251     return r;
252   }
253
254   r = convert_old_bounds(bucket, shard_id, bounds);
255   if (r < 0) {
256     return r;
257   }
258
259   return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
260 }
261
262 int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
263   string old_key = obj_name(bucket, shard_id, false);
264   string new_key = obj_name(bucket, shard_id, true);
265
266   /* couldn't find when indexed by instance, retry with old key by bucket name only */
267   int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds);
268   if (r < 0) {
269     return r;
270   }
271   /* convert to new keys */
272   r = RGWReplicaLogger::write_bounds(new_key, pool, bounds);
273   if (r < 0) {
274     return r;
275   }
276
277   string daemon_id;
278   r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */
279   if (r < 0) {
280     return r;
281   }
282   return 0;
283 }