// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_MON_SESSION_H #define CEPH_MON_SESSION_H #include "include/xlist.h" #include "msg/msg_types.h" #include "mon/mon_types.h" #include "auth/AuthServiceHandler.h" #include "osd/OSDMap.h" #include "MonCap.h" struct MonSession; struct Subscription { MonSession *session; string type; xlist::item type_item; version_t next; bool onetime; bool incremental_onetime; // has CEPH_FEATURE_INCSUBOSDMAP Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this), next(0), onetime(false), incremental_onetime(false) {} }; struct MonSession : public RefCountedObject { ConnectionRef con; int con_type = 0; uint64_t con_features = 0; // zero if AnonConnection entity_inst_t inst; utime_t session_timeout; utime_t time_established; bool closed; xlist::item item; set routed_request_tids; MonCap caps; uint64_t auid; uint64_t global_id; map sub_map; epoch_t osd_epoch; // the osdmap epoch sent to the mon client AuthServiceHandler *auth_handler; EntityName entity_name; ConnectionRef proxy_con; uint64_t proxy_tid; MonSession(const entity_inst_t& i, Connection *c) : RefCountedObject(g_ceph_context), con(c), con_type(c->get_peer_type()), con_features(0), inst(i), closed(false), item(this), auid(0), global_id(0), osd_epoch(0), auth_handler(NULL), proxy_con(NULL), proxy_tid(0) { time_established = ceph_clock_now(); if (c->get_messenger()) { // only fill in features if this is a non-anonymous connection con_features = c->get_features(); } } ~MonSession() override { //generic_dout(0) << "~MonSession " << this << dendl; // we should have been removed before we get destructed; see MonSessionMap::remove_session() assert(!item.is_on_list()); assert(sub_map.empty()); delete auth_handler; } bool is_capable(string service, int mask) { map args; return caps.is_capable( g_ceph_context, CEPH_ENTITY_TYPE_MON, entity_name, service, "", args, mask & MON_CAP_R, mask & MON_CAP_W, mask & MON_CAP_X); } }; struct MonSessionMap { xlist sessions; map* > subs; multimap by_osd; FeatureMap feature_map; // type -> features -> count MonSessionMap() {} ~MonSessionMap() { while (!subs.empty()) { assert(subs.begin()->second->empty()); delete subs.begin()->second; subs.erase(subs.begin()); } } unsigned get_size() const { return sessions.size(); } void remove_session(MonSession *s) { assert(!s->closed); for (map::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p) { p->second->type_item.remove_myself(); delete p->second; } s->sub_map.clear(); s->item.remove_myself(); if (s->inst.name.is_osd()) { for (multimap::iterator p = by_osd.find(s->inst.name.num()); p->first == s->inst.name.num(); ++p) if (p->second == s) { by_osd.erase(p); break; } } if (s->con_features) { feature_map.rm(s->con_type, s->con_features); } s->closed = true; s->put(); } MonSession *new_session(const entity_inst_t& i, Connection *c) { MonSession *s = new MonSession(i, c); assert(s); sessions.push_back(&s->item); if (i.name.is_osd()) by_osd.insert(pair(i.name.num(), s)); if (s->con_features) { feature_map.add(s->con_type, s->con_features); } s->get(); // caller gets a ref return s; } MonSession *get_random_osd_session(OSDMap *osdmap) { // ok, this isn't actually random, but close enough. if (by_osd.empty()) return 0; int n = by_osd.rbegin()->first + 1; int r = rand() % n; multimap::iterator p = by_osd.lower_bound(r); if (p == by_osd.end()) --p; if (!osdmap) { return p->second; } MonSession *s = NULL; multimap::iterator b = p, f = p; bool backward = true, forward = true; while (backward || forward) { if (backward) { if (osdmap->is_up(b->first) && osdmap->get_addr(b->first) == b->second->con->get_peer_addr()) { s = b->second; break; } if (b != by_osd.begin()) --b; else backward = false; } forward = (f != by_osd.end()); if (forward) { if (osdmap->is_up(f->first)) { s = f->second; break; } ++f; } } return s; } void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime, bool incremental_onetime) { Subscription *sub = 0; if (s->sub_map.count(what)) { sub = s->sub_map[what]; } else { sub = new Subscription(s, what); s->sub_map[what] = sub; if (!subs.count(what)) subs[what] = new xlist; subs[what]->push_back(&sub->type_item); } sub->next = start; sub->onetime = onetime; sub->incremental_onetime = onetime && incremental_onetime; } void remove_sub(Subscription *sub) { sub->session->sub_map.erase(sub->type); sub->type_item.remove_myself(); delete sub; } }; inline ostream& operator<<(ostream& out, const MonSession& s) { out << "MonSession(" << s.inst << " is " << (s.closed ? "closed" : "open"); out << " " << s.caps << ")"; return out; } #endif