remove ceph code
[stor4nfv.git] / src / ceph / src / osd / Watch.cc
diff --git a/src/ceph/src/osd/Watch.cc b/src/ceph/src/osd/Watch.cc
deleted file mode 100644 (file)
index 7ff9f99..0000000
+++ /dev/null
@@ -1,540 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-#include "PG.h"
-
-#include "include/types.h"
-#include "messages/MWatchNotify.h"
-
-#include <map>
-
-#include "OSD.h"
-#include "PrimaryLogPG.h"
-#include "Watch.h"
-#include "Session.h"
-
-#include "common/config.h"
-
-struct CancelableContext : public Context {
-  virtual void cancel() = 0;
-};
-
-#define dout_context osd->cct
-#define dout_subsys ceph_subsys_osd
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-
-static ostream& _prefix(
-  std::ostream* _dout,
-  Notify *notify) {
-  return *_dout << notify->gen_dbg_prefix();
-}
-
-Notify::Notify(
-  ConnectionRef client,
-  uint64_t client_gid,
-  bufferlist &payload,
-  uint32_t timeout,
-  uint64_t cookie,
-  uint64_t notify_id,
-  uint64_t version,
-  OSDService *osd)
-  : client(client), client_gid(client_gid),
-    complete(false),
-    discarded(false),
-    timed_out(false),
-    payload(payload),
-    timeout(timeout),
-    cookie(cookie),
-    notify_id(notify_id),
-    version(version),
-    osd(osd),
-    cb(NULL),
-    lock("Notify::lock") {}
-
-NotifyRef Notify::makeNotifyRef(
-  ConnectionRef client,
-  uint64_t client_gid,
-  bufferlist &payload,
-  uint32_t timeout,
-  uint64_t cookie,
-  uint64_t notify_id,
-  uint64_t version,
-  OSDService *osd) {
-  NotifyRef ret(
-    new Notify(
-      client, client_gid,
-      payload, timeout,
-      cookie, notify_id,
-      version, osd));
-  ret->set_self(ret);
-  return ret;
-}
-
-class NotifyTimeoutCB : public CancelableContext {
-  NotifyRef notif;
-  bool canceled; // protected by notif lock
-public:
-  explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
-  void finish(int) override {
-    notif->osd->watch_lock.Unlock();
-    notif->lock.Lock();
-    if (!canceled)
-      notif->do_timeout(); // drops lock
-    else
-      notif->lock.Unlock();
-    notif->osd->watch_lock.Lock();
-  }
-  void cancel() override {
-    assert(notif->lock.is_locked_by_me());
-    canceled = true;
-  }
-};
-
-void Notify::do_timeout()
-{
-  assert(lock.is_locked_by_me());
-  dout(10) << "timeout" << dendl;
-  cb = NULL;
-  if (is_discarded()) {
-    lock.Unlock();
-    return;
-  }
-
-  timed_out = true;         // we will send the client an error code
-  maybe_complete_notify();
-  assert(complete);
-  set<WatchRef> _watchers;
-  _watchers.swap(watchers);
-  lock.Unlock();
-
-  for (set<WatchRef>::iterator i = _watchers.begin();
-       i != _watchers.end();
-       ++i) {
-    boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
-    pg->lock();
-    if (!(*i)->is_discarded()) {
-      (*i)->cancel_notify(self.lock());
-    }
-    pg->unlock();
-  }
-}
-
-void Notify::register_cb()
-{
-  assert(lock.is_locked_by_me());
-  {
-    osd->watch_lock.Lock();
-    cb = new NotifyTimeoutCB(self.lock());
-    if (!osd->watch_timer.add_event_after(timeout, cb)) {
-      cb = nullptr;
-    }
-    osd->watch_lock.Unlock();
-  }
-}
-
-void Notify::unregister_cb()
-{
-  assert(lock.is_locked_by_me());
-  if (!cb)
-    return;
-  cb->cancel();
-  {
-    osd->watch_lock.Lock();
-    osd->watch_timer.cancel_event(cb);
-    cb = NULL;
-    osd->watch_lock.Unlock();
-  }
-}
-
-void Notify::start_watcher(WatchRef watch)
-{
-  Mutex::Locker l(lock);
-  dout(10) << "start_watcher" << dendl;
-  watchers.insert(watch);
-}
-
-void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
-{
-  Mutex::Locker l(lock);
-  dout(10) << "complete_watcher" << dendl;
-  if (is_discarded())
-    return;
-  assert(watchers.count(watch));
-  watchers.erase(watch);
-  notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(),
-                                           watch->get_cookie()),
-                                 reply_bl));
-  maybe_complete_notify();
-}
-
-void Notify::complete_watcher_remove(WatchRef watch)
-{
-  Mutex::Locker l(lock);
-  dout(10) << __func__ << dendl;
-  if (is_discarded())
-    return;
-  assert(watchers.count(watch));
-  watchers.erase(watch);
-  maybe_complete_notify();
-}
-
-void Notify::maybe_complete_notify()
-{
-  dout(10) << "maybe_complete_notify -- "
-          << watchers.size()
-          << " in progress watchers " << dendl;
-  if (watchers.empty() || timed_out) {
-    // prepare reply
-    bufferlist bl;
-    ::encode(notify_replies, bl);
-    list<pair<uint64_t,uint64_t> > missed;
-    for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
-      missed.push_back(make_pair((*p)->get_watcher_gid(),
-                                (*p)->get_cookie()));
-    }
-    ::encode(missed, bl);
-
-    bufferlist empty;
-    MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
-                                        CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
-    reply->notifier_gid = client_gid;
-    reply->set_data(bl);
-    if (timed_out)
-      reply->return_code = -ETIMEDOUT;
-    client->send_message(reply);
-    unregister_cb();
-
-    complete = true;
-  }
-}
-
-void Notify::discard()
-{
-  Mutex::Locker l(lock);
-  discarded = true;
-  unregister_cb();
-  watchers.clear();
-}
-
-void Notify::init()
-{
-  Mutex::Locker l(lock);
-  register_cb();
-  maybe_complete_notify();
-}
-
-#define dout_subsys ceph_subsys_osd
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, watch.get())
-
-static ostream& _prefix(
-  std::ostream* _dout,
-  Watch *watch) {
-  return *_dout << watch->gen_dbg_prefix();
-}
-
-class HandleWatchTimeout : public CancelableContext {
-  WatchRef watch;
-public:
-  bool canceled; // protected by watch->pg->lock
-  explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
-  void cancel() override {
-    canceled = true;
-  }
-  void finish(int) override { ceph_abort(); /* not used */ }
-  void complete(int) override {
-    OSDService *osd(watch->osd);
-    ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
-    boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
-    osd->watch_lock.Unlock();
-    pg->lock();
-    watch->cb = NULL;
-    if (!watch->is_discarded() && !canceled)
-      watch->pg->handle_watch_timeout(watch);
-    delete this; // ~Watch requires pg lock!
-    pg->unlock();
-    osd->watch_lock.Lock();
-  }
-};
-
-class HandleDelayedWatchTimeout : public CancelableContext {
-  WatchRef watch;
-public:
-  bool canceled;
-  explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
-  void cancel() override {
-    canceled = true;
-  }
-  void finish(int) override {
-    OSDService *osd(watch->osd);
-    dout(10) << "HandleWatchTimeoutDelayed" << dendl;
-    assert(watch->pg->is_locked());
-    watch->cb = NULL;
-    if (!watch->is_discarded() && !canceled)
-      watch->pg->handle_watch_timeout(watch);
-  }
-};
-
-#define dout_subsys ceph_subsys_osd
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-
-string Watch::gen_dbg_prefix() {
-  stringstream ss;
-  ss << pg->gen_prefix() << " -- Watch(" 
-     << make_pair(cookie, entity) << ") ";
-  return ss.str();
-}
-
-Watch::Watch(
-  PrimaryLogPG *pg,
-  OSDService *osd,
-  ObjectContextRef obc,
-  uint32_t timeout,
-  uint64_t cookie,
-  entity_name_t entity,
-  const entity_addr_t &addr)
-  : cb(NULL),
-    osd(osd),
-    pg(pg),
-    obc(obc),
-    timeout(timeout),
-    cookie(cookie),
-    addr(addr),
-    will_ping(false),
-    entity(entity),
-    discarded(false) {
-  dout(10) << "Watch()" << dendl;
-}
-
-Watch::~Watch() {
-  dout(10) << "~Watch" << dendl;
-  // users must have called remove() or discard() prior to this point
-  assert(!obc);
-  assert(!conn);
-}
-
-bool Watch::connected() { return !!conn; }
-
-Context *Watch::get_delayed_cb()
-{
-  assert(!cb);
-  cb = new HandleDelayedWatchTimeout(self.lock());
-  return cb;
-}
-
-void Watch::register_cb()
-{
-  Mutex::Locker l(osd->watch_lock);
-  if (cb) {
-    dout(15) << "re-registering callback, timeout: " << timeout << dendl;
-    cb->cancel();
-    osd->watch_timer.cancel_event(cb);
-  } else {
-    dout(15) << "registering callback, timeout: " << timeout << dendl;
-  }
-  cb = new HandleWatchTimeout(self.lock());
-  if (!osd->watch_timer.add_event_after(timeout, cb)) {
-    cb = nullptr;
-  }
-}
-
-void Watch::unregister_cb()
-{
-  dout(15) << "unregister_cb" << dendl;
-  if (!cb)
-    return;
-  dout(15) << "actually registered, cancelling" << dendl;
-  cb->cancel();
-  {
-    Mutex::Locker l(osd->watch_lock);
-    osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
-  }
-  cb = NULL;
-}
-
-void Watch::got_ping(utime_t t)
-{
-  last_ping = t;
-  if (conn) {
-    register_cb();
-  }
-}
-
-void Watch::connect(ConnectionRef con, bool _will_ping)
-{
-  if (conn == con) {
-    dout(10) << __func__ << " con " << con << " - already connected" << dendl;
-    return;
-  }
-  dout(10) << __func__ << " con " << con << dendl;
-  conn = con;
-  will_ping = _will_ping;
-  Session* sessionref(static_cast<Session*>(con->get_priv()));
-  if (sessionref) {
-    sessionref->wstate.addWatch(self.lock());
-    sessionref->put();
-    for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
-        i != in_progress_notifies.end();
-        ++i) {
-      send_notify(i->second);
-    }
-  }
-  if (will_ping) {
-    last_ping = ceph_clock_now();
-    register_cb();
-  } else {
-    unregister_cb();
-  }
-}
-
-void Watch::disconnect()
-{
-  dout(10) << "disconnect (con was " << conn << ")" << dendl;
-  conn = ConnectionRef();
-  if (!will_ping)
-    register_cb();
-}
-
-void Watch::discard()
-{
-  dout(10) << "discard" << dendl;
-  for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
-       i != in_progress_notifies.end();
-       ++i) {
-    i->second->discard();
-  }
-  discard_state();
-}
-
-void Watch::discard_state()
-{
-  assert(pg->is_locked());
-  assert(!discarded);
-  assert(obc);
-  in_progress_notifies.clear();
-  unregister_cb();
-  discarded = true;
-  if (conn) {
-    Session* sessionref(static_cast<Session*>(conn->get_priv()));
-    if (sessionref) {
-      sessionref->wstate.removeWatch(self.lock());
-      sessionref->put();
-    }
-    conn = ConnectionRef();
-  }
-  obc = ObjectContextRef();
-}
-
-bool Watch::is_discarded() const
-{
-  return discarded;
-}
-
-void Watch::remove(bool send_disconnect)
-{
-  dout(10) << "remove" << dendl;
-  if (send_disconnect && conn) {
-    bufferlist empty;
-    MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
-                                        CEPH_WATCH_EVENT_DISCONNECT, empty));
-    conn->send_message(reply);
-  }
-  for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
-       i != in_progress_notifies.end();
-       ++i) {
-    i->second->complete_watcher_remove(self.lock());
-  }
-  discard_state();
-}
-
-void Watch::start_notify(NotifyRef notif)
-{
-  assert(in_progress_notifies.find(notif->notify_id) ==
-        in_progress_notifies.end());
-  if (will_ping) {
-    utime_t cutoff = ceph_clock_now();
-    cutoff.sec_ref() -= timeout;
-    if (last_ping < cutoff) {
-      dout(10) << __func__ << " " << notif->notify_id
-              << " last_ping " << last_ping << " < cutoff " << cutoff
-              << ", disconnecting" << dendl;
-      disconnect();
-      return;
-    }
-  }
-  dout(10) << "start_notify " << notif->notify_id << dendl;
-  in_progress_notifies[notif->notify_id] = notif;
-  notif->start_watcher(self.lock());
-  if (connected())
-    send_notify(notif);
-}
-
-void Watch::cancel_notify(NotifyRef notif)
-{
-  dout(10) << "cancel_notify " << notif->notify_id << dendl;
-  in_progress_notifies.erase(notif->notify_id);
-}
-
-void Watch::send_notify(NotifyRef notif)
-{
-  dout(10) << "send_notify" << dendl;
-  MWatchNotify *notify_msg = new MWatchNotify(
-    cookie, notif->version, notif->notify_id,
-    CEPH_WATCH_EVENT_NOTIFY, notif->payload);
-  notify_msg->notifier_gid = notif->client_gid;
-  conn->send_message(notify_msg);
-}
-
-void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
-{
-  dout(10) << "notify_ack" << dendl;
-  map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
-  if (i != in_progress_notifies.end()) {
-    i->second->complete_watcher(self.lock(), reply_bl);
-    in_progress_notifies.erase(i);
-  }
-}
-
-WatchRef Watch::makeWatchRef(
-  PrimaryLogPG *pg, OSDService *osd,
-  ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr)
-{
-  WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr));
-  ret->set_self(ret);
-  return ret;
-}
-
-void WatchConState::addWatch(WatchRef watch)
-{
-  Mutex::Locker l(lock);
-  watches.insert(watch);
-}
-
-void WatchConState::removeWatch(WatchRef watch)
-{
-  Mutex::Locker l(lock);
-  watches.erase(watch);
-}
-
-void WatchConState::reset(Connection *con)
-{
-  set<WatchRef> _watches;
-  {
-    Mutex::Locker l(lock);
-    _watches.swap(watches);
-  }
-  for (set<WatchRef>::iterator i = _watches.begin();
-       i != _watches.end();
-       ++i) {
-    boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg());
-    pg->lock();
-    if (!(*i)->is_discarded()) {
-      if ((*i)->is_connected(con)) {
-       (*i)->disconnect();
-      } else {
-       lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl;
-      }
-    }
-    pg->unlock();
-  }
-}