Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / io / ObjectRequest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/ObjectRequest.h"
5 #include "common/ceph_context.h"
6 #include "common/dout.h"
7 #include "common/errno.h"
8 #include "common/Mutex.h"
9 #include "common/RWLock.h"
10 #include "common/WorkQueue.h"
11 #include "include/Context.h"
12 #include "include/err.h"
13
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ObjectMap.h"
17 #include "librbd/Utils.h"
18 #include "librbd/io/AioCompletion.h"
19 #include "librbd/io/CopyupRequest.h"
20 #include "librbd/io/ImageRequest.h"
21 #include "librbd/io/ReadResult.h"
22
23 #include <boost/bind.hpp>
24 #include <boost/optional.hpp>
25
26 #define dout_subsys ceph_subsys_rbd
27 #undef dout_prefix
28 #define dout_prefix *_dout << "librbd::io::ObjectRequest: " << this \
29                            << " " << __func__ << ": "
30
31 namespace librbd {
32 namespace io {
33
34 template <typename I>
35 ObjectRequest<I>*
36 ObjectRequest<I>::create_remove(I *ictx, const std::string &oid,
37                                 uint64_t object_no,
38                                 const ::SnapContext &snapc,
39                                 const ZTracer::Trace &parent_trace,
40                                 Context *completion) {
41   return new ObjectRemoveRequest(util::get_image_ctx(ictx), oid, object_no,
42                                  snapc, parent_trace, completion);
43 }
44
45 template <typename I>
46 ObjectRequest<I>*
47 ObjectRequest<I>::create_truncate(I *ictx, const std::string &oid,
48                                   uint64_t object_no, uint64_t object_off,
49                                   const ::SnapContext &snapc,
50                                   const ZTracer::Trace &parent_trace,
51                                   Context *completion) {
52   return new ObjectTruncateRequest(util::get_image_ctx(ictx), oid, object_no,
53                                    object_off, snapc, parent_trace, completion);
54 }
55
56 template <typename I>
57 ObjectRequest<I>*
58 ObjectRequest<I>::create_trim(I *ictx, const std::string &oid,
59                               uint64_t object_no, const ::SnapContext &snapc,
60                               bool post_object_map_update,
61                               Context *completion) {
62   return new ObjectTrimRequest(util::get_image_ctx(ictx), oid, object_no,
63                                snapc, post_object_map_update, completion);
64 }
65
66 template <typename I>
67 ObjectRequest<I>*
68 ObjectRequest<I>::create_write(I *ictx, const std::string &oid,
69                                uint64_t object_no, uint64_t object_off,
70                                const ceph::bufferlist &data,
71                                const ::SnapContext &snapc, int op_flags,
72                                const ZTracer::Trace &parent_trace,
73                                Context *completion) {
74   return new ObjectWriteRequest(util::get_image_ctx(ictx), oid, object_no,
75                                 object_off, data, snapc, op_flags, parent_trace,
76                                 completion);
77 }
78
79 template <typename I>
80 ObjectRequest<I>*
81 ObjectRequest<I>::create_zero(I *ictx, const std::string &oid,
82                               uint64_t object_no, uint64_t object_off,
83                               uint64_t object_len,
84                               const ::SnapContext &snapc,
85                               const ZTracer::Trace &parent_trace,
86                               Context *completion) {
87   return new ObjectZeroRequest(util::get_image_ctx(ictx), oid, object_no,
88                                object_off, object_len, snapc, parent_trace,
89                                completion);
90 }
91
92 template <typename I>
93 ObjectRequest<I>*
94 ObjectRequest<I>::create_writesame(I *ictx, const std::string &oid,
95                                    uint64_t object_no, uint64_t object_off,
96                                    uint64_t object_len,
97                                    const ceph::bufferlist &data,
98                                    const ::SnapContext &snapc, int op_flags,
99                                    const ZTracer::Trace &parent_trace,
100                                    Context *completion) {
101   return new ObjectWriteSameRequest(util::get_image_ctx(ictx), oid, object_no,
102                                     object_off, object_len, data, snapc,
103                                     op_flags, parent_trace, completion);
104 }
105
106 template <typename I>
107 ObjectRequest<I>*
108 ObjectRequest<I>::create_compare_and_write(I *ictx, const std::string &oid,
109                                            uint64_t object_no, uint64_t object_off,
110                                            const ceph::bufferlist &cmp_data,
111                                            const ceph::bufferlist &write_data,
112                                            const ::SnapContext &snapc,
113                                            uint64_t *mismatch_offset,
114                                            int op_flags,
115                                            const ZTracer::Trace &parent_trace,
116                                            Context *completion) {
117   return new ObjectCompareAndWriteRequest(util::get_image_ctx(ictx), oid,
118                                           object_no, object_off, cmp_data,
119                                           write_data, snapc, mismatch_offset,
120                                           op_flags, parent_trace, completion);
121 }
122
123 template <typename I>
124 ObjectRequest<I>::ObjectRequest(ImageCtx *ictx, const std::string &oid,
125                                 uint64_t objectno, uint64_t off,
126                                 uint64_t len, librados::snap_t snap_id,
127                                 bool hide_enoent, const char *trace_name,
128                                 const ZTracer::Trace &trace,
129                                 Context *completion)
130   : m_ictx(ictx), m_oid(oid), m_object_no(objectno), m_object_off(off),
131     m_object_len(len), m_snap_id(snap_id), m_completion(completion),
132     m_hide_enoent(hide_enoent),
133     m_trace(util::create_trace(*ictx, "", trace)) {
134   if (m_trace.valid()) {
135     m_trace.copy_name(trace_name + std::string(" ") + oid);
136     m_trace.event("start");
137   }
138
139   Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, m_object_no,
140                           0, m_ictx->layout.object_size, m_parent_extents);
141
142   RWLock::RLocker snap_locker(m_ictx->snap_lock);
143   RWLock::RLocker parent_locker(m_ictx->parent_lock);
144   compute_parent_extents();
145 }
146
147 template <typename I>
148 void ObjectRequest<I>::complete(int r)
149 {
150   if (should_complete(r)) {
151     ldout(m_ictx->cct, 20) << dendl;
152     if (m_hide_enoent && r == -ENOENT) {
153       r = 0;
154     }
155     m_completion->complete(r);
156     delete this;
157   }
158 }
159
160 template <typename I>
161 bool ObjectRequest<I>::compute_parent_extents() {
162   assert(m_ictx->snap_lock.is_locked());
163   assert(m_ictx->parent_lock.is_locked());
164
165   uint64_t parent_overlap;
166   int r = m_ictx->get_parent_overlap(m_snap_id, &parent_overlap);
167   if (r < 0) {
168     // NOTE: it's possible for a snapshot to be deleted while we are
169     // still reading from it
170     lderr(m_ictx->cct) << "failed to retrieve parent overlap: "
171                        << cpp_strerror(r)
172                        << dendl;
173     m_has_parent = false;
174     m_parent_extents.clear();
175     return false;
176   }
177
178   uint64_t object_overlap = m_ictx->prune_parent_extents(
179     m_parent_extents, parent_overlap);
180   if (object_overlap > 0) {
181     ldout(m_ictx->cct, 20) << "overlap " << parent_overlap << " "
182                            << "extents " << m_parent_extents << dendl;
183     m_has_parent = !m_parent_extents.empty();
184     return true;
185   }
186   return false;
187 }
188
189 static inline bool is_copy_on_read(ImageCtx *ictx, librados::snap_t snap_id) {
190   assert(ictx->snap_lock.is_locked());
191   return (ictx->clone_copy_on_read &&
192           !ictx->read_only && snap_id == CEPH_NOSNAP &&
193           (ictx->exclusive_lock == nullptr ||
194            ictx->exclusive_lock->is_lock_owner()));
195 }
196
197 /** read **/
198
199 template <typename I>
200 ObjectReadRequest<I>::ObjectReadRequest(I *ictx, const std::string &oid,
201                                         uint64_t objectno, uint64_t offset,
202                                         uint64_t len, Extents& be,
203                                         librados::snap_t snap_id, bool sparse,
204                                         int op_flags,
205                                         const ZTracer::Trace &parent_trace,
206                                         Context *completion)
207   : ObjectRequest<I>(util::get_image_ctx(ictx), oid, objectno, offset, len,
208                      snap_id, false, "read", parent_trace, completion),
209     m_buffer_extents(be), m_tried_parent(false), m_sparse(sparse),
210     m_op_flags(op_flags), m_state(LIBRBD_AIO_READ_FLAT) {
211   guard_read();
212 }
213
214 template <typename I>
215 void ObjectReadRequest<I>::guard_read()
216 {
217   ImageCtx *image_ctx = this->m_ictx;
218   RWLock::RLocker snap_locker(image_ctx->snap_lock);
219   RWLock::RLocker parent_locker(image_ctx->parent_lock);
220
221   if (this->has_parent()) {
222     ldout(image_ctx->cct, 20) << "guarding read" << dendl;
223     m_state = LIBRBD_AIO_READ_GUARD;
224   }
225 }
226
227 template <typename I>
228 bool ObjectReadRequest<I>::should_complete(int r)
229 {
230   ImageCtx *image_ctx = this->m_ictx;
231   ldout(image_ctx->cct, 20) << this->m_oid << " "
232                             << this->m_object_off << "~" << this->m_object_len
233                             << " r = " << r << dendl;
234
235   bool finished = true;
236
237   switch (m_state) {
238   case LIBRBD_AIO_READ_GUARD:
239     ldout(image_ctx->cct, 20) << "READ_CHECK_GUARD" << dendl;
240
241     // This is the step to read from parent
242     if (!m_tried_parent && r == -ENOENT) {
243       {
244         RWLock::RLocker snap_locker(image_ctx->snap_lock);
245         RWLock::RLocker parent_locker(image_ctx->parent_lock);
246         if (image_ctx->parent == NULL) {
247           ldout(image_ctx->cct, 20) << "parent is gone; do nothing" << dendl;
248           break;
249         }
250
251         // calculate reverse mapping onto the image
252         vector<pair<uint64_t,uint64_t> > parent_extents;
253         Striper::extent_to_file(image_ctx->cct, &image_ctx->layout,
254                                 this->m_object_no, this->m_object_off,
255                                 this->m_object_len, parent_extents);
256
257         uint64_t parent_overlap = 0;
258         uint64_t object_overlap = 0;
259         r = image_ctx->get_parent_overlap(this->m_snap_id, &parent_overlap);
260         if (r == 0) {
261           object_overlap = image_ctx->prune_parent_extents(parent_extents,
262                                                            parent_overlap);
263         }
264
265         if (object_overlap > 0) {
266           m_tried_parent = true;
267           if (is_copy_on_read(image_ctx, this->m_snap_id)) {
268             m_state = LIBRBD_AIO_READ_COPYUP;
269           }
270
271           read_from_parent(std::move(parent_extents));
272           finished = false;
273         }
274       }
275     }
276     break;
277   case LIBRBD_AIO_READ_COPYUP:
278     ldout(image_ctx->cct, 20) << "READ_COPYUP" << dendl;
279     // This is the extra step for copy-on-read: kick off an asynchronous copyup.
280     // It is different from copy-on-write as asynchronous copyup will finish
281     // by itself so state won't go back to LIBRBD_AIO_READ_GUARD.
282
283     assert(m_tried_parent);
284     if (r > 0) {
285       // If read entire object from parent success and CoR is possible, kick
286       // off a asynchronous copyup. This approach minimizes the latency
287       // impact.
288       send_copyup();
289     }
290     break;
291   case LIBRBD_AIO_READ_FLAT:
292     ldout(image_ctx->cct, 20) << "READ_FLAT" << dendl;
293     // The read content should be deposit in m_read_data
294     break;
295   default:
296     lderr(image_ctx->cct) << "invalid request state: " << m_state << dendl;
297     ceph_abort();
298   }
299
300   return finished;
301 }
302
303 template <typename I>
304 void ObjectReadRequest<I>::send() {
305   ImageCtx *image_ctx = this->m_ictx;
306   ldout(image_ctx->cct, 20) << this->m_oid << " " << this->m_object_off
307                             << "~" << this->m_object_len
308                             << dendl;
309
310   {
311     RWLock::RLocker snap_locker(image_ctx->snap_lock);
312
313     // send read request to parent if the object doesn't exist locally
314     if (image_ctx->object_map != nullptr &&
315         !image_ctx->object_map->object_may_exist(this->m_object_no)) {
316       image_ctx->op_work_queue->queue(util::create_context_callback<
317         ObjectRequest<I> >(this), -ENOENT);
318       return;
319     }
320   }
321
322   librados::ObjectReadOperation op;
323   int flags = image_ctx->get_read_flags(this->m_snap_id);
324   if (m_sparse) {
325     op.sparse_read(this->m_object_off, this->m_object_len, &m_ext_map,
326                    &m_read_data, nullptr);
327   } else {
328     op.read(this->m_object_off, this->m_object_len, &m_read_data, nullptr);
329   }
330   op.set_op_flags2(m_op_flags);
331
332   librados::AioCompletion *rados_completion =
333     util::create_rados_callback(this);
334   int r = image_ctx->data_ctx.aio_operate(
335     this->m_oid, rados_completion, &op, flags, nullptr,
336     (this->m_trace.valid() ? this->m_trace.get_info() : nullptr));
337   assert(r == 0);
338
339   rados_completion->release();
340 }
341
342 template <typename I>
343 void ObjectReadRequest<I>::send_copyup()
344 {
345   ImageCtx *image_ctx = this->m_ictx;
346   ldout(image_ctx->cct, 20) << this->m_oid << " " << this->m_object_off
347                             << "~" << this->m_object_len << dendl;
348
349   {
350     RWLock::RLocker snap_locker(image_ctx->snap_lock);
351     RWLock::RLocker parent_locker(image_ctx->parent_lock);
352     if (!this->compute_parent_extents() ||
353         (image_ctx->exclusive_lock != nullptr &&
354          !image_ctx->exclusive_lock->is_lock_owner())) {
355       return;
356     }
357   }
358
359   Mutex::Locker copyup_locker(image_ctx->copyup_list_lock);
360   map<uint64_t, CopyupRequest*>::iterator it =
361     image_ctx->copyup_list.find(this->m_object_no);
362   if (it == image_ctx->copyup_list.end()) {
363     // create and kick off a CopyupRequest
364     CopyupRequest *new_req = new CopyupRequest(
365       image_ctx, this->m_oid, this->m_object_no,
366       std::move(this->m_parent_extents), this->m_trace);
367     this->m_parent_extents.clear();
368
369     image_ctx->copyup_list[this->m_object_no] = new_req;
370     new_req->send();
371   }
372 }
373
374 template <typename I>
375 void ObjectReadRequest<I>::read_from_parent(Extents&& parent_extents)
376 {
377   ImageCtx *image_ctx = this->m_ictx;
378   AioCompletion *parent_completion = AioCompletion::create_and_start<
379     ObjectRequest<I> >(this, image_ctx, AIO_TYPE_READ);
380
381   ldout(image_ctx->cct, 20) << "parent completion " << parent_completion
382                             << " extents " << parent_extents << dendl;
383   ImageRequest<>::aio_read(image_ctx->parent, parent_completion,
384                            std::move(parent_extents),
385                            ReadResult{&m_read_data}, 0, this->m_trace);
386 }
387
388 /** write **/
389
390 AbstractObjectWriteRequest::AbstractObjectWriteRequest(ImageCtx *ictx,
391                                                        const std::string &oid,
392                                                        uint64_t object_no,
393                                                        uint64_t object_off,
394                                                        uint64_t len,
395                                                        const ::SnapContext &snapc,
396                                                        bool hide_enoent,
397                                                        const char *trace_name,
398                                                        const ZTracer::Trace &parent_trace,
399                                                        Context *completion)
400   : ObjectRequest(ictx, oid, object_no, object_off, len, CEPH_NOSNAP,
401                   hide_enoent, trace_name, parent_trace, completion),
402     m_state(LIBRBD_AIO_WRITE_FLAT), m_snap_seq(snapc.seq.val)
403 {
404   m_snaps.insert(m_snaps.end(), snapc.snaps.begin(), snapc.snaps.end());
405 }
406
407 void AbstractObjectWriteRequest::guard_write()
408 {
409   if (has_parent()) {
410     m_state = LIBRBD_AIO_WRITE_GUARD;
411     m_write.assert_exists();
412     ldout(m_ictx->cct, 20) << "guarding write" << dendl;
413   }
414 }
415
416 bool AbstractObjectWriteRequest::should_complete(int r)
417 {
418   ldout(m_ictx->cct, 20) << get_op_type() << m_oid << " "
419                          << m_object_off << "~" << m_object_len
420                          << " r = " << r << dendl;
421
422   bool finished = true;
423   switch (m_state) {
424   case LIBRBD_AIO_WRITE_PRE:
425     ldout(m_ictx->cct, 20) << "WRITE_PRE" << dendl;
426     if (r < 0) {
427       return true;
428     }
429
430     send_write_op();
431     finished = false;
432     break;
433
434   case LIBRBD_AIO_WRITE_POST:
435     ldout(m_ictx->cct, 20) << "WRITE_POST" << dendl;
436     finished = true;
437     break;
438
439   case LIBRBD_AIO_WRITE_GUARD:
440     ldout(m_ictx->cct, 20) << "WRITE_CHECK_GUARD" << dendl;
441
442     if (r == -ENOENT) {
443       handle_write_guard();
444       finished = false;
445       break;
446     } else if (r < 0) {
447       // pass the error code to the finish context
448       m_state = LIBRBD_AIO_WRITE_ERROR;
449       complete(r);
450       finished = false;
451       break;
452     }
453
454     finished = send_post_object_map_update();
455     break;
456
457   case LIBRBD_AIO_WRITE_COPYUP:
458     ldout(m_ictx->cct, 20) << "WRITE_COPYUP" << dendl;
459     if (r < 0) {
460       m_state = LIBRBD_AIO_WRITE_ERROR;
461       complete(r);
462       finished = false;
463     } else {
464       finished = send_post_object_map_update();
465     }
466     break;
467
468   case LIBRBD_AIO_WRITE_FLAT:
469     ldout(m_ictx->cct, 20) << "WRITE_FLAT" << dendl;
470
471     finished = send_post_object_map_update();
472     break;
473
474   case LIBRBD_AIO_WRITE_ERROR:
475     assert(r < 0);
476     lderr(m_ictx->cct) << "WRITE_ERROR: " << cpp_strerror(r) << dendl;
477     break;
478
479   default:
480     lderr(m_ictx->cct) << "invalid request state: " << m_state << dendl;
481     ceph_abort();
482   }
483
484   return finished;
485 }
486
487 void AbstractObjectWriteRequest::send() {
488   ldout(m_ictx->cct, 20) << get_op_type() << " " << m_oid << " "
489                          << m_object_off << "~" << m_object_len << dendl;
490   {
491     RWLock::RLocker snap_lock(m_ictx->snap_lock);
492     if (m_ictx->object_map == nullptr) {
493       m_object_exist = true;
494     } else {
495       // should have been flushed prior to releasing lock
496       assert(m_ictx->exclusive_lock->is_lock_owner());
497       m_object_exist = m_ictx->object_map->object_may_exist(m_object_no);
498     }
499   }
500
501   send_write();
502 }
503
504 void AbstractObjectWriteRequest::send_pre_object_map_update() {
505   ldout(m_ictx->cct, 20) << dendl;
506
507   {
508     RWLock::RLocker snap_lock(m_ictx->snap_lock);
509     if (m_ictx->object_map != nullptr) {
510       uint8_t new_state;
511       pre_object_map_update(&new_state);
512       RWLock::WLocker object_map_locker(m_ictx->object_map_lock);
513       ldout(m_ictx->cct, 20) << m_oid << " " << m_object_off
514                              << "~" << m_object_len << dendl;
515       m_state = LIBRBD_AIO_WRITE_PRE;
516
517       if (m_ictx->object_map->aio_update<ObjectRequest>(
518             CEPH_NOSNAP, m_object_no, new_state, {}, this->m_trace, this)) {
519         return;
520       }
521     }
522   }
523
524   send_write_op();
525 }
526
527 bool AbstractObjectWriteRequest::send_post_object_map_update() {
528   ldout(m_ictx->cct, 20) << dendl;
529
530   RWLock::RLocker snap_locker(m_ictx->snap_lock);
531   if (m_ictx->object_map == nullptr || !post_object_map_update()) {
532     return true;
533   }
534
535   // should have been flushed prior to releasing lock
536   assert(m_ictx->exclusive_lock->is_lock_owner());
537
538   RWLock::WLocker object_map_locker(m_ictx->object_map_lock);
539   ldout(m_ictx->cct, 20) << m_oid << " " << m_object_off
540                          << "~" << m_object_len << dendl;
541   m_state = LIBRBD_AIO_WRITE_POST;
542
543   if (m_ictx->object_map->aio_update<ObjectRequest>(
544         CEPH_NOSNAP, m_object_no, OBJECT_NONEXISTENT, OBJECT_PENDING,
545         this->m_trace, this)) {
546     return false;
547   }
548
549   return true;
550 }
551
552 void AbstractObjectWriteRequest::send_write() {
553   ldout(m_ictx->cct, 20) << m_oid << " " << m_object_off << "~" << m_object_len
554                          << " object exist " << m_object_exist << dendl;
555
556   if (!m_object_exist && has_parent()) {
557     m_state = LIBRBD_AIO_WRITE_GUARD;
558     handle_write_guard();
559   } else {
560     send_pre_object_map_update();
561   }
562 }
563
564 void AbstractObjectWriteRequest::send_copyup()
565 {
566   ldout(m_ictx->cct, 20) << m_oid << " " << m_object_off
567                          << "~" << m_object_len << dendl;
568   m_state = LIBRBD_AIO_WRITE_COPYUP;
569
570   m_ictx->copyup_list_lock.Lock();
571   map<uint64_t, CopyupRequest*>::iterator it =
572     m_ictx->copyup_list.find(m_object_no);
573   if (it == m_ictx->copyup_list.end()) {
574     CopyupRequest *new_req = new CopyupRequest(m_ictx, m_oid,
575                                                m_object_no,
576                                                std::move(m_parent_extents),
577                                                this->m_trace);
578     m_parent_extents.clear();
579
580     // make sure to wait on this CopyupRequest
581     new_req->append_request(this);
582     m_ictx->copyup_list[m_object_no] = new_req;
583
584     m_ictx->copyup_list_lock.Unlock();
585     new_req->send();
586   } else {
587     it->second->append_request(this);
588     m_ictx->copyup_list_lock.Unlock();
589   }
590 }
591 void AbstractObjectWriteRequest::send_write_op()
592 {
593   m_state = LIBRBD_AIO_WRITE_FLAT;
594   if (m_guard) {
595     guard_write();
596   }
597
598   add_write_ops(&m_write, true);
599   assert(m_write.size() != 0);
600
601   librados::AioCompletion *rados_completion =
602     util::create_rados_callback(this);
603   int r = m_ictx->data_ctx.aio_operate(
604     m_oid, rados_completion, &m_write, m_snap_seq, m_snaps,
605     (this->m_trace.valid() ? this->m_trace.get_info() : nullptr));
606   assert(r == 0);
607   rados_completion->release();
608 }
609 void AbstractObjectWriteRequest::handle_write_guard()
610 {
611   bool has_parent;
612   {
613     RWLock::RLocker snap_locker(m_ictx->snap_lock);
614     RWLock::RLocker parent_locker(m_ictx->parent_lock);
615     has_parent = compute_parent_extents();
616   }
617   // If parent still exists, overlap might also have changed.
618   if (has_parent) {
619     send_copyup();
620   } else {
621     // parent may have disappeared -- send original write again
622     ldout(m_ictx->cct, 20) << "should_complete(" << this
623                            << "): parent overlap now 0" << dendl;
624     send_write();
625   }
626 }
627
628 void ObjectWriteRequest::add_write_ops(librados::ObjectWriteOperation *wr,
629                                        bool set_hints) {
630   RWLock::RLocker snap_locker(m_ictx->snap_lock);
631   if (set_hints && m_ictx->enable_alloc_hint &&
632       (m_ictx->object_map == nullptr || !m_object_exist)) {
633     wr->set_alloc_hint(m_ictx->get_object_size(), m_ictx->get_object_size());
634   }
635
636   if (m_object_off == 0 && m_object_len == m_ictx->get_object_size()) {
637     wr->write_full(m_write_data);
638   } else {
639     wr->write(m_object_off, m_write_data);
640   }
641   wr->set_op_flags2(m_op_flags);
642 }
643
644 void ObjectWriteRequest::send_write() {
645   bool write_full = (m_object_off == 0 && m_object_len == m_ictx->get_object_size());
646   ldout(m_ictx->cct, 20) << m_oid << " " << m_object_off << "~" << m_object_len
647                          << " object exist " << m_object_exist
648                          << " write_full " << write_full << dendl;
649   if (write_full && !has_parent()) {
650     m_guard = false;
651   }
652
653   AbstractObjectWriteRequest::send_write();
654 }
655
656 void ObjectRemoveRequest::guard_write() {
657   // do nothing to disable write guard only if deep-copyup not required
658   RWLock::RLocker snap_locker(m_ictx->snap_lock);
659   if (!m_ictx->snaps.empty()) {
660     AbstractObjectWriteRequest::guard_write();
661   }
662 }
663 void ObjectRemoveRequest::send_write() {
664   ldout(m_ictx->cct, 20) << m_oid << " remove " << " object exist "
665                          << m_object_exist << dendl;
666   if (!m_object_exist && !has_parent()) {
667     m_state = LIBRBD_AIO_WRITE_FLAT;
668     Context *ctx = util::create_context_callback<ObjectRequest>(this);
669     m_ictx->op_work_queue->queue(ctx, 0);
670   } else {
671     send_pre_object_map_update();
672   }
673 }
674
675 void ObjectTruncateRequest::send_write() {
676   ldout(m_ictx->cct, 20) << m_oid << " truncate " << m_object_off
677                          << " object exist " << m_object_exist << dendl;
678   if (!m_object_exist && !has_parent()) {
679     m_state = LIBRBD_AIO_WRITE_FLAT;
680     Context *ctx = util::create_context_callback<ObjectRequest>(this);
681     m_ictx->op_work_queue->queue(ctx, 0);
682   } else {
683     AbstractObjectWriteRequest::send_write();
684   }
685 }
686
687 void ObjectZeroRequest::send_write() {
688   ldout(m_ictx->cct, 20) << m_oid << " zero " << m_object_off << "~"
689                          << m_object_len << " object exist " << m_object_exist
690                          << dendl;
691   if (!m_object_exist && !has_parent()) {
692     m_state = LIBRBD_AIO_WRITE_FLAT;
693     Context *ctx = util::create_context_callback<ObjectRequest>(this);
694     m_ictx->op_work_queue->queue(ctx, 0);
695   } else {
696     AbstractObjectWriteRequest::send_write();
697   }
698 }
699
700 void ObjectWriteSameRequest::add_write_ops(librados::ObjectWriteOperation *wr,
701                                            bool set_hints) {
702   RWLock::RLocker snap_locker(m_ictx->snap_lock);
703   if (set_hints && m_ictx->enable_alloc_hint &&
704       (m_ictx->object_map == nullptr || !m_object_exist)) {
705     wr->set_alloc_hint(m_ictx->get_object_size(), m_ictx->get_object_size());
706   }
707
708   wr->writesame(m_object_off, m_object_len, m_write_data);
709   wr->set_op_flags2(m_op_flags);
710 }
711
712 void ObjectWriteSameRequest::send_write() {
713   bool write_full = (m_object_off == 0 && m_object_len == m_ictx->get_object_size());
714   ldout(m_ictx->cct, 20) << m_oid << " " << m_object_off << "~" << m_object_len
715                          << " write_full " << write_full << dendl;
716   if (write_full && !has_parent()) {
717     m_guard = false;
718   }
719
720   AbstractObjectWriteRequest::send_write();
721 }
722
723 void ObjectCompareAndWriteRequest::add_write_ops(librados::ObjectWriteOperation *wr,
724                                                  bool set_hints) {
725   RWLock::RLocker snap_locker(m_ictx->snap_lock);
726
727   if (set_hints && m_ictx->enable_alloc_hint &&
728       (m_ictx->object_map == nullptr || !m_object_exist)) {
729     wr->set_alloc_hint(m_ictx->get_object_size(), m_ictx->get_object_size());
730   }
731
732   // add cmpext ops
733   wr->cmpext(m_object_off, m_cmp_bl, nullptr);
734
735   if (m_object_off == 0 && m_object_len == m_ictx->get_object_size()) {
736     wr->write_full(m_write_bl);
737   } else {
738     wr->write(m_object_off, m_write_bl);
739   }
740   wr->set_op_flags2(m_op_flags);
741 }
742
743 void ObjectCompareAndWriteRequest::send_write() {
744   bool write_full = (m_object_off == 0 &&
745                      m_object_len == m_ictx->get_object_size());
746   ldout(m_ictx->cct, 20) << "send_write " << this << " " << m_oid << " "
747                          << m_object_off << "~" << m_object_len
748                          << " object exist " << m_object_exist
749                          << " write_full " << write_full << dendl;
750   if (write_full && !has_parent()) {
751     m_guard = false;
752   }
753
754   AbstractObjectWriteRequest::send_write();
755 }
756
757 void ObjectCompareAndWriteRequest::complete(int r)
758 {
759   if (should_complete(r)) {
760     ImageCtx *image_ctx = this->m_ictx;
761     ldout(m_ictx->cct, 20) << "complete " << this << dendl;
762
763     if (this->m_hide_enoent && r == -ENOENT) {
764       r = 0;
765     }
766
767     vector<pair<uint64_t,uint64_t> > file_extents;
768     if (r <= -MAX_ERRNO) {
769       // object extent compare mismatch
770       uint64_t offset = -MAX_ERRNO - r;
771       Striper::extent_to_file(image_ctx->cct, &image_ctx->layout,
772                               this->m_object_no, offset, this->m_object_len,
773                               file_extents);
774
775       assert(file_extents.size() == 1);
776
777       uint64_t mismatch_offset = file_extents[0].first;
778       if (this->m_mismatch_offset)
779         *this->m_mismatch_offset = mismatch_offset;
780       r = -EILSEQ;
781     }
782
783     //compare and write object extent error
784     m_completion->complete(r);
785     delete this;
786   }
787 }
788
789 } // namespace io
790 } // namespace librbd
791
792 template class librbd::io::ObjectRequest<librbd::ImageCtx>;
793 template class librbd::io::ObjectReadRequest<librbd::ImageCtx>;