Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / ImageCtx.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include <errno.h>
4 #include <boost/assign/list_of.hpp>
5 #include <stddef.h>
6
7 #include "common/ceph_context.h"
8 #include "common/dout.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11 #include "common/WorkQueue.h"
12 #include "common/Timer.h"
13
14 #include "librbd/AsyncRequest.h"
15 #include "librbd/ExclusiveLock.h"
16 #include "librbd/internal.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/ImageWatcher.h"
20 #include "librbd/Journal.h"
21 #include "librbd/LibrbdAdminSocketHook.h"
22 #include "librbd/ObjectMap.h"
23 #include "librbd/Operations.h"
24 #include "librbd/operation/ResizeRequest.h"
25 #include "librbd/Utils.h"
26 #include "librbd/LibrbdWriteback.h"
27 #include "librbd/exclusive_lock/AutomaticPolicy.h"
28 #include "librbd/exclusive_lock/StandardPolicy.h"
29 #include "librbd/io/AioCompletion.h"
30 #include "librbd/io/AsyncOperation.h"
31 #include "librbd/io/ImageRequestWQ.h"
32 #include "librbd/journal/StandardPolicy.h"
33
34 #include "osdc/Striper.h"
35 #include <boost/bind.hpp>
36
37 #define dout_subsys ceph_subsys_rbd
38 #undef dout_prefix
39 #define dout_prefix *_dout << "librbd::ImageCtx: "
40
41 using std::map;
42 using std::pair;
43 using std::set;
44 using std::string;
45 using std::vector;
46
47 using ceph::bufferlist;
48 using librados::snap_t;
49 using librados::IoCtx;
50
51 namespace librbd {
52
53 namespace {
54
55 class ThreadPoolSingleton : public ThreadPool {
56 public:
57   ContextWQ *op_work_queue;
58
59   explicit ThreadPoolSingleton(CephContext *cct)
60     : ThreadPool(cct, "librbd::thread_pool", "tp_librbd", 1,
61                  "rbd_op_threads"),
62       op_work_queue(new ContextWQ("librbd::op_work_queue",
63                                   cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
64                                   this)) {
65     start();
66   }
67   ~ThreadPoolSingleton() override {
68     op_work_queue->drain();
69     delete op_work_queue;
70
71     stop();
72   }
73 };
74
75 class SafeTimerSingleton : public SafeTimer {
76 public:
77   Mutex lock;
78
79   explicit SafeTimerSingleton(CephContext *cct)
80       : SafeTimer(cct, lock, true),
81         lock("librbd::Journal::SafeTimerSingleton::lock") {
82     init();
83   }
84   ~SafeTimerSingleton() {
85     Mutex::Locker locker(lock);
86     shutdown();
87   }
88 };
89
90 struct C_FlushCache : public Context {
91   ImageCtx *image_ctx;
92   Context *on_safe;
93
94   C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe)
95     : image_ctx(_image_ctx), on_safe(_on_safe) {
96   }
97   void finish(int r) override {
98     // successful cache flush indicates all IO is now safe
99     image_ctx->flush_cache(on_safe);
100   }
101 };
102
103 struct C_ShutDownCache : public Context {
104   ImageCtx *image_ctx;
105   Context *on_finish;
106
107   C_ShutDownCache(ImageCtx *_image_ctx, Context *_on_finish)
108     : image_ctx(_image_ctx), on_finish(_on_finish) {
109   }
110   void finish(int r) override {
111     image_ctx->object_cacher->stop();
112     on_finish->complete(r);
113   }
114 };
115
116 struct C_InvalidateCache : public Context {
117   ImageCtx *image_ctx;
118   bool purge_on_error;
119   bool reentrant_safe;
120   Context *on_finish;
121
122   C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error,
123                     bool _reentrant_safe, Context *_on_finish)
124     : image_ctx(_image_ctx), purge_on_error(_purge_on_error),
125       reentrant_safe(_reentrant_safe), on_finish(_on_finish) {
126   }
127   void finish(int r) override {
128     assert(image_ctx->cache_lock.is_locked());
129     CephContext *cct = image_ctx->cct;
130
131     if (r == -EBLACKLISTED) {
132       lderr(cct) << "Blacklisted during flush!  Purging cache..." << dendl;
133       image_ctx->object_cacher->purge_set(image_ctx->object_set);
134     } else if (r != 0 && purge_on_error) {
135       lderr(cct) << "invalidate cache encountered error "
136                  << cpp_strerror(r) << " !Purging cache..." << dendl;
137       image_ctx->object_cacher->purge_set(image_ctx->object_set);
138     } else if (r != 0) {
139       lderr(cct) << "flush_cache returned " << r << dendl;
140     }
141
142     loff_t unclean = image_ctx->object_cacher->release_set(
143       image_ctx->object_set);
144     if (unclean == 0) {
145       r = 0;
146     } else {
147       lderr(cct) << "could not release all objects from cache: "
148                  << unclean << " bytes remain" << dendl;
149       if (r == 0) {
150         r = -EBUSY;
151       }
152     }
153
154     if (reentrant_safe) {
155       on_finish->complete(r);
156     } else {
157       image_ctx->op_work_queue->queue(on_finish, r);
158     }
159   }
160
161 };
162
163 } // anonymous namespace
164
165   const string ImageCtx::METADATA_CONF_PREFIX = "conf_";
166
167   ImageCtx::ImageCtx(const string &image_name, const string &image_id,
168                      const char *snap, IoCtx& p, bool ro)
169     : cct((CephContext*)p.cct()),
170       perfcounter(NULL),
171       snap_id(CEPH_NOSNAP),
172       snap_exists(true),
173       read_only(ro),
174       flush_encountered(false),
175       exclusive_locked(false),
176       name(image_name),
177       image_watcher(NULL),
178       journal(NULL),
179       owner_lock(util::unique_lock_name("librbd::ImageCtx::owner_lock", this)),
180       md_lock(util::unique_lock_name("librbd::ImageCtx::md_lock", this)),
181       cache_lock(util::unique_lock_name("librbd::ImageCtx::cache_lock", this)),
182       snap_lock(util::unique_lock_name("librbd::ImageCtx::snap_lock", this)),
183       parent_lock(util::unique_lock_name("librbd::ImageCtx::parent_lock", this)),
184       object_map_lock(util::unique_lock_name("librbd::ImageCtx::object_map_lock", this)),
185       async_ops_lock(util::unique_lock_name("librbd::ImageCtx::async_ops_lock", this)),
186       copyup_list_lock(util::unique_lock_name("librbd::ImageCtx::copyup_list_lock", this)),
187       completed_reqs_lock(util::unique_lock_name("librbd::ImageCtx::completed_reqs_lock", this)),
188       extra_read_flags(0),
189       old_format(true),
190       order(0), size(0), features(0),
191       format_string(NULL),
192       id(image_id), parent(NULL),
193       stripe_unit(0), stripe_count(0), flags(0),
194       object_cacher(NULL), writeback_handler(NULL), object_set(NULL),
195       readahead(),
196       total_bytes_read(0),
197       state(new ImageState<>(this)),
198       operations(new Operations<>(*this)),
199       exclusive_lock(nullptr), object_map(nullptr),
200       io_work_queue(nullptr), op_work_queue(nullptr),
201       asok_hook(nullptr),
202       trace_endpoint("librbd")
203   {
204     md_ctx.dup(p);
205     data_ctx.dup(p);
206     if (snap)
207       snap_name = snap;
208
209     memset(&header, 0, sizeof(header));
210
211     ThreadPool *thread_pool;
212     get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
213     io_work_queue = new io::ImageRequestWQ<>(
214       this, "librbd::io_work_queue",
215       cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
216       thread_pool);
217
218     if (cct->_conf->get_val<bool>("rbd_auto_exclusive_lock_until_manual_request")) {
219       exclusive_lock_policy = new exclusive_lock::AutomaticPolicy(this);
220     } else {
221       exclusive_lock_policy = new exclusive_lock::StandardPolicy(this);
222     }
223     journal_policy = new journal::StandardPolicy<ImageCtx>(this);
224   }
225
226   ImageCtx::~ImageCtx() {
227     assert(image_watcher == NULL);
228     assert(exclusive_lock == NULL);
229     assert(object_map == NULL);
230     assert(journal == NULL);
231     assert(asok_hook == NULL);
232
233     if (perfcounter) {
234       perf_stop();
235     }
236     if (object_cacher) {
237       delete object_cacher;
238       object_cacher = NULL;
239     }
240     if (writeback_handler) {
241       delete writeback_handler;
242       writeback_handler = NULL;
243     }
244     if (object_set) {
245       delete object_set;
246       object_set = NULL;
247     }
248     delete[] format_string;
249
250     md_ctx.aio_flush();
251     data_ctx.aio_flush();
252     io_work_queue->drain();
253
254     delete journal_policy;
255     delete exclusive_lock_policy;
256     delete io_work_queue;
257     delete operations;
258     delete state;
259   }
260
261   void ImageCtx::init() {
262     assert(!header_oid.empty());
263     assert(old_format || !id.empty());
264
265     asok_hook = new LibrbdAdminSocketHook(this);
266
267     string pname = string("librbd-") + id + string("-") +
268       data_ctx.get_pool_name() + string("-") + name;
269     if (!snap_name.empty()) {
270       pname += "-";
271       pname += snap_name;
272     }
273
274     trace_endpoint.copy_name(pname);
275     perf_start(pname);
276
277     if (cache) {
278       Mutex::Locker l(cache_lock);
279       ldout(cct, 20) << "enabling caching..." << dendl;
280       writeback_handler = new LibrbdWriteback(this, cache_lock);
281
282       uint64_t init_max_dirty = cache_max_dirty;
283       if (cache_writethrough_until_flush)
284         init_max_dirty = 0;
285       ldout(cct, 20) << "Initial cache settings:"
286                      << " size=" << cache_size
287                      << " num_objects=" << 10
288                      << " max_dirty=" << init_max_dirty
289                      << " target_dirty=" << cache_target_dirty
290                      << " max_dirty_age="
291                      << cache_max_dirty_age << dendl;
292
293       object_cacher = new ObjectCacher(cct, pname, *writeback_handler, cache_lock,
294                                        NULL, NULL,
295                                        cache_size,
296                                        10,  /* reset this in init */
297                                        init_max_dirty,
298                                        cache_target_dirty,
299                                        cache_max_dirty_age,
300                                        cache_block_writes_upfront);
301
302       // size object cache appropriately
303       uint64_t obj = cache_max_dirty_object;
304       if (!obj) {
305         obj = MIN(2000, MAX(10, cache_size / 100 / sizeof(ObjectCacher::Object)));
306       }
307       ldout(cct, 10) << " cache bytes " << cache_size
308         << " -> about " << obj << " objects" << dendl;
309       object_cacher->set_max_objects(obj);
310
311       object_set = new ObjectCacher::ObjectSet(NULL, data_ctx.get_id(), 0);
312       object_set->return_enoent = true;
313       object_cacher->start();
314     }
315
316     readahead.set_trigger_requests(readahead_trigger_requests);
317     readahead.set_max_readahead_size(readahead_max_bytes);
318   }
319
320   void ImageCtx::shutdown() {
321     delete image_watcher;
322     image_watcher = nullptr;
323
324     delete asok_hook;
325     asok_hook = nullptr;
326   }
327
328   void ImageCtx::init_layout()
329   {
330     if (stripe_unit == 0 || stripe_count == 0) {
331       stripe_unit = 1ull << order;
332       stripe_count = 1;
333     }
334
335     vector<uint64_t> alignments;
336     alignments.push_back(stripe_count << order); // object set (in file striping terminology)
337     alignments.push_back(stripe_unit * stripe_count); // stripe
338     alignments.push_back(stripe_unit); // stripe unit
339     readahead.set_alignments(alignments);
340
341     layout = file_layout_t();
342     layout.stripe_unit = stripe_unit;
343     layout.stripe_count = stripe_count;
344     layout.object_size = 1ull << order;
345     layout.pool_id = data_ctx.get_id();  // FIXME: pool id overflow?
346
347     delete[] format_string;
348     size_t len = object_prefix.length() + 16;
349     format_string = new char[len];
350     if (old_format) {
351       snprintf(format_string, len, "%s.%%012llx", object_prefix.c_str());
352     } else {
353       snprintf(format_string, len, "%s.%%016llx", object_prefix.c_str());
354     }
355
356     ldout(cct, 10) << "init_layout stripe_unit " << stripe_unit
357                    << " stripe_count " << stripe_count
358                    << " object_size " << layout.object_size
359                    << " prefix " << object_prefix
360                    << " format " << format_string
361                    << dendl;
362   }
363
364   void ImageCtx::perf_start(string name) {
365     PerfCountersBuilder plb(cct, name, l_librbd_first, l_librbd_last);
366
367     plb.add_u64_counter(l_librbd_rd, "rd", "Reads");
368     plb.add_u64_counter(l_librbd_rd_bytes, "rd_bytes", "Data size in reads");
369     plb.add_time_avg(l_librbd_rd_latency, "rd_latency", "Latency of reads");
370     plb.add_u64_counter(l_librbd_wr, "wr", "Writes");
371     plb.add_u64_counter(l_librbd_wr_bytes, "wr_bytes", "Written data");
372     plb.add_time_avg(l_librbd_wr_latency, "wr_latency", "Write latency");
373     plb.add_u64_counter(l_librbd_discard, "discard", "Discards");
374     plb.add_u64_counter(l_librbd_discard_bytes, "discard_bytes", "Discarded data");
375     plb.add_time_avg(l_librbd_discard_latency, "discard_latency", "Discard latency");
376     plb.add_u64_counter(l_librbd_flush, "flush", "Flushes");
377     plb.add_u64_counter(l_librbd_aio_flush, "aio_flush", "Async flushes");
378     plb.add_time_avg(l_librbd_aio_flush_latency, "aio_flush_latency", "Latency of async flushes");
379     plb.add_u64_counter(l_librbd_ws, "ws", "WriteSames");
380     plb.add_u64_counter(l_librbd_ws_bytes, "ws_bytes", "WriteSame data");
381     plb.add_time_avg(l_librbd_ws_latency, "ws_latency", "WriteSame latency");
382     plb.add_u64_counter(l_librbd_cmp, "cmp", "CompareAndWrites");
383     plb.add_u64_counter(l_librbd_cmp_bytes, "cmp_bytes", "Data size in cmps");
384     plb.add_time_avg(l_librbd_cmp_latency, "cmp_latency", "Latency of cmps");
385     plb.add_u64_counter(l_librbd_snap_create, "snap_create", "Snap creations");
386     plb.add_u64_counter(l_librbd_snap_remove, "snap_remove", "Snap removals");
387     plb.add_u64_counter(l_librbd_snap_rollback, "snap_rollback", "Snap rollbacks");
388     plb.add_u64_counter(l_librbd_snap_rename, "snap_rename", "Snap rename");
389     plb.add_u64_counter(l_librbd_notify, "notify", "Updated header notifications");
390     plb.add_u64_counter(l_librbd_resize, "resize", "Resizes");
391     plb.add_u64_counter(l_librbd_readahead, "readahead", "Read ahead");
392     plb.add_u64_counter(l_librbd_readahead_bytes, "readahead_bytes", "Data size in read ahead");
393     plb.add_u64_counter(l_librbd_invalidate_cache, "invalidate_cache", "Cache invalidates");
394
395     perfcounter = plb.create_perf_counters();
396     cct->get_perfcounters_collection()->add(perfcounter);
397   }
398
399   void ImageCtx::perf_stop() {
400     assert(perfcounter);
401     cct->get_perfcounters_collection()->remove(perfcounter);
402     delete perfcounter;
403   }
404
405   void ImageCtx::set_read_flag(unsigned flag) {
406     extra_read_flags |= flag;
407   }
408
409   int ImageCtx::get_read_flags(snap_t snap_id) {
410     int flags = librados::OPERATION_NOFLAG | extra_read_flags;
411     if (snap_id == LIBRADOS_SNAP_HEAD)
412       return flags;
413
414     if (balance_snap_reads)
415       flags |= librados::OPERATION_BALANCE_READS;
416     else if (localize_snap_reads)
417       flags |= librados::OPERATION_LOCALIZE_READS;
418     return flags;
419   }
420
421   int ImageCtx::snap_set(cls::rbd::SnapshotNamespace in_snap_namespace,
422                          string in_snap_name)
423   {
424     assert(snap_lock.is_wlocked());
425     snap_t in_snap_id = get_snap_id(in_snap_namespace, in_snap_name);
426     if (in_snap_id != CEPH_NOSNAP) {
427       snap_id = in_snap_id;
428       snap_namespace = in_snap_namespace;
429       snap_name = in_snap_name;
430       snap_exists = true;
431       data_ctx.snap_set_read(snap_id);
432       return 0;
433     }
434     return -ENOENT;
435   }
436
437   void ImageCtx::snap_unset()
438   {
439     assert(snap_lock.is_wlocked());
440     snap_id = CEPH_NOSNAP;
441     snap_namespace = {};
442     snap_name = "";
443     snap_exists = true;
444     data_ctx.snap_set_read(snap_id);
445   }
446
447   snap_t ImageCtx::get_snap_id(cls::rbd::SnapshotNamespace in_snap_namespace,
448                                string in_snap_name) const
449   {
450     assert(snap_lock.is_locked());
451     auto it = snap_ids.find({in_snap_namespace, in_snap_name});
452     if (it != snap_ids.end())
453       return it->second;
454     return CEPH_NOSNAP;
455   }
456
457   const SnapInfo* ImageCtx::get_snap_info(snap_t in_snap_id) const
458   {
459     assert(snap_lock.is_locked());
460     map<snap_t, SnapInfo>::const_iterator it =
461       snap_info.find(in_snap_id);
462     if (it != snap_info.end())
463       return &it->second;
464     return NULL;
465   }
466
467   int ImageCtx::get_snap_name(snap_t in_snap_id,
468                               string *out_snap_name) const
469   {
470     assert(snap_lock.is_locked());
471     const SnapInfo *info = get_snap_info(in_snap_id);
472     if (info) {
473       *out_snap_name = info->name;
474       return 0;
475     }
476     return -ENOENT;
477   }
478
479   int ImageCtx::get_snap_namespace(snap_t in_snap_id,
480                                    cls::rbd::SnapshotNamespace *out_snap_namespace) const
481   {
482     assert(snap_lock.is_locked());
483     const SnapInfo *info = get_snap_info(in_snap_id);
484     if (info) {
485       *out_snap_namespace = info->snap_namespace;
486       return 0;
487     }
488     return -ENOENT;
489   }
490
491   int ImageCtx::get_parent_spec(snap_t in_snap_id,
492                                 ParentSpec *out_pspec) const
493   {
494     const SnapInfo *info = get_snap_info(in_snap_id);
495     if (info) {
496       *out_pspec = info->parent.spec;
497       return 0;
498     }
499     return -ENOENT;
500   }
501
502   uint64_t ImageCtx::get_current_size() const
503   {
504     assert(snap_lock.is_locked());
505     return size;
506   }
507
508   uint64_t ImageCtx::get_object_size() const
509   {
510     return 1ull << order;
511   }
512
513   string ImageCtx::get_object_name(uint64_t num) const {
514     char buf[object_prefix.length() + 32];
515     snprintf(buf, sizeof(buf), format_string, num);
516     return string(buf);
517   }
518
519   uint64_t ImageCtx::get_stripe_unit() const
520   {
521     return stripe_unit;
522   }
523
524   uint64_t ImageCtx::get_stripe_count() const
525   {
526     return stripe_count;
527   }
528
529   uint64_t ImageCtx::get_stripe_period() const
530   {
531     return stripe_count * (1ull << order);
532   }
533
534   utime_t ImageCtx::get_create_timestamp() const
535   {
536     return create_timestamp;
537   }
538
539   int ImageCtx::is_snap_protected(snap_t in_snap_id,
540                                   bool *is_protected) const
541   {
542     assert(snap_lock.is_locked());
543     const SnapInfo *info = get_snap_info(in_snap_id);
544     if (info) {
545       *is_protected =
546         (info->protection_status == RBD_PROTECTION_STATUS_PROTECTED);
547       return 0;
548     }
549     return -ENOENT;
550   }
551
552   int ImageCtx::is_snap_unprotected(snap_t in_snap_id,
553                                     bool *is_unprotected) const
554   {
555     assert(snap_lock.is_locked());
556     const SnapInfo *info = get_snap_info(in_snap_id);
557     if (info) {
558       *is_unprotected =
559         (info->protection_status == RBD_PROTECTION_STATUS_UNPROTECTED);
560       return 0;
561     }
562     return -ENOENT;
563   }
564
565   void ImageCtx::add_snap(cls::rbd::SnapshotNamespace in_snap_namespace,
566                           string in_snap_name,
567                           snap_t id, uint64_t in_size,
568                           const ParentInfo &parent, uint8_t protection_status,
569                           uint64_t flags, utime_t timestamp)
570   {
571     assert(snap_lock.is_wlocked());
572     snaps.push_back(id);
573     SnapInfo info(in_snap_name, in_snap_namespace,
574                   in_size, parent, protection_status, flags, timestamp);
575     snap_info.insert({id, info});
576     snap_ids.insert({{in_snap_namespace, in_snap_name}, id});
577   }
578
579   void ImageCtx::rm_snap(cls::rbd::SnapshotNamespace in_snap_namespace,
580                          string in_snap_name,
581                          snap_t id)
582   {
583     assert(snap_lock.is_wlocked());
584     snaps.erase(std::remove(snaps.begin(), snaps.end(), id), snaps.end());
585     snap_info.erase(id);
586     snap_ids.erase({in_snap_namespace, in_snap_name});
587   }
588
589   uint64_t ImageCtx::get_image_size(snap_t in_snap_id) const
590   {
591     assert(snap_lock.is_locked());
592     if (in_snap_id == CEPH_NOSNAP) {
593       if (!resize_reqs.empty() &&
594           resize_reqs.front()->shrinking()) {
595         return resize_reqs.front()->get_image_size();
596       }
597       return size;
598     }
599
600     const SnapInfo *info = get_snap_info(in_snap_id);
601     if (info) {
602       return info->size;
603     }
604     return 0;
605   }
606
607   uint64_t ImageCtx::get_object_count(snap_t in_snap_id) const {
608     assert(snap_lock.is_locked());
609     uint64_t image_size = get_image_size(in_snap_id);
610     return Striper::get_num_objects(layout, image_size);
611   }
612
613   bool ImageCtx::test_features(uint64_t features) const
614   {
615     RWLock::RLocker l(snap_lock);
616     return test_features(features, snap_lock);
617   }
618
619   bool ImageCtx::test_features(uint64_t in_features,
620                                const RWLock &in_snap_lock) const
621   {
622     assert(snap_lock.is_locked());
623     return ((features & in_features) == in_features);
624   }
625
626   int ImageCtx::get_flags(librados::snap_t _snap_id, uint64_t *_flags) const
627   {
628     assert(snap_lock.is_locked());
629     if (_snap_id == CEPH_NOSNAP) {
630       *_flags = flags;
631       return 0;
632     }
633     const SnapInfo *info = get_snap_info(_snap_id);
634     if (info) {
635       *_flags = info->flags;
636       return 0;
637     }
638     return -ENOENT;
639   }
640
641   int ImageCtx::test_flags(uint64_t flags, bool *flags_set) const
642   {
643     RWLock::RLocker l(snap_lock);
644     return test_flags(flags, snap_lock, flags_set);
645   }
646
647   int ImageCtx::test_flags(uint64_t flags, const RWLock &in_snap_lock,
648                            bool *flags_set) const
649   {
650     assert(snap_lock.is_locked());
651     uint64_t snap_flags;
652     int r = get_flags(snap_id, &snap_flags);
653     if (r < 0) {
654       return r;
655     }
656     *flags_set = ((snap_flags & flags) == flags);
657     return 0;
658   }
659
660   int ImageCtx::update_flags(snap_t in_snap_id, uint64_t flag, bool enabled)
661   {
662     assert(snap_lock.is_wlocked());
663     uint64_t *_flags;
664     if (in_snap_id == CEPH_NOSNAP) {
665       _flags = &flags;
666     } else {
667       map<snap_t, SnapInfo>::iterator it = snap_info.find(in_snap_id);
668       if (it == snap_info.end()) {
669         return -ENOENT;
670       }
671       _flags = &it->second.flags;
672     }
673
674     if (enabled) {
675       (*_flags) |= flag;
676     } else {
677       (*_flags) &= ~flag;
678     }
679     return 0;
680   }
681
682   const ParentInfo* ImageCtx::get_parent_info(snap_t in_snap_id) const
683   {
684     assert(snap_lock.is_locked());
685     assert(parent_lock.is_locked());
686     if (in_snap_id == CEPH_NOSNAP)
687       return &parent_md;
688     const SnapInfo *info = get_snap_info(in_snap_id);
689     if (info)
690       return &info->parent;
691     return NULL;
692   }
693
694   int64_t ImageCtx::get_parent_pool_id(snap_t in_snap_id) const
695   {
696     const ParentInfo *info = get_parent_info(in_snap_id);
697     if (info)
698       return info->spec.pool_id;
699     return -1;
700   }
701
702   string ImageCtx::get_parent_image_id(snap_t in_snap_id) const
703   {
704     const ParentInfo *info = get_parent_info(in_snap_id);
705     if (info)
706       return info->spec.image_id;
707     return "";
708   }
709
710   uint64_t ImageCtx::get_parent_snap_id(snap_t in_snap_id) const
711   {
712     const ParentInfo *info = get_parent_info(in_snap_id);
713     if (info)
714       return info->spec.snap_id;
715     return CEPH_NOSNAP;
716   }
717
718   int ImageCtx::get_parent_overlap(snap_t in_snap_id, uint64_t *overlap) const
719   {
720     assert(snap_lock.is_locked());
721     const ParentInfo *info = get_parent_info(in_snap_id);
722     if (info) {
723       *overlap = info->overlap;
724       return 0;
725     }
726     return -ENOENT;
727   }
728
729   void ImageCtx::aio_read_from_cache(object_t o, uint64_t object_no,
730                                      bufferlist *bl, size_t len,
731                                      uint64_t off, Context *onfinish,
732                                      int fadvise_flags, ZTracer::Trace *trace) {
733     snap_lock.get_read();
734     ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, fadvise_flags);
735     snap_lock.put_read();
736     ObjectExtent extent(o, object_no, off, len, 0);
737     extent.oloc.pool = data_ctx.get_id();
738     extent.buffer_extents.push_back(make_pair(0, len));
739     rd->extents.push_back(extent);
740     cache_lock.Lock();
741     int r = object_cacher->readx(rd, object_set, onfinish, trace);
742     cache_lock.Unlock();
743     if (r != 0)
744       onfinish->complete(r);
745   }
746
747   void ImageCtx::write_to_cache(object_t o, const bufferlist& bl, size_t len,
748                                 uint64_t off, Context *onfinish,
749                                 int fadvise_flags, uint64_t journal_tid,
750                                 ZTracer::Trace *trace) {
751     snap_lock.get_read();
752     ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(
753       snapc, bl, ceph::real_time::min(), fadvise_flags, journal_tid);
754     snap_lock.put_read();
755     ObjectExtent extent(o, 0, off, len, 0);
756     extent.oloc.pool = data_ctx.get_id();
757     // XXX: nspace is always default, io_ctx_impl field private
758     //extent.oloc.nspace = data_ctx.io_ctx_impl->oloc.nspace;
759     extent.buffer_extents.push_back(make_pair(0, len));
760     wr->extents.push_back(extent);
761     {
762       Mutex::Locker l(cache_lock);
763       object_cacher->writex(wr, object_set, onfinish, trace);
764     }
765   }
766
767   void ImageCtx::user_flushed() {
768     if (object_cacher && cache_writethrough_until_flush) {
769       md_lock.get_read();
770       bool flushed_before = flush_encountered;
771       md_lock.put_read();
772
773       uint64_t max_dirty = cache_max_dirty;
774       if (!flushed_before && max_dirty > 0) {
775         md_lock.get_write();
776         flush_encountered = true;
777         md_lock.put_write();
778
779         ldout(cct, 10) << "saw first user flush, enabling writeback" << dendl;
780         Mutex::Locker l(cache_lock);
781         object_cacher->set_max_dirty(max_dirty);
782       }
783     }
784   }
785
786   void ImageCtx::flush_cache(Context *onfinish) {
787     cache_lock.Lock();
788     object_cacher->flush_set(object_set, onfinish);
789     cache_lock.Unlock();
790   }
791
792   void ImageCtx::shut_down_cache(Context *on_finish) {
793     if (object_cacher == NULL) {
794       on_finish->complete(0);
795       return;
796     }
797
798     cache_lock.Lock();
799     object_cacher->release_set(object_set);
800     cache_lock.Unlock();
801
802     C_ShutDownCache *shut_down = new C_ShutDownCache(this, on_finish);
803     flush_cache(new C_InvalidateCache(this, true, false, shut_down));
804   }
805
806   int ImageCtx::invalidate_cache(bool purge_on_error) {
807     flush_async_operations();
808     if (object_cacher == NULL) {
809       return 0;
810     }
811
812     cache_lock.Lock();
813     object_cacher->release_set(object_set);
814     cache_lock.Unlock();
815
816     C_SaferCond ctx;
817     flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx));
818
819     int result = ctx.wait();
820     return result;
821   }
822
823   void ImageCtx::invalidate_cache(bool purge_on_error, Context *on_finish) {
824     if (object_cacher == NULL) {
825       op_work_queue->queue(on_finish, 0);
826       return;
827     }
828
829     cache_lock.Lock();
830     object_cacher->release_set(object_set);
831     cache_lock.Unlock();
832
833     flush_cache(new C_InvalidateCache(this, purge_on_error, false, on_finish));
834   }
835
836   void ImageCtx::clear_nonexistence_cache() {
837     assert(cache_lock.is_locked());
838     if (!object_cacher)
839       return;
840     object_cacher->clear_nonexistence(object_set);
841   }
842
843   bool ImageCtx::is_cache_empty() {
844     Mutex::Locker locker(cache_lock);
845     return object_cacher->set_is_empty(object_set);
846   }
847
848   void ImageCtx::register_watch(Context *on_finish) {
849     assert(image_watcher == NULL);
850     image_watcher = new ImageWatcher<>(*this);
851     image_watcher->register_watch(on_finish);
852   }
853
854   uint64_t ImageCtx::prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
855                                           uint64_t overlap)
856   {
857     // drop extents completely beyond the overlap
858     while (!objectx.empty() && objectx.back().first >= overlap)
859       objectx.pop_back();
860
861     // trim final overlapping extent
862     if (!objectx.empty() && objectx.back().first + objectx.back().second > overlap)
863       objectx.back().second = overlap - objectx.back().first;
864
865     uint64_t len = 0;
866     for (vector<pair<uint64_t,uint64_t> >::iterator p = objectx.begin();
867          p != objectx.end();
868          ++p)
869       len += p->second;
870     ldout(cct, 10) << "prune_parent_extents image overlap " << overlap
871                    << ", object overlap " << len
872                    << " from image extents " << objectx << dendl;
873     return len;
874   }
875
876   void ImageCtx::flush_async_operations() {
877     C_SaferCond ctx;
878     flush_async_operations(&ctx);
879     ctx.wait();
880   }
881
882   void ImageCtx::flush_async_operations(Context *on_finish) {
883     {
884       Mutex::Locker l(async_ops_lock);
885       if (!async_ops.empty()) {
886         ldout(cct, 20) << "flush async operations: " << on_finish << " "
887                        << "count=" << async_ops.size() << dendl;
888         async_ops.front()->add_flush_context(on_finish);
889         return;
890       }
891     }
892     on_finish->complete(0);
893   }
894
895   int ImageCtx::flush() {
896     C_SaferCond cond_ctx;
897     flush(&cond_ctx);
898     return cond_ctx.wait();
899   }
900
901   void ImageCtx::flush(Context *on_safe) {
902     // ensure no locks are held when flush is complete
903     on_safe = util::create_async_context_callback(*this, on_safe);
904
905     if (object_cacher != NULL) {
906       // flush cache after completing all in-flight AIO ops
907       on_safe = new C_FlushCache(this, on_safe);
908     }
909     flush_async_operations(on_safe);
910   }
911
912   void ImageCtx::cancel_async_requests() {
913     C_SaferCond ctx;
914     cancel_async_requests(&ctx);
915     ctx.wait();
916   }
917
918   void ImageCtx::cancel_async_requests(Context *on_finish) {
919     {
920       Mutex::Locker async_ops_locker(async_ops_lock);
921       if (!async_requests.empty()) {
922         ldout(cct, 10) << "canceling async requests: count="
923                        << async_requests.size() << dendl;
924         for (auto req : async_requests) {
925           ldout(cct, 10) << "canceling async request: " << req << dendl;
926           req->cancel();
927         }
928         async_requests_waiters.push_back(on_finish);
929         return;
930       }
931     }
932
933     on_finish->complete(0);
934   }
935
936   void ImageCtx::clear_pending_completions() {
937     Mutex::Locker l(completed_reqs_lock);
938     ldout(cct, 10) << "clear pending AioCompletion: count="
939                    << completed_reqs.size() << dendl;
940     completed_reqs.clear();
941   }
942
943   bool ImageCtx::_filter_metadata_confs(const string &prefix,
944                                         map<string, bool> &configs,
945                                         const map<string, bufferlist> &pairs,
946                                         map<string, bufferlist> *res) {
947     size_t conf_prefix_len = prefix.size();
948
949     for (auto it : pairs) {
950       if (it.first.compare(0, MIN(conf_prefix_len, it.first.size()), prefix) > 0)
951         return false;
952
953       if (it.first.size() <= conf_prefix_len)
954         continue;
955
956       string key = it.first.substr(conf_prefix_len, it.first.size() - conf_prefix_len);
957       auto cit = configs.find(key);
958       if (cit != configs.end()) {
959         cit->second = true;
960         res->insert(make_pair(key, it.second));
961       }
962     }
963     return true;
964   }
965
966   void ImageCtx::apply_metadata(const std::map<std::string, bufferlist> &meta) {
967     ldout(cct, 20) << __func__ << dendl;
968     std::map<string, bool> configs = boost::assign::map_list_of(
969         "rbd_non_blocking_aio", false)(
970         "rbd_cache", false)(
971         "rbd_cache_writethrough_until_flush", false)(
972         "rbd_cache_size", false)(
973         "rbd_cache_max_dirty", false)(
974         "rbd_cache_target_dirty", false)(
975         "rbd_cache_max_dirty_age", false)(
976         "rbd_cache_max_dirty_object", false)(
977         "rbd_cache_block_writes_upfront", false)(
978         "rbd_concurrent_management_ops", false)(
979         "rbd_balance_snap_reads", false)(
980         "rbd_localize_snap_reads", false)(
981         "rbd_balance_parent_reads", false)(
982         "rbd_localize_parent_reads", false)(
983         "rbd_readahead_trigger_requests", false)(
984         "rbd_readahead_max_bytes", false)(
985         "rbd_readahead_disable_after_bytes", false)(
986         "rbd_clone_copy_on_read", false)(
987         "rbd_blacklist_on_break_lock", false)(
988         "rbd_blacklist_expire_seconds", false)(
989         "rbd_request_timed_out_seconds", false)(
990         "rbd_journal_order", false)(
991         "rbd_journal_splay_width", false)(
992         "rbd_journal_commit_age", false)(
993         "rbd_journal_object_flush_interval", false)(
994         "rbd_journal_object_flush_bytes", false)(
995         "rbd_journal_object_flush_age", false)(
996         "rbd_journal_pool", false)(
997         "rbd_journal_max_payload_bytes", false)(
998         "rbd_journal_max_concurrent_object_sets", false)(
999         "rbd_mirroring_resync_after_disconnect", false)(
1000         "rbd_mirroring_replay_delay", false)(
1001         "rbd_skip_partial_discard", false);
1002
1003     md_config_t local_config_t;
1004     std::map<std::string, bufferlist> res;
1005
1006     _filter_metadata_confs(METADATA_CONF_PREFIX, configs, meta, &res);
1007     for (auto it : res) {
1008       std::string val(it.second.c_str(), it.second.length());
1009       int j = local_config_t.set_val(it.first.c_str(), val);
1010       if (j < 0) {
1011         lderr(cct) << __func__ << " failed to set config " << it.first
1012                    << " with value " << it.second.c_str() << ": " << j
1013                    << dendl;
1014       }
1015     }
1016
1017 #define ASSIGN_OPTION(config, type)                                            \
1018     do {                                                                       \
1019       string key = "rbd_";                                                     \
1020       key = key + #config;                                                     \
1021       if (configs[key])                                                        \
1022         config = local_config_t.get_val<type>("rbd_"#config);                  \
1023       else                                                                     \
1024         config = cct->_conf->get_val<type>("rbd_"#config);                     \
1025     } while (0);
1026
1027     ASSIGN_OPTION(non_blocking_aio, bool);
1028     ASSIGN_OPTION(cache, bool);
1029     ASSIGN_OPTION(cache_writethrough_until_flush, bool);
1030     ASSIGN_OPTION(cache_size, int64_t);
1031     ASSIGN_OPTION(cache_max_dirty, int64_t);
1032     ASSIGN_OPTION(cache_target_dirty, int64_t);
1033     ASSIGN_OPTION(cache_max_dirty_age, double);
1034     ASSIGN_OPTION(cache_max_dirty_object, int64_t);
1035     ASSIGN_OPTION(cache_block_writes_upfront, bool);
1036     ASSIGN_OPTION(concurrent_management_ops, int64_t);
1037     ASSIGN_OPTION(balance_snap_reads, bool);
1038     ASSIGN_OPTION(localize_snap_reads, bool);
1039     ASSIGN_OPTION(balance_parent_reads, bool);
1040     ASSIGN_OPTION(localize_parent_reads, bool);
1041     ASSIGN_OPTION(readahead_trigger_requests, int64_t);
1042     ASSIGN_OPTION(readahead_max_bytes, int64_t);
1043     ASSIGN_OPTION(readahead_disable_after_bytes, int64_t);
1044     ASSIGN_OPTION(clone_copy_on_read, bool);
1045     ASSIGN_OPTION(blacklist_on_break_lock, bool);
1046     ASSIGN_OPTION(blacklist_expire_seconds, int64_t);
1047     ASSIGN_OPTION(request_timed_out_seconds, int64_t);
1048     ASSIGN_OPTION(enable_alloc_hint, bool);
1049     ASSIGN_OPTION(journal_order, uint64_t);
1050     ASSIGN_OPTION(journal_splay_width, uint64_t);
1051     ASSIGN_OPTION(journal_commit_age, double);
1052     ASSIGN_OPTION(journal_object_flush_interval, int64_t);
1053     ASSIGN_OPTION(journal_object_flush_bytes, int64_t);
1054     ASSIGN_OPTION(journal_object_flush_age, double);
1055     ASSIGN_OPTION(journal_pool, std::string);
1056     ASSIGN_OPTION(journal_max_payload_bytes, uint64_t);
1057     ASSIGN_OPTION(journal_max_concurrent_object_sets, int64_t);
1058     ASSIGN_OPTION(mirroring_resync_after_disconnect, bool);
1059     ASSIGN_OPTION(mirroring_replay_delay, int64_t);
1060     ASSIGN_OPTION(skip_partial_discard, bool);
1061     ASSIGN_OPTION(blkin_trace_all, bool);
1062   }
1063
1064   ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
1065     return new ExclusiveLock<ImageCtx>(*this);
1066   }
1067
1068   ObjectMap<ImageCtx> *ImageCtx::create_object_map(uint64_t snap_id) {
1069     return new ObjectMap<ImageCtx>(*this, snap_id);
1070   }
1071
1072   Journal<ImageCtx> *ImageCtx::create_journal() {
1073     return new Journal<ImageCtx>(*this);
1074   }
1075
1076   void ImageCtx::set_image_name(const std::string &image_name) {
1077     // update the name so rename can be invoked repeatedly
1078     RWLock::RLocker owner_locker(owner_lock);
1079     RWLock::WLocker snap_locker(snap_lock);
1080     name = image_name;
1081     if (old_format) {
1082       header_oid = util::old_header_name(image_name);
1083     }
1084   }
1085
1086   void ImageCtx::notify_update() {
1087     state->handle_update_notification();
1088     ImageWatcher<>::notify_header_update(md_ctx, header_oid);
1089   }
1090
1091   void ImageCtx::notify_update(Context *on_finish) {
1092     state->handle_update_notification();
1093     image_watcher->notify_header_update(on_finish);
1094   }
1095
1096   exclusive_lock::Policy *ImageCtx::get_exclusive_lock_policy() const {
1097     assert(owner_lock.is_locked());
1098     assert(exclusive_lock_policy != nullptr);
1099     return exclusive_lock_policy;
1100   }
1101
1102   void ImageCtx::set_exclusive_lock_policy(exclusive_lock::Policy *policy) {
1103     assert(owner_lock.is_wlocked());
1104     assert(policy != nullptr);
1105     delete exclusive_lock_policy;
1106     exclusive_lock_policy = policy;
1107   }
1108
1109   journal::Policy *ImageCtx::get_journal_policy() const {
1110     assert(snap_lock.is_locked());
1111     assert(journal_policy != nullptr);
1112     return journal_policy;
1113   }
1114
1115   void ImageCtx::set_journal_policy(journal::Policy *policy) {
1116     assert(snap_lock.is_wlocked());
1117     assert(policy != nullptr);
1118     delete journal_policy;
1119     journal_policy = policy;
1120   }
1121
1122   void ImageCtx::get_thread_pool_instance(CephContext *cct,
1123                                           ThreadPool **thread_pool,
1124                                           ContextWQ **op_work_queue) {
1125     ThreadPoolSingleton *thread_pool_singleton;
1126     cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
1127       thread_pool_singleton, "librbd::thread_pool");
1128     *thread_pool = thread_pool_singleton;
1129     *op_work_queue = thread_pool_singleton->op_work_queue;
1130   }
1131
1132   void ImageCtx::get_timer_instance(CephContext *cct, SafeTimer **timer,
1133                                     Mutex **timer_lock) {
1134     SafeTimerSingleton *safe_timer_singleton;
1135     cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
1136       safe_timer_singleton, "librbd::journal::safe_timer");
1137     *timer = safe_timer_singleton;
1138     *timer_lock = &safe_timer_singleton->lock;
1139   }
1140 }