Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / Watcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/Watcher.h"
5 #include "librbd/watcher/RewatchRequest.h"
6 #include "librbd/Utils.h"
7 #include "librbd/TaskFinisher.h"
8 #include "include/encoding.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include <boost/bind.hpp>
12
13 // re-include our assert to clobber the system one; fix dout:
14 #include "include/assert.h"
15
16 #define dout_subsys ceph_subsys_rbd
17
18 namespace librbd {
19
20 using namespace watcher;
21
22 using util::create_context_callback;
23 using util::create_rados_callback;
24 using std::string;
25
26 namespace {
27
28 struct C_UnwatchAndFlush : public Context {
29   librados::Rados rados;
30   Context *on_finish;
31   bool flushing = false;
32   int ret_val = 0;
33
34   C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
35     : rados(io_ctx), on_finish(on_finish) {
36   }
37
38   void complete(int r) override {
39     if (ret_val == 0 && r < 0) {
40       ret_val = r;
41     }
42
43     if (!flushing) {
44       flushing = true;
45
46       librados::AioCompletion *aio_comp = create_rados_callback(this);
47       r = rados.aio_watch_flush(aio_comp);
48       assert(r == 0);
49       aio_comp->release();
50       return;
51     }
52
53     // ensure our reference to the RadosClient is released prior
54     // to completing the callback to avoid racing an explicit
55     // librados shutdown
56     Context *ctx = on_finish;
57     r = ret_val;
58     delete this;
59
60     ctx->complete(r);
61   }
62
63   void finish(int r) override {
64   }
65 };
66
67 } // anonymous namespace
68
69 #undef dout_prefix
70 #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
71                            << __func__ << ": "
72
73 Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
74                                   uint64_t handle)
75   : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
76     handle(handle) {
77   ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
78 }
79
80 void Watcher::C_NotifyAck::finish(int r) {
81   ldout(cct, 10) << "r=" << r << dendl;
82   assert(r == 0);
83   watcher->acknowledge_notify(notify_id, handle, out);
84 }
85
86 #undef dout_prefix
87 #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
88                            << ": "
89
90 Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
91                           const string& oid)
92   : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
93     m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
94     m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
95     m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
96     m_watch_state(WATCH_STATE_UNREGISTERED), m_watch_ctx(*this) {
97 }
98
99 Watcher::~Watcher() {
100   RWLock::RLocker l(m_watch_lock);
101   assert(m_watch_state != WATCH_STATE_REGISTERED);
102 }
103
104 void Watcher::register_watch(Context *on_finish) {
105   ldout(m_cct, 10) << dendl;
106
107   RWLock::RLocker watch_locker(m_watch_lock);
108   assert(m_watch_state == WATCH_STATE_UNREGISTERED);
109   m_watch_state = WATCH_STATE_REGISTERING;
110
111   librados::AioCompletion *aio_comp = create_rados_callback(
112                                          new C_RegisterWatch(this, on_finish));
113   int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
114   assert(r == 0);
115   aio_comp->release();
116 }
117
118 void Watcher::handle_register_watch(int r, Context *on_finish) {
119   ldout(m_cct, 10) << "r=" << r << dendl;
120   Context *unregister_watch_ctx = nullptr;
121   {
122     RWLock::WLocker watch_locker(m_watch_lock);
123     assert(m_watch_state == WATCH_STATE_REGISTERING);
124
125     std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
126     if (r < 0) {
127       lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
128                    << dendl;
129       m_watch_handle = 0;
130       m_watch_state = WATCH_STATE_UNREGISTERED;
131     } else if (r >= 0) {
132       m_watch_state = WATCH_STATE_REGISTERED;
133     }
134   }
135
136   on_finish->complete(r);
137
138   // wake up pending unregister request
139   if (unregister_watch_ctx != nullptr) {
140     unregister_watch_ctx->complete(0);
141   }
142 }
143
144 void Watcher::unregister_watch(Context *on_finish) {
145   ldout(m_cct, 10) << dendl;
146
147   {
148     RWLock::WLocker watch_locker(m_watch_lock);
149     if (m_watch_state == WATCH_STATE_REGISTERING ||
150         m_watch_state == WATCH_STATE_REWATCHING) {
151       ldout(m_cct, 10) << "delaying unregister until register completed"
152                        << dendl;
153
154       assert(m_unregister_watch_ctx == nullptr);
155       m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
156           unregister_watch(on_finish);
157         });
158       return;
159     }
160
161     if (m_watch_state == WATCH_STATE_REGISTERED ||
162         m_watch_state == WATCH_STATE_ERROR) {
163       m_watch_state = WATCH_STATE_UNREGISTERED;
164
165       librados::AioCompletion *aio_comp = create_rados_callback(
166                         new C_UnwatchAndFlush(m_ioctx, on_finish));
167       int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
168       assert(r == 0);
169       aio_comp->release();
170       return;
171     }
172   }
173
174   on_finish->complete(0);
175 }
176
177 bool Watcher::notifications_blocked() const {
178   RWLock::RLocker locker(m_watch_lock);
179
180   bool blocked = (m_blocked_count > 0);
181   ldout(m_cct, 5) << "blocked=" << blocked << dendl;
182   return blocked;
183 }
184
185 void Watcher::block_notifies(Context *on_finish) {
186   {
187     RWLock::WLocker locker(m_watch_lock);
188     ++m_blocked_count;
189     ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
190   }
191   m_async_op_tracker.wait_for_ops(on_finish);
192 }
193
194 void Watcher::unblock_notifies() {
195   RWLock::WLocker locker(m_watch_lock);
196   assert(m_blocked_count > 0);
197   --m_blocked_count;
198   ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
199 }
200
201 void Watcher::flush(Context *on_finish) {
202   m_notifier.flush(on_finish);
203 }
204
205 std::string Watcher::get_oid() const {
206   RWLock::RLocker locker(m_watch_lock);
207   return m_oid;
208 }
209
210 void Watcher::set_oid(const string& oid) {
211   RWLock::WLocker l(m_watch_lock);
212   assert(m_watch_state == WATCH_STATE_UNREGISTERED);
213
214   m_oid = oid;
215 }
216
217 void Watcher::handle_error(uint64_t handle, int err) {
218   lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
219
220   RWLock::WLocker l(m_watch_lock);
221   if (m_watch_state == WATCH_STATE_REGISTERED) {
222     m_watch_state = WATCH_STATE_ERROR;
223
224     FunctionContext *ctx = new FunctionContext(
225         boost::bind(&Watcher::rewatch, this));
226     m_work_queue->queue(ctx);
227   }
228 }
229
230 void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
231                                  bufferlist &out) {
232   m_ioctx.notify_ack(m_oid, notify_id, handle, out);
233 }
234
235 void Watcher::rewatch() {
236   ldout(m_cct, 10) << dendl;
237
238   RWLock::WLocker l(m_watch_lock);
239   if (m_watch_state != WATCH_STATE_ERROR) {
240     return;
241   }
242   m_watch_state = WATCH_STATE_REWATCHING;
243
244   Context *ctx = create_context_callback<Watcher,
245                                          &Watcher::handle_rewatch>(this);
246   RewatchRequest *req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
247                                                &m_watch_ctx,
248                                                &m_watch_handle, ctx);
249   req->send();
250 }
251
252 void Watcher::handle_rewatch(int r) {
253   ldout(m_cct, 10) "r=" << r << dendl;
254
255   WatchState next_watch_state = WATCH_STATE_REGISTERED;
256   if (r < 0) {
257     // only EBLACKLISTED or ENOENT can be returned
258     assert(r == -EBLACKLISTED || r == -ENOENT);
259     next_watch_state = WATCH_STATE_UNREGISTERED;
260   }
261
262   Context *unregister_watch_ctx = nullptr;
263   {
264     RWLock::WLocker watch_locker(m_watch_lock);
265     assert(m_watch_state == WATCH_STATE_REWATCHING);
266     m_watch_state = next_watch_state;
267
268     std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
269
270     m_work_queue->queue(
271       create_context_callback<Watcher,
272                               &Watcher::handle_rewatch_complete>(this), r);
273   }
274
275   // wake up pending unregister request
276   if (unregister_watch_ctx != nullptr) {
277     unregister_watch_ctx->complete(0);
278   }
279 }
280
281 void Watcher::send_notify(bufferlist& payload,
282                           watcher::NotifyResponse *response,
283                           Context *on_finish) {
284   m_notifier.notify(payload, response, on_finish);
285 }
286
287 void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
288                                       uint64_t notifier_id, bufferlist& bl) {
289   // if notifications are blocked, finish the notification w/o
290   // bubbling the notification up to the derived class
291   watcher.m_async_op_tracker.start_op();
292   if (watcher.notifications_blocked()) {
293     bufferlist bl;
294     watcher.acknowledge_notify(notify_id, handle, bl);
295   } else {
296     watcher.handle_notify(notify_id, handle, notifier_id, bl);
297   }
298   watcher.m_async_op_tracker.finish_op();
299 }
300
301 void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
302   watcher.handle_error(handle, err);
303 }
304
305 } // namespace librbd