Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / ObjectMap.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/ObjectMap.h"
5 #include "librbd/BlockGuard.h"
6 #include "librbd/ExclusiveLock.h"
7 #include "librbd/ImageCtx.h"
8 #include "librbd/object_map/RefreshRequest.h"
9 #include "librbd/object_map/ResizeRequest.h"
10 #include "librbd/object_map/SnapshotCreateRequest.h"
11 #include "librbd/object_map/SnapshotRemoveRequest.h"
12 #include "librbd/object_map/SnapshotRollbackRequest.h"
13 #include "librbd/object_map/UnlockRequest.h"
14 #include "librbd/object_map/UpdateRequest.h"
15 #include "librbd/Utils.h"
16 #include "common/dout.h"
17 #include "common/errno.h"
18 #include "common/WorkQueue.h"
19
20 #include "include/rados/librados.hpp"
21
22 #include "cls/lock/cls_lock_client.h"
23 #include "cls/rbd/cls_rbd_types.h"
24 #include "include/stringify.h"
25 #include "osdc/Striper.h"
26 #include <sstream>
27
28 #define dout_subsys ceph_subsys_rbd
29 #undef dout_prefix
30 #define dout_prefix *_dout << "librbd::ObjectMap: " << this << " " << __func__ \
31                            << ": "
32
33 namespace librbd {
34
35 template <typename I>
36 ObjectMap<I>::ObjectMap(I &image_ctx, uint64_t snap_id)
37   : m_image_ctx(image_ctx), m_snap_id(snap_id),
38     m_update_guard(new UpdateGuard(m_image_ctx.cct)) {
39 }
40
41 template <typename I>
42 ObjectMap<I>::~ObjectMap() {
43   delete m_update_guard;
44 }
45
46 template <typename I>
47 int ObjectMap<I>::aio_remove(librados::IoCtx &io_ctx, const std::string &image_id,
48                              librados::AioCompletion *c) {
49   return io_ctx.aio_remove(object_map_name(image_id, CEPH_NOSNAP), c);
50 }
51
52 template <typename I>
53 std::string ObjectMap<I>::object_map_name(const std::string &image_id,
54                                           uint64_t snap_id) {
55   std::string oid(RBD_OBJECT_MAP_PREFIX + image_id);
56   if (snap_id != CEPH_NOSNAP) {
57     std::stringstream snap_suffix;
58     snap_suffix << "." << std::setfill('0') << std::setw(16) << std::hex
59                 << snap_id;
60     oid += snap_suffix.str();
61   }
62   return oid;
63 }
64
65 template <typename I>
66 bool ObjectMap<I>::is_compatible(const file_layout_t& layout, uint64_t size) {
67   uint64_t object_count = Striper::get_num_objects(layout, size);
68   return (object_count <= cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT);
69 }
70
71 template <typename I>
72 ceph::BitVector<2u>::Reference ObjectMap<I>::operator[](uint64_t object_no)
73 {
74   assert(m_image_ctx.object_map_lock.is_wlocked());
75   assert(object_no < m_object_map.size());
76   return m_object_map[object_no];
77 }
78
79 template <typename I>
80 uint8_t ObjectMap<I>::operator[](uint64_t object_no) const
81 {
82   assert(m_image_ctx.object_map_lock.is_locked());
83   assert(object_no < m_object_map.size());
84   return m_object_map[object_no];
85 }
86
87 template <typename I>
88 bool ObjectMap<I>::object_may_exist(uint64_t object_no) const
89 {
90   assert(m_image_ctx.snap_lock.is_locked());
91
92   // Fall back to default logic if object map is disabled or invalid
93   if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP,
94                                  m_image_ctx.snap_lock)) {
95     return true;
96   }
97
98   bool flags_set;
99   int r = m_image_ctx.test_flags(RBD_FLAG_OBJECT_MAP_INVALID,
100                                  m_image_ctx.snap_lock, &flags_set);
101   if (r < 0 || flags_set) {
102     return true;
103   }
104
105   RWLock::RLocker l(m_image_ctx.object_map_lock);
106   uint8_t state = (*this)[object_no];
107   bool exists = (state != OBJECT_NONEXISTENT);
108   ldout(m_image_ctx.cct, 20) << "object_no=" << object_no << " r=" << exists
109                              << dendl;
110   return exists;
111 }
112
113 template <typename I>
114 bool ObjectMap<I>::update_required(const ceph::BitVector<2>::Iterator& it,
115                                    uint8_t new_state) {
116   assert(m_image_ctx.object_map_lock.is_wlocked());
117   uint8_t state = *it;
118   if ((state == new_state) ||
119       (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
120       (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) {
121     return false;
122   }
123   return true;
124 }
125
126 template <typename I>
127 void ObjectMap<I>::open(Context *on_finish) {
128   auto req = object_map::RefreshRequest<I>::create(
129     m_image_ctx, &m_object_map, m_snap_id, on_finish);
130   req->send();
131 }
132
133 template <typename I>
134 void ObjectMap<I>::close(Context *on_finish) {
135   if (m_snap_id != CEPH_NOSNAP) {
136     m_image_ctx.op_work_queue->queue(on_finish, 0);
137     return;
138   }
139
140   auto req = object_map::UnlockRequest<I>::create(m_image_ctx, on_finish);
141   req->send();
142 }
143
144 template <typename I>
145 void ObjectMap<I>::rollback(uint64_t snap_id, Context *on_finish) {
146   assert(m_image_ctx.snap_lock.is_locked());
147   assert(m_image_ctx.object_map_lock.is_wlocked());
148
149   object_map::SnapshotRollbackRequest *req =
150     new object_map::SnapshotRollbackRequest(m_image_ctx, snap_id, on_finish);
151   req->send();
152 }
153
154 template <typename I>
155 void ObjectMap<I>::snapshot_add(uint64_t snap_id, Context *on_finish) {
156   assert(m_image_ctx.snap_lock.is_locked());
157   assert((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) != 0);
158   assert(snap_id != CEPH_NOSNAP);
159
160   object_map::SnapshotCreateRequest *req =
161     new object_map::SnapshotCreateRequest(m_image_ctx, &m_object_map, snap_id,
162                                           on_finish);
163   req->send();
164 }
165
166 template <typename I>
167 void ObjectMap<I>::snapshot_remove(uint64_t snap_id, Context *on_finish) {
168   assert(m_image_ctx.snap_lock.is_wlocked());
169   assert((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) != 0);
170   assert(snap_id != CEPH_NOSNAP);
171
172   object_map::SnapshotRemoveRequest *req =
173     new object_map::SnapshotRemoveRequest(m_image_ctx, &m_object_map, snap_id,
174                                           on_finish);
175   req->send();
176 }
177
178 template <typename I>
179 void ObjectMap<I>::aio_save(Context *on_finish) {
180   assert(m_image_ctx.owner_lock.is_locked());
181   assert(m_image_ctx.snap_lock.is_locked());
182   assert(m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP,
183                                    m_image_ctx.snap_lock));
184   RWLock::RLocker object_map_locker(m_image_ctx.object_map_lock);
185
186   librados::ObjectWriteOperation op;
187   if (m_snap_id == CEPH_NOSNAP) {
188     rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
189   }
190   cls_client::object_map_save(&op, m_object_map);
191
192   std::string oid(object_map_name(m_image_ctx.id, m_snap_id));
193   librados::AioCompletion *comp = util::create_rados_callback(on_finish);
194
195   int r = m_image_ctx.md_ctx.aio_operate(oid, comp, &op);
196   assert(r == 0);
197   comp->release();
198 }
199
200 template <typename I>
201 void ObjectMap<I>::aio_resize(uint64_t new_size, uint8_t default_object_state,
202                               Context *on_finish) {
203   assert(m_image_ctx.owner_lock.is_locked());
204   assert(m_image_ctx.snap_lock.is_locked());
205   assert(m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP,
206                                    m_image_ctx.snap_lock));
207   assert(m_image_ctx.image_watcher != NULL);
208   assert(m_image_ctx.exclusive_lock == nullptr ||
209          m_image_ctx.exclusive_lock->is_lock_owner());
210
211   object_map::ResizeRequest *req = new object_map::ResizeRequest(
212     m_image_ctx, &m_object_map, m_snap_id, new_size, default_object_state,
213     on_finish);
214   req->send();
215 }
216
217 template <typename I>
218 void ObjectMap<I>::detained_aio_update(UpdateOperation &&op) {
219   CephContext *cct = m_image_ctx.cct;
220   ldout(cct, 20) << dendl;
221
222   assert(m_image_ctx.snap_lock.is_locked());
223   assert(m_image_ctx.object_map_lock.is_wlocked());
224
225   BlockGuardCell *cell;
226   int r = m_update_guard->detain({op.start_object_no, op.end_object_no},
227                                  &op, &cell);
228   if (r < 0) {
229     lderr(cct) << "failed to detain object map update: " << cpp_strerror(r)
230                << dendl;
231     m_image_ctx.op_work_queue->queue(op.on_finish, r);
232     return;
233   } else if (r > 0) {
234     ldout(cct, 20) << "detaining object map update due to in-flight update: "
235                    << "start=" << op.start_object_no << ", "
236                    << "end=" << op.end_object_no << ", "
237                    << (op.current_state ?
238                          stringify(static_cast<uint32_t>(*op.current_state)) :
239                          "")
240                    << "->" << static_cast<uint32_t>(op.new_state) << dendl;
241     return;
242   }
243
244   ldout(cct, 20) << "in-flight update cell: " << cell << dendl;
245   Context *on_finish = op.on_finish;
246   Context *ctx = new FunctionContext([this, cell, on_finish](int r) {
247       handle_detained_aio_update(cell, r, on_finish);
248     });
249   aio_update(CEPH_NOSNAP, op.start_object_no, op.end_object_no, op.new_state,
250              op.current_state, op.parent_trace, ctx);
251 }
252
253 template <typename I>
254 void ObjectMap<I>::handle_detained_aio_update(BlockGuardCell *cell, int r,
255                                               Context *on_finish) {
256   CephContext *cct = m_image_ctx.cct;
257   ldout(cct, 20) << "cell=" << cell << ", r=" << r << dendl;
258
259   typename UpdateGuard::BlockOperations block_ops;
260   m_update_guard->release(cell, &block_ops);
261
262   {
263     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
264     RWLock::WLocker object_map_locker(m_image_ctx.object_map_lock);
265     for (auto &op : block_ops) {
266       detained_aio_update(std::move(op));
267     }
268   }
269
270   on_finish->complete(r);
271 }
272
273 template <typename I>
274 void ObjectMap<I>::aio_update(uint64_t snap_id, uint64_t start_object_no,
275                               uint64_t end_object_no, uint8_t new_state,
276                               const boost::optional<uint8_t> &current_state,
277                               const ZTracer::Trace &parent_trace,
278                               Context *on_finish) {
279   assert(m_image_ctx.snap_lock.is_locked());
280   assert((m_image_ctx.features & RBD_FEATURE_OBJECT_MAP) != 0);
281   assert(m_image_ctx.image_watcher != nullptr);
282   assert(m_image_ctx.exclusive_lock == nullptr ||
283          m_image_ctx.exclusive_lock->is_lock_owner());
284   assert(snap_id != CEPH_NOSNAP || m_image_ctx.object_map_lock.is_wlocked());
285   assert(start_object_no < end_object_no);
286
287   CephContext *cct = m_image_ctx.cct;
288   ldout(cct, 20) << "start=" << start_object_no << ", "
289                  << "end=" << end_object_no << ", "
290                  << (current_state ?
291                        stringify(static_cast<uint32_t>(*current_state)) : "")
292                  << "->" << static_cast<uint32_t>(new_state) << dendl;
293   if (snap_id == CEPH_NOSNAP) {
294     if (end_object_no > m_object_map.size()) {
295       ldout(cct, 20) << "skipping update of invalid object map" << dendl;
296       m_image_ctx.op_work_queue->queue(on_finish, 0);
297       return;
298     }
299
300     auto it = m_object_map.begin() + start_object_no;
301     auto end_it = m_object_map.begin() + end_object_no;
302     for (; it != end_it; ++it) {
303       if (update_required(it, new_state)) {
304         break;
305       }
306     }
307     if (it == end_it) {
308       ldout(cct, 20) << "object map update not required" << dendl;
309       m_image_ctx.op_work_queue->queue(on_finish, 0);
310       return;
311     }
312   }
313
314   auto req = object_map::UpdateRequest<I>::create(
315     m_image_ctx, &m_object_map, snap_id, start_object_no, end_object_no,
316     new_state, current_state, parent_trace, on_finish);
317   req->send();
318 }
319
320 } // namespace librbd
321
322 template class librbd::ObjectMap<librbd::ImageCtx>;
323