1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
4 #include "include/types.h"
5 #include "messages/MWatchNotify.h"
10 #include "PrimaryLogPG.h"
14 #include "common/config.h"
16 struct CancelableContext : public Context {
17 virtual void cancel() = 0;
20 #define dout_context osd->cct
21 #define dout_subsys ceph_subsys_osd
23 #define dout_prefix _prefix(_dout, this)
25 static ostream& _prefix(
28 return *_dout << notify->gen_dbg_prefix();
40 : client(client), client_gid(client_gid),
51 lock("Notify::lock") {}
53 NotifyRef Notify::makeNotifyRef(
72 class NotifyTimeoutCB : public CancelableContext {
74 bool canceled; // protected by notif lock
76 explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
77 void finish(int) override {
78 notif->osd->watch_lock.Unlock();
81 notif->do_timeout(); // drops lock
84 notif->osd->watch_lock.Lock();
86 void cancel() override {
87 assert(notif->lock.is_locked_by_me());
92 void Notify::do_timeout()
94 assert(lock.is_locked_by_me());
95 dout(10) << "timeout" << dendl;
102 timed_out = true; // we will send the client an error code
103 maybe_complete_notify();
105 set<WatchRef> _watchers;
106 _watchers.swap(watchers);
109 for (set<WatchRef>::iterator i = _watchers.begin();
110 i != _watchers.end();
112 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
114 if (!(*i)->is_discarded()) {
115 (*i)->cancel_notify(self.lock());
121 void Notify::register_cb()
123 assert(lock.is_locked_by_me());
125 osd->watch_lock.Lock();
126 cb = new NotifyTimeoutCB(self.lock());
127 if (!osd->watch_timer.add_event_after(timeout, cb)) {
130 osd->watch_lock.Unlock();
134 void Notify::unregister_cb()
136 assert(lock.is_locked_by_me());
141 osd->watch_lock.Lock();
142 osd->watch_timer.cancel_event(cb);
144 osd->watch_lock.Unlock();
148 void Notify::start_watcher(WatchRef watch)
150 Mutex::Locker l(lock);
151 dout(10) << "start_watcher" << dendl;
152 watchers.insert(watch);
155 void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
157 Mutex::Locker l(lock);
158 dout(10) << "complete_watcher" << dendl;
161 assert(watchers.count(watch));
162 watchers.erase(watch);
163 notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
164 watch->get_cookie()),
166 maybe_complete_notify();
169 void Notify::complete_watcher_remove(WatchRef watch)
171 Mutex::Locker l(lock);
172 dout(10) << __func__ << dendl;
175 assert(watchers.count(watch));
176 watchers.erase(watch);
177 maybe_complete_notify();
180 void Notify::maybe_complete_notify()
182 dout(10) << "maybe_complete_notify -- "
184 << " in progress watchers " << dendl;
185 if (watchers.empty() || timed_out) {
188 ::encode(notify_replies, bl);
189 list<pair<uint64_t,uint64_t> > missed;
190 for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
191 missed.push_back(make_pair((*p)->get_watcher_gid(),
192 (*p)->get_cookie()));
194 ::encode(missed, bl);
197 MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
198 CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
199 reply->notifier_gid = client_gid;
202 reply->return_code = -ETIMEDOUT;
203 client->send_message(reply);
210 void Notify::discard()
212 Mutex::Locker l(lock);
220 Mutex::Locker l(lock);
222 maybe_complete_notify();
225 #define dout_subsys ceph_subsys_osd
227 #define dout_prefix _prefix(_dout, watch.get())
229 static ostream& _prefix(
232 return *_dout << watch->gen_dbg_prefix();
235 class HandleWatchTimeout : public CancelableContext {
238 bool canceled; // protected by watch->pg->lock
239 explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
240 void cancel() override {
243 void finish(int) override { ceph_abort(); /* not used */ }
244 void complete(int) override {
245 OSDService *osd(watch->osd);
246 ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
247 boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
248 osd->watch_lock.Unlock();
251 if (!watch->is_discarded() && !canceled)
252 watch->pg->handle_watch_timeout(watch);
253 delete this; // ~Watch requires pg lock!
255 osd->watch_lock.Lock();
259 class HandleDelayedWatchTimeout : public CancelableContext {
263 explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
264 void cancel() override {
267 void finish(int) override {
268 OSDService *osd(watch->osd);
269 dout(10) << "HandleWatchTimeoutDelayed" << dendl;
270 assert(watch->pg->is_locked());
272 if (!watch->is_discarded() && !canceled)
273 watch->pg->handle_watch_timeout(watch);
277 #define dout_subsys ceph_subsys_osd
279 #define dout_prefix _prefix(_dout, this)
281 string Watch::gen_dbg_prefix() {
283 ss << pg->gen_prefix() << " -- Watch("
284 << make_pair(cookie, entity) << ") ";
291 ObjectContextRef obc,
294 entity_name_t entity,
295 const entity_addr_t &addr)
306 dout(10) << "Watch()" << dendl;
310 dout(10) << "~Watch" << dendl;
311 // users must have called remove() or discard() prior to this point
316 bool Watch::connected() { return !!conn; }
318 Context *Watch::get_delayed_cb()
321 cb = new HandleDelayedWatchTimeout(self.lock());
325 void Watch::register_cb()
327 Mutex::Locker l(osd->watch_lock);
329 dout(15) << "re-registering callback, timeout: " << timeout << dendl;
331 osd->watch_timer.cancel_event(cb);
333 dout(15) << "registering callback, timeout: " << timeout << dendl;
335 cb = new HandleWatchTimeout(self.lock());
336 if (!osd->watch_timer.add_event_after(timeout, cb)) {
341 void Watch::unregister_cb()
343 dout(15) << "unregister_cb" << dendl;
346 dout(15) << "actually registered, cancelling" << dendl;
349 Mutex::Locker l(osd->watch_lock);
350 osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
355 void Watch::got_ping(utime_t t)
363 void Watch::connect(ConnectionRef con, bool _will_ping)
366 dout(10) << __func__ << " con " << con << " - already connected" << dendl;
369 dout(10) << __func__ << " con " << con << dendl;
371 will_ping = _will_ping;
372 Session* sessionref(static_cast<Session*>(con->get_priv()));
374 sessionref->wstate.addWatch(self.lock());
376 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
377 i != in_progress_notifies.end();
379 send_notify(i->second);
383 last_ping = ceph_clock_now();
390 void Watch::disconnect()
392 dout(10) << "disconnect (con was " << conn << ")" << dendl;
393 conn = ConnectionRef();
398 void Watch::discard()
400 dout(10) << "discard" << dendl;
401 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
402 i != in_progress_notifies.end();
404 i->second->discard();
409 void Watch::discard_state()
411 assert(pg->is_locked());
414 in_progress_notifies.clear();
418 Session* sessionref(static_cast<Session*>(conn->get_priv()));
420 sessionref->wstate.removeWatch(self.lock());
423 conn = ConnectionRef();
425 obc = ObjectContextRef();
428 bool Watch::is_discarded() const
433 void Watch::remove(bool send_disconnect)
435 dout(10) << "remove" << dendl;
436 if (send_disconnect && conn) {
438 MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
439 CEPH_WATCH_EVENT_DISCONNECT, empty));
440 conn->send_message(reply);
442 for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
443 i != in_progress_notifies.end();
445 i->second->complete_watcher_remove(self.lock());
450 void Watch::start_notify(NotifyRef notif)
452 assert(in_progress_notifies.find(notif->notify_id) ==
453 in_progress_notifies.end());
455 utime_t cutoff = ceph_clock_now();
456 cutoff.sec_ref() -= timeout;
457 if (last_ping < cutoff) {
458 dout(10) << __func__ << " " << notif->notify_id
459 << " last_ping " << last_ping << " < cutoff " << cutoff
460 << ", disconnecting" << dendl;
465 dout(10) << "start_notify " << notif->notify_id << dendl;
466 in_progress_notifies[notif->notify_id] = notif;
467 notif->start_watcher(self.lock());
472 void Watch::cancel_notify(NotifyRef notif)
474 dout(10) << "cancel_notify " << notif->notify_id << dendl;
475 in_progress_notifies.erase(notif->notify_id);
478 void Watch::send_notify(NotifyRef notif)
480 dout(10) << "send_notify" << dendl;
481 MWatchNotify *notify_msg = new MWatchNotify(
482 cookie, notif->version, notif->notify_id,
483 CEPH_WATCH_EVENT_NOTIFY, notif->payload);
484 notify_msg->notifier_gid = notif->client_gid;
485 conn->send_message(notify_msg);
488 void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
490 dout(10) << "notify_ack" << dendl;
491 map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
492 if (i != in_progress_notifies.end()) {
493 i->second->complete_watcher(self.lock(), reply_bl);
494 in_progress_notifies.erase(i);
498 WatchRef Watch::makeWatchRef(
499 PrimaryLogPG *pg, OSDService *osd,
500 ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
502 WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
507 void WatchConState::addWatch(WatchRef watch)
509 Mutex::Locker l(lock);
510 watches.insert(watch);
513 void WatchConState::removeWatch(WatchRef watch)
515 Mutex::Locker l(lock);
516 watches.erase(watch);
519 void WatchConState::reset(Connection *con)
521 set<WatchRef> _watches;
523 Mutex::Locker l(lock);
524 _watches.swap(watches);
526 for (set<WatchRef>::iterator i = _watches.begin();
529 boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
531 if (!(*i)->is_discarded()) {
532 if ((*i)->is_connected(con)) {
535 lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;