Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / Session.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_MON_SESSION_H
16 #define CEPH_MON_SESSION_H
17
18 #include "include/xlist.h"
19 #include "msg/msg_types.h"
20 #include "mon/mon_types.h"
21
22 #include "auth/AuthServiceHandler.h"
23 #include "osd/OSDMap.h"
24
25 #include "MonCap.h"
26
27 struct MonSession;
28
29 struct Subscription {
30   MonSession *session;
31   string type;
32   xlist<Subscription*>::item type_item;
33   version_t next;
34   bool onetime;
35   bool incremental_onetime;  // has CEPH_FEATURE_INCSUBOSDMAP
36   
37   Subscription(MonSession *s, const string& t) : session(s), type(t), type_item(this),
38                                                  next(0), onetime(false), incremental_onetime(false) {}
39 };
40
41 struct MonSession : public RefCountedObject {
42   ConnectionRef con;
43   int con_type = 0;
44   uint64_t con_features = 0;  // zero if AnonConnection
45   entity_inst_t inst;
46   utime_t session_timeout;
47   utime_t time_established;
48   bool closed;
49   xlist<MonSession*>::item item;
50   set<uint64_t> routed_request_tids;
51   MonCap caps;
52   uint64_t auid;
53   uint64_t global_id;
54
55   map<string, Subscription*> sub_map;
56   epoch_t osd_epoch;            // the osdmap epoch sent to the mon client
57
58   AuthServiceHandler *auth_handler;
59   EntityName entity_name;
60
61   ConnectionRef proxy_con;
62   uint64_t proxy_tid;
63
64   MonSession(const entity_inst_t& i, Connection *c) :
65     RefCountedObject(g_ceph_context),
66     con(c),
67     con_type(c->get_peer_type()),
68     con_features(0),
69     inst(i), closed(false), item(this),
70     auid(0),
71     global_id(0),
72     osd_epoch(0),
73     auth_handler(NULL),
74     proxy_con(NULL), proxy_tid(0) {
75     time_established = ceph_clock_now();
76     if (c->get_messenger()) {
77       // only fill in features if this is a non-anonymous connection
78       con_features = c->get_features();
79     }
80   }
81   ~MonSession() override {
82     //generic_dout(0) << "~MonSession " << this << dendl;
83     // we should have been removed before we get destructed; see MonSessionMap::remove_session()
84     assert(!item.is_on_list());
85     assert(sub_map.empty());
86     delete auth_handler;
87   }
88
89   bool is_capable(string service, int mask) {
90     map<string,string> args;
91     return caps.is_capable(
92       g_ceph_context,
93       CEPH_ENTITY_TYPE_MON,
94       entity_name,
95       service, "", args,
96       mask & MON_CAP_R, mask & MON_CAP_W, mask & MON_CAP_X);
97   }
98 };
99
100
101 struct MonSessionMap {
102   xlist<MonSession*> sessions;
103   map<string, xlist<Subscription*>* > subs;
104   multimap<int, MonSession*> by_osd;
105   FeatureMap feature_map; // type -> features -> count
106
107   MonSessionMap() {}
108   ~MonSessionMap() {
109     while (!subs.empty()) {
110       assert(subs.begin()->second->empty());
111       delete subs.begin()->second;
112       subs.erase(subs.begin());
113     }
114   }
115
116   unsigned get_size() const {
117     return sessions.size();
118   }
119
120   void remove_session(MonSession *s) {
121     assert(!s->closed);
122     for (map<string,Subscription*>::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p) {
123       p->second->type_item.remove_myself();
124       delete p->second;
125     }
126     s->sub_map.clear();
127     s->item.remove_myself();
128     if (s->inst.name.is_osd()) {
129       for (multimap<int,MonSession*>::iterator p = by_osd.find(s->inst.name.num());
130            p->first == s->inst.name.num();
131            ++p)
132         if (p->second == s) {
133           by_osd.erase(p);
134           break;
135         }
136     }
137     if (s->con_features) {
138       feature_map.rm(s->con_type, s->con_features);
139     }
140     s->closed = true;
141     s->put();
142   }
143
144   MonSession *new_session(const entity_inst_t& i, Connection *c) {
145     MonSession *s = new MonSession(i, c);
146     assert(s);
147     sessions.push_back(&s->item);
148     if (i.name.is_osd())
149       by_osd.insert(pair<int,MonSession*>(i.name.num(), s));
150     if (s->con_features) {
151       feature_map.add(s->con_type, s->con_features);
152     }
153     s->get();  // caller gets a ref
154     return s;
155   }
156
157   MonSession *get_random_osd_session(OSDMap *osdmap) {
158     // ok, this isn't actually random, but close enough.
159     if (by_osd.empty())
160       return 0;
161     int n = by_osd.rbegin()->first + 1;
162     int r = rand() % n;
163
164     multimap<int,MonSession*>::iterator p = by_osd.lower_bound(r);
165     if (p == by_osd.end())
166       --p;
167
168     if (!osdmap) {
169       return p->second;
170     }
171
172     MonSession *s = NULL;
173
174     multimap<int,MonSession*>::iterator b = p, f = p;
175     bool backward = true, forward = true;
176     while (backward || forward) {
177       if (backward) {
178         if (osdmap->is_up(b->first) &&
179             osdmap->get_addr(b->first) == b->second->con->get_peer_addr()) {
180           s = b->second;
181           break;
182         }
183         if (b != by_osd.begin())
184           --b;
185         else
186           backward = false;
187       }
188
189       forward = (f != by_osd.end());
190       if (forward) {
191         if (osdmap->is_up(f->first)) {
192           s = f->second;
193           break;
194         }
195         ++f;
196       }
197     }
198
199     return s;
200   }
201
202   void add_update_sub(MonSession *s, const string& what, version_t start, bool onetime, bool incremental_onetime) {
203     Subscription *sub = 0;
204     if (s->sub_map.count(what)) {
205       sub = s->sub_map[what];
206     } else {
207       sub = new Subscription(s, what);
208       s->sub_map[what] = sub;
209
210       if (!subs.count(what))
211         subs[what] = new xlist<Subscription*>;
212       subs[what]->push_back(&sub->type_item);
213     }
214     sub->next = start;
215     sub->onetime = onetime;
216     sub->incremental_onetime = onetime && incremental_onetime;
217   }
218
219   void remove_sub(Subscription *sub) {
220     sub->session->sub_map.erase(sub->type);
221     sub->type_item.remove_myself();
222     delete sub;
223   }
224 };
225
226 inline ostream& operator<<(ostream& out, const MonSession& s)
227 {
228   out << "MonSession(" << s.inst << " is "
229       << (s.closed ? "closed" : "open");
230   out << " " << s.caps << ")";
231   return out;
232 }
233
234 #endif