Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / image / CreateRequest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/image/CreateRequest.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "cls/rbd/cls_rbd_client.h"
8 #include "include/assert.h"
9 #include "librbd/Utils.h"
10 #include "common/ceph_context.h"
11 #include "librbd/Journal.h"
12 #include "librbd/MirroringWatcher.h"
13 #include "librbd/journal/CreateRequest.h"
14 #include "librbd/journal/RemoveRequest.h"
15 #include "librbd/mirror/EnableRequest.h"
16 #include "librbd/io/AioCompletion.h"
17 #include "journal/Journaler.h"
18
19 #define dout_subsys ceph_subsys_rbd
20 #undef dout_prefix
21 #define dout_prefix *_dout << "librbd::image::CreateRequest: " << __func__ \
22                            << ": "
23
24 namespace librbd {
25 namespace image {
26
27 using util::create_rados_callback;
28 using util::create_context_callback;
29
30 namespace {
31
32 int validate_features(CephContext *cct, uint64_t features,
33                        bool force_non_primary) {
34   if (features & ~RBD_FEATURES_ALL) {
35     lderr(cct) << "librbd does not support requested features." << dendl;
36     return -ENOSYS;
37   }
38   if ((features & RBD_FEATURE_FAST_DIFF) != 0 &&
39       (features & RBD_FEATURE_OBJECT_MAP) == 0) {
40     lderr(cct) << "cannot use fast diff without object map" << dendl;
41     return -EINVAL;
42   }
43   if ((features & RBD_FEATURE_OBJECT_MAP) != 0 &&
44       (features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
45     lderr(cct) << "cannot use object map without exclusive lock" << dendl;
46     return -EINVAL;
47   }
48   if ((features & RBD_FEATURE_JOURNALING) != 0) {
49     if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
50       lderr(cct) << "cannot use journaling without exclusive lock" << dendl;
51       return -EINVAL;
52     }
53   } else if (force_non_primary) {
54     assert(false);
55   }
56
57   return 0;
58 }
59
60 int validate_striping(CephContext *cct, uint8_t order, uint64_t stripe_unit,
61                       uint64_t stripe_count) {
62   if ((stripe_unit && !stripe_count) ||
63       (!stripe_unit && stripe_count)) {
64     lderr(cct) << "must specify both (or neither) of stripe-unit and "
65                << "stripe-count" << dendl;
66     return -EINVAL;
67   } else if (stripe_unit || stripe_count) {
68     if ((1ull << order) % stripe_unit || stripe_unit > (1ull << order)) {
69       lderr(cct) << "stripe unit is not a factor of the object size" << dendl;
70       return -EINVAL;
71     }
72   }
73   return 0;
74 }
75
76 int validate_data_pool(CephContext *cct, IoCtx &io_ctx, uint64_t features,
77                        const std::string &data_pool, int64_t *data_pool_id) {
78   if ((features & RBD_FEATURE_DATA_POOL) == 0) {
79     return 0;
80   }
81
82   librados::Rados rados(io_ctx);
83   librados::IoCtx data_io_ctx;
84   int r = rados.ioctx_create(data_pool.c_str(), data_io_ctx);
85   if (r < 0) {
86     lderr(cct) << "data pool " << data_pool << " does not exist" << dendl;
87     return -ENOENT;
88   }
89
90   *data_pool_id = data_io_ctx.get_id();
91   return 0;
92 }
93
94
95 bool validate_layout(CephContext *cct, uint64_t size, file_layout_t &layout) {
96   if (!librbd::ObjectMap<>::is_compatible(layout, size)) {
97     lderr(cct) << "image size not compatible with object map" << dendl;
98     return false;
99   }
100
101   return true;
102 }
103
104 int get_image_option(const ImageOptions &image_options, int option,
105                      uint8_t *value) {
106   uint64_t large_value;
107   int r = image_options.get(option, &large_value);
108   if (r < 0) {
109     return r;
110   }
111   *value = static_cast<uint8_t>(large_value);
112   return 0;
113 }
114
115 } // anonymous namespace
116
117 template<typename I>
118 int CreateRequest<I>::validate_order(CephContext *cct, uint8_t order) {
119   if (order > 25 || order < 12) {
120     lderr(cct) << "order must be in the range [12, 25]" << dendl;
121     return -EDOM;
122   }
123   return 0;
124 }
125
126 #undef dout_prefix
127 #define dout_prefix *_dout << "librbd::image::CreateRequest: " << this << " " \
128                            << __func__ << ": "
129
130 template<typename I>
131 CreateRequest<I>::CreateRequest(IoCtx &ioctx, const std::string &image_name,
132                                 const std::string &image_id, uint64_t size,
133                                 const ImageOptions &image_options,
134                                 const std::string &non_primary_global_image_id,
135                                 const std::string &primary_mirror_uuid,
136                                 bool skip_mirror_enable,
137                                 ContextWQ *op_work_queue, Context *on_finish)
138   : m_ioctx(ioctx), m_image_name(image_name), m_image_id(image_id),
139     m_size(size), m_non_primary_global_image_id(non_primary_global_image_id),
140     m_primary_mirror_uuid(primary_mirror_uuid),
141     m_skip_mirror_enable(skip_mirror_enable),
142     m_op_work_queue(op_work_queue), m_on_finish(on_finish) {
143   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
144
145   m_id_obj = util::id_obj_name(m_image_name);
146   m_header_obj = util::header_name(m_image_id);
147   m_objmap_name = ObjectMap<>::object_map_name(m_image_id, CEPH_NOSNAP);
148
149   if (image_options.get(RBD_IMAGE_OPTION_FEATURES, &m_features) != 0) {
150     m_features = util::get_rbd_default_features(m_cct);
151     m_negotiate_features = true;
152   }
153
154   uint64_t features_clear = 0;
155   uint64_t features_set = 0;
156   image_options.get(RBD_IMAGE_OPTION_FEATURES_CLEAR, &features_clear);
157   image_options.get(RBD_IMAGE_OPTION_FEATURES_SET, &features_set);
158
159   uint64_t features_conflict = features_clear & features_set;
160   features_clear &= ~features_conflict;
161   features_set &= ~features_conflict;
162   m_features |= features_set;
163   m_features &= ~features_clear;
164
165   if (image_options.get(RBD_IMAGE_OPTION_STRIPE_UNIT, &m_stripe_unit) != 0 ||
166       m_stripe_unit == 0) {
167     m_stripe_unit = m_cct->_conf->get_val<uint64_t>("rbd_default_stripe_unit");
168   }
169   if (image_options.get(RBD_IMAGE_OPTION_STRIPE_COUNT, &m_stripe_count) != 0 ||
170       m_stripe_count == 0) {
171     m_stripe_count = m_cct->_conf->get_val<uint64_t>("rbd_default_stripe_count");
172   }
173   if (get_image_option(image_options, RBD_IMAGE_OPTION_ORDER, &m_order) != 0 ||
174       m_order == 0) {
175     m_order = m_cct->_conf->get_val<int64_t>("rbd_default_order");
176   }
177   if (get_image_option(image_options, RBD_IMAGE_OPTION_JOURNAL_ORDER,
178       &m_journal_order) != 0) {
179     m_journal_order = m_cct->_conf->get_val<uint64_t>("rbd_journal_order");
180   }
181   if (get_image_option(image_options, RBD_IMAGE_OPTION_JOURNAL_SPLAY_WIDTH,
182                        &m_journal_splay_width) != 0) {
183     m_journal_splay_width = m_cct->_conf->get_val<uint64_t>("rbd_journal_splay_width");
184   }
185   if (image_options.get(RBD_IMAGE_OPTION_JOURNAL_POOL, &m_journal_pool) != 0) {
186     m_journal_pool = m_cct->_conf->get_val<std::string>("rbd_journal_pool");
187   }
188   if (image_options.get(RBD_IMAGE_OPTION_DATA_POOL, &m_data_pool) != 0) {
189     m_data_pool = m_cct->_conf->get_val<std::string>("rbd_default_data_pool");
190   }
191
192   m_layout.object_size = 1ull << m_order;
193   if (m_stripe_unit == 0 || m_stripe_count == 0) {
194     m_layout.stripe_unit = m_layout.object_size;
195     m_layout.stripe_count = 1;
196   } else {
197     m_layout.stripe_unit = m_stripe_unit;
198     m_layout.stripe_count = m_stripe_count;
199   }
200
201   m_force_non_primary = !non_primary_global_image_id.empty();
202
203   if (!m_data_pool.empty() && m_data_pool != ioctx.get_pool_name()) {
204     m_features |= RBD_FEATURE_DATA_POOL;
205   } else {
206     m_data_pool.clear();
207     m_features &= ~RBD_FEATURE_DATA_POOL;
208   }
209
210   if ((m_stripe_unit != 0 && m_stripe_unit != (1ULL << m_order)) ||
211       (m_stripe_count != 0 && m_stripe_count != 1)) {
212     m_features |= RBD_FEATURE_STRIPINGV2;
213   } else {
214     m_features &= ~RBD_FEATURE_STRIPINGV2;
215   }
216
217   ldout(m_cct, 20) << "name=" << m_image_name << ", "
218                    << "id=" << m_image_id << ", "
219                    << "size=" << m_size << ", "
220                    << "features=" << m_features << ", "
221                    << "order=" << (uint64_t)m_order << ", "
222                    << "stripe_unit=" << m_stripe_unit << ", "
223                    << "stripe_count=" << m_stripe_count << ", "
224                    << "journal_order=" << (uint64_t)m_journal_order << ", "
225                    << "journal_splay_width="
226                    << (uint64_t)m_journal_splay_width << ", "
227                    << "journal_pool=" << m_journal_pool << ", "
228                    << "data_pool=" << m_data_pool << dendl;
229 }
230
231 template<typename I>
232 void CreateRequest<I>::send() {
233   ldout(m_cct, 20) << dendl;
234
235   int r = validate_features(m_cct, m_features, m_force_non_primary);
236   if (r < 0) {
237     complete(r);
238     return;
239   }
240
241   r = validate_order(m_cct, m_order);
242   if (r < 0) {
243     complete(r);
244     return;
245   }
246
247   r = validate_striping(m_cct, m_order, m_stripe_unit, m_stripe_count);
248   if (r < 0) {
249     complete(r);
250     return;
251   }
252
253   r = validate_data_pool(m_cct, m_ioctx, m_features, m_data_pool,
254                          &m_data_pool_id);
255   if (r < 0) {
256     complete(r);
257     return;
258   }
259
260   if (((m_features & RBD_FEATURE_OBJECT_MAP) != 0) &&
261       (!validate_layout(m_cct, m_size, m_layout))) {
262     complete(-EINVAL);
263     return;
264   }
265
266   validate_pool();
267 }
268
269 template<typename I>
270 void CreateRequest<I>::validate_pool() {
271   if (!m_cct->_conf->get_val<bool>("rbd_validate_pool")) {
272     create_id_object();
273     return;
274   }
275
276   ldout(m_cct, 20) << dendl;
277
278   using klass = CreateRequest<I>;
279   librados::AioCompletion *comp =
280     create_rados_callback<klass, &klass::handle_validate_pool>(this);
281
282   librados::ObjectReadOperation op;
283   op.stat(NULL, NULL, NULL);
284
285   m_outbl.clear();
286   int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op, &m_outbl);
287   assert(r == 0);
288   comp->release();
289 }
290
291 template<typename I>
292 void CreateRequest<I>::handle_validate_pool(int r) {
293   ldout(m_cct, 20) << "r=" << r << dendl;
294
295   if (r == 0) {
296     validate_overwrite();
297     return;
298   } else if ((r < 0) && (r != -ENOENT)) {
299     lderr(m_cct) << "failed to stat RBD directory: " << cpp_strerror(r)
300                  << dendl;
301     complete(r);
302     return;
303   }
304
305   // allocate a self-managed snapshot id if this a new pool to force
306   // self-managed snapshot mode
307   // This call is executed just once per (fresh) pool, hence we do not
308   // try hard to make it asynchronous (and it's pretty safe not to cause
309   // deadlocks).
310
311   uint64_t snap_id;
312   r = m_ioctx.selfmanaged_snap_create(&snap_id);
313   if (r == -EINVAL) {
314     lderr(m_cct) << "pool not configured for self-managed RBD snapshot support"
315                  << dendl;
316     complete(r);
317     return;
318   } else if (r < 0) {
319     lderr(m_cct) << "failed to allocate self-managed snapshot: "
320                  << cpp_strerror(r) << dendl;
321     complete(r);
322     return;
323   }
324
325   r = m_ioctx.selfmanaged_snap_remove(snap_id);
326   if (r < 0) {
327     // we've already switched to self-managed snapshots -- no need to
328     // error out in case of failure here.
329     ldout(m_cct, 10) << "failed to release self-managed snapshot " << snap_id
330                      << ": " << cpp_strerror(r) << dendl;
331   }
332
333   validate_overwrite();
334 }
335
336 template <typename I>
337 void CreateRequest<I>::validate_overwrite() {
338   ldout(m_cct, 20) << dendl;
339
340   m_data_io_ctx = m_ioctx;
341   if (m_data_pool_id != -1) {
342     librados::Rados rados(m_ioctx);
343     int r = rados.ioctx_create2(m_data_pool_id, m_data_io_ctx);
344     if (r < 0) {
345       lderr(m_cct) << "data pool " << m_data_pool << " does not exist" << dendl;
346       complete(r);
347       return;
348     }
349   }
350
351   using klass = CreateRequest<I>;
352   librados::AioCompletion *comp =
353     create_rados_callback<klass, &klass::handle_validate_overwrite>(this);
354
355   librados::ObjectReadOperation op;
356   op.read(0, 0, nullptr, nullptr);
357
358   m_outbl.clear();
359   int r = m_data_io_ctx.aio_operate(RBD_INFO, comp, &op, &m_outbl);
360   assert(r == 0);
361   comp->release();
362 }
363
364 template <typename I>
365 void CreateRequest<I>::handle_validate_overwrite(int r) {
366   ldout(m_cct, 20) << "r=" << r << dendl;
367
368   bufferlist bl;
369   bl.append("overwrite validated");
370
371   if (r == 0 && m_outbl.contents_equal(bl)) {
372     create_id_object();
373     return;
374   } else if ((r < 0) && (r != -ENOENT)) {
375     lderr(m_cct) << "failed to read RBD info: " << cpp_strerror(r) << dendl;
376     complete(r);
377     return;
378   }
379
380   // validate the pool supports overwrites. We cannot use rbd_directory
381   // since the v1 images store the directory as tmap data within the object.
382   ldout(m_cct, 10) << "validating overwrite support" << dendl;
383   bufferlist initial_bl;
384   initial_bl.append("validate");
385   r = m_data_io_ctx.write(RBD_INFO, initial_bl, initial_bl.length(), 0);
386   if (r >= 0) {
387     r = m_data_io_ctx.write(RBD_INFO, bl, bl.length(), 0);
388   }
389   if (r == -EOPNOTSUPP) {
390     lderr(m_cct) << "pool missing required overwrite support" << dendl;
391     complete(-EINVAL);
392     return;
393   } else if (r < 0) {
394     lderr(m_cct) << "failed to validate overwrite support: " << cpp_strerror(r)
395                  << dendl;
396     complete(r);
397     return;
398   }
399
400   create_id_object();
401 }
402
403 template<typename I>
404 void CreateRequest<I>::create_id_object() {
405   ldout(m_cct, 20) << dendl;
406
407   librados::ObjectWriteOperation op;
408   op.create(true);
409   cls_client::set_id(&op, m_image_id);
410
411   using klass = CreateRequest<I>;
412   librados::AioCompletion *comp =
413     create_rados_callback<klass, &klass::handle_create_id_object>(this);
414   int r = m_ioctx.aio_operate(m_id_obj, comp, &op);
415   assert(r == 0);
416   comp->release();
417 }
418
419 template<typename I>
420 void CreateRequest<I>::handle_create_id_object(int r) {
421   ldout(m_cct, 20) << "r=" << r << dendl;
422
423   if (r < 0) {
424     lderr(m_cct) << "error creating RBD id object: " << cpp_strerror(r)
425                  << dendl;
426     complete(r);
427     return;
428   }
429
430   add_image_to_directory();
431 }
432
433 template<typename I>
434 void CreateRequest<I>::add_image_to_directory() {
435   ldout(m_cct, 20) << dendl;
436
437   librados::ObjectWriteOperation op;
438   cls_client::dir_add_image(&op, m_image_name, m_image_id);
439
440   using klass = CreateRequest<I>;
441   librados::AioCompletion *comp =
442     create_rados_callback<klass, &klass::handle_add_image_to_directory>(this);
443   int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op);
444   assert(r == 0);
445   comp->release();
446 }
447
448 template<typename I>
449 void CreateRequest<I>::handle_add_image_to_directory(int r) {
450   ldout(m_cct, 20) << "r=" << r << dendl;
451
452   if (r < 0) {
453     lderr(m_cct) << "error adding image to directory: " << cpp_strerror(r)
454                  << dendl;
455
456     m_r_saved = r;
457     remove_id_object();
458   }
459
460   negotiate_features();
461 }
462
463 template<typename I>
464 void CreateRequest<I>::negotiate_features() {
465   if (!m_negotiate_features) {
466     create_image();
467     return;
468   }
469
470   ldout(m_cct, 20) << dendl;
471
472   librados::ObjectReadOperation op;
473   cls_client::get_all_features_start(&op);
474
475   using klass = CreateRequest<I>;
476   librados::AioCompletion *comp =
477     create_rados_callback<klass, &klass::handle_negotiate_features>(this);
478
479   m_outbl.clear();
480   int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op, &m_outbl);
481   assert(r == 0);
482   comp->release();
483 }
484
485 template<typename I>
486 void CreateRequest<I>::handle_negotiate_features(int r) {
487   ldout(m_cct, 20) << "r=" << r << dendl;
488
489   uint64_t all_features;
490   if (r >= 0) {
491     bufferlist::iterator it = m_outbl.begin();
492     r = cls_client::get_all_features_finish(&it, &all_features);
493   }
494   if (r < 0) {
495     ldout(m_cct, 10) << "error retrieving server supported features set: "
496                      << cpp_strerror(r) << dendl;
497   } else if ((m_features & all_features) != m_features) {
498     m_features &= all_features;
499     ldout(m_cct, 10) << "limiting default features set to server supported: "
500                      << m_features << dendl;
501   }
502
503   create_image();
504 }
505
506 template<typename I>
507 void CreateRequest<I>::create_image() {
508   ldout(m_cct, 20) << dendl;
509   assert(m_data_pool.empty() || m_data_pool_id != -1);
510
511   ostringstream oss;
512   oss << RBD_DATA_PREFIX;
513   if (m_data_pool_id != -1) {
514     oss << stringify(m_ioctx.get_id()) << ".";
515   }
516   oss << m_image_id;
517   if (oss.str().length() > RBD_MAX_BLOCK_NAME_PREFIX_LENGTH) {
518     lderr(m_cct) << "object prefix '" << oss.str() << "' too large" << dendl;
519     complete(-EINVAL);
520     return;
521   }
522
523   librados::ObjectWriteOperation op;
524   op.create(true);
525   cls_client::create_image(&op, m_size, m_order, m_features, oss.str(),
526                            m_data_pool_id);
527
528   using klass = CreateRequest<I>;
529   librados::AioCompletion *comp =
530     create_rados_callback<klass, &klass::handle_create_image>(this);
531   int r = m_ioctx.aio_operate(m_header_obj, comp, &op);
532   assert(r == 0);
533   comp->release();
534 }
535
536 template<typename I>
537 void CreateRequest<I>::handle_create_image(int r) {
538   ldout(m_cct, 20) << "r=" << r << dendl;
539
540   if (r < 0) {
541     lderr(m_cct) << "error writing header: " << cpp_strerror(r) << dendl;
542     m_r_saved = r;
543     remove_from_dir();
544     return;
545   }
546
547   set_stripe_unit_count();
548 }
549
550 template<typename I>
551 void CreateRequest<I>::set_stripe_unit_count() {
552   if ((!m_stripe_unit && !m_stripe_count) ||
553       ((m_stripe_count == 1) && (m_stripe_unit == (1ull << m_order)))) {
554     object_map_resize();
555     return;
556   }
557
558   ldout(m_cct, 20) << dendl;
559
560   librados::ObjectWriteOperation op;
561   cls_client::set_stripe_unit_count(&op, m_stripe_unit, m_stripe_count);
562
563   using klass = CreateRequest<I>;
564   librados::AioCompletion *comp =
565     create_rados_callback<klass, &klass::handle_set_stripe_unit_count>(this);
566   int r = m_ioctx.aio_operate(m_header_obj, comp, &op);
567   assert(r == 0);
568   comp->release();
569 }
570
571 template<typename I>
572 void CreateRequest<I>::handle_set_stripe_unit_count(int r) {
573   ldout(m_cct, 20) << "r=" << r << dendl;
574
575   if (r < 0) {
576     lderr(m_cct) << "error setting stripe unit/count: "
577                  << cpp_strerror(r) << dendl;
578     m_r_saved = r;
579     remove_header_object();
580     return;
581   }
582
583   object_map_resize();
584 }
585
586 template<typename I>
587 void CreateRequest<I>::object_map_resize() {
588   if ((m_features & RBD_FEATURE_OBJECT_MAP) == 0) {
589     fetch_mirror_mode();
590     return;
591   }
592
593   ldout(m_cct, 20) << dendl;
594
595   librados::ObjectWriteOperation op;
596   cls_client::object_map_resize(&op, Striper::get_num_objects(m_layout, m_size),
597                                 OBJECT_NONEXISTENT);
598
599   using klass = CreateRequest<I>;
600   librados::AioCompletion *comp =
601     create_rados_callback<klass, &klass::handle_object_map_resize>(this);
602   int r = m_ioctx.aio_operate(m_objmap_name, comp, &op);
603   assert(r == 0);
604   comp->release();
605 }
606
607 template<typename I>
608 void CreateRequest<I>::handle_object_map_resize(int r) {
609   ldout(m_cct, 20) << "r=" << r << dendl;
610
611   if (r < 0) {
612     lderr(m_cct) << "error creating initial object map: "
613                  << cpp_strerror(r) << dendl;
614
615     m_r_saved = r;
616     remove_header_object();
617     return;
618   }
619
620   fetch_mirror_mode();
621 }
622
623 template<typename I>
624 void CreateRequest<I>::fetch_mirror_mode() {
625   if ((m_features & RBD_FEATURE_JOURNALING) == 0) {
626     complete(0);
627     return;
628   }
629
630   ldout(m_cct, 20) << dendl;
631
632   librados::ObjectReadOperation op;
633   cls_client::mirror_mode_get_start(&op);
634
635   using klass = CreateRequest<I>;
636   librados::AioCompletion *comp =
637     create_rados_callback<klass, &klass::handle_fetch_mirror_mode>(this);
638   m_outbl.clear();
639   int r = m_ioctx.aio_operate(RBD_MIRRORING, comp, &op, &m_outbl);
640   assert(r == 0);
641   comp->release();
642 }
643
644 template<typename I>
645 void CreateRequest<I>::handle_fetch_mirror_mode(int r) {
646   ldout(m_cct, 20) << "r=" << r << dendl;
647
648   if ((r < 0) && (r != -ENOENT)) {
649     lderr(m_cct) << "failed to retrieve mirror mode: " << cpp_strerror(r)
650                  << dendl;
651
652     m_r_saved = r;
653     remove_object_map();
654     return;
655   }
656
657   cls::rbd::MirrorMode mirror_mode_internal = cls::rbd::MIRROR_MODE_DISABLED;
658   if (r == 0) {
659     bufferlist::iterator it = m_outbl.begin();
660     r = cls_client::mirror_mode_get_finish(&it, &mirror_mode_internal);
661     if (r < 0) {
662       lderr(m_cct) << "Failed to retrieve mirror mode" << dendl;
663
664       m_r_saved = r;
665       remove_object_map();
666       return;
667     }
668   }
669
670   // TODO: remove redundant code...
671   switch (mirror_mode_internal) {
672   case cls::rbd::MIRROR_MODE_DISABLED:
673   case cls::rbd::MIRROR_MODE_IMAGE:
674   case cls::rbd::MIRROR_MODE_POOL:
675     m_mirror_mode = static_cast<rbd_mirror_mode_t>(mirror_mode_internal);
676     break;
677   default:
678     lderr(m_cct) << "Unknown mirror mode ("
679                  << static_cast<uint32_t>(mirror_mode_internal) << ")" << dendl;
680     r = -EINVAL;
681     remove_object_map();
682     return;
683   }
684
685   journal_create();
686 }
687
688 template<typename I>
689 void CreateRequest<I>::journal_create() {
690   ldout(m_cct, 20) << dendl;
691
692   using klass = CreateRequest<I>;
693   Context *ctx = create_context_callback<klass, &klass::handle_journal_create>(
694     this);
695
696   librbd::journal::TagData tag_data;
697   tag_data.mirror_uuid = (m_force_non_primary ? m_primary_mirror_uuid :
698                           librbd::Journal<I>::LOCAL_MIRROR_UUID);
699
700   librbd::journal::CreateRequest<I> *req =
701     librbd::journal::CreateRequest<I>::create(
702       m_ioctx, m_image_id, m_journal_order, m_journal_splay_width,
703       m_journal_pool, cls::journal::Tag::TAG_CLASS_NEW, tag_data,
704       librbd::Journal<I>::IMAGE_CLIENT_ID, m_op_work_queue, ctx);
705   req->send();
706 }
707
708 template<typename I>
709 void CreateRequest<I>::handle_journal_create(int r) {
710   ldout(m_cct, 20) << "r=" << r << dendl;
711
712   if (r < 0) {
713     lderr(m_cct) << "error creating journal: " << cpp_strerror(r)
714                  << dendl;
715
716     m_r_saved = r;
717     remove_object_map();
718     return;
719   }
720
721   mirror_image_enable();
722 }
723
724 template<typename I>
725 void CreateRequest<I>::mirror_image_enable() {
726   if (((m_mirror_mode != RBD_MIRROR_MODE_POOL) && !m_force_non_primary) ||
727       m_skip_mirror_enable) {
728     complete(0);
729     return;
730   }
731
732   ldout(m_cct, 20) << dendl;
733   auto ctx = create_context_callback<
734     CreateRequest<I>, &CreateRequest<I>::handle_mirror_image_enable>(this);
735   auto req = mirror::EnableRequest<I>::create(m_ioctx, m_image_id,
736                                               m_non_primary_global_image_id,
737                                               m_op_work_queue, ctx);
738   req->send();
739 }
740
741 template<typename I>
742 void CreateRequest<I>::handle_mirror_image_enable(int r) {
743   ldout(m_cct, 20) << "r=" << r << dendl;
744
745   if (r < 0) {
746     lderr(m_cct) << "cannot enable mirroring: " << cpp_strerror(r)
747                  << dendl;
748
749     m_r_saved = r;
750     journal_remove();
751     return;
752   }
753
754   complete(0);
755 }
756
757 template<typename I>
758 void CreateRequest<I>::complete(int r) {
759   ldout(m_cct, 20) << dendl;
760
761   if (r == 0) {
762     ldout(m_cct, 20) << "done." << dendl;
763   }
764
765   m_data_io_ctx.close();
766   m_on_finish->complete(r);
767   delete this;
768 }
769
770 // cleanup
771 template<typename I>
772 void CreateRequest<I>::journal_remove() {
773   if ((m_features & RBD_FEATURE_JOURNALING) == 0) {
774     remove_object_map();
775     return;
776   }
777
778   ldout(m_cct, 20) << dendl;
779
780   using klass = CreateRequest<I>;
781   Context *ctx = create_context_callback<klass, &klass::handle_journal_remove>(
782     this);
783
784   librbd::journal::RemoveRequest<I> *req =
785     librbd::journal::RemoveRequest<I>::create(
786       m_ioctx, m_image_id, librbd::Journal<I>::IMAGE_CLIENT_ID, m_op_work_queue,
787       ctx);
788   req->send();
789 }
790
791 template<typename I>
792 void CreateRequest<I>::handle_journal_remove(int r) {
793   ldout(m_cct, 20) << "r=" << r << dendl;
794
795   if (r < 0) {
796     lderr(m_cct) << "error cleaning up journal after creation failed: "
797                  << cpp_strerror(r) << dendl;
798   }
799
800   remove_object_map();
801 }
802
803 template<typename I>
804 void CreateRequest<I>::remove_object_map() {
805   if ((m_features & RBD_FEATURE_OBJECT_MAP) == 0) {
806     remove_header_object();
807     return;
808   }
809
810   ldout(m_cct, 20) << dendl;
811
812   using klass = CreateRequest<I>;
813   librados::AioCompletion *comp =
814     create_rados_callback<klass, &klass::handle_remove_object_map>(this);
815   int r = m_ioctx.aio_remove(m_objmap_name, comp);
816   assert(r == 0);
817   comp->release();
818 }
819
820 template<typename I>
821 void CreateRequest<I>::handle_remove_object_map(int r) {
822   ldout(m_cct, 20) << "r=" << r << dendl;
823
824   if (r < 0) {
825     lderr(m_cct) << "error cleaning up object map after creation failed: "
826                  << cpp_strerror(r) << dendl;
827   }
828
829   remove_header_object();
830 }
831
832 template<typename I>
833 void CreateRequest<I>::remove_header_object() {
834   ldout(m_cct, 20) << dendl;
835
836   using klass = CreateRequest<I>;
837   librados::AioCompletion *comp =
838     create_rados_callback<klass, &klass::handle_remove_header_object>(this);
839   int r = m_ioctx.aio_remove(m_header_obj, comp);
840   assert(r == 0);
841   comp->release();
842 }
843
844 template<typename I>
845 void CreateRequest<I>::handle_remove_header_object(int r) {
846   ldout(m_cct, 20) << "r=" << r << dendl;
847
848   if (r < 0) {
849     lderr(m_cct) << "error cleaning up image header after creation failed: "
850                  << cpp_strerror(r) << dendl;
851   }
852
853   remove_from_dir();
854 }
855
856 template<typename I>
857 void CreateRequest<I>::remove_from_dir() {
858   ldout(m_cct, 20) << dendl;
859
860   librados::ObjectWriteOperation op;
861   cls_client::dir_remove_image(&op, m_image_name, m_image_id);
862
863   using klass = CreateRequest<I>;
864   librados::AioCompletion *comp =
865     create_rados_callback<klass, &klass::handle_remove_from_dir>(this);
866   int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op);
867   assert(r == 0);
868   comp->release();
869 }
870
871 template<typename I>
872 void CreateRequest<I>::handle_remove_from_dir(int r) {
873   ldout(m_cct, 20) << "r=" << r << dendl;
874
875   if (r < 0) {
876     lderr(m_cct) << "error cleaning up image from rbd_directory object "
877                  << "after creation failed: " << cpp_strerror(r) << dendl;
878   }
879
880   remove_id_object();
881 }
882
883 template<typename I>
884 void CreateRequest<I>::remove_id_object() {
885   ldout(m_cct, 20) << dendl;
886
887   using klass = CreateRequest<I>;
888   librados::AioCompletion *comp =
889     create_rados_callback<klass, &klass::handle_remove_id_object>(this);
890   int r = m_ioctx.aio_remove(m_id_obj, comp);
891   assert(r == 0);
892   comp->release();
893 }
894
895 template<typename I>
896 void CreateRequest<I>::handle_remove_id_object(int r) {
897   ldout(m_cct, 20) << "r=" << r << dendl;
898
899   if (r < 0) {
900     lderr(m_cct) << "error cleaning up id object after creation failed: "
901                  << cpp_strerror(r) << dendl;
902   }
903
904   complete(m_r_saved);
905 }
906
907 } //namespace image
908 } //namespace librbd
909
910 template class librbd::image::CreateRequest<librbd::ImageCtx>;