initial code repo
[stor4nfv.git] / src / ceph / src / mon / Session.h
diff --git a/src/ceph/src/mon/Session.h b/src/ceph/src/mon/Session.h
new file mode 100644 (file)
index 0000000..6dddee0
--- /dev/null
@@ -0,0 +1,234 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_MON_SESSION_H
+#define CEPH_MON_SESSION_H
+
+#include "include/xlist.h"
+#include "msg/msg_types.h"
+#include "mon/mon_types.h"
+
+#include "auth/AuthServiceHandler.h"
+#include "osd/OSDMap.h"
+
+#include "MonCap.h"
+
+struct MonSession;
+
+struct Subscription {
+  MonSession *session;
+  string type;
+  xlist<Subscription*>::item type_item;
+  version_t next;
+  bool onetime;
+  bool incremental_onetime;  // has CEPH_FEATURE_INCSUBOSDMAP
+  
+  Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this),
+                                                next(0), onetime(false), incremental_onetime(false) {}
+};
+
+struct MonSession : public RefCountedObject {
+  ConnectionRef con;
+  int con_type = 0;
+  uint64_t con_features = 0;  // zero if AnonConnection
+  entity_inst_t inst;
+  utime_t session_timeout;
+  utime_t time_established;
+  bool closed;
+  xlist<MonSession*>::item item;
+  set<uint64_t> routed_request_tids;
+  MonCap caps;
+  uint64_t auid;
+  uint64_t global_id;
+
+  map<string, Subscription*> sub_map;
+  epoch_t osd_epoch;           // the osdmap epoch sent to the mon client
+
+  AuthServiceHandler *auth_handler;
+  EntityName entity_name;
+
+  ConnectionRef proxy_con;
+  uint64_t proxy_tid;
+
+  MonSession(const entity_inst_t& i, Connection *c) :
+    RefCountedObject(g_ceph_context),
+    con(c),
+    con_type(c->get_peer_type()),
+    con_features(0),
+    inst(i), closed(false), item(this),
+    auid(0),
+    global_id(0),
+    osd_epoch(0),
+    auth_handler(NULL),
+    proxy_con(NULL), proxy_tid(0) {
+    time_established = ceph_clock_now();
+    if (c->get_messenger()) {
+      // only fill in features if this is a non-anonymous connection
+      con_features = c->get_features();
+    }
+  }
+  ~MonSession() override {
+    //generic_dout(0) << "~MonSession " << this << dendl;
+    // we should have been removed before we get destructed; see MonSessionMap::remove_session()
+    assert(!item.is_on_list());
+    assert(sub_map.empty());
+    delete auth_handler;
+  }
+
+  bool is_capable(string service, int mask) {
+    map<string,string> args;
+    return caps.is_capable(
+      g_ceph_context,
+      CEPH_ENTITY_TYPE_MON,
+      entity_name,
+      service, "", args,
+      mask & MON_CAP_R, mask & MON_CAP_W, mask & MON_CAP_X);
+  }
+};
+
+
+struct MonSessionMap {
+  xlist<MonSession*> sessions;
+  map<string, xlist<Subscription*>* > subs;
+  multimap<int, MonSession*> by_osd;
+  FeatureMap feature_map; // type -> features -> count
+
+  MonSessionMap() {}
+  ~MonSessionMap() {
+    while (!subs.empty()) {
+      assert(subs.begin()->second->empty());
+      delete subs.begin()->second;
+      subs.erase(subs.begin());
+    }
+  }
+
+  unsigned get_size() const {
+    return sessions.size();
+  }
+
+  void remove_session(MonSession *s) {
+    assert(!s->closed);
+    for (map<string,Subscription*>::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p) {
+      p->second->type_item.remove_myself();
+      delete p->second;
+    }
+    s->sub_map.clear();
+    s->item.remove_myself();
+    if (s->inst.name.is_osd()) {
+      for (multimap<int,MonSession*>::iterator p = by_osd.find(s->inst.name.num());
+          p->first == s->inst.name.num();
+          ++p)
+       if (p->second == s) {
+         by_osd.erase(p);
+         break;
+       }
+    }
+    if (s->con_features) {
+      feature_map.rm(s->con_type, s->con_features);
+    }
+    s->closed = true;
+    s->put();
+  }
+
+  MonSession *new_session(const entity_inst_t& i, Connection *c) {
+    MonSession *s = new MonSession(i, c);
+    assert(s);
+    sessions.push_back(&s->item);
+    if (i.name.is_osd())
+      by_osd.insert(pair<int,MonSession*>(i.name.num(), s));
+    if (s->con_features) {
+      feature_map.add(s->con_type, s->con_features);
+    }
+    s->get();  // caller gets a ref
+    return s;
+  }
+
+  MonSession *get_random_osd_session(OSDMap *osdmap) {
+    // ok, this isn't actually random, but close enough.
+    if (by_osd.empty())
+      return 0;
+    int n = by_osd.rbegin()->first + 1;
+    int r = rand() % n;
+
+    multimap<int,MonSession*>::iterator p = by_osd.lower_bound(r);
+    if (p == by_osd.end())
+      --p;
+
+    if (!osdmap) {
+      return p->second;
+    }
+
+    MonSession *s = NULL;
+
+    multimap<int,MonSession*>::iterator b = p, f = p;
+    bool backward = true, forward = true;
+    while (backward || forward) {
+      if (backward) {
+        if (osdmap->is_up(b->first) &&
+           osdmap->get_addr(b->first) == b->second->con->get_peer_addr()) {
+          s = b->second;
+          break;
+        }
+        if (b != by_osd.begin())
+          --b;
+        else
+          backward = false;
+      }
+
+      forward = (f != by_osd.end());
+      if (forward) {
+        if (osdmap->is_up(f->first)) {
+          s = f->second;
+          break;
+        }
+        ++f;
+      }
+    }
+
+    return s;
+  }
+
+  void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime, bool incremental_onetime) {
+    Subscription *sub = 0;
+    if (s->sub_map.count(what)) {
+      sub = s->sub_map[what];
+    } else {
+      sub = new Subscription(s, what);
+      s->sub_map[what] = sub;
+
+      if (!subs.count(what))
+       subs[what] = new xlist<Subscription*>;
+      subs[what]->push_back(&sub->type_item);
+    }
+    sub->next = start;
+    sub->onetime = onetime;
+    sub->incremental_onetime = onetime && incremental_onetime;
+  }
+
+  void remove_sub(Subscription *sub) {
+    sub->session->sub_map.erase(sub->type);
+    sub->type_item.remove_myself();
+    delete sub;
+  }
+};
+
+inline ostream& operator<<(ostream& out, const MonSession& s)
+{
+  out << "MonSession(" << s.inst << " is "
+      << (s.closed ? "closed" : "open");
+  out << " " << s.caps << ")";
+  return out;
+}
+
+#endif