Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / io / ImageRequestWQ.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "librbd/ExclusiveLock.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Utils.h"
12 #include "librbd/exclusive_lock/Policy.h"
13 #include "librbd/io/AioCompletion.h"
14 #include "librbd/io/ImageRequest.h"
15
16 #define dout_subsys ceph_subsys_rbd
17 #undef dout_prefix
18 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
19                            << " " << __func__ << ": "
20
21 namespace librbd {
22 namespace io {
23
24 template <typename I>
25 struct ImageRequestWQ<I>::C_AcquireLock : public Context {
26   ImageRequestWQ *work_queue;
27   ImageRequest<I> *image_request;
28
29   C_AcquireLock(ImageRequestWQ *work_queue, ImageRequest<I> *image_request)
30     : work_queue(work_queue), image_request(image_request) {
31   }
32
33   void finish(int r) override {
34     work_queue->handle_acquire_lock(r, image_request);
35   }
36 };
37
38 template <typename I>
39 struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
40   ImageRequestWQ *work_queue;
41   C_BlockedWrites(ImageRequestWQ *_work_queue)
42     : work_queue(_work_queue) {
43   }
44
45   void finish(int r) override {
46     work_queue->handle_blocked_writes(r);
47   }
48 };
49
50 template <typename I>
51 struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
52   ImageRequestWQ *work_queue;
53   ImageRequest<I> *image_request;
54
55   C_RefreshFinish(ImageRequestWQ *work_queue,
56                   ImageRequest<I> *image_request)
57     : work_queue(work_queue), image_request(image_request) {
58   }
59   void finish(int r) override {
60     work_queue->handle_refreshed(r, image_request);
61   }
62 };
63
64 template <typename I>
65 ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
66                                   time_t ti, ThreadPool *tp)
67   : ThreadPool::PointerWQ<ImageRequest<I> >(name, ti, 0, tp),
68     m_image_ctx(*image_ctx),
69     m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
70   CephContext *cct = m_image_ctx.cct;
71   ldout(cct, 5) << "ictx=" << image_ctx << dendl;
72   this->register_work_queue();
73 }
74
75 template <typename I>
76 ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
77                                 ReadResult &&read_result, int op_flags) {
78   CephContext *cct = m_image_ctx.cct;
79   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
80                  << "len = " << len << dendl;
81
82   C_SaferCond cond;
83   AioCompletion *c = AioCompletion::create(&cond);
84   aio_read(c, off, len, std::move(read_result), op_flags, false);
85   return cond.wait();
86 }
87
88 template <typename I>
89 ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
90                                  bufferlist &&bl, int op_flags) {
91   CephContext *cct = m_image_ctx.cct;
92   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
93                  << "len = " << len << dendl;
94
95   m_image_ctx.snap_lock.get_read();
96   int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
97   m_image_ctx.snap_lock.put_read();
98   if (r < 0) {
99     lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
100     return r;
101   }
102
103   C_SaferCond cond;
104   AioCompletion *c = AioCompletion::create(&cond);
105   aio_write(c, off, len, std::move(bl), op_flags, false);
106
107   r = cond.wait();
108   if (r < 0) {
109     return r;
110   }
111   return len;
112 }
113
114 template <typename I>
115 ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
116                                    bool skip_partial_discard) {
117   CephContext *cct = m_image_ctx.cct;
118   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
119                  << "len = " << len << dendl;
120
121   m_image_ctx.snap_lock.get_read();
122   int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
123   m_image_ctx.snap_lock.put_read();
124   if (r < 0) {
125     lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
126     return r;
127   }
128
129   C_SaferCond cond;
130   AioCompletion *c = AioCompletion::create(&cond);
131   aio_discard(c, off, len, skip_partial_discard, false);
132
133   r = cond.wait();
134   if (r < 0) {
135     return r;
136   }
137   return len;
138 }
139
140 template <typename I>
141 ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
142                                      bufferlist &&bl, int op_flags) {
143   CephContext *cct = m_image_ctx.cct;
144   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
145                  << "len = " << len << ", data_len " << bl.length() << dendl;
146
147   m_image_ctx.snap_lock.get_read();
148   int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
149   m_image_ctx.snap_lock.put_read();
150   if (r < 0) {
151     lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
152     return r;
153   }
154
155   C_SaferCond cond;
156   AioCompletion *c = AioCompletion::create(&cond);
157   aio_writesame(c, off, len, std::move(bl), op_flags, false);
158
159   r = cond.wait();
160   if (r < 0) {
161     return r;
162   }
163   return len;
164 }
165
166 template <typename I>
167 ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
168                                              bufferlist &&cmp_bl,
169                                              bufferlist &&bl,
170                                              uint64_t *mismatch_off,
171                                              int op_flags){
172   CephContext *cct = m_image_ctx.cct;
173   ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
174                  << off << ", " << "len = " << len << dendl;
175
176   m_image_ctx.snap_lock.get_read();
177   int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
178   m_image_ctx.snap_lock.put_read();
179   if (r < 0) {
180     lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
181     return r;
182   }
183
184   C_SaferCond cond;
185   AioCompletion *c = AioCompletion::create(&cond);
186   aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
187                         mismatch_off, op_flags, false);
188
189   r = cond.wait();
190   if (r < 0) {
191     return r;
192   }
193
194   return len;
195 }
196
197 template <typename I>
198 void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
199                                  ReadResult &&read_result, int op_flags,
200                                  bool native_async) {
201   CephContext *cct = m_image_ctx.cct;
202   ZTracer::Trace trace;
203   if (m_image_ctx.blkin_trace_all) {
204     trace.init("wq: read", &m_image_ctx.trace_endpoint);
205     trace.event("start");
206   }
207
208   c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
209   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
210                  << "completion=" << c << ", off=" << off << ", "
211                  << "len=" << len << ", " << "flags=" << op_flags << dendl;
212
213   if (native_async && m_image_ctx.event_socket.is_valid()) {
214     c->set_event_notify(true);
215   }
216
217   if (!start_in_flight_io(c)) {
218     return;
219   }
220
221   // if journaling is enabled -- we need to replay the journal because
222   // it might contain an uncommitted write
223   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
224   if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
225       require_lock_on_read()) {
226     queue(ImageRequest<I>::create_read_request(
227             m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
228             trace));
229   } else {
230     c->start_op();
231     ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
232                               std::move(read_result), op_flags, trace);
233     finish_in_flight_io();
234   }
235   trace.event("finish");
236 }
237
238 template <typename I>
239 void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
240                                   bufferlist &&bl, int op_flags,
241                                   bool native_async) {
242   CephContext *cct = m_image_ctx.cct;
243   ZTracer::Trace trace;
244   if (m_image_ctx.blkin_trace_all) {
245     trace.init("wq: write", &m_image_ctx.trace_endpoint);
246     trace.event("init");
247   }
248
249   c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
250   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
251                  << "completion=" << c << ", off=" << off << ", "
252                  << "len=" << len << ", flags=" << op_flags << dendl;
253
254   if (native_async && m_image_ctx.event_socket.is_valid()) {
255     c->set_event_notify(true);
256   }
257
258   if (!start_in_flight_io(c)) {
259     return;
260   }
261
262   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
263   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
264     queue(ImageRequest<I>::create_write_request(
265             m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
266   } else {
267     c->start_op();
268     ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
269                                std::move(bl), op_flags, trace);
270     finish_in_flight_io();
271   }
272   trace.event("finish");
273 }
274
275 template <typename I>
276 void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
277                                     uint64_t len, bool skip_partial_discard,
278                                     bool native_async) {
279   CephContext *cct = m_image_ctx.cct;
280   ZTracer::Trace trace;
281   if (m_image_ctx.blkin_trace_all) {
282     trace.init("wq: discard", &m_image_ctx.trace_endpoint);
283     trace.event("init");
284   }
285
286   c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
287   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
288                  << "completion=" << c << ", off=" << off << ", len=" << len
289                  << dendl;
290
291   if (native_async && m_image_ctx.event_socket.is_valid()) {
292     c->set_event_notify(true);
293   }
294
295   if (!start_in_flight_io(c)) {
296     return;
297   }
298
299   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
300   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
301     queue(ImageRequest<I>::create_discard_request(
302             m_image_ctx, c, off, len, skip_partial_discard, trace));
303   } else {
304     c->start_op();
305     ImageRequest<I>::aio_discard(&m_image_ctx, c, off, len,
306                                  skip_partial_discard, trace);
307     finish_in_flight_io();
308   }
309   trace.event("finish");
310 }
311
312 template <typename I>
313 void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
314   CephContext *cct = m_image_ctx.cct;
315   ZTracer::Trace trace;
316   if (m_image_ctx.blkin_trace_all) {
317     trace.init("wq: flush", &m_image_ctx.trace_endpoint);
318     trace.event("init");
319   }
320
321   c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
322   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
323                  << "completion=" << c << dendl;
324
325   if (native_async && m_image_ctx.event_socket.is_valid()) {
326     c->set_event_notify(true);
327   }
328
329   if (!start_in_flight_io(c)) {
330     return;
331   }
332
333   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
334   if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
335     queue(ImageRequest<I>::create_flush_request(m_image_ctx, c, trace));
336   } else {
337     ImageRequest<I>::aio_flush(&m_image_ctx, c, trace);
338     finish_in_flight_io();
339   }
340   trace.event("finish");
341 }
342
343 template <typename I>
344 void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
345                                       uint64_t len, bufferlist &&bl,
346                                       int op_flags, bool native_async) {
347   CephContext *cct = m_image_ctx.cct;
348   ZTracer::Trace trace;
349   if (m_image_ctx.blkin_trace_all) {
350     trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
351     trace.event("init");
352   }
353
354   c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
355   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
356                  << "completion=" << c << ", off=" << off << ", "
357                  << "len=" << len << ", data_len = " << bl.length() << ", "
358                  << "flags=" << op_flags << dendl;
359
360   if (native_async && m_image_ctx.event_socket.is_valid()) {
361     c->set_event_notify(true);
362   }
363
364   if (!start_in_flight_io(c)) {
365     return;
366   }
367
368   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
369   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
370     queue(ImageRequest<I>::create_writesame_request(
371             m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
372   } else {
373     c->start_op();
374     ImageRequest<I>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl),
375                                    op_flags, trace);
376     finish_in_flight_io();
377   }
378   trace.event("finish");
379 }
380
381 template <typename I>
382 void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
383                                               uint64_t off, uint64_t len,
384                                               bufferlist &&cmp_bl,
385                                               bufferlist &&bl,
386                                               uint64_t *mismatch_off,
387                                               int op_flags, bool native_async) {
388   CephContext *cct = m_image_ctx.cct;
389   ZTracer::Trace trace;
390   if (m_image_ctx.blkin_trace_all) {
391     trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
392     trace.event("init");
393   }
394
395   c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
396   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
397                  << "completion=" << c << ", off=" << off << ", "
398                  << "len=" << len << dendl;
399
400   if (native_async && m_image_ctx.event_socket.is_valid()) {
401     c->set_event_notify(true);
402   }
403
404   if (!start_in_flight_io(c)) {
405     return;
406   }
407
408   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
409   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
410     queue(ImageRequest<I>::create_compare_and_write_request(
411             m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
412             mismatch_off, op_flags, trace));
413   } else {
414     c->start_op();
415     ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}},
416                                            std::move(cmp_bl), std::move(bl),
417                                            mismatch_off, op_flags, trace);
418     finish_in_flight_io();
419   }
420   trace.event("finish");
421 }
422
423 template <typename I>
424 void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
425   assert(m_image_ctx.owner_lock.is_locked());
426
427   {
428     RWLock::WLocker locker(m_lock);
429     assert(!m_shutdown);
430     m_shutdown = true;
431
432     CephContext *cct = m_image_ctx.cct;
433     ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
434                   << dendl;
435     if (m_in_flight_ios > 0) {
436       m_on_shutdown = on_shutdown;
437       return;
438     }
439   }
440
441   // ensure that all in-flight IO is flushed
442   m_image_ctx.flush(on_shutdown);
443 }
444
445 template <typename I>
446 int ImageRequestWQ<I>::block_writes() {
447   C_SaferCond cond_ctx;
448   block_writes(&cond_ctx);
449   return cond_ctx.wait();
450 }
451
452 template <typename I>
453 void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
454   assert(m_image_ctx.owner_lock.is_locked());
455   CephContext *cct = m_image_ctx.cct;
456
457   {
458     RWLock::WLocker locker(m_lock);
459     ++m_write_blockers;
460     ldout(cct, 5) << &m_image_ctx << ", " << "num="
461                   << m_write_blockers << dendl;
462     if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
463       m_write_blocker_contexts.push_back(on_blocked);
464       return;
465     }
466   }
467
468   // ensure that all in-flight IO is flushed
469   m_image_ctx.flush(on_blocked);
470 }
471
472 template <typename I>
473 void ImageRequestWQ<I>::unblock_writes() {
474   CephContext *cct = m_image_ctx.cct;
475
476   bool wake_up = false;
477   {
478     RWLock::WLocker locker(m_lock);
479     assert(m_write_blockers > 0);
480     --m_write_blockers;
481
482     ldout(cct, 5) << &m_image_ctx << ", " << "num="
483                   << m_write_blockers << dendl;
484     if (m_write_blockers == 0) {
485       wake_up = true;
486     }
487   }
488
489   if (wake_up) {
490     this->signal();
491   }
492 }
493
494 template <typename I>
495 void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
496   CephContext *cct = m_image_ctx.cct;
497   ldout(cct, 20) << dendl;
498
499   bool wake_up = false;
500   {
501     RWLock::WLocker locker(m_lock);
502     switch (direction) {
503     case DIRECTION_READ:
504       wake_up = (enabled != m_require_lock_on_read);
505       m_require_lock_on_read = enabled;
506       break;
507     case DIRECTION_WRITE:
508       wake_up = (enabled != m_require_lock_on_write);
509       m_require_lock_on_write = enabled;
510       break;
511     case DIRECTION_BOTH:
512       wake_up = (enabled != m_require_lock_on_read ||
513                  enabled != m_require_lock_on_write);
514       m_require_lock_on_read = enabled;
515       m_require_lock_on_write = enabled;
516       break;
517     }
518   }
519
520   // wake up the thread pool whenever the state changes so that
521   // we can re-request the lock if required
522   if (wake_up) {
523     this->signal();
524   }
525 }
526
527 template <typename I>
528 void *ImageRequestWQ<I>::_void_dequeue() {
529   CephContext *cct = m_image_ctx.cct;
530   ImageRequest<I> *peek_item = this->front();
531
532   // no queued IO requests or all IO is blocked/stalled
533   if (peek_item == nullptr || m_io_blockers.load() > 0) {
534     return nullptr;
535   }
536
537   bool lock_required;
538   bool refresh_required = m_image_ctx.state->is_refresh_required();
539   {
540     RWLock::RLocker locker(m_lock);
541     bool write_op = peek_item->is_write_op();
542     lock_required = is_lock_required(write_op);
543     if (write_op) {
544       if (!lock_required && m_write_blockers > 0) {
545         // missing lock is not the write blocker
546         return nullptr;
547       }
548
549       if (!lock_required && !refresh_required) {
550         // completed ops will requeue the IO -- don't count it as in-progress
551         m_in_flight_writes++;
552       }
553     }
554   }
555
556   ImageRequest<I> *item = reinterpret_cast<ImageRequest<I> *>(
557     ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue());
558   assert(peek_item == item);
559
560   if (lock_required) {
561     this->get_pool_lock().Unlock();
562     m_image_ctx.owner_lock.get_read();
563     if (m_image_ctx.exclusive_lock != nullptr) {
564       ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
565       if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
566         lderr(cct) << "op requires exclusive lock" << dendl;
567         fail_in_flight_io(-EROFS, item);
568
569         // wake up the IO since we won't be returning a request to process
570         this->signal();
571       } else {
572         // stall IO until the acquire completes
573         ++m_io_blockers;
574         m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item));
575       }
576     } else {
577       // raced with the exclusive lock being disabled
578       lock_required = false;
579     }
580     m_image_ctx.owner_lock.put_read();
581     this->get_pool_lock().Lock();
582
583     if (lock_required) {
584       return nullptr;
585     }
586   }
587
588   if (refresh_required) {
589     ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
590
591     // stall IO until the refresh completes
592     ++m_io_blockers;
593
594     this->get_pool_lock().Unlock();
595     m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
596     this->get_pool_lock().Lock();
597     return nullptr;
598   }
599
600   item->start_op();
601   return item;
602 }
603
604 template <typename I>
605 void ImageRequestWQ<I>::process(ImageRequest<I> *req) {
606   CephContext *cct = m_image_ctx.cct;
607   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
608                  << "req=" << req << dendl;
609
610   req->send();
611
612   finish_queued_io(req);
613   if (req->is_write_op()) {
614     finish_in_flight_write();
615   }
616   delete req;
617
618   finish_in_flight_io();
619 }
620
621 template <typename I>
622 void ImageRequestWQ<I>::finish_queued_io(ImageRequest<I> *req) {
623   RWLock::RLocker locker(m_lock);
624   if (req->is_write_op()) {
625     assert(m_queued_writes > 0);
626     m_queued_writes--;
627   } else {
628     assert(m_queued_reads > 0);
629     m_queued_reads--;
630   }
631 }
632
633 template <typename I>
634 void ImageRequestWQ<I>::finish_in_flight_write() {
635   bool writes_blocked = false;
636   {
637     RWLock::RLocker locker(m_lock);
638     assert(m_in_flight_writes > 0);
639     if (--m_in_flight_writes == 0 &&
640         !m_write_blocker_contexts.empty()) {
641       writes_blocked = true;
642     }
643   }
644
645   if (writes_blocked) {
646     m_image_ctx.flush(new C_BlockedWrites(this));
647   }
648 }
649
650 template <typename I>
651 int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
652   RWLock::RLocker locker(m_lock);
653
654   if (m_shutdown) {
655     CephContext *cct = m_image_ctx.cct;
656     lderr(cct) << "IO received on closed image" << dendl;
657
658     c->get();
659     c->fail(-ESHUTDOWN);
660     return false;
661   }
662
663   m_in_flight_ios++;
664   return true;
665 }
666
667 template <typename I>
668 void ImageRequestWQ<I>::finish_in_flight_io() {
669   Context *on_shutdown;
670   {
671     RWLock::RLocker locker(m_lock);
672     if (--m_in_flight_ios > 0 || !m_shutdown) {
673       return;
674     }
675     on_shutdown = m_on_shutdown;
676   }
677
678   CephContext *cct = m_image_ctx.cct;
679   ldout(cct, 5) << "completing shut down" << dendl;
680
681   assert(on_shutdown != nullptr);
682   m_image_ctx.flush(on_shutdown);
683 }
684
685 template <typename I>
686 void ImageRequestWQ<I>::fail_in_flight_io(int r, ImageRequest<I> *req) {
687   this->process_finish();
688   req->fail(r);
689   finish_queued_io(req);
690   delete req;
691   finish_in_flight_io();
692 }
693
694 template <typename I>
695 bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
696   assert(m_lock.is_locked());
697   return ((write_op && m_require_lock_on_write) ||
698           (!write_op && m_require_lock_on_read));
699 }
700
701 template <typename I>
702 void ImageRequestWQ<I>::queue(ImageRequest<I> *req) {
703   assert(m_image_ctx.owner_lock.is_locked());
704
705   CephContext *cct = m_image_ctx.cct;
706   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
707                  << "req=" << req << dendl;
708
709   if (req->is_write_op()) {
710     m_queued_writes++;
711   } else {
712     m_queued_reads++;
713   }
714
715   ThreadPool::PointerWQ<ImageRequest<I> >::queue(req);
716 }
717
718 template <typename I>
719 void ImageRequestWQ<I>::handle_acquire_lock(int r, ImageRequest<I> *req) {
720   CephContext *cct = m_image_ctx.cct;
721   ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
722
723   if (r < 0) {
724     fail_in_flight_io(r, req);
725   } else {
726     // since IO was stalled for acquire -- original IO order is preserved
727     // if we requeue this op for work queue processing
728     this->requeue(req);
729   }
730
731   assert(m_io_blockers.load() > 0);
732   --m_io_blockers;
733   this->signal();
734 }
735
736 template <typename I>
737 void ImageRequestWQ<I>::handle_refreshed(int r, ImageRequest<I> *req) {
738   CephContext *cct = m_image_ctx.cct;
739   ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
740                 << "req=" << req << dendl;
741   if (r < 0) {
742     fail_in_flight_io(r, req);
743   } else {
744     // since IO was stalled for refresh -- original IO order is preserved
745     // if we requeue this op for work queue processing
746     this->requeue(req);
747   }
748
749   assert(m_io_blockers.load() > 0);
750   --m_io_blockers;
751   this->signal();
752 }
753
754 template <typename I>
755 void ImageRequestWQ<I>::handle_blocked_writes(int r) {
756   Contexts contexts;
757   {
758     RWLock::WLocker locker(m_lock);
759     contexts.swap(m_write_blocker_contexts);
760   }
761
762   for (auto ctx : contexts) {
763     ctx->complete(0);
764   }
765 }
766
767 template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
768
769 } // namespace io
770 } // namespace librbd