Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / operation / ObjectMapIterate.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/operation/ObjectMapIterate.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "librbd/AsyncObjectThrottle.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageWatcher.h"
11 #include "librbd/internal.h"
12 #include "librbd/ObjectMap.h"
13 #include "librbd/operation/ResizeRequest.h"
14 #include "librbd/object_map/InvalidateRequest.h"
15 #include "librbd/Utils.h"
16 #include <boost/lambda/bind.hpp>
17 #include <boost/lambda/construct.hpp>
18
19 #define dout_subsys ceph_subsys_rbd
20 #undef dout_prefix
21 #define dout_prefix *_dout << "librbd::ObjectMapIterateRequest: "
22
23 namespace librbd {
24 namespace operation {
25
26 namespace {
27
28 template <typename I>
29 class C_VerifyObjectCallback : public C_AsyncObjectThrottle<I> {
30 public:
31   C_VerifyObjectCallback(AsyncObjectThrottle<I> &throttle, I *image_ctx,
32                          uint64_t snap_id, uint64_t object_no,
33                          ObjectIterateWork<I> handle_mismatch,
34                          std::atomic_flag *invalidate)
35     : C_AsyncObjectThrottle<I>(throttle, *image_ctx),
36     m_snap_id(snap_id), m_object_no(object_no),
37     m_oid(image_ctx->get_object_name(m_object_no)),
38     m_handle_mismatch(handle_mismatch),
39     m_invalidate(invalidate)
40   {
41     m_io_ctx.dup(image_ctx->data_ctx);
42     m_io_ctx.snap_set_read(CEPH_SNAPDIR);
43   }
44
45   void complete(int r) override {
46     I &image_ctx = this->m_image_ctx;
47     if (should_complete(r)) {
48       ldout(image_ctx.cct, 20) << m_oid << " C_VerifyObjectCallback completed "
49                                << dendl;
50       m_io_ctx.close();
51
52       this->finish(r);
53       delete this;
54     }
55   }
56
57   int send() override {
58     send_list_snaps();
59     return 0;
60   }
61
62 private:
63   librados::IoCtx m_io_ctx;
64   uint64_t m_snap_id;
65   uint64_t m_object_no;
66   std::string m_oid;
67   ObjectIterateWork<I> m_handle_mismatch;
68   std::atomic_flag *m_invalidate;
69
70   librados::snap_set_t m_snap_set;
71   int m_snap_list_ret;
72
73   bool should_complete(int r) {
74     I &image_ctx = this->m_image_ctx;
75     CephContext *cct = image_ctx.cct;
76     if (r == 0) {
77       r = m_snap_list_ret;
78     }
79     if (r < 0 && r != -ENOENT) {
80       lderr(cct) << m_oid << " C_VerifyObjectCallback::should_complete: "
81                  << "encountered an error: " << cpp_strerror(r) << dendl;
82       return true;
83     }
84
85     ldout(cct, 20) << m_oid << " C_VerifyObjectCallback::should_complete: "
86                    << " r="
87                    << r << dendl;
88     return object_map_action(get_object_state());
89   }
90
91   void send_list_snaps() {
92     I &image_ctx = this->m_image_ctx;
93     assert(image_ctx.owner_lock.is_locked());
94     ldout(image_ctx.cct, 5) << m_oid
95                             << " C_VerifyObjectCallback::send_list_snaps"
96                             << dendl;
97
98     librados::ObjectReadOperation op;
99     op.list_snaps(&m_snap_set, &m_snap_list_ret);
100
101     librados::AioCompletion *comp = util::create_rados_callback(this);
102     int r = m_io_ctx.aio_operate(m_oid, comp, &op, NULL);
103     assert(r == 0);
104     comp->release();
105   }
106
107   uint8_t get_object_state() {
108     I &image_ctx = this->m_image_ctx;
109     RWLock::RLocker snap_locker(image_ctx.snap_lock);
110     for (std::vector<librados::clone_info_t>::const_iterator r =
111            m_snap_set.clones.begin(); r != m_snap_set.clones.end(); ++r) {
112       librados::snap_t from_snap_id;
113       librados::snap_t to_snap_id;
114       if (r->cloneid == librados::SNAP_HEAD) {
115         from_snap_id = next_valid_snap_id(m_snap_set.seq + 1);
116         to_snap_id = librados::SNAP_HEAD;
117       } else {
118         from_snap_id = next_valid_snap_id(r->snaps[0]);
119         to_snap_id = r->snaps[r->snaps.size()-1];
120       }
121
122       if (to_snap_id < m_snap_id) {
123         continue;
124       } else if (m_snap_id < from_snap_id) {
125         break;
126       }
127
128       if ((image_ctx.features & RBD_FEATURE_FAST_DIFF) != 0 &&
129           from_snap_id != m_snap_id) {
130         return OBJECT_EXISTS_CLEAN;
131       }
132       return OBJECT_EXISTS;
133     }
134     return OBJECT_NONEXISTENT;
135   }
136
137   uint64_t next_valid_snap_id(uint64_t snap_id) {
138     I &image_ctx = this->m_image_ctx;
139     assert(image_ctx.snap_lock.is_locked());
140
141     std::map<librados::snap_t, SnapInfo>::iterator it =
142       image_ctx.snap_info.lower_bound(snap_id);
143     if (it == image_ctx.snap_info.end()) {
144       return CEPH_NOSNAP;
145     }
146     return it->first;
147   }
148
149   bool object_map_action(uint8_t new_state) {
150     I &image_ctx = this->m_image_ctx;
151     CephContext *cct = image_ctx.cct;
152     RWLock::RLocker owner_locker(image_ctx.owner_lock);
153
154     // should have been canceled prior to releasing lock
155     assert(image_ctx.exclusive_lock == nullptr ||
156            image_ctx.exclusive_lock->is_lock_owner());
157
158     RWLock::RLocker snap_locker(image_ctx.snap_lock);
159     assert(image_ctx.object_map != nullptr);
160
161     RWLock::WLocker l(image_ctx.object_map_lock);
162     uint8_t state = (*image_ctx.object_map)[m_object_no];
163
164     ldout(cct, 10) << "C_VerifyObjectCallback::object_map_action"
165                    << " object " << image_ctx.get_object_name(m_object_no)
166                    << " state " << (int)state
167                    << " new_state " << (int)new_state << dendl;
168
169     if (state != new_state) {
170       int r = 0;
171
172       assert(m_handle_mismatch);
173       r = m_handle_mismatch(image_ctx, m_object_no, state, new_state);
174       if (r) {
175         lderr(cct) << "object map error: object "
176                    << image_ctx.get_object_name(m_object_no)
177                    << " marked as " << (int)state << ", but should be "
178                    << (int)new_state << dendl;
179         m_invalidate->test_and_set();
180       } else {
181         ldout(cct, 1) << "object map inconsistent: object "
182                    << image_ctx.get_object_name(m_object_no)
183                    << " marked as " << (int)state << ", but should be "
184                    << (int)new_state << dendl;
185       }
186     }
187
188     return true;
189   }
190 };
191
192 } // anonymous namespace
193
194 template <typename I>
195 void ObjectMapIterateRequest<I>::send() {
196   send_verify_objects();
197 }
198
199 template <typename I>
200 bool ObjectMapIterateRequest<I>::should_complete(int r) {
201   CephContext *cct = m_image_ctx.cct;
202   ldout(cct, 5) << this << " should_complete: " << " r=" << r << dendl;
203
204   RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
205   switch (m_state) {
206   case STATE_VERIFY_OBJECTS:
207     if (m_invalidate.test_and_set()) {
208       send_invalidate_object_map();
209     } else if (r == 0) {
210       return true;
211     }
212     break;
213
214   case STATE_INVALIDATE_OBJECT_MAP:
215     if (r == 0) {
216       return true;
217     }
218     break;
219
220   default:
221     assert(false);
222     break;
223   }
224
225   if (r < 0) {
226     lderr(cct) << "object map operation encountered an error: "
227                << cpp_strerror(r)
228                << dendl;
229     return true;
230   }
231
232   return false;
233 }
234
235 template <typename I>
236 void ObjectMapIterateRequest<I>::send_verify_objects() {
237   assert(m_image_ctx.owner_lock.is_locked());
238   CephContext *cct = m_image_ctx.cct;
239
240   uint64_t snap_id;
241   uint64_t num_objects;
242   {
243     RWLock::RLocker l(m_image_ctx.snap_lock);
244     snap_id = m_image_ctx.snap_id;
245     num_objects = Striper::get_num_objects(m_image_ctx.layout,
246                                            m_image_ctx.get_image_size(snap_id));
247   }
248   ldout(cct, 5) << this << " send_verify_objects" << dendl;
249
250   m_state = STATE_VERIFY_OBJECTS;
251
252   typename AsyncObjectThrottle<I>::ContextFactory context_factory(
253     boost::lambda::bind(boost::lambda::new_ptr<C_VerifyObjectCallback<I> >(),
254                         boost::lambda::_1, &m_image_ctx, snap_id,
255                         boost::lambda::_2, m_handle_mismatch, &m_invalidate));
256   AsyncObjectThrottle<I> *throttle = new AsyncObjectThrottle<I>(
257     this, m_image_ctx, context_factory, this->create_callback_context(),
258     &m_prog_ctx, 0, num_objects);
259   throttle->start_ops(m_image_ctx.concurrent_management_ops);
260 }
261
262 template <typename I>
263 uint64_t ObjectMapIterateRequest<I>::get_image_size() const {
264   assert(m_image_ctx.snap_lock.is_locked());
265   if (m_image_ctx.snap_id == CEPH_NOSNAP) {
266     if (!m_image_ctx.resize_reqs.empty()) {
267       return m_image_ctx.resize_reqs.front()->get_image_size();
268     } else {
269       return m_image_ctx.size;
270     }
271   }
272   return  m_image_ctx.get_image_size(m_image_ctx.snap_id);
273 }
274
275 template <typename I>
276 void ObjectMapIterateRequest<I>::send_invalidate_object_map() {
277   CephContext *cct = m_image_ctx.cct;
278
279   ldout(cct, 5) << this << " send_invalidate_object_map" << dendl;
280   m_state = STATE_INVALIDATE_OBJECT_MAP;
281
282   object_map::InvalidateRequest<I>*req =
283     object_map::InvalidateRequest<I>::create(m_image_ctx, m_image_ctx.snap_id,
284                                              true,
285                                              this->create_callback_context());
286
287   assert(m_image_ctx.owner_lock.is_locked());
288   RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
289   req->send();
290 }
291
292 } // namespace operation
293 } // namespace librbd
294
295 template class librbd::operation::ObjectMapIterateRequest<librbd::ImageCtx>;