Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / LibrbdWriteback.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include "common/ceph_context.h"
7 #include "common/dout.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/Context.h"
11 #include "include/rados/librados.hpp"
12 #include "include/rbd/librbd.hpp"
13
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/internal.h"
17 #include "librbd/LibrbdWriteback.h"
18 #include "librbd/ObjectMap.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Utils.h"
21 #include "librbd/io/AioCompletion.h"
22 #include "librbd/io/ObjectRequest.h"
23
24 #include "include/assert.h"
25
26 #define dout_subsys ceph_subsys_rbd
27 #undef dout_prefix
28 #define dout_prefix *_dout << "librbdwriteback: "
29
30 namespace librbd {
31
32   /**
33    * callback to finish a rados completion as a Context
34    *
35    * @param c completion
36    * @param arg Context* recast as void*
37    */
38   void context_cb(rados_completion_t c, void *arg)
39   {
40     Context *con = reinterpret_cast<Context *>(arg);
41     con->complete(rados_aio_get_return_value(c));
42   }
43
44   /**
45    * context to wrap another context in a Mutex
46    *
47    * @param cct cct
48    * @param c context to finish
49    * @param l mutex to lock
50    */
51   class C_ReadRequest : public Context {
52   public:
53     C_ReadRequest(CephContext *cct, Context *c, Mutex *cache_lock)
54       : m_cct(cct), m_ctx(c), m_cache_lock(cache_lock) {
55     }
56     void finish(int r) override {
57       ldout(m_cct, 20) << "aio_cb completing " << dendl;
58       {
59         Mutex::Locker cache_locker(*m_cache_lock);
60         m_ctx->complete(r);
61       }
62       ldout(m_cct, 20) << "aio_cb finished" << dendl;
63     }
64   private:
65     CephContext *m_cct;
66     Context *m_ctx;
67     Mutex *m_cache_lock;
68   };
69
70   class C_OrderedWrite : public Context {
71   public:
72     C_OrderedWrite(CephContext *cct, LibrbdWriteback::write_result_d *result,
73                    const ZTracer::Trace &trace, LibrbdWriteback *wb)
74       : m_cct(cct), m_result(result), m_trace(trace), m_wb_handler(wb) {}
75     ~C_OrderedWrite() override {}
76     void finish(int r) override {
77       ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl;
78       {
79         Mutex::Locker l(m_wb_handler->m_lock);
80         assert(!m_result->done);
81         m_result->done = true;
82         m_result->ret = r;
83         m_wb_handler->complete_writes(m_result->oid);
84       }
85       ldout(m_cct, 20) << "C_OrderedWrite finished " << m_result << dendl;
86       m_trace.event("finish");
87     }
88   private:
89     CephContext *m_cct;
90     LibrbdWriteback::write_result_d *m_result;
91     ZTracer::Trace m_trace;
92     LibrbdWriteback *m_wb_handler;
93   };
94
95   struct C_WriteJournalCommit : public Context {
96     typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
97
98     ImageCtx *image_ctx;
99     std::string oid;
100     uint64_t object_no;
101     uint64_t off;
102     bufferlist bl;
103     SnapContext snapc;
104     uint64_t journal_tid;
105     ZTracer::Trace trace;
106     Context *req_comp;
107     bool request_sent = false;
108
109     C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid,
110                          uint64_t _object_no, uint64_t _off,
111                          const bufferlist &_bl, const SnapContext& _snapc,
112                          uint64_t _journal_tid,
113                          const ZTracer::Trace &trace, Context *_req_comp)
114       : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off),
115         bl(_bl), snapc(_snapc), journal_tid(_journal_tid),
116         trace(trace), req_comp(_req_comp) {
117       CephContext *cct = image_ctx->cct;
118       ldout(cct, 20) << this << " C_WriteJournalCommit: "
119                      << "delaying write until journal tid "
120                      << journal_tid << " safe" << dendl;
121     }
122
123     void complete(int r) override {
124       if (request_sent || r < 0) {
125         if (request_sent && r == 0) {
126           // only commit IO events that are safely recorded to the backing image
127           // since the cache will retry all IOs that fail
128           commit_io_event_extent(0);
129         }
130
131         req_comp->complete(r);
132         delete this;
133       } else {
134         send_request();
135       }
136     }
137
138     void finish(int r) override {
139     }
140
141     void commit_io_event_extent(int r) {
142       CephContext *cct = image_ctx->cct;
143       ldout(cct, 20) << this << " C_WriteJournalCommit: "
144                      << "write committed: updating journal commit position"
145                      << dendl;
146
147       // all IO operations are flushed prior to closing the journal
148       assert(image_ctx->journal != NULL);
149
150       Extents file_extents;
151       Striper::extent_to_file(cct, &image_ctx->layout, object_no, off,
152                               bl.length(), file_extents);
153       for (Extents::iterator it = file_extents.begin();
154            it != file_extents.end(); ++it) {
155         image_ctx->journal->commit_io_event_extent(journal_tid, it->first,
156                                                    it->second, r);
157       }
158     }
159
160     void send_request() {
161       CephContext *cct = image_ctx->cct;
162       ldout(cct, 20) << this << " C_WriteJournalCommit: "
163                      << "journal committed: sending write request" << dendl;
164
165       assert(image_ctx->exclusive_lock->is_lock_owner());
166
167       request_sent = true;
168       auto req = new io::ObjectWriteRequest(image_ctx, oid, object_no, off,
169                                             bl, snapc, 0, trace, this);
170       req->send();
171     }
172   };
173
174   struct C_CommitIOEventExtent : public Context {
175     ImageCtx *image_ctx;
176     uint64_t journal_tid;
177     uint64_t offset;
178     uint64_t length;
179
180     C_CommitIOEventExtent(ImageCtx *image_ctx, uint64_t journal_tid,
181                           uint64_t offset, uint64_t length)
182       : image_ctx(image_ctx), journal_tid(journal_tid), offset(offset),
183         length(length) {
184     }
185
186     void finish(int r) override {
187       // all IO operations are flushed prior to closing the journal
188       assert(image_ctx->journal != nullptr);
189
190       image_ctx->journal->commit_io_event_extent(journal_tid, offset, length,
191                                                  r);
192     }
193   };
194
195   LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
196     : m_tid(0), m_lock(lock), m_ictx(ictx) {
197   }
198
199   void LibrbdWriteback::read(const object_t& oid, uint64_t object_no,
200                              const object_locator_t& oloc,
201                              uint64_t off, uint64_t len, snapid_t snapid,
202                              bufferlist *pbl, uint64_t trunc_size,
203                              __u32 trunc_seq, int op_flags,
204                              const ZTracer::Trace &parent_trace,
205                              Context *onfinish)
206   {
207     // on completion, take the mutex and then call onfinish.
208     Context *req = new C_ReadRequest(m_ictx->cct, onfinish, &m_lock);
209
210     {
211       RWLock::RLocker snap_locker(m_ictx->snap_lock);
212       if (m_ictx->object_map != nullptr &&
213           !m_ictx->object_map->object_may_exist(object_no)) {
214         m_ictx->op_work_queue->queue(req, -ENOENT);
215         return;
216       }
217     }
218
219     librados::ObjectReadOperation op;
220     op.read(off, len, pbl, NULL);
221     op.set_op_flags2(op_flags);
222     int flags = m_ictx->get_read_flags(snapid);
223
224     librados::AioCompletion *rados_completion =
225       util::create_rados_callback(req);
226     int r = m_ictx->data_ctx.aio_operate(
227       oid.name, rados_completion, &op, flags, nullptr,
228       (parent_trace.valid() ? parent_trace.get_info() : nullptr));
229     rados_completion->release();
230     assert(r >= 0);
231   }
232
233   bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid)
234   {
235     m_ictx->snap_lock.get_read();
236     librados::snap_t snap_id = m_ictx->snap_id;
237     m_ictx->parent_lock.get_read();
238     uint64_t overlap = 0;
239     m_ictx->get_parent_overlap(snap_id, &overlap);
240     m_ictx->parent_lock.put_read();
241     m_ictx->snap_lock.put_read();
242
243     uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
244
245     // reverse map this object extent onto the parent
246     vector<pair<uint64_t,uint64_t> > objectx;
247     Striper::extent_to_file(m_ictx->cct, &m_ictx->layout,
248                           object_no, 0, m_ictx->layout.object_size,
249                           objectx);
250     uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap);
251     bool may = object_overlap > 0;
252     ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off
253                            << "~" << read_len << " = " << may << dendl;
254     return may;
255   }
256
257   ceph_tid_t LibrbdWriteback::write(const object_t& oid,
258                                     const object_locator_t& oloc,
259                                     uint64_t off, uint64_t len,
260                                     const SnapContext& snapc,
261                                     const bufferlist &bl,
262                                     ceph::real_time mtime, uint64_t trunc_size,
263                                     __u32 trunc_seq, ceph_tid_t journal_tid,
264                                     const ZTracer::Trace &parent_trace,
265                                     Context *oncommit)
266   {
267     ZTracer::Trace trace;
268     if (parent_trace.valid()) {
269       trace.init("", &m_ictx->trace_endpoint, &parent_trace);
270       trace.copy_name("writeback " + oid.name);
271       trace.event("start");
272     }
273
274     uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
275
276     write_result_d *result = new write_result_d(oid.name, oncommit);
277     m_writes[oid.name].push(result);
278     ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl;
279     C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, trace,
280                                                   this);
281
282     // all IO operations are flushed prior to closing the journal
283     assert(journal_tid == 0 || m_ictx->journal != NULL);
284     if (journal_tid != 0) {
285       m_ictx->journal->flush_event(
286         journal_tid, new C_WriteJournalCommit(
287           m_ictx, oid.name, object_no, off, bl, snapc, journal_tid, trace,
288           req_comp));
289     } else {
290       auto req = new io::ObjectWriteRequest(
291         m_ictx, oid.name, object_no, off, bl, snapc, 0, trace, req_comp);
292       req->send();
293     }
294     return ++m_tid;
295   }
296
297
298   void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
299                                          uint64_t len,
300                                          ceph_tid_t original_journal_tid,
301                                          ceph_tid_t new_journal_tid) {
302     typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
303
304     ldout(m_ictx->cct, 20) << __func__ << ": " << oid << " "
305                            << off << "~" << len << " "
306                            << "journal_tid=" << original_journal_tid << ", "
307                            << "new_journal_tid=" << new_journal_tid << dendl;
308
309     uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
310
311     // all IO operations are flushed prior to closing the journal
312     assert(original_journal_tid != 0 && m_ictx->journal != NULL);
313
314     Extents file_extents;
315     Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off,
316                             len, file_extents);
317     for (Extents::iterator it = file_extents.begin();
318          it != file_extents.end(); ++it) {
319       if (new_journal_tid != 0) {
320         // ensure new journal event is safely committed to disk before
321         // committing old event
322         m_ictx->journal->flush_event(
323           new_journal_tid, new C_CommitIOEventExtent(m_ictx,
324                                                      original_journal_tid,
325                                                      it->first, it->second));
326       } else {
327         m_ictx->journal->commit_io_event_extent(original_journal_tid, it->first,
328                                                 it->second, 0);
329       }
330     }
331   }
332
333   void LibrbdWriteback::complete_writes(const std::string& oid)
334   {
335     assert(m_lock.is_locked());
336     std::queue<write_result_d*>& results = m_writes[oid];
337     ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl;
338     std::list<write_result_d*> finished;
339
340     while (!results.empty()) {
341       write_result_d *result = results.front();
342       if (!result->done)
343         break;
344       finished.push_back(result);
345       results.pop();
346     }
347
348     if (results.empty())
349       m_writes.erase(oid);
350
351     for (std::list<write_result_d*>::iterator it = finished.begin();
352          it != finished.end(); ++it) {
353       write_result_d *result = *it;
354       ldout(m_ictx->cct, 20) << "complete_writes() completing " << result
355                              << dendl;
356       result->oncommit->complete(result->ret);
357       delete result;
358     }
359   }
360 }