Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / Watch.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 #include "PG.h"
3
4 #include "include/types.h"
5 #include "messages/MWatchNotify.h"
6
7 #include <map>
8
9 #include "OSD.h"
10 #include "PrimaryLogPG.h"
11 #include "Watch.h"
12 #include "Session.h"
13
14 #include "common/config.h"
15
16 struct CancelableContext : public Context {
17   virtual void cancel() = 0;
18 };
19
20 #define dout_context osd->cct
21 #define dout_subsys ceph_subsys_osd
22 #undef dout_prefix
23 #define dout_prefix _prefix(_dout, this)
24
25 static ostream& _prefix(
26   std::ostream* _dout,
27   Notify *notify) {
28   return *_dout << notify->gen_dbg_prefix();
29 }
30
31 Notify::Notify(
32   ConnectionRef client,
33   uint64_t client_gid,
34   bufferlist &payload,
35   uint32_t timeout,
36   uint64_t cookie,
37   uint64_t notify_id,
38   uint64_t version,
39   OSDService *osd)
40   : client(client), client_gid(client_gid),
41     complete(false),
42     discarded(false),
43     timed_out(false),
44     payload(payload),
45     timeout(timeout),
46     cookie(cookie),
47     notify_id(notify_id),
48     version(version),
49     osd(osd),
50     cb(NULL),
51     lock("Notify::lock") {}
52
53 NotifyRef Notify::makeNotifyRef(
54   ConnectionRef client,
55   uint64_t client_gid,
56   bufferlist &payload,
57   uint32_t timeout,
58   uint64_t cookie,
59   uint64_t notify_id,
60   uint64_t version,
61   OSDService *osd) {
62   NotifyRef ret(
63     new Notify(
64       client, client_gid,
65       payload, timeout,
66       cookie, notify_id,
67       version, osd));
68   ret->set_self(ret);
69   return ret;
70 }
71
72 class NotifyTimeoutCB : public CancelableContext {
73   NotifyRef notif;
74   bool canceled; // protected by notif lock
75 public:
76   explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
77   void finish(int) override {
78     notif->osd->watch_lock.Unlock();
79     notif->lock.Lock();
80     if (!canceled)
81       notif->do_timeout(); // drops lock
82     else
83       notif->lock.Unlock();
84     notif->osd->watch_lock.Lock();
85   }
86   void cancel() override {
87     assert(notif->lock.is_locked_by_me());
88     canceled = true;
89   }
90 };
91
92 void Notify::do_timeout()
93 {
94   assert(lock.is_locked_by_me());
95   dout(10) << "timeout" << dendl;
96   cb = NULL;
97   if (is_discarded()) {
98     lock.Unlock();
99     return;
100   }
101
102   timed_out = true;         // we will send the client an error code
103   maybe_complete_notify();
104   assert(complete);
105   set<WatchRef> _watchers;
106   _watchers.swap(watchers);
107   lock.Unlock();
108
109   for (set<WatchRef>::iterator i = _watchers.begin();
110        i != _watchers.end();
111        ++i) {
112     boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
113     pg->lock();
114     if (!(*i)->is_discarded()) {
115       (*i)->cancel_notify(self.lock());
116     }
117     pg->unlock();
118   }
119 }
120
121 void Notify::register_cb()
122 {
123   assert(lock.is_locked_by_me());
124   {
125     osd->watch_lock.Lock();
126     cb = new NotifyTimeoutCB(self.lock());
127     if (!osd->watch_timer.add_event_after(timeout, cb)) {
128       cb = nullptr;
129     }
130     osd->watch_lock.Unlock();
131   }
132 }
133
134 void Notify::unregister_cb()
135 {
136   assert(lock.is_locked_by_me());
137   if (!cb)
138     return;
139   cb->cancel();
140   {
141     osd->watch_lock.Lock();
142     osd->watch_timer.cancel_event(cb);
143     cb = NULL;
144     osd->watch_lock.Unlock();
145   }
146 }
147
148 void Notify::start_watcher(WatchRef watch)
149 {
150   Mutex::Locker l(lock);
151   dout(10) << "start_watcher" << dendl;
152   watchers.insert(watch);
153 }
154
155 void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
156 {
157   Mutex::Locker l(lock);
158   dout(10) << "complete_watcher" << dendl;
159   if (is_discarded())
160     return;
161   assert(watchers.count(watch));
162   watchers.erase(watch);
163   notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
164                                             watch->get_cookie()),
165                                   reply_bl));
166   maybe_complete_notify();
167 }
168
169 void Notify::complete_watcher_remove(WatchRef watch)
170 {
171   Mutex::Locker l(lock);
172   dout(10) << __func__ << dendl;
173   if (is_discarded())
174     return;
175   assert(watchers.count(watch));
176   watchers.erase(watch);
177   maybe_complete_notify();
178 }
179
180 void Notify::maybe_complete_notify()
181 {
182   dout(10) << "maybe_complete_notify -- "
183            << watchers.size()
184            << " in progress watchers " << dendl;
185   if (watchers.empty() || timed_out) {
186     // prepare reply
187     bufferlist bl;
188     ::encode(notify_replies, bl);
189     list<pair<uint64_t,uint64_t> > missed;
190     for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
191       missed.push_back(make_pair((*p)->get_watcher_gid(),
192                                  (*p)->get_cookie()));
193     }
194     ::encode(missed, bl);
195
196     bufferlist empty;
197     MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
198                                          CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
199     reply->notifier_gid = client_gid;
200     reply->set_data(bl);
201     if (timed_out)
202       reply->return_code = -ETIMEDOUT;
203     client->send_message(reply);
204     unregister_cb();
205
206     complete = true;
207   }
208 }
209
210 void Notify::discard()
211 {
212   Mutex::Locker l(lock);
213   discarded = true;
214   unregister_cb();
215   watchers.clear();
216 }
217
218 void Notify::init()
219 {
220   Mutex::Locker l(lock);
221   register_cb();
222   maybe_complete_notify();
223 }
224
225 #define dout_subsys ceph_subsys_osd
226 #undef dout_prefix
227 #define dout_prefix _prefix(_dout, watch.get())
228
229 static ostream& _prefix(
230   std::ostream* _dout,
231   Watch *watch) {
232   return *_dout << watch->gen_dbg_prefix();
233 }
234
235 class HandleWatchTimeout : public CancelableContext {
236   WatchRef watch;
237 public:
238   bool canceled; // protected by watch->pg->lock
239   explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
240   void cancel() override {
241     canceled = true;
242   }
243   void finish(int) override { ceph_abort(); /* not used */ }
244   void complete(int) override {
245     OSDService *osd(watch->osd);
246     ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
247     boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
248     osd->watch_lock.Unlock();
249     pg->lock();
250     watch->cb = NULL;
251     if (!watch->is_discarded() && !canceled)
252       watch->pg->handle_watch_timeout(watch);
253     delete this; // ~Watch requires pg lock!
254     pg->unlock();
255     osd->watch_lock.Lock();
256   }
257 };
258
259 class HandleDelayedWatchTimeout : public CancelableContext {
260   WatchRef watch;
261 public:
262   bool canceled;
263   explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
264   void cancel() override {
265     canceled = true;
266   }
267   void finish(int) override {
268     OSDService *osd(watch->osd);
269     dout(10) << "HandleWatchTimeoutDelayed" << dendl;
270     assert(watch->pg->is_locked());
271     watch->cb = NULL;
272     if (!watch->is_discarded() && !canceled)
273       watch->pg->handle_watch_timeout(watch);
274   }
275 };
276
277 #define dout_subsys ceph_subsys_osd
278 #undef dout_prefix
279 #define dout_prefix _prefix(_dout, this)
280
281 string Watch::gen_dbg_prefix() {
282   stringstream ss;
283   ss << pg->gen_prefix() << " -- Watch(" 
284      << make_pair(cookie, entity) << ") ";
285   return ss.str();
286 }
287
288 Watch::Watch(
289   PrimaryLogPG *pg,
290   OSDService *osd,
291   ObjectContextRef obc,
292   uint32_t timeout,
293   uint64_t cookie,
294   entity_name_t entity,
295   const entity_addr_t &addr)
296   : cb(NULL),
297     osd(osd),
298     pg(pg),
299     obc(obc),
300     timeout(timeout),
301     cookie(cookie),
302     addr(addr),
303     will_ping(false),
304     entity(entity),
305     discarded(false) {
306   dout(10) << "Watch()" << dendl;
307 }
308
309 Watch::~Watch() {
310   dout(10) << "~Watch" << dendl;
311   // users must have called remove() or discard() prior to this point
312   assert(!obc);
313   assert(!conn);
314 }
315
316 bool Watch::connected() { return !!conn; }
317
318 Context *Watch::get_delayed_cb()
319 {
320   assert(!cb);
321   cb = new HandleDelayedWatchTimeout(self.lock());
322   return cb;
323 }
324
325 void Watch::register_cb()
326 {
327   Mutex::Locker l(osd->watch_lock);
328   if (cb) {
329     dout(15) << "re-registering callback, timeout: " << timeout << dendl;
330     cb->cancel();
331     osd->watch_timer.cancel_event(cb);
332   } else {
333     dout(15) << "registering callback, timeout: " << timeout << dendl;
334   }
335   cb = new HandleWatchTimeout(self.lock());
336   if (!osd->watch_timer.add_event_after(timeout, cb)) {
337     cb = nullptr;
338   }
339 }
340
341 void Watch::unregister_cb()
342 {
343   dout(15) << "unregister_cb" << dendl;
344   if (!cb)
345     return;
346   dout(15) << "actually registered, cancelling" << dendl;
347   cb->cancel();
348   {
349     Mutex::Locker l(osd->watch_lock);
350     osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
351   }
352   cb = NULL;
353 }
354
355 void Watch::got_ping(utime_t t)
356 {
357   last_ping = t;
358   if (conn) {
359     register_cb();
360   }
361 }
362
363 void Watch::connect(ConnectionRef con, bool _will_ping)
364 {
365   if (conn == con) {
366     dout(10) << __func__ << " con " << con << " - already connected" << dendl;
367     return;
368   }
369   dout(10) << __func__ << " con " << con << dendl;
370   conn = con;
371   will_ping = _will_ping;
372   Session* sessionref(static_cast<Session*>(con->get_priv()));
373   if (sessionref) {
374     sessionref->wstate.addWatch(self.lock());
375     sessionref->put();
376     for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
377          i != in_progress_notifies.end();
378          ++i) {
379       send_notify(i->second);
380     }
381   }
382   if (will_ping) {
383     last_ping = ceph_clock_now();
384     register_cb();
385   } else {
386     unregister_cb();
387   }
388 }
389
390 void Watch::disconnect()
391 {
392   dout(10) << "disconnect (con was " << conn << ")" << dendl;
393   conn = ConnectionRef();
394   if (!will_ping)
395     register_cb();
396 }
397
398 void Watch::discard()
399 {
400   dout(10) << "discard" << dendl;
401   for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
402        i != in_progress_notifies.end();
403        ++i) {
404     i->second->discard();
405   }
406   discard_state();
407 }
408
409 void Watch::discard_state()
410 {
411   assert(pg->is_locked());
412   assert(!discarded);
413   assert(obc);
414   in_progress_notifies.clear();
415   unregister_cb();
416   discarded = true;
417   if (conn) {
418     Session* sessionref(static_cast<Session*>(conn->get_priv()));
419     if (sessionref) {
420       sessionref->wstate.removeWatch(self.lock());
421       sessionref->put();
422     }
423     conn = ConnectionRef();
424   }
425   obc = ObjectContextRef();
426 }
427
428 bool Watch::is_discarded() const
429 {
430   return discarded;
431 }
432
433 void Watch::remove(bool send_disconnect)
434 {
435   dout(10) << "remove" << dendl;
436   if (send_disconnect && conn) {
437     bufferlist empty;
438     MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
439                                          CEPH_WATCH_EVENT_DISCONNECT, empty));
440     conn->send_message(reply);
441   }
442   for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
443        i != in_progress_notifies.end();
444        ++i) {
445     i->second->complete_watcher_remove(self.lock());
446   }
447   discard_state();
448 }
449
450 void Watch::start_notify(NotifyRef notif)
451 {
452   assert(in_progress_notifies.find(notif->notify_id) ==
453          in_progress_notifies.end());
454   if (will_ping) {
455     utime_t cutoff = ceph_clock_now();
456     cutoff.sec_ref() -= timeout;
457     if (last_ping < cutoff) {
458       dout(10) << __func__ << " " << notif->notify_id
459                << " last_ping " << last_ping << " < cutoff " << cutoff
460                << ", disconnecting" << dendl;
461       disconnect();
462       return;
463     }
464   }
465   dout(10) << "start_notify " << notif->notify_id << dendl;
466   in_progress_notifies[notif->notify_id] = notif;
467   notif->start_watcher(self.lock());
468   if (connected())
469     send_notify(notif);
470 }
471
472 void Watch::cancel_notify(NotifyRef notif)
473 {
474   dout(10) << "cancel_notify " << notif->notify_id << dendl;
475   in_progress_notifies.erase(notif->notify_id);
476 }
477
478 void Watch::send_notify(NotifyRef notif)
479 {
480   dout(10) << "send_notify" << dendl;
481   MWatchNotify *notify_msg = new MWatchNotify(
482     cookie, notif->version, notif->notify_id,
483     CEPH_WATCH_EVENT_NOTIFY, notif->payload);
484   notify_msg->notifier_gid = notif->client_gid;
485   conn->send_message(notify_msg);
486 }
487
488 void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
489 {
490   dout(10) << "notify_ack" << dendl;
491   map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
492   if (i != in_progress_notifies.end()) {
493     i->second->complete_watcher(self.lock(), reply_bl);
494     in_progress_notifies.erase(i);
495   }
496 }
497
498 WatchRef Watch::makeWatchRef(
499   PrimaryLogPG *pg, OSDService *osd,
500   ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
501 {
502   WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
503   ret->set_self(ret);
504   return ret;
505 }
506
507 void WatchConState::addWatch(WatchRef watch)
508 {
509   Mutex::Locker l(lock);
510   watches.insert(watch);
511 }
512
513 void WatchConState::removeWatch(WatchRef watch)
514 {
515   Mutex::Locker l(lock);
516   watches.erase(watch);
517 }
518
519 void WatchConState::reset(Connection *con)
520 {
521   set<WatchRef> _watches;
522   {
523     Mutex::Locker l(lock);
524     _watches.swap(watches);
525   }
526   for (set<WatchRef>::iterator i = _watches.begin();
527        i != _watches.end();
528        ++i) {
529     boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
530     pg->lock();
531     if (!(*i)->is_discarded()) {
532       if ((*i)->is_connected(con)) {
533         (*i)->disconnect();
534       } else {
535         lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
536       }
537     }
538     pg->unlock();
539   }
540 }