initial code repo
[stor4nfv.git] / src / ceph / src / osd / Watch.cc
diff --git a/src/ceph/src/osd/Watch.cc b/src/ceph/src/osd/Watch.cc
new file mode 100644 (file)
index 0000000..7ff9f99
--- /dev/null
@@ -0,0 +1,540 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#include "PG.h"
+
+#include "include/types.h"
+#include "messages/MWatchNotify.h"
+
+#include <map>
+
+#include "OSD.h"
+#include "PrimaryLogPG.h"
+#include "Watch.h"
+#include "Session.h"
+
+#include "common/config.h"
+
+struct CancelableContext : public Context {
+  virtual void cancel() = 0;
+};
+
+#define dout_context osd->cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+static ostream& _prefix(
+  std::ostream* _dout,
+  Notify *notify) {
+  return *_dout << notify->gen_dbg_prefix();
+}
+
+Notify::Notify(
+  ConnectionRef client,
+  uint64_t client_gid,
+  bufferlist &payload,
+  uint32_t timeout,
+  uint64_t cookie,
+  uint64_t notify_id,
+  uint64_t version,
+  OSDService *osd)
+  : client(client), client_gid(client_gid),
+    complete(false),
+    discarded(false),
+    timed_out(false),
+    payload(payload),
+    timeout(timeout),
+    cookie(cookie),
+    notify_id(notify_id),
+    version(version),
+    osd(osd),
+    cb(NULL),
+    lock("Notify::lock") {}
+
+NotifyRef Notify::makeNotifyRef(
+  ConnectionRef client,
+  uint64_t client_gid,
+  bufferlist &payload,
+  uint32_t timeout,
+  uint64_t cookie,
+  uint64_t notify_id,
+  uint64_t version,
+  OSDService *osd) {
+  NotifyRef ret(
+    new Notify(
+      client, client_gid,
+      payload, timeout,
+      cookie, notify_id,
+      version, osd));
+  ret->set_self(ret);
+  return ret;
+}
+
+class NotifyTimeoutCB : public CancelableContext {
+  NotifyRef notif;
+  bool canceled; // protected by notif lock
+public:
+  explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
+  void finish(int) override {
+    notif->osd->watch_lock.Unlock();
+    notif->lock.Lock();
+    if (!canceled)
+      notif->do_timeout(); // drops lock
+    else
+      notif->lock.Unlock();
+    notif->osd->watch_lock.Lock();
+  }
+  void cancel() override {
+    assert(notif->lock.is_locked_by_me());
+    canceled = true;
+  }
+};
+
+void Notify::do_timeout()
+{
+  assert(lock.is_locked_by_me());
+  dout(10) << "timeout" << dendl;
+  cb = NULL;
+  if (is_discarded()) {
+    lock.Unlock();
+    return;
+  }
+
+  timed_out = true;         // we will send the client an error code
+  maybe_complete_notify();
+  assert(complete);
+  set<WatchRef> _watchers;
+  _watchers.swap(watchers);
+  lock.Unlock();
+
+  for (set<WatchRef>::iterator i = _watchers.begin();
+       i != _watchers.end();
+       ++i) {
+    boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
+    pg->lock();
+    if (!(*i)->is_discarded()) {
+      (*i)->cancel_notify(self.lock());
+    }
+    pg->unlock();
+  }
+}
+
+void Notify::register_cb()
+{
+  assert(lock.is_locked_by_me());
+  {
+    osd->watch_lock.Lock();
+    cb = new NotifyTimeoutCB(self.lock());
+    if (!osd->watch_timer.add_event_after(timeout, cb)) {
+      cb = nullptr;
+    }
+    osd->watch_lock.Unlock();
+  }
+}
+
+void Notify::unregister_cb()
+{
+  assert(lock.is_locked_by_me());
+  if (!cb)
+    return;
+  cb->cancel();
+  {
+    osd->watch_lock.Lock();
+    osd->watch_timer.cancel_event(cb);
+    cb = NULL;
+    osd->watch_lock.Unlock();
+  }
+}
+
+void Notify::start_watcher(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  dout(10) << "start_watcher" << dendl;
+  watchers.insert(watch);
+}
+
+void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
+{
+  Mutex::Locker l(lock);
+  dout(10) << "complete_watcher" << dendl;
+  if (is_discarded())
+    return;
+  assert(watchers.count(watch));
+  watchers.erase(watch);
+  notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
+                                           watch->get_cookie()),
+                                 reply_bl));
+  maybe_complete_notify();
+}
+
+void Notify::complete_watcher_remove(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  dout(10) << __func__ << dendl;
+  if (is_discarded())
+    return;
+  assert(watchers.count(watch));
+  watchers.erase(watch);
+  maybe_complete_notify();
+}
+
+void Notify::maybe_complete_notify()
+{
+  dout(10) << "maybe_complete_notify -- "
+          << watchers.size()
+          << " in progress watchers " << dendl;
+  if (watchers.empty() || timed_out) {
+    // prepare reply
+    bufferlist bl;
+    ::encode(notify_replies, bl);
+    list<pair<uint64_t,uint64_t> > missed;
+    for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
+      missed.push_back(make_pair((*p)->get_watcher_gid(),
+                                (*p)->get_cookie()));
+    }
+    ::encode(missed, bl);
+
+    bufferlist empty;
+    MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
+                                        CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
+    reply->notifier_gid = client_gid;
+    reply->set_data(bl);
+    if (timed_out)
+      reply->return_code = -ETIMEDOUT;
+    client->send_message(reply);
+    unregister_cb();
+
+    complete = true;
+  }
+}
+
+void Notify::discard()
+{
+  Mutex::Locker l(lock);
+  discarded = true;
+  unregister_cb();
+  watchers.clear();
+}
+
+void Notify::init()
+{
+  Mutex::Locker l(lock);
+  register_cb();
+  maybe_complete_notify();
+}
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, watch.get())
+
+static ostream& _prefix(
+  std::ostream* _dout,
+  Watch *watch) {
+  return *_dout << watch->gen_dbg_prefix();
+}
+
+class HandleWatchTimeout : public CancelableContext {
+  WatchRef watch;
+public:
+  bool canceled; // protected by watch->pg->lock
+  explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
+  void cancel() override {
+    canceled = true;
+  }
+  void finish(int) override { ceph_abort(); /* not used */ }
+  void complete(int) override {
+    OSDService *osd(watch->osd);
+    ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
+    boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
+    osd->watch_lock.Unlock();
+    pg->lock();
+    watch->cb = NULL;
+    if (!watch->is_discarded() && !canceled)
+      watch->pg->handle_watch_timeout(watch);
+    delete this; // ~Watch requires pg lock!
+    pg->unlock();
+    osd->watch_lock.Lock();
+  }
+};
+
+class HandleDelayedWatchTimeout : public CancelableContext {
+  WatchRef watch;
+public:
+  bool canceled;
+  explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
+  void cancel() override {
+    canceled = true;
+  }
+  void finish(int) override {
+    OSDService *osd(watch->osd);
+    dout(10) << "HandleWatchTimeoutDelayed" << dendl;
+    assert(watch->pg->is_locked());
+    watch->cb = NULL;
+    if (!watch->is_discarded() && !canceled)
+      watch->pg->handle_watch_timeout(watch);
+  }
+};
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+string Watch::gen_dbg_prefix() {
+  stringstream ss;
+  ss << pg->gen_prefix() << " -- Watch(" 
+     << make_pair(cookie, entity) << ") ";
+  return ss.str();
+}
+
+Watch::Watch(
+  PrimaryLogPG *pg,
+  OSDService *osd,
+  ObjectContextRef obc,
+  uint32_t timeout,
+  uint64_t cookie,
+  entity_name_t entity,
+  const entity_addr_t &addr)
+  : cb(NULL),
+    osd(osd),
+    pg(pg),
+    obc(obc),
+    timeout(timeout),
+    cookie(cookie),
+    addr(addr),
+    will_ping(false),
+    entity(entity),
+    discarded(false) {
+  dout(10) << "Watch()" << dendl;
+}
+
+Watch::~Watch() {
+  dout(10) << "~Watch" << dendl;
+  // users must have called remove() or discard() prior to this point
+  assert(!obc);
+  assert(!conn);
+}
+
+bool Watch::connected() { return !!conn; }
+
+Context *Watch::get_delayed_cb()
+{
+  assert(!cb);
+  cb = new HandleDelayedWatchTimeout(self.lock());
+  return cb;
+}
+
+void Watch::register_cb()
+{
+  Mutex::Locker l(osd->watch_lock);
+  if (cb) {
+    dout(15) << "re-registering callback, timeout: " << timeout << dendl;
+    cb->cancel();
+    osd->watch_timer.cancel_event(cb);
+  } else {
+    dout(15) << "registering callback, timeout: " << timeout << dendl;
+  }
+  cb = new HandleWatchTimeout(self.lock());
+  if (!osd->watch_timer.add_event_after(timeout, cb)) {
+    cb = nullptr;
+  }
+}
+
+void Watch::unregister_cb()
+{
+  dout(15) << "unregister_cb" << dendl;
+  if (!cb)
+    return;
+  dout(15) << "actually registered, cancelling" << dendl;
+  cb->cancel();
+  {
+    Mutex::Locker l(osd->watch_lock);
+    osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
+  }
+  cb = NULL;
+}
+
+void Watch::got_ping(utime_t t)
+{
+  last_ping = t;
+  if (conn) {
+    register_cb();
+  }
+}
+
+void Watch::connect(ConnectionRef con, bool _will_ping)
+{
+  if (conn == con) {
+    dout(10) << __func__ << " con " << con << " - already connected" << dendl;
+    return;
+  }
+  dout(10) << __func__ << " con " << con << dendl;
+  conn = con;
+  will_ping = _will_ping;
+  Session* sessionref(static_cast<Session*>(con->get_priv()));
+  if (sessionref) {
+    sessionref->wstate.addWatch(self.lock());
+    sessionref->put();
+    for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+        i != in_progress_notifies.end();
+        ++i) {
+      send_notify(i->second);
+    }
+  }
+  if (will_ping) {
+    last_ping = ceph_clock_now();
+    register_cb();
+  } else {
+    unregister_cb();
+  }
+}
+
+void Watch::disconnect()
+{
+  dout(10) << "disconnect (con was " << conn << ")" << dendl;
+  conn = ConnectionRef();
+  if (!will_ping)
+    register_cb();
+}
+
+void Watch::discard()
+{
+  dout(10) << "discard" << dendl;
+  for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+       i != in_progress_notifies.end();
+       ++i) {
+    i->second->discard();
+  }
+  discard_state();
+}
+
+void Watch::discard_state()
+{
+  assert(pg->is_locked());
+  assert(!discarded);
+  assert(obc);
+  in_progress_notifies.clear();
+  unregister_cb();
+  discarded = true;
+  if (conn) {
+    Session* sessionref(static_cast<Session*>(conn->get_priv()));
+    if (sessionref) {
+      sessionref->wstate.removeWatch(self.lock());
+      sessionref->put();
+    }
+    conn = ConnectionRef();
+  }
+  obc = ObjectContextRef();
+}
+
+bool Watch::is_discarded() const
+{
+  return discarded;
+}
+
+void Watch::remove(bool send_disconnect)
+{
+  dout(10) << "remove" << dendl;
+  if (send_disconnect && conn) {
+    bufferlist empty;
+    MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
+                                        CEPH_WATCH_EVENT_DISCONNECT, empty));
+    conn->send_message(reply);
+  }
+  for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+       i != in_progress_notifies.end();
+       ++i) {
+    i->second->complete_watcher_remove(self.lock());
+  }
+  discard_state();
+}
+
+void Watch::start_notify(NotifyRef notif)
+{
+  assert(in_progress_notifies.find(notif->notify_id) ==
+        in_progress_notifies.end());
+  if (will_ping) {
+    utime_t cutoff = ceph_clock_now();
+    cutoff.sec_ref() -= timeout;
+    if (last_ping < cutoff) {
+      dout(10) << __func__ << " " << notif->notify_id
+              << " last_ping " << last_ping << " < cutoff " << cutoff
+              << ", disconnecting" << dendl;
+      disconnect();
+      return;
+    }
+  }
+  dout(10) << "start_notify " << notif->notify_id << dendl;
+  in_progress_notifies[notif->notify_id] = notif;
+  notif->start_watcher(self.lock());
+  if (connected())
+    send_notify(notif);
+}
+
+void Watch::cancel_notify(NotifyRef notif)
+{
+  dout(10) << "cancel_notify " << notif->notify_id << dendl;
+  in_progress_notifies.erase(notif->notify_id);
+}
+
+void Watch::send_notify(NotifyRef notif)
+{
+  dout(10) << "send_notify" << dendl;
+  MWatchNotify *notify_msg = new MWatchNotify(
+    cookie, notif->version, notif->notify_id,
+    CEPH_WATCH_EVENT_NOTIFY, notif->payload);
+  notify_msg->notifier_gid = notif->client_gid;
+  conn->send_message(notify_msg);
+}
+
+void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
+{
+  dout(10) << "notify_ack" << dendl;
+  map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
+  if (i != in_progress_notifies.end()) {
+    i->second->complete_watcher(self.lock(), reply_bl);
+    in_progress_notifies.erase(i);
+  }
+}
+
+WatchRef Watch::makeWatchRef(
+  PrimaryLogPG *pg, OSDService *osd,
+  ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
+{
+  WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
+  ret->set_self(ret);
+  return ret;
+}
+
+void WatchConState::addWatch(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  watches.insert(watch);
+}
+
+void WatchConState::removeWatch(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  watches.erase(watch);
+}
+
+void WatchConState::reset(Connection *con)
+{
+  set<WatchRef> _watches;
+  {
+    Mutex::Locker l(lock);
+    _watches.swap(watches);
+  }
+  for (set<WatchRef>::iterator i = _watches.begin();
+       i != _watches.end();
+       ++i) {
+    boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
+    pg->lock();
+    if (!(*i)->is_discarded()) {
+      if ((*i)->is_connected(con)) {
+       (*i)->disconnect();
+      } else {
+       lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
+      }
+    }
+    pg->unlock();
+  }
+}