X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FSession.h;fp=src%2Fceph%2Fsrc%2Fmon%2FSession.h;h=6dddee0716b05a9fb367bd24fade7581405767bd;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mon/Session.h b/src/ceph/src/mon/Session.h new file mode 100644 index 0000000..6dddee0 --- /dev/null +++ b/src/ceph/src/mon/Session.h @@ -0,0 +1,234 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MON_SESSION_H +#define CEPH_MON_SESSION_H + +#include "include/xlist.h" +#include "msg/msg_types.h" +#include "mon/mon_types.h" + +#include "auth/AuthServiceHandler.h" +#include "osd/OSDMap.h" + +#include "MonCap.h" + +struct MonSession; + +struct Subscription { + MonSession *session; + string type; + xlist::item type_item; + version_t next; + bool onetime; + bool incremental_onetime; // has CEPH_FEATURE_INCSUBOSDMAP + + Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this), + next(0), onetime(false), incremental_onetime(false) {} +}; + +struct MonSession : public RefCountedObject { + ConnectionRef con; + int con_type = 0; + uint64_t con_features = 0; // zero if AnonConnection + entity_inst_t inst; + utime_t session_timeout; + utime_t time_established; + bool closed; + xlist::item item; + set routed_request_tids; + MonCap caps; + uint64_t auid; + uint64_t global_id; + + map sub_map; + epoch_t osd_epoch; // the osdmap epoch sent to the mon client + + AuthServiceHandler *auth_handler; + EntityName entity_name; + + ConnectionRef proxy_con; + uint64_t proxy_tid; + + MonSession(const entity_inst_t& i, Connection *c) : + RefCountedObject(g_ceph_context), + con(c), + con_type(c->get_peer_type()), + con_features(0), + inst(i), closed(false), item(this), + auid(0), + global_id(0), + osd_epoch(0), + auth_handler(NULL), + proxy_con(NULL), proxy_tid(0) { + time_established = ceph_clock_now(); + if (c->get_messenger()) { + // only fill in features if this is a non-anonymous connection + con_features = c->get_features(); + } + } + ~MonSession() override { + //generic_dout(0) << "~MonSession " << this << dendl; + // we should have been removed before we get destructed; see MonSessionMap::remove_session() + assert(!item.is_on_list()); + assert(sub_map.empty()); + delete auth_handler; + } + + bool is_capable(string service, int mask) { + map args; + return caps.is_capable( + g_ceph_context, + CEPH_ENTITY_TYPE_MON, + entity_name, + service, "", args, + mask & MON_CAP_R, mask & MON_CAP_W, mask & MON_CAP_X); + } +}; + + +struct MonSessionMap { + xlist sessions; + map* > subs; + multimap by_osd; + FeatureMap feature_map; // type -> features -> count + + MonSessionMap() {} + ~MonSessionMap() { + while (!subs.empty()) { + assert(subs.begin()->second->empty()); + delete subs.begin()->second; + subs.erase(subs.begin()); + } + } + + unsigned get_size() const { + return sessions.size(); + } + + void remove_session(MonSession *s) { + assert(!s->closed); + for (map::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p) { + p->second->type_item.remove_myself(); + delete p->second; + } + s->sub_map.clear(); + s->item.remove_myself(); + if (s->inst.name.is_osd()) { + for (multimap::iterator p = by_osd.find(s->inst.name.num()); + p->first == s->inst.name.num(); + ++p) + if (p->second == s) { + by_osd.erase(p); + break; + } + } + if (s->con_features) { + feature_map.rm(s->con_type, s->con_features); + } + s->closed = true; + s->put(); + } + + MonSession *new_session(const entity_inst_t& i, Connection *c) { + MonSession *s = new MonSession(i, c); + assert(s); + sessions.push_back(&s->item); + if (i.name.is_osd()) + by_osd.insert(pair(i.name.num(), s)); + if (s->con_features) { + feature_map.add(s->con_type, s->con_features); + } + s->get(); // caller gets a ref + return s; + } + + MonSession *get_random_osd_session(OSDMap *osdmap) { + // ok, this isn't actually random, but close enough. + if (by_osd.empty()) + return 0; + int n = by_osd.rbegin()->first + 1; + int r = rand() % n; + + multimap::iterator p = by_osd.lower_bound(r); + if (p == by_osd.end()) + --p; + + if (!osdmap) { + return p->second; + } + + MonSession *s = NULL; + + multimap::iterator b = p, f = p; + bool backward = true, forward = true; + while (backward || forward) { + if (backward) { + if (osdmap->is_up(b->first) && + osdmap->get_addr(b->first) == b->second->con->get_peer_addr()) { + s = b->second; + break; + } + if (b != by_osd.begin()) + --b; + else + backward = false; + } + + forward = (f != by_osd.end()); + if (forward) { + if (osdmap->is_up(f->first)) { + s = f->second; + break; + } + ++f; + } + } + + return s; + } + + void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime, bool incremental_onetime) { + Subscription *sub = 0; + if (s->sub_map.count(what)) { + sub = s->sub_map[what]; + } else { + sub = new Subscription(s, what); + s->sub_map[what] = sub; + + if (!subs.count(what)) + subs[what] = new xlist; + subs[what]->push_back(&sub->type_item); + } + sub->next = start; + sub->onetime = onetime; + sub->incremental_onetime = onetime && incremental_onetime; + } + + void remove_sub(Subscription *sub) { + sub->session->sub_map.erase(sub->type); + sub->type_item.remove_myself(); + delete sub; + } +}; + +inline ostream& operator<<(ostream& out, const MonSession& s) +{ + out << "MonSession(" << s.inst << " is " + << (s.closed ? "closed" : "open"); + out << " " << s.caps << ")"; + return out; +} + +#endif