Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / object_map / RefreshRequest.cc
1 // -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/object_map/RefreshRequest.h"
5 #include "cls/lock/cls_lock_client.h"
6 #include "common/dout.h"
7 #include "common/errno.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ObjectMap.h"
10 #include "librbd/object_map/InvalidateRequest.h"
11 #include "librbd/object_map/LockRequest.h"
12 #include "librbd/object_map/ResizeRequest.h"
13 #include "librbd/Utils.h"
14 #include "osdc/Striper.h"
15
16 #define dout_subsys ceph_subsys_rbd
17 #undef dout_prefix
18 #define dout_prefix *_dout << "librbd::object_map::RefreshRequest: "
19
20 namespace librbd {
21
22 using util::create_context_callback;
23 using util::create_rados_callback;
24
25 namespace object_map {
26
27 template <typename I>
28 RefreshRequest<I>::RefreshRequest(I &image_ctx, ceph::BitVector<2> *object_map,
29                                   uint64_t snap_id, Context *on_finish)
30   : m_image_ctx(image_ctx), m_object_map(object_map), m_snap_id(snap_id),
31     m_on_finish(on_finish), m_object_count(0),
32     m_truncate_on_disk_object_map(false) {
33 }
34
35 template <typename I>
36 void RefreshRequest<I>::send() {
37   {
38     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
39     m_object_count = Striper::get_num_objects(
40       m_image_ctx.layout, m_image_ctx.get_image_size(m_snap_id));
41   }
42
43
44   CephContext *cct = m_image_ctx.cct;
45   ldout(cct, 20) << this << " " << __func__ << ": "
46                  << "object_count=" << m_object_count << dendl;
47   send_lock();
48 }
49
50 template <typename I>
51 void RefreshRequest<I>::apply() {
52   uint64_t num_objs;
53   {
54     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
55     num_objs = Striper::get_num_objects(
56       m_image_ctx.layout, m_image_ctx.get_image_size(m_snap_id));
57   }
58   assert(m_on_disk_object_map.size() >= num_objs);
59
60   *m_object_map = m_on_disk_object_map;
61 }
62
63 template <typename I>
64 void RefreshRequest<I>::send_lock() {
65   CephContext *cct = m_image_ctx.cct;
66   if (m_object_count > cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT) {
67     send_invalidate_and_close();
68     return;
69   } else if (m_snap_id != CEPH_NOSNAP) {
70     send_load();
71     return;
72   }
73
74   std::string oid(ObjectMap<>::object_map_name(m_image_ctx.id, m_snap_id));
75   ldout(cct, 10) << this << " " << __func__ << ": oid=" << oid << dendl;
76
77   using klass = RefreshRequest<I>;
78   Context *ctx = create_context_callback<
79     klass, &klass::handle_lock>(this);
80
81   LockRequest<I> *req = LockRequest<I>::create(m_image_ctx, ctx);
82   req->send();
83 }
84
85 template <typename I>
86 Context *RefreshRequest<I>::handle_lock(int *ret_val) {
87   CephContext *cct = m_image_ctx.cct;
88   ldout(cct, 10) << this << " " << __func__ << dendl;
89
90   assert(*ret_val == 0);
91   send_load();
92   return nullptr;
93 }
94
95 template <typename I>
96 void RefreshRequest<I>::send_load() {
97   CephContext *cct = m_image_ctx.cct;
98   std::string oid(ObjectMap<>::object_map_name(m_image_ctx.id, m_snap_id));
99   ldout(cct, 10) << this << " " << __func__ << ": oid=" << oid << dendl;
100
101   librados::ObjectReadOperation op;
102   cls_client::object_map_load_start(&op);
103
104   using klass = RefreshRequest<I>;
105   m_out_bl.clear();
106   librados::AioCompletion *rados_completion =
107     create_rados_callback<klass, &klass::handle_load>(this);
108   int r = m_image_ctx.md_ctx.aio_operate(oid, rados_completion, &op, &m_out_bl);
109   assert(r == 0);
110   rados_completion->release();
111 }
112
113 template <typename I>
114 Context *RefreshRequest<I>::handle_load(int *ret_val) {
115   CephContext *cct = m_image_ctx.cct;
116   ldout(cct, 10) << this << " " << __func__ << ": r=" << *ret_val << dendl;
117
118   if (*ret_val == 0) {
119     bufferlist::iterator bl_it = m_out_bl.begin();
120     *ret_val = cls_client::object_map_load_finish(&bl_it,
121                                                   &m_on_disk_object_map);
122   }
123
124   std::string oid(ObjectMap<>::object_map_name(m_image_ctx.id, m_snap_id));
125   if (*ret_val == -EINVAL) {
126      // object map is corrupt on-disk -- clear it and properly size it
127      // so future IO can keep the object map in sync
128     lderr(cct) << "object map corrupt on-disk: " << oid << dendl;
129     m_truncate_on_disk_object_map = true;
130     send_resize_invalidate();
131     return nullptr;
132   } else if (*ret_val < 0) {
133     lderr(cct) << "failed to load object map: " << oid << dendl;
134     send_invalidate();
135     return nullptr;
136   }
137
138   if (m_on_disk_object_map.size() < m_object_count) {
139     lderr(cct) << "object map smaller than current object count: "
140                << m_on_disk_object_map.size() << " != "
141                << m_object_count << dendl;
142     send_resize_invalidate();
143     return nullptr;
144   }
145
146   ldout(cct, 20) << "refreshed object map: num_objs="
147                  << m_on_disk_object_map.size() << dendl;
148   if (m_on_disk_object_map.size() > m_object_count) {
149     // resize op might have been interrupted
150     ldout(cct, 1) << "object map larger than current object count: "
151                   << m_on_disk_object_map.size() << " != "
152                   << m_object_count << dendl;
153   }
154
155   apply();
156   return m_on_finish;
157 }
158
159 template <typename I>
160 void RefreshRequest<I>::send_invalidate() {
161   CephContext *cct = m_image_ctx.cct;
162   ldout(cct, 10) << this << " " << __func__ << dendl;
163
164   m_on_disk_object_map.clear();
165   object_map::ResizeRequest::resize(&m_on_disk_object_map, m_object_count,
166                                     OBJECT_EXISTS);
167
168   using klass = RefreshRequest<I>;
169   Context *ctx = create_context_callback<
170     klass, &klass::handle_invalidate>(this);
171   InvalidateRequest<I> *req = InvalidateRequest<I>::create(
172     m_image_ctx, m_snap_id, false, ctx);
173
174   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
175   RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
176   req->send();
177 }
178
179 template <typename I>
180 Context *RefreshRequest<I>::handle_invalidate(int *ret_val) {
181   CephContext *cct = m_image_ctx.cct;
182   ldout(cct, 10) << this << " " << __func__ << ": r=" << *ret_val << dendl;
183
184   assert(*ret_val == 0);
185   apply();
186   return m_on_finish;
187 }
188
189 template <typename I>
190 void RefreshRequest<I>::send_resize_invalidate() {
191   CephContext *cct = m_image_ctx.cct;
192   ldout(cct, 10) << this << " " << __func__ << dendl;
193
194   m_on_disk_object_map.clear();
195   object_map::ResizeRequest::resize(&m_on_disk_object_map, m_object_count,
196                                     OBJECT_EXISTS);
197
198   using klass = RefreshRequest<I>;
199   Context *ctx = create_context_callback<
200     klass, &klass::handle_resize_invalidate>(this);
201   InvalidateRequest<I> *req = InvalidateRequest<I>::create(
202     m_image_ctx, m_snap_id, false, ctx);
203
204   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
205   RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
206   req->send();
207 }
208
209 template <typename I>
210 Context *RefreshRequest<I>::handle_resize_invalidate(int *ret_val) {
211   CephContext *cct = m_image_ctx.cct;
212   ldout(cct, 10) << this << " " << __func__ << ": r=" << *ret_val << dendl;
213
214   assert(*ret_val == 0);
215   send_resize();
216   return nullptr;
217 }
218
219 template <typename I>
220 void RefreshRequest<I>::send_resize() {
221   CephContext *cct = m_image_ctx.cct;
222   std::string oid(ObjectMap<>::object_map_name(m_image_ctx.id, m_snap_id));
223   ldout(cct, 10) << this << " " << __func__ << ": oid=" << oid << dendl;
224
225   librados::ObjectWriteOperation op;
226   if (m_snap_id == CEPH_NOSNAP) {
227     rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
228   }
229   if (m_truncate_on_disk_object_map) {
230     op.truncate(0);
231   }
232   cls_client::object_map_resize(&op, m_object_count, OBJECT_NONEXISTENT);
233
234   using klass = RefreshRequest<I>;
235   librados::AioCompletion *rados_completion =
236     create_rados_callback<klass, &klass::handle_resize>(this);
237   int r = m_image_ctx.md_ctx.aio_operate(oid, rados_completion, &op);
238   assert(r == 0);
239   rados_completion->release();
240 }
241
242 template <typename I>
243 Context *RefreshRequest<I>::handle_resize(int *ret_val) {
244   CephContext *cct = m_image_ctx.cct;
245   ldout(cct, 10) << this << " " << __func__ << ": r=" << *ret_val << dendl;
246
247   if (*ret_val < 0) {
248     lderr(cct) << "failed to adjust object map size: " << cpp_strerror(*ret_val)
249                << dendl;
250     *ret_val = 0;
251   }
252   apply();
253   return m_on_finish;
254 }
255
256 template <typename I>
257 void RefreshRequest<I>::send_invalidate_and_close() {
258   CephContext *cct = m_image_ctx.cct;
259   ldout(cct, 10) << this << " " << __func__ << dendl;
260
261   using klass = RefreshRequest<I>;
262   Context *ctx = create_context_callback<
263     klass, &klass::handle_invalidate_and_close>(this);
264   InvalidateRequest<I> *req = InvalidateRequest<I>::create(
265     m_image_ctx, m_snap_id, false, ctx);
266
267   lderr(cct) << "object map too large: " << m_object_count << dendl;
268   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
269   RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
270   req->send();
271 }
272
273 template <typename I>
274 Context *RefreshRequest<I>::handle_invalidate_and_close(int *ret_val) {
275   CephContext *cct = m_image_ctx.cct;
276   ldout(cct, 10) << this << " " << __func__ << ": r=" << *ret_val << dendl;
277
278   assert(*ret_val == 0);
279
280   *ret_val = -EFBIG;
281   m_object_map->clear();
282   return m_on_finish;
283 }
284
285 } // namespace object_map
286 } // namespace librbd
287
288 template class librbd::object_map::RefreshRequest<librbd::ImageCtx>;