Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / internal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "include/int_types.h"
4
5 #include <errno.h>
6 #include <limits.h>
7
8 #include "include/types.h"
9 #include "include/uuid.h"
10 #include "common/ceph_context.h"
11 #include "common/dout.h"
12 #include "common/errno.h"
13 #include "common/Throttle.h"
14 #include "common/event_socket.h"
15 #include "cls/lock/cls_lock_client.h"
16 #include "include/stringify.h"
17
18 #include "cls/rbd/cls_rbd.h"
19 #include "cls/rbd/cls_rbd_types.h"
20 #include "cls/rbd/cls_rbd_client.h"
21 #include "cls/journal/cls_journal_types.h"
22 #include "cls/journal/cls_journal_client.h"
23
24 #include "librbd/ExclusiveLock.h"
25 #include "librbd/ImageCtx.h"
26 #include "librbd/ImageState.h"
27 #include "librbd/internal.h"
28 #include "librbd/Journal.h"
29 #include "librbd/ObjectMap.h"
30 #include "librbd/Operations.h"
31 #include "librbd/Types.h"
32 #include "librbd/Utils.h"
33 #include "librbd/api/Image.h"
34 #include "librbd/exclusive_lock/AutomaticPolicy.h"
35 #include "librbd/exclusive_lock/StandardPolicy.h"
36 #include "librbd/image/CloneRequest.h"
37 #include "librbd/image/CreateRequest.h"
38 #include "librbd/image/RemoveRequest.h"
39 #include "librbd/io/AioCompletion.h"
40 #include "librbd/io/ImageRequest.h"
41 #include "librbd/io/ImageRequestWQ.h"
42 #include "librbd/io/ObjectRequest.h"
43 #include "librbd/io/ReadResult.h"
44 #include "librbd/journal/Types.h"
45 #include "librbd/managed_lock/Types.h"
46 #include "librbd/mirror/EnableRequest.h"
47 #include "librbd/operation/TrimRequest.h"
48
49 #include "journal/Journaler.h"
50
51 #include <boost/scope_exit.hpp>
52 #include <boost/variant.hpp>
53 #include "include/assert.h"
54
55 #define dout_subsys ceph_subsys_rbd
56 #undef dout_prefix
57 #define dout_prefix *_dout << "librbd: "
58
59 #define rbd_howmany(x, y)  (((x) + (y) - 1) / (y))
60
61 using std::map;
62 using std::pair;
63 using std::set;
64 using std::string;
65 using std::vector;
66 // list binds to list() here, so std::list is explicitly used below
67
68 using ceph::bufferlist;
69 using librados::snap_t;
70 using librados::IoCtx;
71 using librados::Rados;
72
73 namespace librbd {
74
75 namespace {
76
77 int validate_pool(IoCtx &io_ctx, CephContext *cct) {
78   if (!cct->_conf->get_val<bool>("rbd_validate_pool")) {
79     return 0;
80   }
81
82   int r = io_ctx.stat(RBD_DIRECTORY, NULL, NULL);
83   if (r == 0) {
84     return 0;
85   } else if (r < 0 && r != -ENOENT) {
86     lderr(cct) << "failed to stat RBD directory: " << cpp_strerror(r) << dendl;
87     return r;
88   }
89
90   // allocate a self-managed snapshot id if this a new pool to force
91   // self-managed snapshot mode
92   uint64_t snap_id;
93   r = io_ctx.selfmanaged_snap_create(&snap_id);
94   if (r == -EINVAL) {
95     lderr(cct) << "pool not configured for self-managed RBD snapshot support"
96                << dendl;
97     return r;
98   } else if (r < 0) {
99     lderr(cct) << "failed to allocate self-managed snapshot: "
100                << cpp_strerror(r) << dendl;
101     return r;
102   }
103
104   r = io_ctx.selfmanaged_snap_remove(snap_id);
105   if (r < 0) {
106     lderr(cct) << "failed to release self-managed snapshot " << snap_id
107                << ": " << cpp_strerror(r) << dendl;
108   }
109   return 0;
110 }
111
112
113 } // anonymous namespace
114
115   int detect_format(IoCtx &io_ctx, const string &name,
116                     bool *old_format, uint64_t *size)
117   {
118     CephContext *cct = (CephContext *)io_ctx.cct();
119     if (old_format)
120       *old_format = true;
121     int r = io_ctx.stat(util::old_header_name(name), size, NULL);
122     if (r == -ENOENT) {
123       if (old_format)
124         *old_format = false;
125       r = io_ctx.stat(util::id_obj_name(name), size, NULL);
126       if (r < 0)
127         return r;
128     } else if (r < 0) {
129       return r;
130     }
131
132     ldout(cct, 20) << "detect format of " << name << " : "
133                    << (old_format ? (*old_format ? "old" : "new") :
134                        "don't care")  << dendl;
135     return 0;
136   }
137
138   bool has_parent(int64_t parent_pool_id, uint64_t off, uint64_t overlap)
139   {
140     return (parent_pool_id != -1 && off <= overlap);
141   }
142
143   void init_rbd_header(struct rbd_obj_header_ondisk& ondisk,
144                        uint64_t size, int order, uint64_t bid)
145   {
146     uint32_t hi = bid >> 32;
147     uint32_t lo = bid & 0xFFFFFFFF;
148     uint32_t extra = rand() % 0xFFFFFFFF;
149     memset(&ondisk, 0, sizeof(ondisk));
150
151     memcpy(&ondisk.text, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT));
152     memcpy(&ondisk.signature, RBD_HEADER_SIGNATURE,
153            sizeof(RBD_HEADER_SIGNATURE));
154     memcpy(&ondisk.version, RBD_HEADER_VERSION, sizeof(RBD_HEADER_VERSION));
155
156     snprintf(ondisk.block_name, sizeof(ondisk.block_name), "rb.%x.%x.%x",
157              hi, lo, extra);
158
159     ondisk.image_size = size;
160     ondisk.options.order = order;
161     ondisk.options.crypt_type = RBD_CRYPT_NONE;
162     ondisk.options.comp_type = RBD_COMP_NONE;
163     ondisk.snap_seq = 0;
164     ondisk.snap_count = 0;
165     ondisk.reserved = 0;
166     ondisk.snap_names_len = 0;
167   }
168
169   void image_info(ImageCtx *ictx, image_info_t& info, size_t infosize)
170   {
171     int obj_order = ictx->order;
172     ictx->snap_lock.get_read();
173     info.size = ictx->get_image_size(ictx->snap_id);
174     ictx->snap_lock.put_read();
175     info.obj_size = 1ULL << obj_order;
176     info.num_objs = Striper::get_num_objects(ictx->layout, info.size);
177     info.order = obj_order;
178     strncpy(info.block_name_prefix, ictx->object_prefix.c_str(),
179             RBD_MAX_BLOCK_NAME_SIZE);
180     info.block_name_prefix[RBD_MAX_BLOCK_NAME_SIZE - 1] = '\0';
181
182     // clear deprecated fields
183     info.parent_pool = -1L;
184     info.parent_name[0] = '\0';
185   }
186
187   uint64_t oid_to_object_no(const string& oid, const string& object_prefix)
188   {
189     istringstream iss(oid);
190     // skip object prefix and separator
191     iss.ignore(object_prefix.length() + 1);
192     uint64_t num;
193     iss >> std::hex >> num;
194     return num;
195   }
196
197   void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx)
198   {
199     assert(ictx->owner_lock.is_locked());
200     assert(ictx->exclusive_lock == nullptr ||
201            ictx->exclusive_lock->is_lock_owner());
202
203     C_SaferCond ctx;
204     ictx->snap_lock.get_read();
205     operation::TrimRequest<> *req = operation::TrimRequest<>::create(
206       *ictx, &ctx, ictx->size, newsize, prog_ctx);
207     ictx->snap_lock.put_read();
208     req->send();
209
210     int r = ctx.wait();
211     if (r < 0) {
212       lderr(ictx->cct) << "warning: failed to remove some object(s): "
213                        << cpp_strerror(r) << dendl;
214     }
215   }
216
217   int read_header_bl(IoCtx& io_ctx, const string& header_oid,
218                      bufferlist& header, uint64_t *ver)
219   {
220     int r;
221     uint64_t off = 0;
222 #define READ_SIZE 4096
223     do {
224       bufferlist bl;
225       r = io_ctx.read(header_oid, bl, READ_SIZE, off);
226       if (r < 0)
227         return r;
228       header.claim_append(bl);
229       off += r;
230     } while (r == READ_SIZE);
231
232     if (header.length() < sizeof(RBD_HEADER_TEXT) ||
233         memcmp(RBD_HEADER_TEXT, header.c_str(), sizeof(RBD_HEADER_TEXT))) {
234       CephContext *cct = (CephContext *)io_ctx.cct();
235       lderr(cct) << "unrecognized header format" << dendl;
236       return -ENXIO;
237     }
238
239     if (ver)
240       *ver = io_ctx.get_last_version();
241
242     return 0;
243   }
244
245   int read_header(IoCtx& io_ctx, const string& header_oid,
246                   struct rbd_obj_header_ondisk *header, uint64_t *ver)
247   {
248     bufferlist header_bl;
249     int r = read_header_bl(io_ctx, header_oid, header_bl, ver);
250     if (r < 0)
251       return r;
252     if (header_bl.length() < (int)sizeof(*header))
253       return -EIO;
254     memcpy(header, header_bl.c_str(), sizeof(*header));
255
256     return 0;
257   }
258
259   int tmap_set(IoCtx& io_ctx, const string& imgname)
260   {
261     bufferlist cmdbl, emptybl;
262     __u8 c = CEPH_OSD_TMAP_SET;
263     ::encode(c, cmdbl);
264     ::encode(imgname, cmdbl);
265     ::encode(emptybl, cmdbl);
266     return io_ctx.tmap_update(RBD_DIRECTORY, cmdbl);
267   }
268
269   int tmap_rm(IoCtx& io_ctx, const string& imgname)
270   {
271     bufferlist cmdbl;
272     __u8 c = CEPH_OSD_TMAP_RM;
273     ::encode(c, cmdbl);
274     ::encode(imgname, cmdbl);
275     return io_ctx.tmap_update(RBD_DIRECTORY, cmdbl);
276   }
277
278   typedef boost::variant<std::string,uint64_t> image_option_value_t;
279   typedef std::map<int,image_option_value_t> image_options_t;
280   typedef std::shared_ptr<image_options_t> image_options_ref;
281
282   enum image_option_type_t {
283     STR,
284     UINT64,
285   };
286
287   const std::map<int, image_option_type_t> IMAGE_OPTIONS_TYPE_MAPPING = {
288     {RBD_IMAGE_OPTION_FORMAT, UINT64},
289     {RBD_IMAGE_OPTION_FEATURES, UINT64},
290     {RBD_IMAGE_OPTION_ORDER, UINT64},
291     {RBD_IMAGE_OPTION_STRIPE_UNIT, UINT64},
292     {RBD_IMAGE_OPTION_STRIPE_COUNT, UINT64},
293     {RBD_IMAGE_OPTION_JOURNAL_ORDER, UINT64},
294     {RBD_IMAGE_OPTION_JOURNAL_SPLAY_WIDTH, UINT64},
295     {RBD_IMAGE_OPTION_JOURNAL_POOL, STR},
296     {RBD_IMAGE_OPTION_FEATURES_SET, UINT64},
297     {RBD_IMAGE_OPTION_FEATURES_CLEAR, UINT64},
298     {RBD_IMAGE_OPTION_DATA_POOL, STR},
299   };
300
301   std::string image_option_name(int optname) {
302     switch (optname) {
303     case RBD_IMAGE_OPTION_FORMAT:
304       return "format";
305     case RBD_IMAGE_OPTION_FEATURES:
306       return "features";
307     case RBD_IMAGE_OPTION_ORDER:
308       return "order";
309     case RBD_IMAGE_OPTION_STRIPE_UNIT:
310       return "stripe_unit";
311     case RBD_IMAGE_OPTION_STRIPE_COUNT:
312       return "stripe_count";
313     case RBD_IMAGE_OPTION_JOURNAL_ORDER:
314       return "journal_order";
315     case RBD_IMAGE_OPTION_JOURNAL_SPLAY_WIDTH:
316       return "journal_splay_width";
317     case RBD_IMAGE_OPTION_JOURNAL_POOL:
318       return "journal_pool";
319     case RBD_IMAGE_OPTION_FEATURES_SET:
320       return "features_set";
321     case RBD_IMAGE_OPTION_FEATURES_CLEAR:
322       return "features_clear";
323     case RBD_IMAGE_OPTION_DATA_POOL:
324       return "data_pool";
325     default:
326       return "unknown (" + stringify(optname) + ")";
327     }
328   }
329
330   std::ostream &operator<<(std::ostream &os, const ImageOptions &opts) {
331     os << "[";
332
333     const char *delimiter = "";
334     for (auto &i : IMAGE_OPTIONS_TYPE_MAPPING) {
335       if (i.second == STR) {
336         std::string val;
337         if (opts.get(i.first, &val) == 0) {
338           os << delimiter << image_option_name(i.first) << "=" << val;
339           delimiter = ", ";
340         }
341       } else if (i.second == UINT64) {
342         uint64_t val;
343         if (opts.get(i.first, &val) == 0) {
344           os << delimiter << image_option_name(i.first) << "=" << val;
345           delimiter = ", ";
346         }
347       }
348     }
349
350     os << "]";
351
352     return os;
353   }
354
355   void image_options_create(rbd_image_options_t* opts)
356   {
357     image_options_ref* opts_ = new image_options_ref(new image_options_t());
358
359     *opts = static_cast<rbd_image_options_t>(opts_);
360   }
361
362   void image_options_create_ref(rbd_image_options_t* opts,
363                                 rbd_image_options_t orig)
364   {
365     image_options_ref* orig_ = static_cast<image_options_ref*>(orig);
366     image_options_ref* opts_ = new image_options_ref(*orig_);
367
368     *opts = static_cast<rbd_image_options_t>(opts_);
369   }
370
371   void image_options_copy(rbd_image_options_t* opts,
372                           const ImageOptions &orig)
373   {
374     image_options_ref* opts_ = new image_options_ref(new image_options_t());
375
376     *opts = static_cast<rbd_image_options_t>(opts_);
377
378     std::string str_val;
379     uint64_t uint64_val;
380     for (auto &i : IMAGE_OPTIONS_TYPE_MAPPING) {
381       switch (i.second) {
382       case STR:
383         if (orig.get(i.first, &str_val) == 0) {
384           image_options_set(*opts, i.first, str_val);
385         }
386         continue;
387       case UINT64:
388         if (orig.get(i.first, &uint64_val) == 0) {
389           image_options_set(*opts, i.first, uint64_val);
390         }
391         continue;
392       }
393     }
394   }
395
396   void image_options_destroy(rbd_image_options_t opts)
397   {
398     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
399
400     delete opts_;
401   }
402
403   int image_options_set(rbd_image_options_t opts, int optname,
404                         const std::string& optval)
405   {
406     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
407
408     std::map<int, image_option_type_t>::const_iterator i =
409       IMAGE_OPTIONS_TYPE_MAPPING.find(optname);
410
411     if (i == IMAGE_OPTIONS_TYPE_MAPPING.end() || i->second != STR) {
412       return -EINVAL;
413     }
414
415     (*opts_->get())[optname] = optval;
416     return 0;
417   }
418
419   int image_options_set(rbd_image_options_t opts, int optname, uint64_t optval)
420   {
421     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
422
423     std::map<int, image_option_type_t>::const_iterator i =
424       IMAGE_OPTIONS_TYPE_MAPPING.find(optname);
425
426     if (i == IMAGE_OPTIONS_TYPE_MAPPING.end() || i->second != UINT64) {
427       return -EINVAL;
428     }
429
430     (*opts_->get())[optname] = optval;
431     return 0;
432   }
433
434   int image_options_get(rbd_image_options_t opts, int optname,
435                         std::string* optval)
436   {
437     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
438
439     std::map<int, image_option_type_t>::const_iterator i =
440       IMAGE_OPTIONS_TYPE_MAPPING.find(optname);
441
442     if (i == IMAGE_OPTIONS_TYPE_MAPPING.end() || i->second != STR) {
443       return -EINVAL;
444     }
445
446     image_options_t::const_iterator j = (*opts_)->find(optname);
447
448     if (j == (*opts_)->end()) {
449       return -ENOENT;
450     }
451
452     *optval = boost::get<std::string>(j->second);
453     return 0;
454   }
455
456   int image_options_get(rbd_image_options_t opts, int optname, uint64_t* optval)
457   {
458     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
459
460     std::map<int, image_option_type_t>::const_iterator i =
461       IMAGE_OPTIONS_TYPE_MAPPING.find(optname);
462
463     if (i == IMAGE_OPTIONS_TYPE_MAPPING.end() || i->second != UINT64) {
464       return -EINVAL;
465     }
466
467     image_options_t::const_iterator j = (*opts_)->find(optname);
468
469     if (j == (*opts_)->end()) {
470       return -ENOENT;
471     }
472
473     *optval = boost::get<uint64_t>(j->second);
474     return 0;
475   }
476
477   int image_options_is_set(rbd_image_options_t opts, int optname,
478                            bool* is_set)
479   {
480     if (IMAGE_OPTIONS_TYPE_MAPPING.find(optname) ==
481           IMAGE_OPTIONS_TYPE_MAPPING.end()) {
482       return -EINVAL;
483     }
484
485     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
486     *is_set = ((*opts_)->find(optname) != (*opts_)->end());
487     return 0;
488   }
489
490   int image_options_unset(rbd_image_options_t opts, int optname)
491   {
492     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
493
494     std::map<int, image_option_type_t>::const_iterator i =
495       IMAGE_OPTIONS_TYPE_MAPPING.find(optname);
496
497     if (i == IMAGE_OPTIONS_TYPE_MAPPING.end()) {
498       assert((*opts_)->find(optname) == (*opts_)->end());
499       return -EINVAL;
500     }
501
502     image_options_t::const_iterator j = (*opts_)->find(optname);
503
504     if (j == (*opts_)->end()) {
505       return -ENOENT;
506     }
507
508     (*opts_)->erase(j);
509     return 0;
510   }
511
512   void image_options_clear(rbd_image_options_t opts)
513   {
514     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
515
516     (*opts_)->clear();
517   }
518
519   bool image_options_is_empty(rbd_image_options_t opts)
520   {
521     image_options_ref* opts_ = static_cast<image_options_ref*>(opts);
522
523     return (*opts_)->empty();
524   }
525
526   int list(IoCtx& io_ctx, vector<string>& names)
527   {
528     CephContext *cct = (CephContext *)io_ctx.cct();
529     ldout(cct, 20) << "list " << &io_ctx << dendl;
530
531     bufferlist bl;
532     int r = io_ctx.read(RBD_DIRECTORY, bl, 0, 0);
533     if (r < 0) {
534       if (r == -ENOENT) {
535         r = 0;
536       }
537       return r;
538     }
539
540     // old format images are in a tmap
541     if (bl.length()) {
542       bufferlist::iterator p = bl.begin();
543       bufferlist header;
544       map<string,bufferlist> m;
545       ::decode(header, p);
546       ::decode(m, p);
547       for (map<string,bufferlist>::iterator q = m.begin(); q != m.end(); ++q) {
548         names.push_back(q->first);
549       }
550     }
551
552     map<string, string> images;
553     r = api::Image<>::list_images(io_ctx, &images);
554     if (r < 0) {
555       lderr(cct) << "error listing v2 images: " << cpp_strerror(r) << dendl;
556       return r;
557     }
558     for (const auto& img_pair : images) {
559       names.push_back(img_pair.first);
560     }
561
562     return 0;
563   }
564
565   int flatten_children(ImageCtx *ictx, const char* snap_name,
566                        ProgressContext& pctx)
567   {
568     CephContext *cct = ictx->cct;
569     ldout(cct, 20) << "children flatten " << ictx->name << dendl;
570
571     RWLock::RLocker l(ictx->snap_lock);
572     snap_t snap_id = ictx->get_snap_id(cls::rbd::UserSnapshotNamespace(), snap_name);
573     ParentSpec parent_spec(ictx->md_ctx.get_id(), ictx->id, snap_id);
574     map< pair<int64_t, string>, set<string> > image_info;
575
576     int r = api::Image<>::list_children(ictx, parent_spec, &image_info);
577     if (r < 0) {
578       return r;
579     }
580
581     size_t size = image_info.size();
582     if (size == 0)
583       return 0;
584
585     size_t i = 0;
586     Rados rados(ictx->md_ctx);
587     for ( auto &info : image_info){
588       string pool = info.first.second;
589       IoCtx ioctx;
590       r = rados.ioctx_create2(info.first.first, ioctx);
591       if (r < 0) {
592         lderr(cct) << "Error accessing child image pool " << pool
593                    << dendl;
594         return r;
595       }
596
597       for (auto &id_it : info.second) {
598         ImageCtx *imctx = new ImageCtx("", id_it, NULL, ioctx, false);
599         int r = imctx->state->open(false);
600         if (r < 0) {
601           lderr(cct) << "error opening image: "
602                      << cpp_strerror(r) << dendl;
603           return r;
604         }
605
606         librbd::NoOpProgressContext prog_ctx;
607         r = imctx->operations->flatten(prog_ctx);
608         if (r < 0) {
609           lderr(cct) << "error flattening image: " << pool << "/" << id_it
610                      << cpp_strerror(r) << dendl;
611           imctx->state->close();
612           return r;
613         }
614
615         if ((imctx->features & RBD_FEATURE_DEEP_FLATTEN) == 0 &&
616             !imctx->snaps.empty()) {
617           imctx->parent_lock.get_read();
618           ParentInfo parent_info = imctx->parent_md;
619           imctx->parent_lock.put_read();
620
621           r = cls_client::remove_child(&imctx->md_ctx, RBD_CHILDREN,
622                                        parent_info.spec, imctx->id);
623           if (r < 0 && r != -ENOENT) {
624             lderr(cct) << "error removing child from children list" << dendl;
625             imctx->state->close();
626             return r;
627           }
628         }
629
630         r = imctx->state->close();
631         if (r < 0) {
632           lderr(cct) << "failed to close image: " << cpp_strerror(r) << dendl;
633           return r;
634         }
635       }
636       pctx.update_progress(++i, size);
637       assert(i <= size);
638     }
639
640     return 0;
641   }
642
643   int list_children(ImageCtx *ictx, set<pair<string, string> >& names)
644   {
645     CephContext *cct = ictx->cct;
646     ldout(cct, 20) << "children list " << ictx->name << dendl;
647
648     RWLock::RLocker l(ictx->snap_lock);
649     ParentSpec parent_spec(ictx->md_ctx.get_id(), ictx->id, ictx->snap_id);
650     map< pair<int64_t, string>, set<string> > image_info;
651
652     int r = api::Image<>::list_children(ictx, parent_spec, &image_info);
653     if (r < 0) {
654       return r;
655     }
656
657     Rados rados(ictx->md_ctx);
658     for ( auto &info : image_info){
659       IoCtx ioctx;
660       r = rados.ioctx_create2(info.first.first, ioctx);
661       if (r < 0) {
662         lderr(cct) << "Error accessing child image pool " << info.first.second
663                    << dendl;
664         return r;
665       }
666
667       for (auto &id_it : info.second) {
668         string name;
669         r = cls_client::dir_get_name(&ioctx, RBD_DIRECTORY, id_it, &name);
670         if (r < 0) {
671           lderr(cct) << "Error looking up name for image id " << id_it
672                      << " in pool " << info.first.second << dendl;
673           return r;
674         }
675         names.insert(make_pair(info.first.second, name));
676       }
677     }
678     
679     return 0;
680   }
681
682   int get_snap_namespace(ImageCtx *ictx,
683                          const char *snap_name,
684                          cls::rbd::SnapshotNamespace *snap_namespace) {
685     ldout(ictx->cct, 20) << "get_snap_namespace " << ictx << " " << snap_name
686                          << dendl;
687
688     int r = ictx->state->refresh_if_required();
689     if (r < 0)
690       return r;
691     RWLock::RLocker l(ictx->snap_lock);
692     snap_t snap_id = ictx->get_snap_id(*snap_namespace, snap_name);
693     if (snap_id == CEPH_NOSNAP)
694       return -ENOENT;
695     r = ictx->get_snap_namespace(snap_id, snap_namespace);
696     return r;
697   }
698
699   int snap_is_protected(ImageCtx *ictx, const char *snap_name, bool *is_protected)
700   {
701     ldout(ictx->cct, 20) << "snap_is_protected " << ictx << " " << snap_name
702                          << dendl;
703
704     int r = ictx->state->refresh_if_required();
705     if (r < 0)
706       return r;
707
708     RWLock::RLocker l(ictx->snap_lock);
709     snap_t snap_id = ictx->get_snap_id(cls::rbd::UserSnapshotNamespace(), snap_name);
710     if (snap_id == CEPH_NOSNAP)
711       return -ENOENT;
712     bool is_unprotected;
713     r = ictx->is_snap_unprotected(snap_id, &is_unprotected);
714     // consider both PROTECTED or UNPROTECTING to be 'protected',
715     // since in either state they can't be deleted
716     *is_protected = !is_unprotected;
717     return r;
718   }
719
720   int create_v1(IoCtx& io_ctx, const char *imgname, uint64_t size, int order)
721   {
722     CephContext *cct = (CephContext *)io_ctx.cct();
723
724     ldout(cct, 20) << __func__ << " "  << &io_ctx << " name = " << imgname
725                    << " size = " << size << " order = " << order << dendl;
726     int r = validate_pool(io_ctx, cct);
727     if (r < 0) {
728       return r;
729     }
730
731     ldout(cct, 2) << "adding rbd image to directory..." << dendl;
732     r = tmap_set(io_ctx, imgname);
733     if (r < 0) {
734       lderr(cct) << "error adding image to directory: " << cpp_strerror(r)
735                  << dendl;
736       return r;
737     }
738
739     Rados rados(io_ctx);
740     uint64_t bid = rados.get_instance_id();
741
742     ldout(cct, 2) << "creating rbd image..." << dendl;
743     struct rbd_obj_header_ondisk header;
744     init_rbd_header(header, size, order, bid);
745
746     bufferlist bl;
747     bl.append((const char *)&header, sizeof(header));
748
749     string header_oid = util::old_header_name(imgname);
750     r = io_ctx.write(header_oid, bl, bl.length(), 0);
751     if (r < 0) {
752       lderr(cct) << "Error writing image header: " << cpp_strerror(r)
753                  << dendl;
754       int remove_r = tmap_rm(io_ctx, imgname);
755       if (remove_r < 0) {
756         lderr(cct) << "Could not remove image from directory after "
757                    << "header creation failed: "
758                    << cpp_strerror(remove_r) << dendl;
759       }
760       return r;
761     }
762
763     ldout(cct, 2) << "done." << dendl;
764     return 0;
765   }
766
767   int create(librados::IoCtx& io_ctx, const char *imgname, uint64_t size,
768              int *order)
769   {
770     uint64_t order_ = *order;
771     ImageOptions opts;
772
773     int r = opts.set(RBD_IMAGE_OPTION_ORDER, order_);
774     assert(r == 0);
775
776     r = create(io_ctx, imgname, "", size, opts, "", "", false);
777
778     int r1 = opts.get(RBD_IMAGE_OPTION_ORDER, &order_);
779     assert(r1 == 0);
780     *order = order_;
781
782     return r;
783   }
784
785   int create(IoCtx& io_ctx, const char *imgname, uint64_t size,
786              bool old_format, uint64_t features, int *order,
787              uint64_t stripe_unit, uint64_t stripe_count)
788   {
789     if (!order)
790       return -EINVAL;
791
792     uint64_t order_ = *order;
793     uint64_t format = old_format ? 1 : 2;
794     ImageOptions opts;
795     int r;
796
797     r = opts.set(RBD_IMAGE_OPTION_FORMAT, format);
798     assert(r == 0);
799     r = opts.set(RBD_IMAGE_OPTION_FEATURES, features);
800     assert(r == 0);
801     r = opts.set(RBD_IMAGE_OPTION_ORDER, order_);
802     assert(r == 0);
803     r = opts.set(RBD_IMAGE_OPTION_STRIPE_UNIT, stripe_unit);
804     assert(r == 0);
805     r = opts.set(RBD_IMAGE_OPTION_STRIPE_COUNT, stripe_count);
806     assert(r == 0);
807
808     r = create(io_ctx, imgname, "", size, opts, "", "", false);
809
810     int r1 = opts.get(RBD_IMAGE_OPTION_ORDER, &order_);
811     assert(r1 == 0);
812     *order = order_;
813
814     return r;
815   }
816
817   int create(IoCtx& io_ctx, const std::string &image_name,
818              const std::string &image_id, uint64_t size,
819              ImageOptions& opts,
820              const std::string &non_primary_global_image_id,
821              const std::string &primary_mirror_uuid,
822              bool skip_mirror_enable)
823   {
824     std::string id(image_id);
825     if (id.empty()) {
826       id = util::generate_image_id(io_ctx);
827     }
828
829     CephContext *cct = (CephContext *)io_ctx.cct();
830     ldout(cct, 10) << __func__ << " name=" << image_name << ", "
831                    << "id= " << id << ", "
832                    << "size=" << size << ", opts=" << opts << dendl;
833
834     uint64_t format;
835     if (opts.get(RBD_IMAGE_OPTION_FORMAT, &format) != 0)
836       format = cct->_conf->get_val<int64_t>("rbd_default_format");
837     bool old_format = format == 1;
838
839     // make sure it doesn't already exist, in either format
840     int r = detect_format(io_ctx, image_name, NULL, NULL);
841     if (r != -ENOENT) {
842       if (r) {
843         lderr(cct) << "Could not tell if " << image_name << " already exists"
844                    << dendl;
845         return r;
846       }
847       lderr(cct) << "rbd image " << image_name << " already exists" << dendl;
848       return -EEXIST;
849     }
850
851     uint64_t order = 0;
852     if (opts.get(RBD_IMAGE_OPTION_ORDER, &order) != 0 || order == 0) {
853       order = cct->_conf->get_val<int64_t>("rbd_default_order");
854     }
855     r = image::CreateRequest<>::validate_order(cct, order);
856     if (r < 0) {
857       return r;
858     }
859
860     if (old_format) {
861       r = create_v1(io_ctx, image_name.c_str(), size, order);
862     } else {
863       ThreadPool *thread_pool;
864       ContextWQ *op_work_queue;
865       ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
866
867       C_SaferCond cond;
868       image::CreateRequest<> *req = image::CreateRequest<>::create(
869         io_ctx, image_name, id, size, opts, non_primary_global_image_id,
870         primary_mirror_uuid, skip_mirror_enable, op_work_queue, &cond);
871       req->send();
872
873       r = cond.wait();
874     }
875
876     int r1 = opts.set(RBD_IMAGE_OPTION_ORDER, order);
877     assert(r1 == 0);
878
879     return r;
880   }
881
882   /*
883    * Parent may be in different pool, hence different IoCtx
884    */
885   int clone(IoCtx& p_ioctx, const char *p_name, const char *p_snap_name,
886             IoCtx& c_ioctx, const char *c_name,
887             uint64_t features, int *c_order,
888             uint64_t stripe_unit, int stripe_count)
889   {
890     uint64_t order = *c_order;
891
892     ImageOptions opts;
893     opts.set(RBD_IMAGE_OPTION_FEATURES, features);
894     opts.set(RBD_IMAGE_OPTION_ORDER, order);
895     opts.set(RBD_IMAGE_OPTION_STRIPE_UNIT, stripe_unit);
896     opts.set(RBD_IMAGE_OPTION_STRIPE_COUNT, stripe_count);
897
898     int r = clone(p_ioctx, p_name, p_snap_name, c_ioctx, c_name, opts);
899     opts.get(RBD_IMAGE_OPTION_ORDER, &order);
900     *c_order = order;
901     return r;
902   }
903
904   int clone(IoCtx& p_ioctx, const char *p_name, const char *p_snap_name,
905             IoCtx& c_ioctx, const char *c_name, ImageOptions& c_opts)
906   {
907     CephContext *cct = (CephContext *)p_ioctx.cct();
908     if (p_snap_name == NULL) {
909       lderr(cct) << "image to be cloned must be a snapshot" << dendl;
910       return -EINVAL;
911     }
912
913     // make sure parent snapshot exists
914     ImageCtx *p_imctx = new ImageCtx(p_name, "", p_snap_name, p_ioctx, true);
915     int r = p_imctx->state->open(false);
916     if (r < 0) {
917       lderr(cct) << "error opening parent image: "
918                  << cpp_strerror(r) << dendl;
919       return r;
920     }
921
922     r = clone(p_imctx, c_ioctx, c_name, "", c_opts, "", "");
923
924     int close_r = p_imctx->state->close();
925     if (r == 0 && close_r < 0) {
926       r = close_r;
927     }
928
929     if (r < 0) {
930       return r;
931     }
932     return 0;
933   }
934
935   int clone(ImageCtx *p_imctx, IoCtx& c_ioctx, const std::string &c_name,
936             const std::string &c_id, ImageOptions& c_opts,
937             const std::string &non_primary_global_image_id,
938             const std::string &primary_mirror_uuid)
939   {
940     std::string id(c_id);
941     if (id.empty()) {
942       id = util::generate_image_id(c_ioctx);
943     }
944
945     CephContext *cct = (CephContext *)c_ioctx.cct();
946     ldout(cct, 10) << __func__ << " "
947                    << "c_name=" << c_name << ", "
948                    << "c_id= " << c_id << ", "
949                    << "c_opts=" << c_opts << dendl;
950
951     ThreadPool *thread_pool;
952     ContextWQ *op_work_queue;
953     ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
954
955     C_SaferCond cond;
956     auto *req = image::CloneRequest<>::create(
957       p_imctx, c_ioctx, c_name, id, c_opts,
958       non_primary_global_image_id, primary_mirror_uuid, op_work_queue, &cond);
959     req->send();
960
961     return cond.wait();
962   }
963
964   int rename(IoCtx& io_ctx, const char *srcname, const char *dstname)
965   {
966     CephContext *cct = (CephContext *)io_ctx.cct();
967     ldout(cct, 20) << "rename " << &io_ctx << " " << srcname << " -> "
968                    << dstname << dendl;
969
970     ImageCtx *ictx = new ImageCtx(srcname, "", "", io_ctx, false);
971     int r = ictx->state->open(false);
972     if (r < 0) {
973       lderr(cct) << "error opening source image: " << cpp_strerror(r) << dendl;
974       return r;
975     }
976     BOOST_SCOPE_EXIT((ictx)) {
977       ictx->state->close();
978     } BOOST_SCOPE_EXIT_END
979
980     return ictx->operations->rename(dstname);
981   }
982
983   int info(ImageCtx *ictx, image_info_t& info, size_t infosize)
984   {
985     ldout(ictx->cct, 20) << "info " << ictx << dendl;
986
987     int r = ictx->state->refresh_if_required();
988     if (r < 0)
989       return r;
990
991     image_info(ictx, info, infosize);
992     return 0;
993   }
994
995   int get_old_format(ImageCtx *ictx, uint8_t *old)
996   {
997     int r = ictx->state->refresh_if_required();
998     if (r < 0)
999       return r;
1000     *old = ictx->old_format;
1001     return 0;
1002   }
1003
1004   int get_size(ImageCtx *ictx, uint64_t *size)
1005   {
1006     int r = ictx->state->refresh_if_required();
1007     if (r < 0)
1008       return r;
1009     RWLock::RLocker l2(ictx->snap_lock);
1010     *size = ictx->get_image_size(ictx->snap_id);
1011     return 0;
1012   }
1013
1014   int get_features(ImageCtx *ictx, uint64_t *features)
1015   {
1016     int r = ictx->state->refresh_if_required();
1017     if (r < 0)
1018       return r;
1019     RWLock::RLocker l(ictx->snap_lock);
1020     *features = ictx->features;
1021     return 0;
1022   }
1023
1024   int get_overlap(ImageCtx *ictx, uint64_t *overlap)
1025   {
1026     int r = ictx->state->refresh_if_required();
1027     if (r < 0)
1028       return r;
1029     RWLock::RLocker l(ictx->snap_lock);
1030     RWLock::RLocker l2(ictx->parent_lock);
1031     return ictx->get_parent_overlap(ictx->snap_id, overlap);
1032   }
1033
1034   int get_parent_info(ImageCtx *ictx, string *parent_pool_name,
1035                       string *parent_name, string *parent_id,
1036                       string *parent_snap_name)
1037   {
1038     int r = ictx->state->refresh_if_required();
1039     if (r < 0)
1040       return r;
1041
1042     RWLock::RLocker l(ictx->snap_lock);
1043     RWLock::RLocker l2(ictx->parent_lock);
1044     if (ictx->parent == NULL) {
1045       return -ENOENT;
1046     }
1047
1048     ParentSpec parent_spec;
1049
1050     if (ictx->snap_id == CEPH_NOSNAP) {
1051       parent_spec = ictx->parent_md.spec;
1052     } else {
1053       r = ictx->get_parent_spec(ictx->snap_id, &parent_spec);
1054       if (r < 0) {
1055         lderr(ictx->cct) << "Can't find snapshot id = " << ictx->snap_id
1056                          << dendl;
1057         return r;
1058       }
1059       if (parent_spec.pool_id == -1)
1060         return -ENOENT;
1061     }
1062     if (parent_pool_name) {
1063       Rados rados(ictx->md_ctx);
1064       r = rados.pool_reverse_lookup(parent_spec.pool_id,
1065                                     parent_pool_name);
1066       if (r < 0) {
1067         lderr(ictx->cct) << "error looking up pool name: " << cpp_strerror(r)
1068                          << dendl;
1069         return r;
1070       }
1071     }
1072
1073     if (parent_snap_name) {
1074       RWLock::RLocker l(ictx->parent->snap_lock);
1075       r = ictx->parent->get_snap_name(parent_spec.snap_id,
1076                                       parent_snap_name);
1077       if (r < 0) {
1078         lderr(ictx->cct) << "error finding parent snap name: "
1079                          << cpp_strerror(r) << dendl;
1080         return r;
1081       }
1082     }
1083
1084     if (parent_name) {
1085       RWLock::RLocker snap_locker(ictx->parent->snap_lock);
1086       *parent_name = ictx->parent->name;
1087     }
1088     if (parent_id) {
1089       *parent_id = ictx->parent->id;
1090     }
1091
1092     return 0;
1093   }
1094
1095   int get_flags(ImageCtx *ictx, uint64_t *flags)
1096   {
1097     int r = ictx->state->refresh_if_required();
1098     if (r < 0) {
1099       return r;
1100     }
1101
1102     RWLock::RLocker l2(ictx->snap_lock);
1103     return ictx->get_flags(ictx->snap_id, flags);
1104   }
1105
1106   int set_image_notification(ImageCtx *ictx, int fd, int type)
1107   {
1108     CephContext *cct = ictx->cct;
1109     ldout(cct, 20) << __func__ << " " << ictx << " fd " << fd << " type" << type << dendl;
1110
1111     int r = ictx->state->refresh_if_required();
1112     if (r < 0) {
1113       return r;
1114     }
1115
1116     if (ictx->event_socket.is_valid())
1117       return -EINVAL;
1118     return ictx->event_socket.init(fd, type);
1119   }
1120
1121   int is_exclusive_lock_owner(ImageCtx *ictx, bool *is_owner)
1122   {
1123     *is_owner = false;
1124
1125     RWLock::RLocker owner_locker(ictx->owner_lock);
1126     if (ictx->exclusive_lock == nullptr ||
1127         !ictx->exclusive_lock->is_lock_owner()) {
1128       return 0;
1129     }
1130
1131     // might have been blacklisted by peer -- ensure we still own
1132     // the lock by pinging the OSD
1133     int r = ictx->exclusive_lock->assert_header_locked();
1134     if (r == -EBUSY || r == -ENOENT) {
1135       return 0;
1136     } else if (r < 0) {
1137       return r;
1138     }
1139
1140     *is_owner = true;
1141     return 0;
1142   }
1143
1144   int lock_acquire(ImageCtx *ictx, rbd_lock_mode_t lock_mode)
1145   {
1146     CephContext *cct = ictx->cct;
1147     ldout(cct, 20) << __func__ << ": ictx=" << ictx << ", "
1148                    << "lock_mode=" << lock_mode << dendl;
1149
1150     if (lock_mode != RBD_LOCK_MODE_EXCLUSIVE) {
1151       return -EOPNOTSUPP;
1152     }
1153
1154     C_SaferCond lock_ctx;
1155     {
1156       RWLock::WLocker l(ictx->owner_lock);
1157
1158       if (ictx->exclusive_lock == nullptr) {
1159         lderr(cct) << "exclusive-lock feature is not enabled" << dendl;
1160         return -EINVAL;
1161       }
1162
1163       if (ictx->get_exclusive_lock_policy()->may_auto_request_lock()) {
1164         ictx->set_exclusive_lock_policy(
1165           new exclusive_lock::StandardPolicy(ictx));
1166       }
1167
1168       if (ictx->exclusive_lock->is_lock_owner()) {
1169         return 0;
1170       }
1171
1172       ictx->exclusive_lock->acquire_lock(&lock_ctx);
1173     }
1174
1175     int r = lock_ctx.wait();
1176     if (r < 0) {
1177       lderr(cct) << "failed to request exclusive lock: " << cpp_strerror(r)
1178                  << dendl;
1179       return r;
1180     }
1181
1182     RWLock::RLocker l(ictx->owner_lock);
1183
1184     if (ictx->exclusive_lock == nullptr ||
1185         !ictx->exclusive_lock->is_lock_owner()) {
1186       lderr(cct) << "failed to acquire exclusive lock" << dendl;
1187       return -EROFS;
1188     }
1189
1190     return 0;
1191   }
1192
1193   int lock_release(ImageCtx *ictx)
1194   {
1195     CephContext *cct = ictx->cct;
1196     ldout(cct, 20) << __func__ << ": ictx=" << ictx << dendl;
1197
1198     C_SaferCond lock_ctx;
1199     {
1200       RWLock::WLocker l(ictx->owner_lock);
1201
1202       if (ictx->exclusive_lock == nullptr ||
1203           !ictx->exclusive_lock->is_lock_owner()) {
1204         lderr(cct) << "not exclusive lock owner" << dendl;
1205         return -EINVAL;
1206       }
1207
1208       ictx->exclusive_lock->release_lock(&lock_ctx);
1209     }
1210
1211     int r = lock_ctx.wait();
1212     if (r < 0) {
1213       lderr(cct) << "failed to release exclusive lock: " << cpp_strerror(r)
1214                  << dendl;
1215       return r;
1216     }
1217     return 0;
1218   }
1219
1220   int lock_get_owners(ImageCtx *ictx, rbd_lock_mode_t *lock_mode,
1221                       std::list<std::string> *lock_owners)
1222   {
1223     CephContext *cct = ictx->cct;
1224     ldout(cct, 20) << __func__ << ": ictx=" << ictx << dendl;
1225
1226     if (!ictx->test_features(RBD_FEATURE_EXCLUSIVE_LOCK)) {
1227       lderr(cct) << "exclusive-lock feature is not enabled" << dendl;
1228       return -EINVAL;
1229     }
1230
1231     managed_lock::Locker locker;
1232     C_SaferCond get_owner_ctx;
1233     ExclusiveLock<>(*ictx).get_locker(&locker, &get_owner_ctx);
1234     int r = get_owner_ctx.wait();
1235     if (r == -ENOENT) {
1236       return r;
1237     } else if (r < 0) {
1238       lderr(cct) << "failed to determine current lock owner: "
1239                  << cpp_strerror(r) << dendl;
1240       return r;
1241     }
1242
1243     *lock_mode = RBD_LOCK_MODE_EXCLUSIVE;
1244     lock_owners->clear();
1245     lock_owners->emplace_back(locker.address);
1246     return 0;
1247   }
1248
1249   int lock_break(ImageCtx *ictx, rbd_lock_mode_t lock_mode,
1250                  const std::string &lock_owner)
1251   {
1252     CephContext *cct = ictx->cct;
1253     ldout(cct, 20) << __func__ << ": ictx=" << ictx << ", "
1254                    << "lock_mode=" << lock_mode << ", "
1255                    << "lock_owner=" << lock_owner << dendl;
1256
1257     if (lock_mode != RBD_LOCK_MODE_EXCLUSIVE) {
1258       return -EOPNOTSUPP;
1259     }
1260
1261     if (ictx->read_only) {
1262       return -EROFS;
1263     }
1264
1265     managed_lock::Locker locker;
1266     C_SaferCond get_owner_ctx;
1267     {
1268       RWLock::RLocker l(ictx->owner_lock);
1269
1270       if (ictx->exclusive_lock == nullptr) {
1271         lderr(cct) << "exclusive-lock feature is not enabled" << dendl;
1272         return -EINVAL;
1273       }
1274
1275       ictx->exclusive_lock->get_locker(&locker, &get_owner_ctx);
1276     }
1277     int r = get_owner_ctx.wait();
1278     if (r == -ENOENT) {
1279       return r;
1280     } else if (r < 0) {
1281       lderr(cct) << "failed to determine current lock owner: "
1282                  << cpp_strerror(r) << dendl;
1283       return r;
1284     }
1285
1286     if (locker.address != lock_owner) {
1287       return -EBUSY;
1288     }
1289
1290     C_SaferCond break_ctx;
1291     {
1292       RWLock::RLocker l(ictx->owner_lock);
1293
1294       if (ictx->exclusive_lock == nullptr) {
1295         lderr(cct) << "exclusive-lock feature is not enabled" << dendl;
1296         return -EINVAL;
1297       }
1298
1299       ictx->exclusive_lock->break_lock(locker, true, &break_ctx);
1300     }
1301     r = break_ctx.wait();
1302     if (r == -ENOENT) {
1303       return r;
1304     } else if (r < 0) {
1305       lderr(cct) << "failed to break lock: " << cpp_strerror(r) << dendl;
1306       return r;
1307     }
1308     return 0;
1309   }
1310
1311   int remove(IoCtx& io_ctx, const std::string &image_name,
1312              const std::string &image_id, ProgressContext& prog_ctx,
1313              bool force, bool from_trash_remove)
1314   {
1315     CephContext *cct((CephContext *)io_ctx.cct());
1316     ldout(cct, 20) << "remove " << &io_ctx << " "
1317                    << (image_id.empty() ? image_name : image_id) << dendl;
1318
1319     ThreadPool *thread_pool;
1320     ContextWQ *op_work_queue;
1321     ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
1322
1323     C_SaferCond cond;
1324     auto req = librbd::image::RemoveRequest<>::create(
1325       io_ctx, image_name, image_id, force, from_trash_remove, prog_ctx,
1326       op_work_queue, &cond);
1327     req->send();
1328
1329     return cond.wait();
1330   }
1331
1332   int trash_move(librados::IoCtx &io_ctx, rbd_trash_image_source_t source,
1333                  const std::string &image_name, uint64_t delay) {
1334     CephContext *cct((CephContext *)io_ctx.cct());
1335     ldout(cct, 20) << "trash_move " << &io_ctx << " " << image_name
1336                    << dendl;
1337
1338     std::string image_id;
1339     ImageCtx *ictx = new ImageCtx(image_name, "", nullptr, io_ctx, false);
1340     int r = ictx->state->open(true);
1341     if (r < 0) {
1342       ictx = nullptr;
1343
1344       if (r != -ENOENT) {
1345         ldout(cct, 2) << "error opening image: " << cpp_strerror(-r) << dendl;
1346         return r;
1347       }
1348
1349       // try to get image id from the directory
1350       r = cls_client::dir_get_id(&io_ctx, RBD_DIRECTORY, image_name, &image_id);
1351       if (r < 0) {
1352         if (r != -ENOENT) {
1353           ldout(cct, 2) << "error reading image id from dirctory: "
1354                         << cpp_strerror(-r) << dendl;
1355         }
1356         return r;
1357       }
1358     } else {
1359       if (ictx->old_format) {
1360         ictx->state->close();
1361         return -EOPNOTSUPP;
1362       }
1363
1364       image_id = ictx->id;
1365       ictx->owner_lock.get_read();
1366       if (ictx->exclusive_lock != nullptr) {
1367         r = ictx->operations->prepare_image_update();
1368         if (r < 0 || (ictx->exclusive_lock != nullptr &&
1369                       !ictx->exclusive_lock->is_lock_owner())) {
1370           lderr(cct) << "cannot obtain exclusive lock - not removing" << dendl;
1371           ictx->owner_lock.put_read();
1372           ictx->state->close();
1373           return -EBUSY;
1374         }
1375       }
1376     }
1377
1378     BOOST_SCOPE_EXIT_ALL(ictx, cct) {
1379       if (ictx == nullptr)
1380         return;
1381
1382       bool is_locked = ictx->exclusive_lock != nullptr &&
1383                        ictx->exclusive_lock->is_lock_owner();
1384       if (is_locked) {
1385         C_SaferCond ctx;
1386         auto exclusive_lock = ictx->exclusive_lock;
1387         exclusive_lock->shut_down(&ctx);
1388         ictx->owner_lock.put_read();
1389         int r = ctx.wait();
1390         if (r < 0) {
1391           lderr(cct) << "error shutting down exclusive lock" << dendl;
1392         }
1393         delete exclusive_lock;
1394       } else {
1395         ictx->owner_lock.put_read();
1396       }
1397       ictx->state->close();
1398     };
1399
1400     ldout(cct, 2) << "adding image entry to rbd_trash" << dendl;
1401     utime_t ts = ceph_clock_now();
1402     utime_t deferment_end_time = ts;
1403     deferment_end_time += (double)delay;
1404     cls::rbd::TrashImageSource trash_source =
1405         static_cast<cls::rbd::TrashImageSource>(source);
1406     cls::rbd::TrashImageSpec trash_spec(trash_source, image_name, ts,
1407                                         deferment_end_time);
1408     r = cls_client::trash_add(&io_ctx, image_id, trash_spec);
1409     if (r < 0 && r != -EEXIST) {
1410       lderr(cct) << "error adding image " << image_name << " to rbd_trash"
1411                  << dendl;
1412       return r;
1413     } else if (r == -EEXIST) {
1414       ldout(cct, 10) << "found previous unfinished deferred remove for image:"
1415                      << image_id << dendl;
1416       // continue with removing image from directory
1417     }
1418
1419     ldout(cct, 2) << "removing id object..." << dendl;
1420     r = io_ctx.remove(util::id_obj_name(image_name));
1421     if (r < 0 && r != -ENOENT) {
1422       lderr(cct) << "error removing id object: " << cpp_strerror(r)
1423                  << dendl;
1424       return r;
1425     }
1426
1427     ldout(cct, 2) << "removing rbd image from v2 directory..." << dendl;
1428     r = cls_client::dir_remove_image(&io_ctx, RBD_DIRECTORY, image_name,
1429                                      image_id);
1430     if (r < 0) {
1431       if (r != -ENOENT) {
1432         lderr(cct) << "error removing image from v2 directory: "
1433                    << cpp_strerror(-r) << dendl;
1434       }
1435       return r;
1436     }
1437
1438     return 0;
1439   }
1440
1441   int trash_get(IoCtx &io_ctx, const std::string &id,
1442                 trash_image_info_t *info) {
1443     CephContext *cct((CephContext *)io_ctx.cct());
1444     ldout(cct, 20) << __func__ << " " << &io_ctx << dendl;
1445
1446     cls::rbd::TrashImageSpec spec;
1447     int r = cls_client::trash_get(&io_ctx, id, &spec);
1448     if (r == -ENOENT) {
1449       return r;
1450     } else if (r < 0) {
1451       lderr(cct) << "error retrieving trash entry: " << cpp_strerror(r)
1452                  << dendl;
1453       return r;
1454     }
1455
1456     rbd_trash_image_source_t source = static_cast<rbd_trash_image_source_t>(
1457       spec.source);
1458     *info = trash_image_info_t{id, spec.name, source, spec.deletion_time.sec(),
1459                                spec.deferment_end_time.sec()};
1460     return 0;
1461   }
1462
1463   int trash_list(IoCtx &io_ctx, vector<trash_image_info_t> &entries) {
1464     CephContext *cct((CephContext *)io_ctx.cct());
1465     ldout(cct, 20) << "trash_list " << &io_ctx << dendl;
1466
1467     bool more_entries;
1468     uint32_t max_read = 1024;
1469     std::string last_read = "";
1470     do {
1471       map<string, cls::rbd::TrashImageSpec> trash_entries;
1472       int r = cls_client::trash_list(&io_ctx, last_read, max_read,
1473                                      &trash_entries);
1474       if (r < 0 && r != -ENOENT) {
1475         lderr(cct) << "error listing rbd trash entries: " << cpp_strerror(r)
1476                    << dendl;
1477         return r;
1478       } else if (r == -ENOENT) {
1479         break;
1480       }
1481
1482       if (trash_entries.empty()) {
1483         break;
1484       }
1485
1486       for (const auto &entry : trash_entries) {
1487         rbd_trash_image_source_t source =
1488             static_cast<rbd_trash_image_source_t>(entry.second.source);
1489         entries.push_back({entry.first, entry.second.name, source,
1490                            entry.second.deletion_time.sec(),
1491                            entry.second.deferment_end_time.sec()});
1492       }
1493       last_read = trash_entries.rbegin()->first;
1494       more_entries = (trash_entries.size() >= max_read);
1495     } while (more_entries);
1496
1497     return 0;
1498   }
1499
1500   int trash_remove(IoCtx &io_ctx, const std::string &image_id, bool force,
1501                    ProgressContext& prog_ctx) {
1502     CephContext *cct((CephContext *)io_ctx.cct());
1503     ldout(cct, 20) << "trash_remove " << &io_ctx << " " << image_id
1504                    << " " << force << dendl;
1505
1506     cls::rbd::TrashImageSpec trash_spec;
1507     int r = cls_client::trash_get(&io_ctx, image_id, &trash_spec);
1508     if (r < 0) {
1509       lderr(cct) << "error getting image id " << image_id
1510                  << " info from trash: " << cpp_strerror(r) << dendl;
1511       return r;
1512     }
1513
1514     utime_t now = ceph_clock_now();
1515     if (now < trash_spec.deferment_end_time && !force) {
1516       lderr(cct) << "error: deferment time has not expired." << dendl;
1517       return -EPERM;
1518     }
1519
1520     r = remove(io_ctx, "", image_id, prog_ctx, false, true);
1521     if (r < 0) {
1522       lderr(cct) << "error removing image " << image_id
1523                  << ", which is pending deletion" << dendl;
1524       return r;
1525     }
1526     r = cls_client::trash_remove(&io_ctx, image_id);
1527     if (r < 0 && r != -ENOENT) {
1528       lderr(cct) << "error removing image " << image_id
1529                  << " from rbd_trash object" << dendl;
1530       return r;
1531     }
1532     return 0;
1533   }
1534
1535   int trash_restore(librados::IoCtx &io_ctx, const std::string &image_id,
1536                     const std::string &image_new_name) {
1537     CephContext *cct((CephContext *)io_ctx.cct());
1538     ldout(cct, 20) << "trash_restore " << &io_ctx << " " << image_id << " "
1539                    << image_new_name << dendl;
1540
1541     cls::rbd::TrashImageSpec trash_spec;
1542     int r = cls_client::trash_get(&io_ctx, image_id, &trash_spec);
1543     if (r < 0) {
1544       lderr(cct) << "error getting image id " << image_id
1545                  << " info from trash: " << cpp_strerror(r) << dendl;
1546       return r;
1547     }
1548
1549     std::string image_name = image_new_name;
1550     if (image_name.empty()) {
1551       // if user didn't specify a new name, let's try using the old name
1552       image_name = trash_spec.name;
1553       ldout(cct, 20) << "restoring image id " << image_id << " with name "
1554                      << image_name << dendl;
1555     }
1556
1557     // check if no image exists with the same name
1558     bool create_id_obj = true;
1559     std::string existing_id;
1560     r = cls_client::get_id(&io_ctx, util::id_obj_name(image_name), &existing_id);
1561     if (r < 0 && r != -ENOENT) {
1562       lderr(cct) << "error checking if image " << image_name << " exists: "
1563                  << cpp_strerror(r) << dendl;
1564       return r;
1565     } else if (r != -ENOENT){
1566       // checking if we are recovering from an incomplete restore
1567       if (existing_id != image_id) {
1568         ldout(cct, 2) << "an image with the same name already exists" << dendl;
1569         return -EEXIST;
1570       }
1571       create_id_obj = false;
1572     }
1573
1574     if (create_id_obj) {
1575       ldout(cct, 2) << "adding id object" << dendl;
1576       librados::ObjectWriteOperation op;
1577       op.create(true);
1578       cls_client::set_id(&op, image_id);
1579       r = io_ctx.operate(util::id_obj_name(image_name), &op);
1580       if (r < 0) {
1581         lderr(cct) << "error adding id object for image " << image_name
1582                    << ": " << cpp_strerror(r) << dendl;
1583         return r;
1584       }
1585     }
1586
1587     ldout(cct, 2) << "adding rbd image from v2 directory..." << dendl;
1588     r = cls_client::dir_add_image(&io_ctx, RBD_DIRECTORY, image_name,
1589                                   image_id);
1590     if (r < 0 && r != -EEXIST) {
1591       lderr(cct) << "error adding image to v2 directory: "
1592                  << cpp_strerror(r) << dendl;
1593       return r;
1594     }
1595
1596     ldout(cct, 2) << "removing image from trash..." << dendl;
1597     r = cls_client::trash_remove(&io_ctx, image_id);
1598     if (r < 0 && r != -ENOENT) {
1599       lderr(cct) << "error removing image id " << image_id << " from trash: "
1600                  << cpp_strerror(r) << dendl;
1601       return r;
1602     }
1603
1604     return 0;
1605   }
1606
1607   int snap_list(ImageCtx *ictx, vector<snap_info_t>& snaps)
1608   {
1609     ldout(ictx->cct, 20) << "snap_list " << ictx << dendl;
1610
1611     int r = ictx->state->refresh_if_required();
1612     if (r < 0)
1613       return r;
1614
1615     RWLock::RLocker l(ictx->snap_lock);
1616     for (map<snap_t, SnapInfo>::iterator it = ictx->snap_info.begin();
1617          it != ictx->snap_info.end(); ++it) {
1618       snap_info_t info;
1619       info.name = it->second.name;
1620       info.id = it->first;
1621       info.size = it->second.size;
1622       snaps.push_back(info);
1623     }
1624
1625     return 0;
1626   }
1627
1628   int snap_exists(ImageCtx *ictx, const cls::rbd::SnapshotNamespace& snap_namespace,
1629                   const char *snap_name, bool *exists)
1630   {
1631     ldout(ictx->cct, 20) << "snap_exists " << ictx << " " << snap_name << dendl;
1632
1633     int r = ictx->state->refresh_if_required();
1634     if (r < 0)
1635       return r;
1636
1637     RWLock::RLocker l(ictx->snap_lock);
1638     *exists = ictx->get_snap_id(snap_namespace, snap_name) != CEPH_NOSNAP;
1639     return 0;
1640   }
1641
1642   int snap_remove(ImageCtx *ictx, const char *snap_name, uint32_t flags,
1643                   ProgressContext& pctx)
1644   {
1645     ldout(ictx->cct, 20) << "snap_remove " << ictx << " " << snap_name << " flags: " << flags << dendl;
1646
1647     int r = 0;
1648
1649     r = ictx->state->refresh_if_required();
1650     if (r < 0)
1651       return r;
1652
1653     if (flags & RBD_SNAP_REMOVE_FLATTEN) {
1654         r = flatten_children(ictx, snap_name, pctx);
1655         if (r < 0) {
1656           return r;
1657         }
1658     }
1659
1660     bool is_protected;
1661     r = snap_is_protected(ictx, snap_name, &is_protected);
1662     if (r < 0) {
1663       return r;
1664     }
1665
1666     if (is_protected && flags & RBD_SNAP_REMOVE_UNPROTECT) {
1667       r = ictx->operations->snap_unprotect(cls::rbd::UserSnapshotNamespace(), snap_name);
1668       if (r < 0) {
1669         lderr(ictx->cct) << "failed to unprotect snapshot: " << snap_name << dendl;
1670         return r;
1671       }
1672
1673       r = snap_is_protected(ictx, snap_name, &is_protected);
1674       if (r < 0) {
1675         return r;
1676       }
1677       if (is_protected) {
1678         lderr(ictx->cct) << "snapshot is still protected after unprotection" << dendl;
1679         ceph_abort();
1680       }
1681     }
1682
1683     C_SaferCond ctx;
1684     ictx->operations->snap_remove(cls::rbd::UserSnapshotNamespace(), snap_name, &ctx);
1685
1686     r = ctx.wait();
1687     return r;
1688   }
1689
1690   int snap_get_timestamp(ImageCtx *ictx, uint64_t snap_id, struct timespec *timestamp)
1691   {
1692     std::map<librados::snap_t, SnapInfo>::iterator snap_it = ictx->snap_info.find(snap_id);
1693     assert(snap_it != ictx->snap_info.end());
1694     utime_t time = snap_it->second.timestamp;
1695     time.to_timespec(timestamp);
1696     return 0;
1697   }
1698
1699   int snap_get_limit(ImageCtx *ictx, uint64_t *limit)
1700   {
1701     int r = cls_client::snapshot_get_limit(&ictx->md_ctx, ictx->header_oid,
1702                                            limit);
1703     if (r == -EOPNOTSUPP) {
1704       *limit = UINT64_MAX;
1705       r = 0;
1706     }
1707     return r;
1708   }
1709
1710   int snap_set_limit(ImageCtx *ictx, uint64_t limit)
1711   {
1712     return ictx->operations->snap_set_limit(limit);
1713   }
1714
1715   struct CopyProgressCtx {
1716     explicit CopyProgressCtx(ProgressContext &p)
1717       : destictx(NULL), src_size(0), prog_ctx(p)
1718     { }
1719
1720     ImageCtx *destictx;
1721     uint64_t src_size;
1722     ProgressContext &prog_ctx;
1723   };
1724
1725   int copy(ImageCtx *src, IoCtx& dest_md_ctx, const char *destname,
1726            ImageOptions& opts, ProgressContext &prog_ctx, size_t sparse_size)
1727   {
1728     CephContext *cct = (CephContext *)dest_md_ctx.cct();
1729     ldout(cct, 20) << "copy " << src->name
1730                    << (src->snap_name.length() ? "@" + src->snap_name : "")
1731                    << " -> " << destname << " opts = " << opts << dendl;
1732
1733     src->snap_lock.get_read();
1734     uint64_t features = src->features;
1735     uint64_t src_size = src->get_image_size(src->snap_id);
1736     src->snap_lock.put_read();
1737     uint64_t format = src->old_format ? 1 : 2;
1738     if (opts.get(RBD_IMAGE_OPTION_FORMAT, &format) != 0) {
1739       opts.set(RBD_IMAGE_OPTION_FORMAT, format);
1740     }
1741     uint64_t stripe_unit = src->stripe_unit;
1742     if (opts.get(RBD_IMAGE_OPTION_STRIPE_UNIT, &stripe_unit) != 0) {
1743       opts.set(RBD_IMAGE_OPTION_STRIPE_UNIT, stripe_unit);
1744     }
1745     uint64_t stripe_count = src->stripe_count;
1746     if (opts.get(RBD_IMAGE_OPTION_STRIPE_COUNT, &stripe_count) != 0) {
1747       opts.set(RBD_IMAGE_OPTION_STRIPE_COUNT, stripe_count);
1748     }
1749     uint64_t order = src->order;
1750     if (opts.get(RBD_IMAGE_OPTION_ORDER, &order) != 0) {
1751       opts.set(RBD_IMAGE_OPTION_ORDER, order);
1752     }
1753     if (opts.get(RBD_IMAGE_OPTION_FEATURES, &features) != 0) {
1754       opts.set(RBD_IMAGE_OPTION_FEATURES, features);
1755     }
1756     if (features & ~RBD_FEATURES_ALL) {
1757       lderr(cct) << "librbd does not support requested features" << dendl;
1758       return -ENOSYS;
1759     }
1760
1761     int r = create(dest_md_ctx, destname, "", src_size, opts, "", "", false);
1762     if (r < 0) {
1763       lderr(cct) << "header creation failed" << dendl;
1764       return r;
1765     }
1766     opts.set(RBD_IMAGE_OPTION_ORDER, static_cast<uint64_t>(order));
1767
1768     ImageCtx *dest = new librbd::ImageCtx(destname, "", NULL,
1769                                           dest_md_ctx, false);
1770     r = dest->state->open(false);
1771     if (r < 0) {
1772       lderr(cct) << "failed to read newly created header" << dendl;
1773       return r;
1774     }
1775
1776     r = copy(src, dest, prog_ctx, sparse_size);
1777
1778     int close_r = dest->state->close();
1779     if (r == 0 && close_r < 0) {
1780       r = close_r;
1781     }
1782     return r;
1783   }
1784
1785   class C_CopyWrite : public Context {
1786   public:
1787     C_CopyWrite(bufferlist *bl, Context* ctx)
1788       : m_bl(bl), m_ctx(ctx) {}
1789     void finish(int r) override {
1790       delete m_bl;
1791       m_ctx->complete(r);
1792     }
1793   private:
1794     bufferlist *m_bl;
1795     Context *m_ctx;
1796   };
1797
1798   class C_CopyRead : public Context {
1799   public:
1800     C_CopyRead(SimpleThrottle *throttle, ImageCtx *dest, uint64_t offset,
1801                bufferlist *bl, size_t sparse_size)
1802       : m_throttle(throttle), m_dest(dest), m_offset(offset), m_bl(bl),
1803       m_sparse_size(sparse_size) {
1804       m_throttle->start_op();
1805     }
1806     void finish(int r) override {
1807       if (r < 0) {
1808         lderr(m_dest->cct) << "error reading from source image at offset "
1809                            << m_offset << ": " << cpp_strerror(r) << dendl;
1810         delete m_bl;
1811         m_throttle->end_op(r);
1812         return;
1813       }
1814       assert(m_bl->length() == (size_t)r);
1815
1816       if (m_bl->is_zero()) {
1817         delete m_bl;
1818         m_throttle->end_op(r);
1819         return;
1820       }
1821
1822       if (!m_sparse_size) {
1823         m_sparse_size = (1 << m_dest->order);
1824       }
1825
1826       auto *throttle = m_throttle;
1827       auto *end_op_ctx = new FunctionContext([throttle](int r) {
1828         throttle->end_op(r);
1829       });
1830       auto gather_ctx = new C_Gather(m_dest->cct, end_op_ctx);
1831
1832       bufferptr m_ptr(m_bl->length());
1833       m_bl->rebuild(m_ptr);
1834       size_t write_offset = 0;
1835       size_t write_length = 0;
1836       size_t offset = 0;
1837       size_t length = m_bl->length();
1838       while (offset < length) {
1839         if (util::calc_sparse_extent(m_ptr,
1840                                      m_sparse_size,
1841                                      length,
1842                                      &write_offset,
1843                                      &write_length,
1844                                      &offset)) {
1845           bufferptr write_ptr(m_ptr, write_offset, write_length);
1846           bufferlist *write_bl = new bufferlist();
1847           write_bl->push_back(write_ptr);
1848           Context *ctx = new C_CopyWrite(write_bl, gather_ctx->new_sub());
1849           auto comp = io::AioCompletion::create(ctx);
1850
1851           // coordinate through AIO WQ to ensure lock is acquired if needed
1852           m_dest->io_work_queue->aio_write(comp, m_offset + write_offset,
1853                                            write_length,
1854                                            std::move(*write_bl),
1855                                            LIBRADOS_OP_FLAG_FADVISE_DONTNEED,
1856                                            std::move(read_trace));
1857           write_offset = offset;
1858           write_length = 0;
1859         }
1860       }
1861       delete m_bl;
1862       assert(gather_ctx->get_sub_created_count() > 0);
1863       gather_ctx->activate();
1864     }
1865
1866     ZTracer::Trace read_trace;
1867
1868   private:
1869     SimpleThrottle *m_throttle;
1870     ImageCtx *m_dest;
1871     uint64_t m_offset;
1872     bufferlist *m_bl;
1873     size_t m_sparse_size;
1874   };
1875
1876   int copy(ImageCtx *src, ImageCtx *dest, ProgressContext &prog_ctx, size_t sparse_size)
1877   {
1878     src->snap_lock.get_read();
1879     uint64_t src_size = src->get_image_size(src->snap_id);
1880     src->snap_lock.put_read();
1881
1882     dest->snap_lock.get_read();
1883     uint64_t dest_size = dest->get_image_size(dest->snap_id);
1884     dest->snap_lock.put_read();
1885
1886     CephContext *cct = src->cct;
1887     if (dest_size < src_size) {
1888       lderr(cct) << " src size " << src_size << " > dest size "
1889                  << dest_size << dendl;
1890       return -EINVAL;
1891     }
1892     int r;
1893     map<string, bufferlist> pairs;
1894
1895     r = cls_client::metadata_list(&src->md_ctx, src->header_oid, "", 0, &pairs);
1896     if (r < 0 && r != -EOPNOTSUPP && r != -EIO) {
1897       lderr(cct) << "couldn't list metadata: " << cpp_strerror(r) << dendl;
1898       return r;
1899     } else if (r == 0 && !pairs.empty()) {
1900       r = cls_client::metadata_set(&dest->md_ctx, dest->header_oid, pairs);
1901       if (r < 0) {
1902         lderr(cct) << "couldn't set metadata: " << cpp_strerror(r) << dendl;
1903         return r;
1904       }
1905     }
1906
1907     ZTracer::Trace trace;
1908     if (src->blkin_trace_all) {
1909       trace.init("copy", &src->trace_endpoint);
1910     }
1911
1912     RWLock::RLocker owner_lock(src->owner_lock);
1913     SimpleThrottle throttle(src->concurrent_management_ops, false);
1914     uint64_t period = src->get_stripe_period();
1915     unsigned fadvise_flags = LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL |
1916                              LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1917     for (uint64_t offset = 0; offset < src_size; offset += period) {
1918       if (throttle.pending_error()) {
1919         return throttle.wait_for_ret();
1920       }
1921
1922       uint64_t len = min(period, src_size - offset);
1923       bufferlist *bl = new bufferlist();
1924       auto ctx = new C_CopyRead(&throttle, dest, offset, bl, sparse_size);
1925       auto comp = io::AioCompletion::create_and_start<Context>(
1926         ctx, src, io::AIO_TYPE_READ);
1927
1928       io::ImageReadRequest<> req(*src, comp, {{offset, len}},
1929                                  io::ReadResult{bl}, fadvise_flags,
1930                                  std::move(trace));
1931       ctx->read_trace = req.get_trace();
1932
1933       req.send();
1934       prog_ctx.update_progress(offset, src_size);
1935     }
1936
1937     r = throttle.wait_for_ret();
1938     if (r >= 0)
1939       prog_ctx.update_progress(src_size, src_size);
1940     return r;
1941   }
1942
1943   int snap_set(ImageCtx *ictx, const cls::rbd::SnapshotNamespace &snap_namespace,
1944                const char *snap_name)
1945   {
1946     ldout(ictx->cct, 20) << "snap_set " << ictx << " snap = "
1947                          << (snap_name ? snap_name : "NULL") << dendl;
1948
1949     // ignore return value, since we may be set to a non-existent
1950     // snapshot and the user is trying to fix that
1951     ictx->state->refresh_if_required();
1952
1953     C_SaferCond ctx;
1954     std::string name(snap_name == nullptr ? "" : snap_name);
1955     ictx->state->snap_set(snap_namespace, name, &ctx);
1956
1957     int r = ctx.wait();
1958     if (r < 0) {
1959       if (r != -ENOENT) {
1960         lderr(ictx->cct) << "failed to " << (name.empty() ? "un" : "") << "set "
1961                          << "snapshot: " << cpp_strerror(r) << dendl;
1962       }
1963       return r;
1964     }
1965
1966     return 0;
1967   }
1968
1969   int list_lockers(ImageCtx *ictx,
1970                    std::list<locker_t> *lockers,
1971                    bool *exclusive,
1972                    string *tag)
1973   {
1974     ldout(ictx->cct, 20) << "list_locks on image " << ictx << dendl;
1975
1976     int r = ictx->state->refresh_if_required();
1977     if (r < 0)
1978       return r;
1979
1980     RWLock::RLocker locker(ictx->md_lock);
1981     if (exclusive)
1982       *exclusive = ictx->exclusive_locked;
1983     if (tag)
1984       *tag = ictx->lock_tag;
1985     if (lockers) {
1986       lockers->clear();
1987       map<rados::cls::lock::locker_id_t,
1988           rados::cls::lock::locker_info_t>::const_iterator it;
1989       for (it = ictx->lockers.begin(); it != ictx->lockers.end(); ++it) {
1990         locker_t locker;
1991         locker.client = stringify(it->first.locker);
1992         locker.cookie = it->first.cookie;
1993         locker.address = stringify(it->second.addr);
1994         lockers->push_back(locker);
1995       }
1996     }
1997
1998     return 0;
1999   }
2000
2001   int lock(ImageCtx *ictx, bool exclusive, const string& cookie,
2002            const string& tag)
2003   {
2004     ldout(ictx->cct, 20) << "lock image " << ictx << " exclusive=" << exclusive
2005                          << " cookie='" << cookie << "' tag='" << tag << "'"
2006                          << dendl;
2007
2008     int r = ictx->state->refresh_if_required();
2009     if (r < 0)
2010       return r;
2011
2012     /**
2013      * If we wanted we could do something more intelligent, like local
2014      * checks that we think we will succeed. But for now, let's not
2015      * duplicate that code.
2016      */
2017     {
2018       RWLock::RLocker locker(ictx->md_lock);
2019       r = rados::cls::lock::lock(&ictx->md_ctx, ictx->header_oid, RBD_LOCK_NAME,
2020                                  exclusive ? LOCK_EXCLUSIVE : LOCK_SHARED,
2021                                  cookie, tag, "", utime_t(), 0);
2022       if (r < 0) {
2023         return r;
2024       }
2025     }
2026
2027     ictx->notify_update();
2028     return 0;
2029   }
2030
2031   int unlock(ImageCtx *ictx, const string& cookie)
2032   {
2033     ldout(ictx->cct, 20) << "unlock image " << ictx
2034                          << " cookie='" << cookie << "'" << dendl;
2035
2036     int r = ictx->state->refresh_if_required();
2037     if (r < 0)
2038       return r;
2039
2040     {
2041       RWLock::RLocker locker(ictx->md_lock);
2042       r = rados::cls::lock::unlock(&ictx->md_ctx, ictx->header_oid,
2043                                    RBD_LOCK_NAME, cookie);
2044       if (r < 0) {
2045         return r;
2046       }
2047     }
2048
2049     ictx->notify_update();
2050     return 0;
2051   }
2052
2053   int break_lock(ImageCtx *ictx, const string& client,
2054                  const string& cookie)
2055   {
2056     ldout(ictx->cct, 20) << "break_lock image " << ictx << " client='" << client
2057                          << "' cookie='" << cookie << "'" << dendl;
2058
2059     int r = ictx->state->refresh_if_required();
2060     if (r < 0)
2061       return r;
2062
2063     entity_name_t lock_client;
2064     if (!lock_client.parse(client)) {
2065       lderr(ictx->cct) << "Unable to parse client '" << client
2066                        << "'" << dendl;
2067       return -EINVAL;
2068     }
2069
2070     if (ictx->blacklist_on_break_lock) {
2071       typedef std::map<rados::cls::lock::locker_id_t,
2072                        rados::cls::lock::locker_info_t> Lockers;
2073       Lockers lockers;
2074       ClsLockType lock_type;
2075       std::string lock_tag;
2076       r = rados::cls::lock::get_lock_info(&ictx->md_ctx, ictx->header_oid,
2077                                           RBD_LOCK_NAME, &lockers, &lock_type,
2078                                           &lock_tag);
2079       if (r < 0) {
2080         lderr(ictx->cct) << "unable to retrieve lock info: " << cpp_strerror(r)
2081                        << dendl;
2082         return r;
2083       }
2084
2085       std::string client_address;
2086       for (Lockers::iterator it = lockers.begin();
2087            it != lockers.end(); ++it) {
2088         if (it->first.locker == lock_client) {
2089           client_address = stringify(it->second.addr);
2090           break;
2091         }
2092       }
2093       if (client_address.empty()) {
2094         return -ENOENT;
2095       }
2096
2097       RWLock::RLocker locker(ictx->md_lock);
2098       librados::Rados rados(ictx->md_ctx);
2099       r = rados.blacklist_add(client_address,
2100                               ictx->blacklist_expire_seconds);
2101       if (r < 0) {
2102         lderr(ictx->cct) << "unable to blacklist client: " << cpp_strerror(r)
2103                        << dendl;
2104         return r;
2105       }
2106     }
2107
2108     r = rados::cls::lock::break_lock(&ictx->md_ctx, ictx->header_oid,
2109                                      RBD_LOCK_NAME, cookie, lock_client);
2110     if (r < 0)
2111       return r;
2112     ictx->notify_update();
2113     return 0;
2114   }
2115
2116   void rbd_ctx_cb(completion_t cb, void *arg)
2117   {
2118     Context *ctx = reinterpret_cast<Context *>(arg);
2119     auto comp = reinterpret_cast<io::AioCompletion *>(cb);
2120     ctx->complete(comp->get_return_value());
2121     comp->release();
2122   }
2123
2124   int64_t read_iterate(ImageCtx *ictx, uint64_t off, uint64_t len,
2125                        int (*cb)(uint64_t, size_t, const char *, void *),
2126                        void *arg)
2127   {
2128     utime_t start_time, elapsed;
2129
2130     ldout(ictx->cct, 20) << "read_iterate " << ictx << " off = " << off
2131                          << " len = " << len << dendl;
2132
2133     int r = ictx->state->refresh_if_required();
2134     if (r < 0)
2135       return r;
2136
2137     uint64_t mylen = len;
2138     ictx->snap_lock.get_read();
2139     r = clip_io(ictx, off, &mylen);
2140     ictx->snap_lock.put_read();
2141     if (r < 0)
2142       return r;
2143
2144     int64_t total_read = 0;
2145     uint64_t period = ictx->get_stripe_period();
2146     uint64_t left = mylen;
2147
2148     ZTracer::Trace trace;
2149     if (ictx->blkin_trace_all) {
2150       trace.init("read_iterate", &ictx->trace_endpoint);
2151     }
2152
2153     RWLock::RLocker owner_locker(ictx->owner_lock);
2154     start_time = ceph_clock_now();
2155     while (left > 0) {
2156       uint64_t period_off = off - (off % period);
2157       uint64_t read_len = min(period_off + period - off, left);
2158
2159       bufferlist bl;
2160
2161       C_SaferCond ctx;
2162       auto c = io::AioCompletion::create_and_start(&ctx, ictx,
2163                                                    io::AIO_TYPE_READ);
2164       io::ImageRequest<>::aio_read(ictx, c, {{off, read_len}},
2165                                    io::ReadResult{&bl}, 0, std::move(trace));
2166
2167       int ret = ctx.wait();
2168       if (ret < 0) {
2169         return ret;
2170       }
2171
2172       r = cb(total_read, ret, bl.c_str(), arg);
2173       if (r < 0) {
2174         return r;
2175       }
2176
2177       total_read += ret;
2178       left -= ret;
2179       off += ret;
2180     }
2181
2182     elapsed = ceph_clock_now() - start_time;
2183     ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed);
2184     ictx->perfcounter->inc(l_librbd_rd);
2185     ictx->perfcounter->inc(l_librbd_rd_bytes, mylen);
2186     return total_read;
2187   }
2188
2189   // validate extent against image size; clip to image size if necessary
2190   int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len)
2191   {
2192     assert(ictx->snap_lock.is_locked());
2193     uint64_t image_size = ictx->get_image_size(ictx->snap_id);
2194     bool snap_exists = ictx->snap_exists;
2195
2196     if (!snap_exists)
2197       return -ENOENT;
2198
2199     // special-case "len == 0" requests: always valid
2200     if (*len == 0)
2201       return 0;
2202
2203     // can't start past end
2204     if (off >= image_size)
2205       return -EINVAL;
2206
2207     // clip requests that extend past end to just end
2208     if ((off + *len) > image_size)
2209       *len = (size_t)(image_size - off);
2210
2211     return 0;
2212   }
2213
2214   int flush(ImageCtx *ictx)
2215   {
2216     CephContext *cct = ictx->cct;
2217     ldout(cct, 20) << "flush " << ictx << dendl;
2218
2219     int r = ictx->state->refresh_if_required();
2220     if (r < 0) {
2221       return r;
2222     }
2223
2224     ictx->user_flushed();
2225     C_SaferCond ctx;
2226     {
2227       RWLock::RLocker owner_locker(ictx->owner_lock);
2228       ictx->flush(&ctx);
2229     }
2230     r = ctx.wait();
2231
2232     ictx->perfcounter->inc(l_librbd_flush);
2233     return r;
2234   }
2235
2236   int invalidate_cache(ImageCtx *ictx)
2237   {
2238     CephContext *cct = ictx->cct;
2239     ldout(cct, 20) << "invalidate_cache " << ictx << dendl;
2240
2241     int r = ictx->state->refresh_if_required();
2242     if (r < 0) {
2243       return r;
2244     }
2245
2246     RWLock::RLocker owner_locker(ictx->owner_lock);
2247     RWLock::WLocker md_locker(ictx->md_lock);
2248     r = ictx->invalidate_cache(false);
2249     ictx->perfcounter->inc(l_librbd_invalidate_cache);
2250     return r;
2251   }
2252
2253   int poll_io_events(ImageCtx *ictx, io::AioCompletion **comps, int numcomp)
2254   {
2255     if (numcomp <= 0)
2256       return -EINVAL;
2257     CephContext *cct = ictx->cct;
2258     ldout(cct, 20) << __func__ << " " << ictx << " numcomp = " << numcomp
2259                    << dendl;
2260     int i = 0;
2261     Mutex::Locker l(ictx->completed_reqs_lock);
2262     while (i < numcomp) {
2263       if (ictx->completed_reqs.empty())
2264         break;
2265       comps[i++] = ictx->completed_reqs.front();
2266       ictx->completed_reqs.pop_front();
2267     }
2268     return i;
2269   }
2270
2271   int metadata_get(ImageCtx *ictx, const string &key, string *value)
2272   {
2273     CephContext *cct = ictx->cct;
2274     ldout(cct, 20) << "metadata_get " << ictx << " key=" << key << dendl;
2275
2276     int r = ictx->state->refresh_if_required();
2277     if (r < 0) {
2278       return r;
2279     }
2280
2281     return cls_client::metadata_get(&ictx->md_ctx, ictx->header_oid, key, value);
2282   }
2283
2284   int metadata_list(ImageCtx *ictx, const string &start, uint64_t max, map<string, bufferlist> *pairs)
2285   {
2286     CephContext *cct = ictx->cct;
2287     ldout(cct, 20) << "metadata_list " << ictx << dendl;
2288
2289     int r = ictx->state->refresh_if_required();
2290     if (r < 0) {
2291       return r;
2292     }
2293
2294     return cls_client::metadata_list(&ictx->md_ctx, ictx->header_oid, start, max, pairs);
2295   }
2296
2297   struct C_RBD_Readahead : public Context {
2298     ImageCtx *ictx;
2299     object_t oid;
2300     uint64_t offset;
2301     uint64_t length;
2302     C_RBD_Readahead(ImageCtx *ictx, object_t oid, uint64_t offset, uint64_t length)
2303       : ictx(ictx), oid(oid), offset(offset), length(length) { }
2304     void finish(int r) override {
2305       ldout(ictx->cct, 20) << "C_RBD_Readahead on " << oid << ": " << offset << "+" << length << dendl;
2306       ictx->readahead.dec_pending();
2307     }
2308   };
2309
2310   void readahead(ImageCtx *ictx,
2311                  const vector<pair<uint64_t,uint64_t> >& image_extents)
2312   {
2313     uint64_t total_bytes = 0;
2314     for (vector<pair<uint64_t,uint64_t> >::const_iterator p = image_extents.begin();
2315          p != image_extents.end();
2316          ++p) {
2317       total_bytes += p->second;
2318     }
2319     
2320     ictx->md_lock.get_write();
2321     bool abort = ictx->readahead_disable_after_bytes != 0 &&
2322       ictx->total_bytes_read > ictx->readahead_disable_after_bytes;
2323     if (abort) {
2324       ictx->md_lock.put_write();
2325       return;
2326     }
2327     ictx->total_bytes_read += total_bytes;
2328     ictx->snap_lock.get_read();
2329     uint64_t image_size = ictx->get_image_size(ictx->snap_id);
2330     ictx->snap_lock.put_read();
2331     ictx->md_lock.put_write();
2332  
2333     pair<uint64_t, uint64_t> readahead_extent = ictx->readahead.update(image_extents, image_size);
2334     uint64_t readahead_offset = readahead_extent.first;
2335     uint64_t readahead_length = readahead_extent.second;
2336
2337     if (readahead_length > 0) {
2338       ldout(ictx->cct, 20) << "(readahead logical) " << readahead_offset << "~" << readahead_length << dendl;
2339       map<object_t,vector<ObjectExtent> > readahead_object_extents;
2340       Striper::file_to_extents(ictx->cct, ictx->format_string, &ictx->layout,
2341                                readahead_offset, readahead_length, 0, readahead_object_extents);
2342       for (map<object_t,vector<ObjectExtent> >::iterator p = readahead_object_extents.begin(); p != readahead_object_extents.end(); ++p) {
2343         for (vector<ObjectExtent>::iterator q = p->second.begin(); q != p->second.end(); ++q) {
2344           ldout(ictx->cct, 20) << "(readahead) oid " << q->oid << " " << q->offset << "~" << q->length << dendl;
2345
2346           Context *req_comp = new C_RBD_Readahead(ictx, q->oid, q->offset, q->length);
2347           ictx->readahead.inc_pending();
2348           ictx->aio_read_from_cache(q->oid, q->objectno, NULL,
2349                                     q->length, q->offset,
2350                                     req_comp, 0, nullptr);
2351         }
2352       }
2353       ictx->perfcounter->inc(l_librbd_readahead);
2354       ictx->perfcounter->inc(l_librbd_readahead_bytes, readahead_length);
2355     }
2356   }
2357
2358
2359
2360 }