Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / io / AioCompletion.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/AioCompletion.h"
5 #include <errno.h>
6
7 #include "common/ceph_context.h"
8 #include "common/dout.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11 #include "common/WorkQueue.h"
12
13 #include "librbd/ImageCtx.h"
14 #include "librbd/internal.h"
15
16 #include "librbd/Journal.h"
17
18 #ifdef WITH_LTTNG
19 #include "tracing/librbd.h"
20 #else
21 #define tracepoint(...)
22 #endif
23
24 #define dout_subsys ceph_subsys_rbd
25 #undef dout_prefix
26 #define dout_prefix *_dout << "librbd::io::AioCompletion: " << this \
27                            << " " << __func__ << ": "
28
29 namespace librbd {
30 namespace io {
31
32 int AioCompletion::wait_for_complete() {
33   tracepoint(librbd, aio_wait_for_complete_enter, this);
34   lock.Lock();
35   while (state != AIO_STATE_COMPLETE)
36     cond.Wait(lock);
37   lock.Unlock();
38   tracepoint(librbd, aio_wait_for_complete_exit, 0);
39   return 0;
40 }
41
42 void AioCompletion::finalize(ssize_t rval)
43 {
44   assert(lock.is_locked());
45   assert(ictx != nullptr);
46   CephContext *cct = ictx->cct;
47
48   ldout(cct, 20) << "r=" << rval << dendl;
49   if (rval >= 0 && aio_type == AIO_TYPE_READ) {
50     read_result.assemble_result(cct);
51   }
52 }
53
54 void AioCompletion::complete() {
55   assert(lock.is_locked());
56   assert(ictx != nullptr);
57   CephContext *cct = ictx->cct;
58
59   tracepoint(librbd, aio_complete_enter, this, rval);
60   utime_t elapsed;
61   elapsed = ceph_clock_now() - start_time;
62   switch (aio_type) {
63   case AIO_TYPE_GENERIC:
64   case AIO_TYPE_OPEN:
65   case AIO_TYPE_CLOSE:
66     break;
67   case AIO_TYPE_READ:
68     ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed); break;
69   case AIO_TYPE_WRITE:
70     ictx->perfcounter->tinc(l_librbd_wr_latency, elapsed); break;
71   case AIO_TYPE_DISCARD:
72     ictx->perfcounter->tinc(l_librbd_discard_latency, elapsed); break;
73   case AIO_TYPE_FLUSH:
74     ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
75   case AIO_TYPE_WRITESAME:
76     ictx->perfcounter->tinc(l_librbd_ws_latency, elapsed); break;
77   case AIO_TYPE_COMPARE_AND_WRITE:
78     ictx->perfcounter->tinc(l_librbd_cmp_latency, elapsed); break;
79   default:
80     lderr(cct) << "completed invalid aio_type: " << aio_type << dendl;
81     break;
82   }
83
84   // inform the journal that the op has successfully committed
85   if (journal_tid != 0) {
86     assert(ictx->journal != NULL);
87     ictx->journal->commit_io_event(journal_tid, rval);
88   }
89
90   state = AIO_STATE_CALLBACK;
91   if (complete_cb) {
92     lock.Unlock();
93     complete_cb(rbd_comp, complete_arg);
94     lock.Lock();
95   }
96
97   if (ictx && event_notify && ictx->event_socket.is_valid()) {
98     ictx->completed_reqs_lock.Lock();
99     ictx->completed_reqs.push_back(&m_xlist_item);
100     ictx->completed_reqs_lock.Unlock();
101     ictx->event_socket.notify();
102   }
103
104   state = AIO_STATE_COMPLETE;
105   cond.Signal();
106
107   // note: possible for image to be closed after op marked finished
108   if (async_op.started()) {
109     async_op.finish_op();
110   }
111   tracepoint(librbd, aio_complete_exit);
112 }
113
114 void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
115   Mutex::Locker locker(lock);
116   if (ictx == nullptr) {
117     ictx = i;
118     aio_type = t;
119     start_time = ceph_clock_now();
120   }
121 }
122
123 void AioCompletion::start_op(bool ignore_type) {
124   Mutex::Locker locker(lock);
125   assert(ictx != nullptr);
126   assert(!async_op.started());
127   if (state == AIO_STATE_PENDING &&
128       (ignore_type || aio_type != AIO_TYPE_FLUSH)) {
129     async_op.start_op(*ictx);
130   }
131 }
132
133 void AioCompletion::fail(int r)
134 {
135   lock.Lock();
136   assert(ictx != nullptr);
137   CephContext *cct = ictx->cct;
138
139   lderr(cct) << cpp_strerror(r) << dendl;
140   assert(pending_count == 0);
141   rval = r;
142   complete();
143   put_unlock();
144 }
145
146 void AioCompletion::set_request_count(uint32_t count) {
147   lock.Lock();
148   assert(ictx != nullptr);
149   CephContext *cct = ictx->cct;
150
151   ldout(cct, 20) << "pending=" << count << dendl;
152   assert(pending_count == 0);
153   pending_count = count;
154   lock.Unlock();
155
156   // if no pending requests, completion will fire now
157   unblock();
158 }
159
160 void AioCompletion::complete_request(ssize_t r)
161 {
162   lock.Lock();
163   assert(ictx != nullptr);
164   CephContext *cct = ictx->cct;
165
166   if (rval >= 0) {
167     if (r < 0 && r != -EEXIST)
168       rval = r;
169     else if (r > 0)
170       rval += r;
171   }
172   assert(pending_count);
173   int count = --pending_count;
174
175   ldout(cct, 20) << "cb=" << complete_cb << ", "
176                  << "pending=" << pending_count << dendl;
177   if (!count && blockers == 0) {
178     finalize(rval);
179     complete();
180   }
181   put_unlock();
182 }
183
184 void AioCompletion::associate_journal_event(uint64_t tid) {
185   Mutex::Locker l(lock);
186   assert(state == AIO_STATE_PENDING);
187   journal_tid = tid;
188 }
189
190 bool AioCompletion::is_complete() {
191   tracepoint(librbd, aio_is_complete_enter, this);
192   bool done;
193   {
194     Mutex::Locker l(lock);
195     done = this->state == AIO_STATE_COMPLETE;
196   }
197   tracepoint(librbd, aio_is_complete_exit, done);
198   return done;
199 }
200
201 ssize_t AioCompletion::get_return_value() {
202   tracepoint(librbd, aio_get_return_value_enter, this);
203   lock.Lock();
204   ssize_t r = rval;
205   lock.Unlock();
206   tracepoint(librbd, aio_get_return_value_exit, r);
207   return r;
208 }
209
210 } // namespace io
211 } // namespace librbd