Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_ggate / Server.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/debug.h"
5 #include "common/errno.h"
6 #include "Driver.h"
7 #include "Server.h"
8 #include "Request.h"
9
10 #define dout_context g_ceph_context
11 #define dout_subsys ceph_subsys_rbd
12 #undef dout_prefix
13 #define dout_prefix *_dout << "rbd::ggate::Server: " << this \
14                            << " " << __func__ << ": "
15
16 namespace rbd {
17 namespace ggate {
18
19 Server::Server(Driver *drv, librbd::Image& image)
20   : m_drv(drv), m_image(image), m_lock("rbd::ggate::Server::m_lock"),
21     m_reader_thread(this, &Server::reader_entry),
22     m_writer_thread(this, &Server::writer_entry) {
23 }
24
25 void Server::run() {
26   dout(10) << dendl;
27
28   int r = start();
29   assert(r == 0);
30
31   dout(20) << "entering run loop" << dendl;
32
33   {
34     Mutex::Locker locker(m_lock);
35     while (!m_stopping) {
36       m_cond.WaitInterval(m_lock, utime_t(1, 0));
37     }
38   }
39
40   dout(20) << "exiting run loop" << dendl;
41
42   stop();
43 }
44
45 int Server::start() {
46   dout(10) << dendl;
47
48   m_reader_thread.create("rbd_reader");
49   m_writer_thread.create("rbd_writer");
50   return 0;
51 }
52
53 void Server::stop() {
54   dout(10) << dendl;
55
56   {
57     Mutex::Locker locker(m_lock);
58     assert(m_stopping);
59   }
60
61   m_reader_thread.join();
62   m_writer_thread.join();
63
64   wait_clean();
65 }
66
67 void Server::io_start(IOContext *ctx) {
68   dout(20) << ctx << dendl;
69
70   Mutex::Locker locker(m_lock);
71   m_io_pending.push_back(&ctx->item);
72 }
73
74 void Server::io_finish(IOContext *ctx) {
75   dout(20) << ctx << dendl;
76
77   Mutex::Locker locker(m_lock);
78   assert(ctx->item.is_on_list());
79
80   ctx->item.remove_myself();
81   m_io_finished.push_back(&ctx->item);
82   m_cond.Signal();
83 }
84
85 Server::IOContext *Server::wait_io_finish() {
86   dout(20) << dendl;
87
88   Mutex::Locker locker(m_lock);
89
90   while (m_io_finished.empty() && !m_stopping) {
91     m_cond.Wait(m_lock);
92   }
93
94   if (m_io_finished.empty()) {
95     return nullptr;
96   }
97
98   IOContext *ret = m_io_finished.front();
99   m_io_finished.pop_front();
100
101   return ret;
102 }
103
104 void Server::wait_clean() {
105   dout(20) << dendl;
106
107   assert(!m_reader_thread.is_started());
108
109   Mutex::Locker locker(m_lock);
110
111   while (!m_io_pending.empty()) {
112     m_cond.Wait(m_lock);
113   }
114
115   while (!m_io_finished.empty()) {
116     ceph::unique_ptr<IOContext> free_ctx(m_io_finished.front());
117     m_io_finished.pop_front();
118   }
119 }
120
121 void Server::aio_callback(librbd::completion_t cb, void *arg) {
122   librbd::RBD::AioCompletion *aio_completion =
123     reinterpret_cast<librbd::RBD::AioCompletion*>(cb);
124
125   IOContext *ctx = reinterpret_cast<IOContext *>(arg);
126   int r = aio_completion->get_return_value();
127
128   ctx->server->handle_aio(ctx, r);
129   aio_completion->release();
130 }
131
132 void Server::handle_aio(IOContext *ctx, int r) {
133   dout(20) << ctx << ": r=" << r << dendl;
134
135   if (r == -EINVAL) {
136     // if shrinking an image, a pagecache writeback might reference
137     // extents outside of the range of the new image extents
138     dout(5) << "masking IO out-of-bounds error" << dendl;
139     ctx->req->bl.clear();
140     r = 0;
141   }
142
143   if (r < 0) {
144     ctx->req->set_error(-r);
145   } else if ((ctx->req->get_cmd() == Request::Read) &&
146              r != static_cast<int>(ctx->req->get_length())) {
147     int pad_byte_count = static_cast<int> (ctx->req->get_length()) - r;
148     ctx->req->bl.append_zero(pad_byte_count);
149     dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl;
150     ctx->req->set_error(0);
151   } else {
152     ctx->req->set_error(0);
153   }
154   io_finish(ctx);
155 }
156
157 void Server::reader_entry() {
158   dout(20) << dendl;
159
160   while (!m_stopping) {
161     ceph::unique_ptr<IOContext> ctx(new IOContext(this));
162
163     dout(20) << "waiting for ggate request" << dendl;
164
165     int r = m_drv->recv(&ctx->req);
166     if (r < 0) {
167       if (r != -ECANCELED) {
168         derr << "recv: " << cpp_strerror(r) << dendl;
169       }
170       Mutex::Locker locker(m_lock);
171       m_stopping = true;
172       m_cond.Signal();
173       return;
174     }
175
176     IOContext *pctx = ctx.release();
177
178     dout(20) << pctx << ": start: " << *pctx << dendl;
179
180     io_start(pctx);
181     librbd::RBD::AioCompletion *c =
182       new librbd::RBD::AioCompletion(pctx, aio_callback);
183     switch (pctx->req->get_cmd())
184     {
185     case rbd::ggate::Request::Write:
186       m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(),
187                         pctx->req->bl, c);
188       break;
189     case rbd::ggate::Request::Read:
190       m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(),
191                        pctx->req->bl, c);
192       break;
193     case rbd::ggate::Request::Flush:
194       m_image.aio_flush(c);
195       break;
196     case rbd::ggate::Request::Discard:
197       m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c);
198       break;
199     default:
200       derr << pctx << ": invalid request command: " << pctx->req->get_cmd()
201            << dendl;
202       c->release();
203       Mutex::Locker locker(m_lock);
204       m_stopping = true;
205       m_cond.Signal();
206       return;
207     }
208   }
209   dout(20) << "terminated" << dendl;
210 }
211
212 void Server::writer_entry() {
213   dout(20) << dendl;
214
215   while (!m_stopping) {
216     dout(20) << "waiting for io request" << dendl;
217
218     ceph::unique_ptr<IOContext> ctx(wait_io_finish());
219     if (!ctx) {
220       dout(20) << "no io requests, terminating" << dendl;
221       return;
222     }
223
224     dout(20) << ctx.get() << ": got: " << *ctx << dendl;
225
226     int r = m_drv->send(ctx->req);
227     if (r < 0) {
228       derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl;
229       Mutex::Locker locker(m_lock);
230       m_stopping = true;
231       m_cond.Signal();
232       return;
233     }
234     dout(20) << ctx.get() << " finish" << dendl;
235   }
236   dout(20) << "terminated" << dendl;
237 }
238
239 std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) {
240
241   os << "[" << ctx.req->get_id();
242
243   switch (ctx.req->get_cmd())
244   {
245   case rbd::ggate::Request::Write:
246     os << " Write ";
247     break;
248   case rbd::ggate::Request::Read:
249     os << " Read ";
250     break;
251   case rbd::ggate::Request::Flush:
252     os << " Flush ";
253     break;
254   case rbd::ggate::Request::Discard:
255     os << " Discard ";
256     break;
257   default:
258     os << " Unknow(" << ctx.req->get_cmd() << ") ";
259     break;
260   }
261
262   os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " "
263      << ctx.req->get_error() << "]";
264
265   return os;
266 }
267
268 } // namespace ggate
269 } // namespace rbd
270