Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / Operations.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "cls/rbd/cls_rbd_types.h"
5 #include "librbd/Operations.h"
6 #include "common/dout.h"
7 #include "common/errno.h"
8 #include "common/WorkQueue.h"
9
10 #include "librbd/ExclusiveLock.h"
11 #include "librbd/ImageCtx.h"
12 #include "librbd/ImageState.h"
13 #include "librbd/ImageWatcher.h"
14 #include "librbd/ObjectMap.h"
15 #include "librbd/Utils.h"
16 #include "librbd/journal/DisabledPolicy.h"
17 #include "librbd/journal/StandardPolicy.h"
18 #include "librbd/operation/DisableFeaturesRequest.h"
19 #include "librbd/operation/EnableFeaturesRequest.h"
20 #include "librbd/operation/FlattenRequest.h"
21 #include "librbd/operation/MetadataRemoveRequest.h"
22 #include "librbd/operation/MetadataSetRequest.h"
23 #include "librbd/operation/ObjectMapIterate.h"
24 #include "librbd/operation/RebuildObjectMapRequest.h"
25 #include "librbd/operation/RenameRequest.h"
26 #include "librbd/operation/ResizeRequest.h"
27 #include "librbd/operation/SnapshotCreateRequest.h"
28 #include "librbd/operation/SnapshotProtectRequest.h"
29 #include "librbd/operation/SnapshotRemoveRequest.h"
30 #include "librbd/operation/SnapshotRenameRequest.h"
31 #include "librbd/operation/SnapshotRollbackRequest.h"
32 #include "librbd/operation/SnapshotUnprotectRequest.h"
33 #include "librbd/operation/SnapshotLimitRequest.h"
34 #include <set>
35 #include <boost/bind.hpp>
36 #include <boost/scope_exit.hpp>
37
38 #define dout_subsys ceph_subsys_rbd
39 #undef dout_prefix
40 #define dout_prefix *_dout << "librbd::Operations: "
41
42 namespace librbd {
43
44 namespace {
45
46 template <typename I>
47 struct C_NotifyUpdate : public Context {
48   I &image_ctx;
49   Context *on_finish;
50   bool notified = false;
51
52   C_NotifyUpdate(I &image_ctx, Context *on_finish)
53     : image_ctx(image_ctx), on_finish(on_finish) {
54   }
55
56   void complete(int r) override {
57     CephContext *cct = image_ctx.cct;
58     if (notified) {
59       if (r == -ETIMEDOUT) {
60         // don't fail the op if a peer fails to get the update notification
61         lderr(cct) << "update notification timed-out" << dendl;
62         r = 0;
63       } else if (r == -ENOENT) {
64         // don't fail if header is missing (e.g. v1 image rename)
65         ldout(cct, 5) << "update notification on missing header" << dendl;
66         r = 0;
67       } else if (r < 0) {
68         lderr(cct) << "update notification failed: " << cpp_strerror(r)
69                    << dendl;
70       }
71       Context::complete(r);
72       return;
73     }
74
75     if (r < 0) {
76       // op failed -- no need to send update notification
77       Context::complete(r);
78       return;
79     }
80
81     notified = true;
82     image_ctx.notify_update(this);
83   }
84   void finish(int r) override {
85     on_finish->complete(r);
86   }
87 };
88
89 template <typename I>
90 struct C_InvokeAsyncRequest : public Context {
91   /**
92    * @verbatim
93    *
94    *               <start>
95    *                  |
96    *    . . . . . .   |   . . . . . . . . . . . . . . . . . .
97    *    .         .   |   .                                 .
98    *    .         v   v   v                                 .
99    *    .       REFRESH_IMAGE (skip if not needed)          .
100    *    .             |                                     .
101    *    .             v                                     .
102    *    .       ACQUIRE_LOCK (skip if exclusive lock        .
103    *    .             |       disabled or has lock)         .
104    *    .             |                                     .
105    *    .   /--------/ \--------\   . . . . . . . . . . . . .
106    *    .   |                   |   .
107    *    .   v                   v   .
108    *  LOCAL_REQUEST       REMOTE_REQUEST
109    *        |                   |
110    *        |                   |
111    *        \--------\ /--------/
112    *                  |
113    *                  v
114    *              <finish>
115    *
116    * @endverbatim
117    */
118
119   I &image_ctx;
120   std::string request_type;
121   bool permit_snapshot;
122   boost::function<void(Context*)> local;
123   boost::function<void(Context*)> remote;
124   std::set<int> filter_error_codes;
125   Context *on_finish;
126   bool request_lock = false;
127
128   C_InvokeAsyncRequest(I &image_ctx, const std::string& request_type,
129                        bool permit_snapshot,
130                        const boost::function<void(Context*)>& local,
131                        const boost::function<void(Context*)>& remote,
132                        const std::set<int> &filter_error_codes,
133                        Context *on_finish)
134     : image_ctx(image_ctx), request_type(request_type),
135       permit_snapshot(permit_snapshot), local(local), remote(remote),
136       filter_error_codes(filter_error_codes), on_finish(on_finish) {
137   }
138
139   void send() {
140     send_refresh_image();
141   }
142
143   void send_refresh_image() {
144     if (!image_ctx.state->is_refresh_required()) {
145       send_acquire_exclusive_lock();
146       return;
147     }
148
149     CephContext *cct = image_ctx.cct;
150     ldout(cct, 20) << __func__ << dendl;
151
152     Context *ctx = util::create_context_callback<
153       C_InvokeAsyncRequest<I>,
154       &C_InvokeAsyncRequest<I>::handle_refresh_image>(this);
155     image_ctx.state->refresh(ctx);
156   }
157
158   void handle_refresh_image(int r) {
159     CephContext *cct = image_ctx.cct;
160     ldout(cct, 20) << __func__ << ": r=" << r << dendl;
161
162     if (r < 0) {
163       lderr(cct) << "failed to refresh image: " << cpp_strerror(r) << dendl;
164       complete(r);
165       return;
166     }
167
168     send_acquire_exclusive_lock();
169   }
170
171   void send_acquire_exclusive_lock() {
172     // context can complete before owner_lock is unlocked
173     RWLock &owner_lock(image_ctx.owner_lock);
174     owner_lock.get_read();
175     image_ctx.snap_lock.get_read();
176     if (image_ctx.read_only ||
177         (!permit_snapshot && image_ctx.snap_id != CEPH_NOSNAP)) {
178       image_ctx.snap_lock.put_read();
179       owner_lock.put_read();
180       complete(-EROFS);
181       return;
182     }
183     image_ctx.snap_lock.put_read();
184
185     if (image_ctx.exclusive_lock == nullptr) {
186       send_local_request();
187       owner_lock.put_read();
188       return;
189     } else if (image_ctx.image_watcher == nullptr) {
190       owner_lock.put_read();
191       complete(-EROFS);
192       return;
193     }
194
195     if (image_ctx.exclusive_lock->is_lock_owner() &&
196         image_ctx.exclusive_lock->accept_requests()) {
197       send_local_request();
198       owner_lock.put_read();
199       return;
200     }
201
202     CephContext *cct = image_ctx.cct;
203     ldout(cct, 20) << __func__ << dendl;
204
205     Context *ctx = util::create_async_context_callback(
206       image_ctx, util::create_context_callback<
207         C_InvokeAsyncRequest<I>,
208         &C_InvokeAsyncRequest<I>::handle_acquire_exclusive_lock>(this));
209
210     if (request_lock) {
211       // current lock owner doesn't support op -- try to perform
212       // the action locally
213       request_lock = false;
214       image_ctx.exclusive_lock->acquire_lock(ctx);
215     } else {
216       image_ctx.exclusive_lock->try_acquire_lock(ctx);
217     }
218     owner_lock.put_read();
219   }
220
221   void handle_acquire_exclusive_lock(int r) {
222     CephContext *cct = image_ctx.cct;
223     ldout(cct, 20) << __func__ << ": r=" << r << dendl;
224
225     if (r < 0) {
226       complete(-EROFS);
227       return;
228     }
229
230     // context can complete before owner_lock is unlocked
231     RWLock &owner_lock(image_ctx.owner_lock);
232     owner_lock.get_read();
233     if (image_ctx.exclusive_lock->is_lock_owner()) {
234       send_local_request();
235       owner_lock.put_read();
236       return;
237     }
238
239     send_remote_request();
240     owner_lock.put_read();
241   }
242
243   void send_remote_request() {
244     assert(image_ctx.owner_lock.is_locked());
245
246     CephContext *cct = image_ctx.cct;
247     ldout(cct, 20) << __func__ << dendl;
248
249     Context *ctx = util::create_context_callback<
250       C_InvokeAsyncRequest<I>, &C_InvokeAsyncRequest<I>::handle_remote_request>(
251         this);
252     remote(ctx);
253   }
254
255   void handle_remote_request(int r) {
256     CephContext *cct = image_ctx.cct;
257     ldout(cct, 20) << __func__ << ": r=" << r << dendl;
258
259     if (r == -EOPNOTSUPP) {
260       ldout(cct, 5) << request_type << " not supported by current lock owner"
261                     << dendl;
262       request_lock = true;
263       send_refresh_image();
264       return;
265     } else if (r != -ETIMEDOUT && r != -ERESTART) {
266       image_ctx.state->handle_update_notification();
267
268       complete(r);
269       return;
270     }
271
272     ldout(cct, 5) << request_type << " timed out notifying lock owner"
273                   << dendl;
274     send_refresh_image();
275   }
276
277   void send_local_request() {
278     assert(image_ctx.owner_lock.is_locked());
279
280     CephContext *cct = image_ctx.cct;
281     ldout(cct, 20) << __func__ << dendl;
282
283     Context *ctx = util::create_async_context_callback(
284       image_ctx, util::create_context_callback<
285         C_InvokeAsyncRequest<I>,
286         &C_InvokeAsyncRequest<I>::handle_local_request>(this));
287     local(ctx);
288   }
289
290   void handle_local_request(int r) {
291     CephContext *cct = image_ctx.cct;
292     ldout(cct, 20) << __func__ << ": r=" << r << dendl;
293
294     if (r == -ERESTART) {
295       send_refresh_image();
296       return;
297     }
298     complete(r);
299   }
300
301   void finish(int r) override {
302     if (filter_error_codes.count(r) != 0) {
303       r = 0;
304     }
305     on_finish->complete(r);
306   }
307 };
308
309 template <typename I>
310 bool needs_invalidate(I& image_ctx, uint64_t object_no,
311                      uint8_t current_state, uint8_t new_state) {
312   if ( (current_state == OBJECT_EXISTS ||
313         current_state == OBJECT_EXISTS_CLEAN) &&
314        (new_state == OBJECT_NONEXISTENT ||
315         new_state == OBJECT_PENDING)) {
316     return false;
317   }
318   return true;
319 }
320
321 } // anonymous namespace
322
323 template <typename I>
324 Operations<I>::Operations(I &image_ctx)
325   : m_image_ctx(image_ctx), m_async_request_seq(0) {
326 }
327
328 template <typename I>
329 int Operations<I>::flatten(ProgressContext &prog_ctx) {
330   CephContext *cct = m_image_ctx.cct;
331   ldout(cct, 20) << "flatten" << dendl;
332
333   int r = m_image_ctx.state->refresh_if_required();
334   if (r < 0) {
335     return r;
336   }
337
338   if (m_image_ctx.read_only) {
339     return -EROFS;
340   }
341
342   {
343     RWLock::RLocker parent_locker(m_image_ctx.parent_lock);
344     if (m_image_ctx.parent_md.spec.pool_id == -1) {
345       lderr(cct) << "image has no parent" << dendl;
346       return -EINVAL;
347     }
348   }
349
350   uint64_t request_id = ++m_async_request_seq;
351   r = invoke_async_request("flatten", false,
352                            boost::bind(&Operations<I>::execute_flatten, this,
353                                        boost::ref(prog_ctx), _1),
354                            boost::bind(&ImageWatcher<I>::notify_flatten,
355                                        m_image_ctx.image_watcher, request_id,
356                                        boost::ref(prog_ctx), _1));
357
358   if (r < 0 && r != -EINVAL) {
359     return r;
360   }
361   ldout(cct, 20) << "flatten finished" << dendl;
362   return 0;
363 }
364
365 template <typename I>
366 void Operations<I>::execute_flatten(ProgressContext &prog_ctx,
367                                     Context *on_finish) {
368   assert(m_image_ctx.owner_lock.is_locked());
369   assert(m_image_ctx.exclusive_lock == nullptr ||
370          m_image_ctx.exclusive_lock->is_lock_owner());
371
372   CephContext *cct = m_image_ctx.cct;
373   ldout(cct, 20) << "flatten" << dendl;
374
375   if (m_image_ctx.read_only) {
376     on_finish->complete(-EROFS);
377     return;
378   }
379
380   m_image_ctx.snap_lock.get_read();
381   m_image_ctx.parent_lock.get_read();
382
383   // can't flatten a non-clone
384   if (m_image_ctx.parent_md.spec.pool_id == -1) {
385     lderr(cct) << "image has no parent" << dendl;
386     m_image_ctx.parent_lock.put_read();
387     m_image_ctx.snap_lock.put_read();
388     on_finish->complete(-EINVAL);
389     return;
390   }
391   if (m_image_ctx.snap_id != CEPH_NOSNAP) {
392     lderr(cct) << "snapshots cannot be flattened" << dendl;
393     m_image_ctx.parent_lock.put_read();
394     m_image_ctx.snap_lock.put_read();
395     on_finish->complete(-EROFS);
396     return;
397   }
398
399   ::SnapContext snapc = m_image_ctx.snapc;
400   assert(m_image_ctx.parent != NULL);
401
402   uint64_t overlap;
403   int r = m_image_ctx.get_parent_overlap(CEPH_NOSNAP, &overlap);
404   assert(r == 0);
405   assert(overlap <= m_image_ctx.size);
406
407   uint64_t object_size = m_image_ctx.get_object_size();
408   uint64_t  overlap_objects = Striper::get_num_objects(m_image_ctx.layout,
409                                                        overlap);
410
411   m_image_ctx.parent_lock.put_read();
412   m_image_ctx.snap_lock.put_read();
413
414   operation::FlattenRequest<I> *req = new operation::FlattenRequest<I>(
415     m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), object_size,
416     overlap_objects, snapc, prog_ctx);
417   req->send();
418 }
419
420 template <typename I>
421 int Operations<I>::rebuild_object_map(ProgressContext &prog_ctx) {
422   CephContext *cct = m_image_ctx.cct;
423   ldout(cct, 10) << "rebuild_object_map" << dendl;
424
425   int r = m_image_ctx.state->refresh_if_required();
426   if (r < 0) {
427     return r;
428   }
429
430   uint64_t request_id = ++m_async_request_seq;
431   r = invoke_async_request("rebuild object map", true,
432                            boost::bind(&Operations<I>::execute_rebuild_object_map,
433                                        this, boost::ref(prog_ctx), _1),
434                            boost::bind(&ImageWatcher<I>::notify_rebuild_object_map,
435                                        m_image_ctx.image_watcher, request_id,
436                                        boost::ref(prog_ctx), _1));
437
438   ldout(cct, 10) << "rebuild object map finished" << dendl;
439   if (r < 0) {
440     return r;
441   }
442   return 0;
443 }
444
445 template <typename I>
446 void Operations<I>::execute_rebuild_object_map(ProgressContext &prog_ctx,
447                                                Context *on_finish) {
448   assert(m_image_ctx.owner_lock.is_locked());
449   assert(m_image_ctx.exclusive_lock == nullptr ||
450          m_image_ctx.exclusive_lock->is_lock_owner());
451
452   CephContext *cct = m_image_ctx.cct;
453   ldout(cct, 5) << this << " " << __func__ << dendl;
454
455   if (m_image_ctx.read_only) {
456     on_finish->complete(-EROFS);
457     return;
458   }
459   if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP)) {
460     lderr(cct) << "image must support object-map feature" << dendl;
461     on_finish->complete(-EINVAL);
462     return;
463   }
464
465   operation::RebuildObjectMapRequest<I> *req =
466     new operation::RebuildObjectMapRequest<I>(
467       m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), prog_ctx);
468   req->send();
469 }
470
471 template <typename I>
472 int Operations<I>::check_object_map(ProgressContext &prog_ctx) {
473   CephContext *cct = m_image_ctx.cct;
474   ldout(cct, 5) << this << " " << __func__ << dendl;
475   int r = m_image_ctx.state->refresh_if_required();
476   if (r < 0) {
477     return r;
478   }
479
480   r = invoke_async_request("check object map", true,
481                            boost::bind(&Operations<I>::check_object_map, this,
482                                        boost::ref(prog_ctx), _1),
483                            [] (Context *c) { c->complete(-EOPNOTSUPP); });
484
485   return r;
486 }
487
488 template <typename I>
489 void Operations<I>::object_map_iterate(ProgressContext &prog_ctx,
490                                        operation::ObjectIterateWork<I> handle_mismatch,
491                                        Context *on_finish) {
492   assert(m_image_ctx.owner_lock.is_locked());
493   assert(m_image_ctx.exclusive_lock == nullptr ||
494          m_image_ctx.exclusive_lock->is_lock_owner());
495
496   if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP)) {
497     on_finish->complete(-EINVAL);
498     return;
499   }
500
501   operation::ObjectMapIterateRequest<I> *req =
502     new operation::ObjectMapIterateRequest<I>(m_image_ctx, on_finish,
503                                               prog_ctx, handle_mismatch);
504   req->send();
505 }
506
507 template <typename I>
508 void Operations<I>::check_object_map(ProgressContext &prog_ctx,
509                                      Context *on_finish) {
510   object_map_iterate(prog_ctx, needs_invalidate, on_finish);
511 }
512
513 template <typename I>
514 int Operations<I>::rename(const char *dstname) {
515   CephContext *cct = m_image_ctx.cct;
516   ldout(cct, 5) << this << " " << __func__ << ": dest_name=" << dstname
517                 << dendl;
518
519   int r = librbd::detect_format(m_image_ctx.md_ctx, dstname, NULL, NULL);
520   if (r < 0 && r != -ENOENT) {
521     lderr(cct) << "error checking for existing image called "
522                << dstname << ":" << cpp_strerror(r) << dendl;
523     return r;
524   }
525   if (r == 0) {
526     lderr(cct) << "rbd image " << dstname << " already exists" << dendl;
527     return -EEXIST;
528   }
529
530   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
531     r = invoke_async_request("rename", true,
532                              boost::bind(&Operations<I>::execute_rename, this,
533                                          dstname, _1),
534                              boost::bind(&ImageWatcher<I>::notify_rename,
535                                          m_image_ctx.image_watcher, dstname,
536                                          _1));
537     if (r < 0 && r != -EEXIST) {
538       return r;
539     }
540   } else {
541     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
542     C_SaferCond cond_ctx;
543     execute_rename(dstname, &cond_ctx);
544
545     r = cond_ctx.wait();
546     if (r < 0) {
547       return r;
548     }
549   }
550
551   m_image_ctx.set_image_name(dstname);
552   return 0;
553 }
554
555 template <typename I>
556 void Operations<I>::execute_rename(const std::string &dest_name,
557                                    Context *on_finish) {
558   assert(m_image_ctx.owner_lock.is_locked());
559   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
560     assert(m_image_ctx.exclusive_lock == nullptr ||
561            m_image_ctx.exclusive_lock->is_lock_owner());
562   }
563
564   m_image_ctx.snap_lock.get_read();
565   if (m_image_ctx.name == dest_name) {
566     m_image_ctx.snap_lock.put_read();
567     on_finish->complete(-EEXIST);
568     return;
569   }
570   m_image_ctx.snap_lock.put_read();
571
572   CephContext *cct = m_image_ctx.cct;
573   ldout(cct, 5) << this << " " << __func__ << ": dest_name=" << dest_name
574                 << dendl;
575
576   if (m_image_ctx.old_format) {
577     // unregister watch before and register back after rename
578     on_finish = new C_NotifyUpdate<I>(m_image_ctx, on_finish);
579     on_finish = new FunctionContext([this, on_finish](int r) {
580         if (m_image_ctx.old_format) {
581           m_image_ctx.image_watcher->set_oid(m_image_ctx.header_oid);
582         }
583         m_image_ctx.image_watcher->register_watch(on_finish);
584       });
585     on_finish = new FunctionContext([this, dest_name, on_finish](int r) {
586         operation::RenameRequest<I> *req = new operation::RenameRequest<I>(
587           m_image_ctx, on_finish, dest_name);
588         req->send();
589       });
590     m_image_ctx.image_watcher->unregister_watch(on_finish);
591     return;
592   }
593   operation::RenameRequest<I> *req = new operation::RenameRequest<I>(
594     m_image_ctx, on_finish, dest_name);
595   req->send();
596 }
597
598 template <typename I>
599 int Operations<I>::resize(uint64_t size, bool allow_shrink, ProgressContext& prog_ctx) {
600   CephContext *cct = m_image_ctx.cct;
601
602   m_image_ctx.snap_lock.get_read();
603   ldout(cct, 5) << this << " " << __func__ << ": "
604                 << "size=" << m_image_ctx.size << ", "
605                 << "new_size=" << size << dendl;
606   m_image_ctx.snap_lock.put_read();
607
608   int r = m_image_ctx.state->refresh_if_required();
609   if (r < 0) {
610     return r;
611   }
612
613   if (m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP) &&
614       !ObjectMap<>::is_compatible(m_image_ctx.layout, size)) {
615     lderr(cct) << "New size not compatible with object map" << dendl;
616     return -EINVAL;
617   }
618
619   uint64_t request_id = ++m_async_request_seq;
620   r = invoke_async_request("resize", false,
621                            boost::bind(&Operations<I>::execute_resize, this,
622                                        size, allow_shrink, boost::ref(prog_ctx), _1, 0),
623                            boost::bind(&ImageWatcher<I>::notify_resize,
624                                        m_image_ctx.image_watcher, request_id,
625                                        size, allow_shrink, boost::ref(prog_ctx), _1));
626
627   m_image_ctx.perfcounter->inc(l_librbd_resize);
628   ldout(cct, 2) << "resize finished" << dendl;
629   return r;
630 }
631
632 template <typename I>
633 void Operations<I>::execute_resize(uint64_t size, bool allow_shrink, ProgressContext &prog_ctx,
634                                    Context *on_finish,
635                                    uint64_t journal_op_tid) {
636   assert(m_image_ctx.owner_lock.is_locked());
637   assert(m_image_ctx.exclusive_lock == nullptr ||
638          m_image_ctx.exclusive_lock->is_lock_owner());
639
640   CephContext *cct = m_image_ctx.cct;
641   m_image_ctx.snap_lock.get_read();
642   ldout(cct, 5) << this << " " << __func__ << ": "
643                 << "size=" << m_image_ctx.size << ", "
644                 << "new_size=" << size << dendl;
645
646   if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
647     m_image_ctx.snap_lock.put_read();
648     on_finish->complete(-EROFS);
649     return;
650   } else if (m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP,
651                                        m_image_ctx.snap_lock) &&
652              !ObjectMap<>::is_compatible(m_image_ctx.layout, size)) {
653     m_image_ctx.snap_lock.put_read();
654     on_finish->complete(-EINVAL);
655     return;
656   }
657   m_image_ctx.snap_lock.put_read();
658
659   operation::ResizeRequest<I> *req = new operation::ResizeRequest<I>(
660     m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), size, allow_shrink,
661     prog_ctx, journal_op_tid, false);
662   req->send();
663 }
664
665 template <typename I>
666 int Operations<I>::snap_create(const cls::rbd::SnapshotNamespace &snap_namespace,
667                                const char *snap_name) {
668   if (m_image_ctx.read_only) {
669     return -EROFS;
670   }
671
672   int r = m_image_ctx.state->refresh_if_required();
673   if (r < 0) {
674     return r;
675   }
676
677   C_SaferCond ctx;
678   snap_create(snap_namespace, snap_name, &ctx);
679   r = ctx.wait();
680
681   if (r < 0) {
682     return r;
683   }
684
685   m_image_ctx.perfcounter->inc(l_librbd_snap_create);
686   return r;
687 }
688
689 template <typename I>
690 void Operations<I>::snap_create(const cls::rbd::SnapshotNamespace &snap_namespace,
691                                 const char *snap_name,
692                                 Context *on_finish) {
693   CephContext *cct = m_image_ctx.cct;
694   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
695                 << dendl;
696
697   if (m_image_ctx.read_only) {
698     on_finish->complete(-EROFS);
699     return;
700   }
701
702   m_image_ctx.snap_lock.get_read();
703   if (m_image_ctx.get_snap_id(snap_namespace, snap_name) != CEPH_NOSNAP) {
704     m_image_ctx.snap_lock.put_read();
705     on_finish->complete(-EEXIST);
706     return;
707   }
708   m_image_ctx.snap_lock.put_read();
709
710   C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(
711     m_image_ctx, "snap_create", true,
712     boost::bind(&Operations<I>::execute_snap_create, this, snap_namespace, snap_name,
713                 _1, 0, false),
714     boost::bind(&ImageWatcher<I>::notify_snap_create, m_image_ctx.image_watcher,
715                 snap_namespace, snap_name, _1),
716     {-EEXIST}, on_finish);
717   req->send();
718 }
719
720 template <typename I>
721 void Operations<I>::execute_snap_create(const cls::rbd::SnapshotNamespace &snap_namespace,
722                                         const std::string &snap_name,
723                                         Context *on_finish,
724                                         uint64_t journal_op_tid,
725                                         bool skip_object_map) {
726   assert(m_image_ctx.owner_lock.is_locked());
727   assert(m_image_ctx.exclusive_lock == nullptr ||
728          m_image_ctx.exclusive_lock->is_lock_owner());
729
730   CephContext *cct = m_image_ctx.cct;
731   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
732                 << dendl;
733
734   m_image_ctx.snap_lock.get_read();
735   if (m_image_ctx.get_snap_id(snap_namespace, snap_name) != CEPH_NOSNAP) {
736     m_image_ctx.snap_lock.put_read();
737     on_finish->complete(-EEXIST);
738     return;
739   }
740   m_image_ctx.snap_lock.put_read();
741
742   operation::SnapshotCreateRequest<I> *req =
743     new operation::SnapshotCreateRequest<I>(
744       m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish),
745       snap_namespace, snap_name, journal_op_tid, skip_object_map);
746   req->send();
747 }
748
749 template <typename I>
750 int Operations<I>::snap_rollback(const cls::rbd::SnapshotNamespace& snap_namespace,
751                                  const char *snap_name,
752                                  ProgressContext& prog_ctx) {
753   CephContext *cct = m_image_ctx.cct;
754   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
755                 << dendl;
756
757   int r = m_image_ctx.state->refresh_if_required();
758   if (r < 0)
759     return r;
760
761   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
762   {
763     // need to drop snap_lock before invalidating cache
764     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
765     if (!m_image_ctx.snap_exists) {
766       return -ENOENT;
767     }
768
769     if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
770       return -EROFS;
771     }
772
773     uint64_t snap_id = m_image_ctx.get_snap_id(snap_namespace, snap_name);
774     if (snap_id == CEPH_NOSNAP) {
775       lderr(cct) << "No such snapshot found." << dendl;
776       return -ENOENT;
777     }
778   }
779
780   r = prepare_image_update();
781   if (r < 0) {
782     return -EROFS;
783   }
784   if (m_image_ctx.exclusive_lock != nullptr &&
785       !m_image_ctx.exclusive_lock->is_lock_owner()) {
786     return -EROFS;
787   }
788
789   C_SaferCond cond_ctx;
790   execute_snap_rollback(snap_namespace, snap_name, prog_ctx, &cond_ctx);
791   r = cond_ctx.wait();
792   if (r < 0) {
793     return r;
794   }
795
796   m_image_ctx.perfcounter->inc(l_librbd_snap_rollback);
797   return r;
798 }
799
800 template <typename I>
801 void Operations<I>::execute_snap_rollback(const cls::rbd::SnapshotNamespace& snap_namespace,
802                                           const std::string &snap_name,
803                                           ProgressContext& prog_ctx,
804                                           Context *on_finish) {
805   assert(m_image_ctx.owner_lock.is_locked());
806   CephContext *cct = m_image_ctx.cct;
807   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
808                 << dendl;
809
810   m_image_ctx.snap_lock.get_read();
811   uint64_t snap_id = m_image_ctx.get_snap_id(snap_namespace, snap_name);
812   if (snap_id == CEPH_NOSNAP) {
813     lderr(cct) << "No such snapshot found." << dendl;
814     m_image_ctx.snap_lock.put_read();
815     on_finish->complete(-ENOENT);
816     return;
817   }
818
819   uint64_t new_size = m_image_ctx.get_image_size(snap_id);
820   m_image_ctx.snap_lock.put_read();
821
822   // async mode used for journal replay
823   operation::SnapshotRollbackRequest<I> *request =
824     new operation::SnapshotRollbackRequest<I>(
825       m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), snap_namespace, snap_name,
826       snap_id, new_size, prog_ctx);
827   request->send();
828 }
829
830 template <typename I>
831 int Operations<I>::snap_remove(const cls::rbd::SnapshotNamespace& snap_namespace,
832                                const char *snap_name) {
833   if (m_image_ctx.read_only) {
834     return -EROFS;
835   }
836
837   int r = m_image_ctx.state->refresh_if_required();
838   if (r < 0) {
839     return r;
840   }
841
842   C_SaferCond ctx;
843   snap_remove(snap_namespace, snap_name, &ctx);
844   r = ctx.wait();
845
846   if (r < 0) {
847     return r;
848   }
849
850   m_image_ctx.perfcounter->inc(l_librbd_snap_remove);
851   return 0;
852 }
853
854 template <typename I>
855 void Operations<I>::snap_remove(const cls::rbd::SnapshotNamespace& snap_namespace,
856                                 const char *snap_name,
857                                 Context *on_finish) {
858   CephContext *cct = m_image_ctx.cct;
859   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
860                 << dendl;
861
862   if (m_image_ctx.read_only) {
863     on_finish->complete(-EROFS);
864     return;
865   }
866
867   // quickly filter out duplicate ops
868   m_image_ctx.snap_lock.get_read();
869   if (m_image_ctx.get_snap_id(snap_namespace, snap_name) == CEPH_NOSNAP) {
870     m_image_ctx.snap_lock.put_read();
871     on_finish->complete(-ENOENT);
872     return;
873   }
874
875   bool proxy_op = ((m_image_ctx.features & RBD_FEATURE_FAST_DIFF) != 0 ||
876                    (m_image_ctx.features & RBD_FEATURE_JOURNALING) != 0);
877   m_image_ctx.snap_lock.put_read();
878
879   if (proxy_op) {
880     C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(
881       m_image_ctx, "snap_remove", true,
882       boost::bind(&Operations<I>::execute_snap_remove, this, snap_namespace, snap_name, _1),
883       boost::bind(&ImageWatcher<I>::notify_snap_remove, m_image_ctx.image_watcher,
884                   snap_namespace, snap_name, _1),
885       {-ENOENT}, on_finish);
886     req->send();
887   } else {
888     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
889     execute_snap_remove(snap_namespace, snap_name, on_finish);
890   }
891 }
892
893 template <typename I>
894 void Operations<I>::execute_snap_remove(const cls::rbd::SnapshotNamespace& snap_namespace,
895                                         const std::string &snap_name,
896                                         Context *on_finish) {
897   assert(m_image_ctx.owner_lock.is_locked());
898   {
899     if ((m_image_ctx.features & RBD_FEATURE_FAST_DIFF) != 0) {
900       assert(m_image_ctx.exclusive_lock == nullptr ||
901              m_image_ctx.exclusive_lock->is_lock_owner());
902     }
903   }
904
905   CephContext *cct = m_image_ctx.cct;
906   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
907                 << dendl;
908
909   m_image_ctx.snap_lock.get_read();
910   uint64_t snap_id = m_image_ctx.get_snap_id(snap_namespace, snap_name);
911   if (snap_id == CEPH_NOSNAP) {
912     lderr(m_image_ctx.cct) << "No such snapshot found." << dendl;
913     m_image_ctx.snap_lock.put_read();
914     on_finish->complete(-ENOENT);
915     return;
916   }
917
918   bool is_protected;
919   int r = m_image_ctx.is_snap_protected(snap_id, &is_protected);
920   if (r < 0) {
921     m_image_ctx.snap_lock.put_read();
922     on_finish->complete(r);
923     return;
924   } else if (is_protected) {
925     lderr(m_image_ctx.cct) << "snapshot is protected" << dendl;
926     m_image_ctx.snap_lock.put_read();
927     on_finish->complete(-EBUSY);
928     return;
929   }
930   m_image_ctx.snap_lock.put_read();
931
932   operation::SnapshotRemoveRequest<I> *req =
933     new operation::SnapshotRemoveRequest<I>(
934       m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish),
935       snap_namespace, snap_name, snap_id);
936   req->send();
937 }
938
939 template <typename I>
940 int Operations<I>::snap_rename(const char *srcname, const char *dstname) {
941   CephContext *cct = m_image_ctx.cct;
942   ldout(cct, 5) << this << " " << __func__ << ": "
943                 << "snap_name=" << srcname << ", "
944                 << "new_snap_name=" << dstname << dendl;
945
946   snapid_t snap_id;
947   if (m_image_ctx.read_only) {
948     return -EROFS;
949   }
950
951   int r = m_image_ctx.state->refresh_if_required();
952   if (r < 0)
953     return r;
954
955   {
956     RWLock::RLocker l(m_image_ctx.snap_lock);
957     snap_id = m_image_ctx.get_snap_id(cls::rbd::UserSnapshotNamespace(), srcname);
958     if (snap_id == CEPH_NOSNAP) {
959       return -ENOENT;
960     }
961     if (m_image_ctx.get_snap_id(cls::rbd::UserSnapshotNamespace(), dstname) != CEPH_NOSNAP) {
962       return -EEXIST;
963     }
964   }
965
966   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
967     r = invoke_async_request("snap_rename", true,
968                              boost::bind(&Operations<I>::execute_snap_rename,
969                                          this, snap_id, dstname, _1),
970                              boost::bind(&ImageWatcher<I>::notify_snap_rename,
971                                          m_image_ctx.image_watcher, snap_id,
972                                          dstname, _1));
973     if (r < 0 && r != -EEXIST) {
974       return r;
975     }
976   } else {
977     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
978     C_SaferCond cond_ctx;
979     execute_snap_rename(snap_id, dstname, &cond_ctx);
980
981     r = cond_ctx.wait();
982     if (r < 0) {
983       return r;
984     }
985   }
986
987   m_image_ctx.perfcounter->inc(l_librbd_snap_rename);
988   return 0;
989 }
990
991 template <typename I>
992 void Operations<I>::execute_snap_rename(const uint64_t src_snap_id,
993                                         const std::string &dest_snap_name,
994                                         Context *on_finish) {
995   assert(m_image_ctx.owner_lock.is_locked());
996   if ((m_image_ctx.features & RBD_FEATURE_JOURNALING) != 0) {
997     assert(m_image_ctx.exclusive_lock == nullptr ||
998            m_image_ctx.exclusive_lock->is_lock_owner());
999   }
1000
1001   m_image_ctx.snap_lock.get_read();
1002   if (m_image_ctx.get_snap_id(cls::rbd::UserSnapshotNamespace(),
1003                               dest_snap_name) != CEPH_NOSNAP) {
1004     // Renaming is supported for snapshots from user namespace only.
1005     m_image_ctx.snap_lock.put_read();
1006     on_finish->complete(-EEXIST);
1007     return;
1008   }
1009   m_image_ctx.snap_lock.put_read();
1010
1011   CephContext *cct = m_image_ctx.cct;
1012   ldout(cct, 5) << this << " " << __func__ << ": "
1013                 << "snap_id=" << src_snap_id << ", "
1014                 << "new_snap_name=" << dest_snap_name << dendl;
1015
1016   operation::SnapshotRenameRequest<I> *req =
1017     new operation::SnapshotRenameRequest<I>(
1018       m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), src_snap_id,
1019       dest_snap_name);
1020   req->send();
1021 }
1022
1023 template <typename I>
1024 int Operations<I>::snap_protect(const cls::rbd::SnapshotNamespace& snap_namespace,
1025                                 const char *snap_name) {
1026   CephContext *cct = m_image_ctx.cct;
1027   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1028                 << dendl;
1029
1030   if (m_image_ctx.read_only) {
1031     return -EROFS;
1032   }
1033
1034   if (!m_image_ctx.test_features(RBD_FEATURE_LAYERING)) {
1035     lderr(cct) << "image must support layering" << dendl;
1036     return -ENOSYS;
1037   }
1038
1039   int r = m_image_ctx.state->refresh_if_required();
1040   if (r < 0) {
1041     return r;
1042   }
1043
1044   {
1045     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
1046     bool is_protected;
1047     r = m_image_ctx.is_snap_protected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1048                                       &is_protected);
1049     if (r < 0) {
1050       return r;
1051     }
1052
1053     if (is_protected) {
1054       return -EBUSY;
1055     }
1056   }
1057
1058   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1059     r = invoke_async_request("snap_protect", true,
1060                              boost::bind(&Operations<I>::execute_snap_protect,
1061                                          this, snap_namespace, snap_name, _1),
1062                              boost::bind(&ImageWatcher<I>::notify_snap_protect,
1063                                          m_image_ctx.image_watcher,
1064                                          snap_namespace, snap_name, _1));
1065     if (r < 0 && r != -EBUSY) {
1066       return r;
1067     }
1068   } else {
1069     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1070     C_SaferCond cond_ctx;
1071     execute_snap_protect(snap_namespace, snap_name, &cond_ctx);
1072
1073     r = cond_ctx.wait();
1074     if (r < 0) {
1075       return r;
1076     }
1077   }
1078   return 0;
1079 }
1080
1081 template <typename I>
1082 void Operations<I>::execute_snap_protect(const cls::rbd::SnapshotNamespace& snap_namespace,
1083                                          const std::string &snap_name,
1084                                          Context *on_finish) {
1085   assert(m_image_ctx.owner_lock.is_locked());
1086   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1087     assert(m_image_ctx.exclusive_lock == nullptr ||
1088            m_image_ctx.exclusive_lock->is_lock_owner());
1089   }
1090
1091   m_image_ctx.snap_lock.get_read();
1092   bool is_protected;
1093   int r = m_image_ctx.is_snap_protected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1094                                         &is_protected);
1095   if (r < 0) {
1096     m_image_ctx.snap_lock.put_read();
1097     on_finish->complete(r);
1098     return;
1099   } else if (is_protected) {
1100     m_image_ctx.snap_lock.put_read();
1101     on_finish->complete(-EBUSY);
1102     return;
1103   }
1104   m_image_ctx.snap_lock.put_read();
1105
1106   CephContext *cct = m_image_ctx.cct;
1107   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1108                 << dendl;
1109
1110   operation::SnapshotProtectRequest<I> *request =
1111     new operation::SnapshotProtectRequest<I>(
1112       m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), snap_namespace, snap_name);
1113   request->send();
1114 }
1115
1116 template <typename I>
1117 int Operations<I>::snap_unprotect(const cls::rbd::SnapshotNamespace& snap_namespace,
1118                                   const char *snap_name) {
1119   CephContext *cct = m_image_ctx.cct;
1120   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1121                 << dendl;
1122
1123   if (m_image_ctx.read_only) {
1124     return -EROFS;
1125   }
1126
1127   int r = m_image_ctx.state->refresh_if_required();
1128   if (r < 0) {
1129     return r;
1130   }
1131
1132   {
1133     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
1134     bool is_unprotected;
1135     r = m_image_ctx.is_snap_unprotected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1136                                   &is_unprotected);
1137     if (r < 0) {
1138       return r;
1139     }
1140
1141     if (is_unprotected) {
1142       return -EINVAL;
1143     }
1144   }
1145
1146   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1147     r = invoke_async_request("snap_unprotect", true,
1148                              boost::bind(&Operations<I>::execute_snap_unprotect,
1149                                          this, snap_namespace, snap_name, _1),
1150                              boost::bind(&ImageWatcher<I>::notify_snap_unprotect,
1151                                          m_image_ctx.image_watcher,
1152                                          snap_namespace, snap_name, _1));
1153     if (r < 0 && r != -EINVAL) {
1154       return r;
1155     }
1156   } else {
1157     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1158     C_SaferCond cond_ctx;
1159     execute_snap_unprotect(snap_namespace, snap_name, &cond_ctx);
1160
1161     r = cond_ctx.wait();
1162     if (r < 0) {
1163       return r;
1164     }
1165   }
1166   return 0;
1167 }
1168
1169 template <typename I>
1170 void Operations<I>::execute_snap_unprotect(const cls::rbd::SnapshotNamespace& snap_namespace,
1171                                            const std::string &snap_name,
1172                                            Context *on_finish) {
1173   assert(m_image_ctx.owner_lock.is_locked());
1174   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
1175     assert(m_image_ctx.exclusive_lock == nullptr ||
1176            m_image_ctx.exclusive_lock->is_lock_owner());
1177   }
1178
1179   m_image_ctx.snap_lock.get_read();
1180   bool is_unprotected;
1181   int r = m_image_ctx.is_snap_unprotected(m_image_ctx.get_snap_id(snap_namespace, snap_name),
1182                                           &is_unprotected);
1183   if (r < 0) {
1184     m_image_ctx.snap_lock.put_read();
1185     on_finish->complete(r);
1186     return;
1187   } else if (is_unprotected) {
1188     m_image_ctx.snap_lock.put_read();
1189     on_finish->complete(-EINVAL);
1190     return;
1191   }
1192   m_image_ctx.snap_lock.put_read();
1193
1194   CephContext *cct = m_image_ctx.cct;
1195   ldout(cct, 5) << this << " " << __func__ << ": snap_name=" << snap_name
1196                 << dendl;
1197
1198   operation::SnapshotUnprotectRequest<I> *request =
1199     new operation::SnapshotUnprotectRequest<I>(
1200       m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), snap_namespace, snap_name);
1201   request->send();
1202 }
1203
1204 template <typename I>
1205 int Operations<I>::snap_set_limit(uint64_t limit) {
1206   CephContext *cct = m_image_ctx.cct;
1207   ldout(cct, 5) << this << " " << __func__ << ": limit=" << limit << dendl;
1208
1209   if (m_image_ctx.read_only) {
1210     return -EROFS;
1211   }
1212
1213   int r = m_image_ctx.state->refresh_if_required();
1214   if (r < 0) {
1215     return r;
1216   }
1217
1218   {
1219     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1220     C_SaferCond limit_ctx;
1221
1222     if (m_image_ctx.exclusive_lock != nullptr &&
1223         !m_image_ctx.exclusive_lock->is_lock_owner()) {
1224       C_SaferCond lock_ctx;
1225
1226       m_image_ctx.exclusive_lock->acquire_lock(&lock_ctx);
1227       r = lock_ctx.wait();
1228       if (r < 0) {
1229         return r;
1230       }
1231     }
1232
1233     execute_snap_set_limit(limit, &limit_ctx);
1234     r = limit_ctx.wait();
1235   }
1236
1237   return r;
1238 }
1239
1240 template <typename I>
1241 void Operations<I>::execute_snap_set_limit(const uint64_t limit,
1242                                            Context *on_finish) {
1243   assert(m_image_ctx.owner_lock.is_locked());
1244
1245   CephContext *cct = m_image_ctx.cct;
1246   ldout(cct, 5) << this << " " << __func__ << ": limit=" << limit
1247                 << dendl;
1248
1249   operation::SnapshotLimitRequest<I> *request =
1250     new operation::SnapshotLimitRequest<I>(m_image_ctx, on_finish, limit);
1251   request->send();
1252 }
1253
1254 template <typename I>
1255 int Operations<I>::update_features(uint64_t features, bool enabled) {
1256   CephContext *cct = m_image_ctx.cct;
1257   ldout(cct, 5) << this << " " << __func__ << ": features=" << features
1258                 << ", enabled=" << enabled << dendl;
1259
1260   int r = m_image_ctx.state->refresh_if_required();
1261   if (r < 0) {
1262     return r;
1263   }
1264
1265   if (m_image_ctx.read_only) {
1266     return -EROFS;
1267   } else if (m_image_ctx.old_format) {
1268     lderr(cct) << "old-format images do not support features" << dendl;
1269     return -EINVAL;
1270   }
1271
1272   uint64_t disable_mask = (RBD_FEATURES_MUTABLE |
1273                            RBD_FEATURES_DISABLE_ONLY);
1274   if ((enabled && (features & RBD_FEATURES_MUTABLE) != features) ||
1275       (!enabled && (features & disable_mask) != features)) {
1276     lderr(cct) << "cannot update immutable features" << dendl;
1277     return -EINVAL;
1278   }
1279   if (features == 0) {
1280     lderr(cct) << "update requires at least one feature" << dendl;
1281     return -EINVAL;
1282   }
1283   {
1284     RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
1285     if (enabled && (features & m_image_ctx.features) != 0) {
1286       lderr(cct) << "one or more requested features are already enabled"
1287                  << dendl;
1288       return -EINVAL;
1289     }
1290     if (!enabled && (features & ~m_image_ctx.features) != 0) {
1291       lderr(cct) << "one or more requested features are already disabled"
1292                  << dendl;
1293       return -EINVAL;
1294     }
1295   }
1296
1297   // if disabling journaling, avoid attempting to open the journal
1298   // when acquiring the exclusive lock in case the journal is corrupt
1299   bool disabling_journal = false;
1300   if (!enabled && ((features & RBD_FEATURE_JOURNALING) != 0)) {
1301     RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
1302     m_image_ctx.set_journal_policy(new journal::DisabledPolicy());
1303     disabling_journal = true;
1304   }
1305   BOOST_SCOPE_EXIT_ALL( (this)(disabling_journal) ) {
1306     if (disabling_journal) {
1307       RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
1308       m_image_ctx.set_journal_policy(
1309         new journal::StandardPolicy<I>(&m_image_ctx));
1310     }
1311   };
1312
1313   r = invoke_async_request("update_features", false,
1314                            boost::bind(&Operations<I>::execute_update_features,
1315                                        this, features, enabled, _1, 0),
1316                            boost::bind(&ImageWatcher<I>::notify_update_features,
1317                                        m_image_ctx.image_watcher, features,
1318                                        enabled, _1));
1319   ldout(cct, 2) << "update_features finished" << dendl;
1320   return r;
1321 }
1322
1323 template <typename I>
1324 void Operations<I>::execute_update_features(uint64_t features, bool enabled,
1325                                             Context *on_finish,
1326                                             uint64_t journal_op_tid) {
1327   assert(m_image_ctx.owner_lock.is_locked());
1328   assert(m_image_ctx.exclusive_lock == nullptr ||
1329          m_image_ctx.exclusive_lock->is_lock_owner());
1330
1331   CephContext *cct = m_image_ctx.cct;
1332   ldout(cct, 5) << this << " " << __func__ << ": features=" << features
1333                 << ", enabled=" << enabled << dendl;
1334
1335   if (enabled) {
1336     operation::EnableFeaturesRequest<I> *req =
1337       new operation::EnableFeaturesRequest<I>(
1338         m_image_ctx, on_finish, journal_op_tid, features);
1339     req->send();
1340   } else {
1341     operation::DisableFeaturesRequest<I> *req =
1342       new operation::DisableFeaturesRequest<I>(
1343         m_image_ctx, on_finish, journal_op_tid, features, false);
1344     req->send();
1345   }
1346 }
1347
1348 template <typename I>
1349 int Operations<I>::metadata_set(const std::string &key,
1350                                 const std::string &value) {
1351   CephContext *cct = m_image_ctx.cct;
1352   ldout(cct, 5) << this << " " << __func__ << ": key=" << key << ", value="
1353                 << value << dendl;
1354
1355   string start = m_image_ctx.METADATA_CONF_PREFIX;
1356   size_t conf_prefix_len = start.size();
1357
1358   if (key.size() > conf_prefix_len && !key.compare(0, conf_prefix_len, start)) {
1359     // validate config setting
1360     string subkey = key.substr(conf_prefix_len, key.size() - conf_prefix_len);
1361     int r = md_config_t().set_val(subkey.c_str(), value);
1362     if (r < 0) {
1363       return r;
1364     }
1365   }
1366
1367   int r = m_image_ctx.state->refresh_if_required();
1368   if (r < 0) {
1369     return r;
1370   }
1371
1372   if (m_image_ctx.read_only) {
1373     return -EROFS;
1374   }
1375
1376   {
1377     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1378     C_SaferCond metadata_ctx;
1379
1380     if (m_image_ctx.exclusive_lock != nullptr &&
1381         !m_image_ctx.exclusive_lock->is_lock_owner()) {
1382       C_SaferCond lock_ctx;
1383
1384       m_image_ctx.exclusive_lock->acquire_lock(&lock_ctx);
1385       r = lock_ctx.wait();
1386       if (r < 0) {
1387         return r;
1388       }
1389     }
1390
1391     execute_metadata_set(key, value, &metadata_ctx);
1392     r = metadata_ctx.wait();
1393   }
1394
1395   return r;
1396 }
1397
1398 template <typename I>
1399 void Operations<I>::execute_metadata_set(const std::string &key,
1400                                         const std::string &value,
1401                                         Context *on_finish) {
1402   assert(m_image_ctx.owner_lock.is_locked());
1403
1404   CephContext *cct = m_image_ctx.cct;
1405   ldout(cct, 5) << this << " " << __func__ << ": key=" << key << ", value="
1406                 << value << dendl;
1407
1408   operation::MetadataSetRequest<I> *request =
1409     new operation::MetadataSetRequest<I>(m_image_ctx, on_finish, key, value);
1410   request->send();
1411 }
1412
1413 template <typename I>
1414 int Operations<I>::metadata_remove(const std::string &key) {
1415   CephContext *cct = m_image_ctx.cct;
1416   ldout(cct, 5) << this << " " << __func__ << ": key=" << key << dendl;
1417
1418   if (m_image_ctx.read_only) {
1419     return -EROFS;
1420   }
1421
1422   int r = m_image_ctx.state->refresh_if_required();
1423   if (r < 0) {
1424     return r;
1425   }
1426
1427   if (m_image_ctx.read_only) {
1428     return -EROFS;
1429   }
1430
1431   std::string value;
1432   r = cls_client::metadata_get(&m_image_ctx.md_ctx, m_image_ctx.header_oid, key, &value);
1433   if(r < 0)
1434     return r;
1435
1436   {
1437     RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
1438     C_SaferCond metadata_ctx;
1439
1440     if (m_image_ctx.exclusive_lock != nullptr &&
1441         !m_image_ctx.exclusive_lock->is_lock_owner()) {
1442       C_SaferCond lock_ctx;
1443
1444       m_image_ctx.exclusive_lock->acquire_lock(&lock_ctx);
1445       r = lock_ctx.wait();
1446       if (r < 0) {
1447         return r;
1448       }
1449     }
1450
1451     execute_metadata_remove(key, &metadata_ctx);
1452     r = metadata_ctx.wait();
1453   }
1454
1455   return r;
1456 }
1457
1458 template <typename I>
1459 void Operations<I>::execute_metadata_remove(const std::string &key,
1460                                            Context *on_finish) {
1461   assert(m_image_ctx.owner_lock.is_locked());
1462
1463   CephContext *cct = m_image_ctx.cct;
1464   ldout(cct, 5) << this << " " << __func__ << ": key=" << key << dendl;
1465
1466   operation::MetadataRemoveRequest<I> *request =
1467     new operation::MetadataRemoveRequest<I>(m_image_ctx, on_finish, key);
1468   request->send();
1469 }
1470
1471 template <typename I>
1472 int Operations<I>::prepare_image_update() {
1473   assert(m_image_ctx.owner_lock.is_locked() &&
1474          !m_image_ctx.owner_lock.is_wlocked());
1475   if (m_image_ctx.image_watcher == NULL) {
1476     return -EROFS;
1477   }
1478
1479   // need to upgrade to a write lock
1480   bool trying_lock = false;
1481   C_SaferCond ctx;
1482   m_image_ctx.owner_lock.put_read();
1483   {
1484     RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
1485     if (m_image_ctx.exclusive_lock != nullptr &&
1486         (!m_image_ctx.exclusive_lock->is_lock_owner() ||
1487          !m_image_ctx.exclusive_lock->accept_requests())) {
1488       m_image_ctx.exclusive_lock->try_acquire_lock(&ctx);
1489       trying_lock = true;
1490     }
1491   }
1492
1493   int r = 0;
1494   if (trying_lock) {
1495     r = ctx.wait();
1496   }
1497   m_image_ctx.owner_lock.get_read();
1498
1499   return r;
1500 }
1501
1502 template <typename I>
1503 int Operations<I>::invoke_async_request(const std::string& request_type,
1504                                         bool permit_snapshot,
1505                                         const boost::function<void(Context*)>& local_request,
1506                                         const boost::function<void(Context*)>& remote_request) {
1507   C_SaferCond ctx;
1508   C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(m_image_ctx,
1509                                                              request_type,
1510                                                              permit_snapshot,
1511                                                              local_request,
1512                                                              remote_request,
1513                                                              {}, &ctx);
1514   req->send();
1515   return ctx.wait();
1516 }
1517
1518 } // namespace librbd
1519
1520 template class librbd::Operations<librbd::ImageCtx>;