Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / SessionMap.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_MDS_SESSIONMAP_H
16 #define CEPH_MDS_SESSIONMAP_H
17
18 #include <set>
19 using std::set;
20
21 #include "include/unordered_map.h"
22
23 #include "include/Context.h"
24 #include "include/xlist.h"
25 #include "include/elist.h"
26 #include "include/interval_set.h"
27 #include "mdstypes.h"
28 #include "mds/MDSAuthCaps.h"
29 #include "common/perf_counters.h"
30
31 class CInode;
32 struct MDRequestImpl;
33
34 #include "CInode.h"
35 #include "Capability.h"
36 #include "msg/Message.h"
37
38 enum {
39   l_mdssm_first = 5500,
40   l_mdssm_session_count,
41   l_mdssm_session_add,
42   l_mdssm_session_remove,
43   l_mdssm_last,
44 };
45
46 /* 
47  * session
48  */
49
50 class Session : public RefCountedObject {
51   // -- state etc --
52 public:
53   /*
54                     
55         <deleted> <-- closed <------------+
56              ^         |                  |
57              |         v                  |
58           killing <-- opening <----+      |
59              ^         |           |      |
60              |         v           |      |
61            stale <--> open --> closing ---+
62
63     + additional dimension of 'importing' (with counter)
64
65   */
66   enum {
67     STATE_CLOSED = 0,
68     STATE_OPENING = 1,   // journaling open
69     STATE_OPEN = 2,
70     STATE_CLOSING = 3,   // journaling close
71     STATE_STALE = 4,
72     STATE_KILLING = 5
73   };
74
75   const char *get_state_name(int s) const {
76     switch (s) {
77     case STATE_CLOSED: return "closed";
78     case STATE_OPENING: return "opening";
79     case STATE_OPEN: return "open";
80     case STATE_CLOSING: return "closing";
81     case STATE_STALE: return "stale";
82     case STATE_KILLING: return "killing";
83     default: return "???";
84     }
85   }
86
87 private:
88   int state;
89   uint64_t state_seq;
90   int importing_count;
91   friend class SessionMap;
92
93   // Human (friendly) name is soft state generated from client metadata
94   void _update_human_name();
95   std::string human_name;
96
97   // Versions in this session was projected: used to verify
98   // that appropriate mark_dirty calls follow.
99   std::deque<version_t> projected;
100
101
102
103 public:
104
105   void push_pv(version_t pv)
106   {
107     assert(projected.empty() || projected.back() != pv);
108     projected.push_back(pv);
109   }
110
111   void pop_pv(version_t v)
112   {
113     assert(!projected.empty());
114     assert(projected.front() == v);
115     projected.pop_front();
116   }
117
118   int get_state() const { return state; }
119   void set_state(int new_state)
120   {
121     if (state != new_state) {
122       state = new_state;
123       state_seq++;
124     }
125   }
126   void decode(bufferlist::iterator &p);
127   void set_client_metadata(std::map<std::string, std::string> const &meta);
128   std::string get_human_name() const {return human_name;}
129
130   // Ephemeral state for tracking progress of capability recalls
131   utime_t recalled_at;  // When was I asked to SESSION_RECALL?
132   utime_t last_recall_sent;
133   uint32_t recall_count;  // How many caps was I asked to SESSION_RECALL?
134   uint32_t recall_release_count;  // How many caps have I actually revoked?
135
136   session_info_t info;                         ///< durable bits
137
138   MDSAuthCaps auth_caps;
139
140   ConnectionRef connection;
141   xlist<Session*>::item item_session_list;
142
143   list<Message*> preopen_out_queue;  ///< messages for client, queued before they connect
144
145   elist<MDRequestImpl*> requests;
146   size_t get_request_count();
147
148   interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
149
150   void notify_cap_release(size_t n_caps);
151   void notify_recall_sent(const size_t new_limit);
152   void clear_recalled_at();
153
154   inodeno_t next_ino() const {
155     if (info.prealloc_inos.empty())
156       return 0;
157     return info.prealloc_inos.range_start();
158   }
159   inodeno_t take_ino(inodeno_t ino = 0) {
160     assert(!info.prealloc_inos.empty());
161
162     if (ino) {
163       if (info.prealloc_inos.contains(ino))
164         info.prealloc_inos.erase(ino);
165       else
166         ino = 0;
167     }
168     if (!ino) {
169       ino = info.prealloc_inos.range_start();
170       info.prealloc_inos.erase(ino);
171     }
172     info.used_inos.insert(ino, 1);
173     return ino;
174   }
175   int get_num_projected_prealloc_inos() const {
176     return info.prealloc_inos.size() + pending_prealloc_inos.size();
177   }
178
179   client_t get_client() const {
180     return info.get_client();
181   }
182
183   const char *get_state_name() const { return get_state_name(state); }
184   uint64_t get_state_seq() const { return state_seq; }
185   bool is_closed() const { return state == STATE_CLOSED; }
186   bool is_opening() const { return state == STATE_OPENING; }
187   bool is_open() const { return state == STATE_OPEN; }
188   bool is_closing() const { return state == STATE_CLOSING; }
189   bool is_stale() const { return state == STATE_STALE; }
190   bool is_killing() const { return state == STATE_KILLING; }
191
192   void inc_importing() {
193     ++importing_count;
194   }
195   void dec_importing() {
196     assert(importing_count > 0);
197     --importing_count;
198   }
199   bool is_importing() const { return importing_count > 0; }
200
201   // -- caps --
202 private:
203   version_t cap_push_seq;        // cap push seq #
204   map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
205
206 public:
207   xlist<Capability*> caps;     // inodes with caps; front=most recently used
208   xlist<ClientLease*> leases;  // metadata leases to clients
209   utime_t last_cap_renew;
210
211 public:
212   version_t inc_push_seq() { return ++cap_push_seq; }
213   version_t get_push_seq() const { return cap_push_seq; }
214
215   version_t wait_for_flush(MDSInternalContextBase* c) {
216     waitfor_flush[get_push_seq()].push_back(c);
217     return get_push_seq();
218   }
219   void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
220     while (!waitfor_flush.empty()) {
221       if (waitfor_flush.begin()->first > seq)
222         break;
223       ls.splice(ls.end(), waitfor_flush.begin()->second);
224       waitfor_flush.erase(waitfor_flush.begin());
225     }
226   }
227
228   void add_cap(Capability *cap) {
229     caps.push_back(&cap->item_session_caps);
230   }
231   void touch_lease(ClientLease *r) {
232     leases.push_back(&r->item_session_lease);
233   }
234
235   // -- leases --
236   uint32_t lease_seq;
237
238   // -- completed requests --
239 private:
240   // Has completed_requests been modified since the last time we
241   // wrote this session out?
242   bool completed_requests_dirty;
243
244   unsigned num_trim_flushes_warnings;
245   unsigned num_trim_requests_warnings;
246 public:
247   void add_completed_request(ceph_tid_t t, inodeno_t created) {
248     info.completed_requests[t] = created;
249     completed_requests_dirty = true;
250   }
251   bool trim_completed_requests(ceph_tid_t mintid) {
252     // trim
253     bool erased_any = false;
254     while (!info.completed_requests.empty() && 
255            (mintid == 0 || info.completed_requests.begin()->first < mintid)) {
256       info.completed_requests.erase(info.completed_requests.begin());
257       erased_any = true;
258     }
259
260     if (erased_any) {
261       completed_requests_dirty = true;
262     }
263     return erased_any;
264   }
265   bool have_completed_request(ceph_tid_t tid, inodeno_t *pcreated) const {
266     map<ceph_tid_t,inodeno_t>::const_iterator p = info.completed_requests.find(tid);
267     if (p == info.completed_requests.end())
268       return false;
269     if (pcreated)
270       *pcreated = p->second;
271     return true;
272   }
273
274   void add_completed_flush(ceph_tid_t tid) {
275     info.completed_flushes.insert(tid);
276   }
277   bool trim_completed_flushes(ceph_tid_t mintid) {
278     bool erased_any = false;
279     while (!info.completed_flushes.empty() &&
280         (mintid == 0 || *info.completed_flushes.begin() < mintid)) {
281       info.completed_flushes.erase(info.completed_flushes.begin());
282       erased_any = true;
283     }
284     if (erased_any) {
285       completed_requests_dirty = true;
286     }
287     return erased_any;
288   }
289   bool have_completed_flush(ceph_tid_t tid) const {
290     return info.completed_flushes.count(tid);
291   }
292
293   unsigned get_num_completed_flushes() const { return info.completed_flushes.size(); }
294   unsigned get_num_trim_flushes_warnings() const {
295     return num_trim_flushes_warnings;
296   }
297   void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings; }
298   void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings = 0; }
299
300   unsigned get_num_completed_requests() const { return info.completed_requests.size(); }
301   unsigned get_num_trim_requests_warnings() const {
302     return num_trim_requests_warnings;
303   }
304   void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; }
305   void reset_num_trim_requests_warnings() { num_trim_requests_warnings = 0; }
306
307   bool has_dirty_completed_requests() const
308   {
309     return completed_requests_dirty;
310   }
311
312   void clear_dirty_completed_requests()
313   {
314     completed_requests_dirty = false;
315   }
316
317   int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
318                    const vector<uint64_t> *gid_list, int new_uid, int new_gid);
319
320
321   Session() : 
322     state(STATE_CLOSED), state_seq(0), importing_count(0),
323     recall_count(0), recall_release_count(0),
324     auth_caps(g_ceph_context),
325     connection(NULL), item_session_list(this),
326     requests(0),  // member_offset passed to front() manually
327     cap_push_seq(0),
328     lease_seq(0),
329     completed_requests_dirty(false),
330     num_trim_flushes_warnings(0),
331     num_trim_requests_warnings(0) { }
332   ~Session() override {
333     assert(!item_session_list.is_on_list());
334     while (!preopen_out_queue.empty()) {
335       preopen_out_queue.front()->put();
336       preopen_out_queue.pop_front();
337     }
338   }
339
340   void clear() {
341     pending_prealloc_inos.clear();
342     info.clear_meta();
343
344     cap_push_seq = 0;
345     last_cap_renew = utime_t();
346
347   }
348 };
349
350 class SessionFilter
351 {
352 protected:
353   // First is whether to filter, second is filter value
354   std::pair<bool, bool> reconnecting;
355
356 public:
357   std::map<std::string, std::string> metadata;
358   std::string auth_name;
359   std::string state;
360   int64_t id;
361
362   SessionFilter()
363     : reconnecting(false, false), id(0)
364   {}
365
366   bool match(
367       const Session &session,
368       std::function<bool(client_t)> is_reconnecting) const;
369   int parse(const std::vector<std::string> &args, std::stringstream *ss);
370   void set_reconnecting(bool v)
371   {
372     reconnecting.first = true;
373     reconnecting.second = v;
374   }
375 };
376
377 /*
378  * session map
379  */
380
381 class MDSRank;
382
383 /**
384  * Encapsulate the serialized state associated with SessionMap.  Allows
385  * encode/decode outside of live MDS instance.
386  */
387 class SessionMapStore {
388 protected:
389   version_t version;
390   ceph::unordered_map<entity_name_t, Session*> session_map;
391   PerfCounters *logger;
392 public:
393   mds_rank_t rank;
394
395   version_t get_version() const {return version;}
396
397   virtual void encode_header(bufferlist *header_bl);
398   virtual void decode_header(bufferlist &header_bl);
399   virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
400   virtual void decode_legacy(bufferlist::iterator& blp);
401   void dump(Formatter *f) const;
402
403   void set_rank(mds_rank_t r)
404   {
405     rank = r;
406   }
407
408   Session* get_or_add_session(const entity_inst_t& i) {
409     Session *s;
410     auto session_map_entry = session_map.find(i.name);
411     if (session_map_entry != session_map.end()) {
412       s = session_map_entry->second;
413     } else {
414       s = session_map[i.name] = new Session;
415       s->info.inst = i;
416       s->last_cap_renew = ceph_clock_now();
417       if (logger) {
418         logger->set(l_mdssm_session_count, session_map.size());
419         logger->inc(l_mdssm_session_add);
420       }
421     }
422
423     return s;
424   }
425
426   static void generate_test_instances(list<SessionMapStore*>& ls);
427
428   void reset_state()
429   {
430     session_map.clear();
431   }
432
433   SessionMapStore() : version(0), logger(nullptr), rank(MDS_RANK_NONE) {}
434   virtual ~SessionMapStore() {};
435 };
436
437 class SessionMap : public SessionMapStore {
438 public:
439   MDSRank *mds;
440
441 protected:
442   version_t projected, committing, committed;
443 public:
444   map<int,xlist<Session*>* > by_state;
445   uint64_t set_state(Session *session, int state);
446   map<version_t, list<MDSInternalContextBase*> > commit_waiters;
447
448   explicit SessionMap(MDSRank *m) : mds(m),
449                        projected(0), committing(0), committed(0),
450                        loaded_legacy(false)
451   { }
452
453   ~SessionMap() override
454   {
455     for (auto p : by_state)
456       delete p.second;
457
458     if (logger) {
459       g_ceph_context->get_perfcounters_collection()->remove(logger);
460     }
461
462     delete logger;
463   }
464
465   void register_perfcounters();
466
467   void set_version(const version_t v)
468   {
469     version = projected = v;
470   }
471
472   void set_projected(const version_t v)
473   {
474     projected = v;
475   }
476
477   version_t get_projected() const
478   {
479     return projected;
480   }
481
482   version_t get_committed() const
483   {
484     return committed;
485   }
486
487   version_t get_committing() const
488   {
489     return committing;
490   }
491
492   // sessions
493   void decode_legacy(bufferlist::iterator& blp) override;
494   bool empty() const { return session_map.empty(); }
495   const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
496   {
497     return session_map;
498   }
499
500   bool is_any_state(int state) const {
501     map<int,xlist<Session*>* >::const_iterator p = by_state.find(state);
502     if (p == by_state.end() || p->second->empty())
503       return false;
504     return true;
505   }
506
507   bool have_unclosed_sessions() const {
508     return
509       is_any_state(Session::STATE_OPENING) ||
510       is_any_state(Session::STATE_OPEN) ||
511       is_any_state(Session::STATE_CLOSING) ||
512       is_any_state(Session::STATE_STALE) ||
513       is_any_state(Session::STATE_KILLING);
514   }
515   bool have_session(entity_name_t w) const {
516     return session_map.count(w);
517   }
518   Session* get_session(entity_name_t w) {
519     auto session_map_entry = session_map.find(w);
520     return (session_map_entry != session_map.end() ?
521             session_map_entry-> second : nullptr);
522   }
523   const Session* get_session(entity_name_t w) const {
524     ceph::unordered_map<entity_name_t, Session*>::const_iterator p = session_map.find(w);
525     if (p == session_map.end()) {
526       return NULL;
527     } else {
528       return p->second;
529     }
530   }
531
532   void add_session(Session *s);
533   void remove_session(Session *s);
534   void touch_session(Session *session);
535
536   Session *get_oldest_session(int state) {
537     auto by_state_entry = by_state.find(state);
538     if (by_state_entry == by_state.end() || by_state_entry->second->empty())
539       return 0;
540     return by_state_entry->second->front();
541   }
542
543   void dump();
544
545   void get_client_set(set<client_t>& s) {
546     for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
547          p != session_map.end();
548          ++p)
549       if (p->second->info.inst.name.is_client())
550         s.insert(p->second->info.inst.name.num());
551   }
552   void get_client_session_set(set<Session*>& s) const {
553     for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
554          p != session_map.end();
555          ++p)
556       if (p->second->info.inst.name.is_client())
557         s.insert(p->second);
558   }
559
560   void open_sessions(map<client_t,entity_inst_t>& client_map) {
561     for (map<client_t,entity_inst_t>::iterator p = client_map.begin(); 
562          p != client_map.end(); 
563          ++p) {
564       Session *s = get_or_add_session(p->second);
565       set_state(s, Session::STATE_OPEN);
566       version++;
567     }
568   }
569
570   // helpers
571   entity_inst_t& get_inst(entity_name_t w) {
572     assert(session_map.count(w));
573     return session_map[w]->info.inst;
574   }
575   version_t inc_push_seq(client_t client) {
576     return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
577   }
578   version_t get_push_seq(client_t client) {
579     return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
580   }
581   bool have_completed_request(metareqid_t rid) {
582     Session *session = get_session(rid.name);
583     return session && session->have_completed_request(rid.tid, NULL);
584   }
585   void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
586     Session *session = get_session(c);
587     assert(session);
588     session->trim_completed_requests(tid);
589   }
590
591   void wipe();
592   void wipe_ino_prealloc();
593
594   // -- loading, saving --
595   inodeno_t ino;
596   list<MDSInternalContextBase*> waiting_for_load;
597
598   object_t get_object_name() const;
599
600   void load(MDSInternalContextBase *onload);
601   void _load_finish(
602       int operation_r,
603       int header_r,
604       int values_r,
605       bool first,
606       bufferlist &header_bl,
607       std::map<std::string, bufferlist> &session_vals,
608       bool more_session_vals);
609
610   void load_legacy();
611   void _load_legacy_finish(int r, bufferlist &bl);
612
613   void save(MDSInternalContextBase *onsave, version_t needv=0);
614   void _save_finish(version_t v);
615
616 protected:
617   std::set<entity_name_t> dirty_sessions;
618   std::set<entity_name_t> null_sessions;
619   bool loaded_legacy;
620   void _mark_dirty(Session *session);
621 public:
622
623   /**
624    * Advance the version, and mark this session
625    * as dirty within the new version.
626    *
627    * Dirty means journalled but needing writeback
628    * to the backing store.  Must have called
629    * mark_projected previously for this session.
630    */
631   void mark_dirty(Session *session);
632
633   /**
634    * Advance the projected version, and mark this
635    * session as projected within the new version
636    *
637    * Projected means the session is updated in memory
638    * but we're waiting for the journal write of the update
639    * to finish.  Must subsequently call mark_dirty
640    * for sessions in the same global order as calls
641    * to mark_projected.
642    */
643   version_t mark_projected(Session *session);
644
645   /**
646    * During replay, advance versions to account
647    * for a session modification, and mark the
648    * session dirty.
649    */
650   void replay_dirty_session(Session *session);
651
652   /**
653    * During replay, if a session no longer present
654    * would have consumed a version, advance `version`
655    * and `projected` to account for that.
656    */
657   void replay_advance_version();
658
659   /**
660    * For these session IDs, if a session exists with this ID, and it has
661    * dirty completed_requests, then persist it immediately
662    * (ahead of usual project/dirty versioned writes
663    *  of the map).
664    */
665   void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
666                      MDSGatherBuilder *gather_bld);
667 };
668
669 std::ostream& operator<<(std::ostream &out, const Session &s);
670
671
672 #endif