Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / SessionMap.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "MDSRank.h"
16 #include "MDCache.h"
17 #include "Mutation.h"
18 #include "SessionMap.h"
19 #include "osdc/Filer.h"
20 #include "common/Finisher.h"
21
22 #include "common/config.h"
23 #include "common/errno.h"
24 #include "include/assert.h"
25 #include "include/stringify.h"
26
27 #define dout_context g_ceph_context
28 #define dout_subsys ceph_subsys_mds
29 #undef dout_prefix
30 #define dout_prefix *_dout << "mds." << rank << ".sessionmap "
31
32 namespace {
33 class SessionMapIOContext : public MDSIOContextBase
34 {
35   protected:
36     SessionMap *sessionmap;
37     MDSRank *get_mds() override {return sessionmap->mds;}
38   public:
39     explicit SessionMapIOContext(SessionMap *sessionmap_) : sessionmap(sessionmap_) {
40       assert(sessionmap != NULL);
41     }
42 };
43 };
44
45 void SessionMap::register_perfcounters()
46 {
47   PerfCountersBuilder plb(g_ceph_context, "mds_sessions",
48       l_mdssm_first, l_mdssm_last);
49   plb.add_u64(l_mdssm_session_count, "session_count",
50       "Session count");
51   plb.add_u64_counter(l_mdssm_session_add, "session_add",
52       "Sessions added");
53   plb.add_u64_counter(l_mdssm_session_remove, "session_remove",
54       "Sessions removed");
55   logger = plb.create_perf_counters();
56   g_ceph_context->get_perfcounters_collection()->add(logger);
57 }
58
59 void SessionMap::dump()
60 {
61   dout(10) << "dump" << dendl;
62   for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
63        p != session_map.end();
64        ++p) 
65     dout(10) << p->first << " " << p->second
66              << " state " << p->second->get_state_name()
67              << " completed " << p->second->info.completed_requests
68              << " prealloc_inos " << p->second->info.prealloc_inos
69              << " used_inos " << p->second->info.used_inos
70              << dendl;
71 }
72
73
74 // ----------------
75 // LOAD
76
77
78 object_t SessionMap::get_object_name() const
79 {
80   char s[30];
81   snprintf(s, sizeof(s), "mds%d_sessionmap", int(mds->get_nodeid()));
82   return object_t(s);
83 }
84
85 namespace {
86 class C_IO_SM_Load : public SessionMapIOContext {
87 public:
88   const bool first;  //< Am I the initial (header) load?
89   int header_r;  //< Return value from OMAP header read
90   int values_r;  //< Return value from OMAP value read
91   bufferlist header_bl;
92   std::map<std::string, bufferlist> session_vals;
93   bool more_session_vals = false;
94
95   C_IO_SM_Load(SessionMap *cm, const bool f)
96     : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {}
97
98   void finish(int r) override {
99     sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals,
100       more_session_vals);
101   }
102 };
103 }
104
105
106 /**
107  * Decode OMAP header.  Call this once when loading.
108  */
109 void SessionMapStore::decode_header(
110       bufferlist &header_bl)
111 {
112   bufferlist::iterator q = header_bl.begin();
113   DECODE_START(1, q)
114   ::decode(version, q);
115   DECODE_FINISH(q);
116 }
117
118 void SessionMapStore::encode_header(
119     bufferlist *header_bl)
120 {
121   ENCODE_START(1, 1, *header_bl);
122   ::encode(version, *header_bl);
123   ENCODE_FINISH(*header_bl);
124 }
125
126 /**
127  * Decode and insert some serialized OMAP values.  Call this
128  * repeatedly to insert batched loads.
129  */
130 void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals)
131 {
132   for (std::map<std::string, bufferlist>::iterator i = session_vals.begin();
133        i != session_vals.end(); ++i) {
134
135     entity_inst_t inst;
136
137     bool parsed = inst.name.parse(i->first);
138     if (!parsed) {
139       derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl;
140       throw buffer::malformed_input("Corrupt entity name in sessionmap");
141     }
142
143     Session *s = get_or_add_session(inst);
144     if (s->is_closed())
145       s->set_state(Session::STATE_OPEN);
146     bufferlist::iterator q = i->second.begin();
147     s->decode(q);
148   }
149 }
150
151 /**
152  * An OMAP read finished.
153  */
154 void SessionMap::_load_finish(
155     int operation_r,
156     int header_r,
157     int values_r,
158     bool first,
159     bufferlist &header_bl,
160     std::map<std::string, bufferlist> &session_vals,
161     bool more_session_vals)
162 {
163   if (operation_r < 0) {
164     derr << "_load_finish got " << cpp_strerror(operation_r) << dendl;
165     mds->clog->error() << "error reading sessionmap '" << get_object_name()
166                        << "' " << operation_r << " ("
167                        << cpp_strerror(operation_r) << ")";
168     mds->damaged();
169     ceph_abort();  // Should be unreachable because damaged() calls respawn()
170   }
171
172   // Decode header
173   if (first) {
174     if (header_r != 0) {
175       derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl;
176       mds->clog->error() << "error reading sessionmap header "
177                          << header_r << " (" << cpp_strerror(header_r) << ")";
178       mds->damaged();
179       ceph_abort();  // Should be unreachable because damaged() calls respawn()
180     }
181
182     if(header_bl.length() == 0) {
183       dout(4) << __func__ << ": header missing, loading legacy..." << dendl;
184       load_legacy();
185       return;
186     }
187
188     try {
189       decode_header(header_bl);
190     } catch (buffer::error &e) {
191       mds->clog->error() << "corrupt sessionmap header: " << e.what();
192       mds->damaged();
193       ceph_abort();  // Should be unreachable because damaged() calls respawn()
194     }
195     dout(10) << __func__ << " loaded version " << version << dendl;
196   }
197
198   if (values_r != 0) {
199     derr << __func__ << ": error reading values: "
200       << cpp_strerror(values_r) << dendl;
201     mds->clog->error() << "error reading sessionmap values: " 
202                        << values_r << " (" << cpp_strerror(values_r) << ")";
203     mds->damaged();
204     ceph_abort();  // Should be unreachable because damaged() calls respawn()
205   }
206
207   // Decode session_vals
208   try {
209     decode_values(session_vals);
210   } catch (buffer::error &e) {
211     mds->clog->error() << "corrupt sessionmap values: " << e.what();
212     mds->damaged();
213     ceph_abort();  // Should be unreachable because damaged() calls respawn()
214   }
215
216   if (more_session_vals) {
217     // Issue another read if we're not at the end of the omap
218     const std::string last_key = session_vals.rbegin()->first;
219     dout(10) << __func__ << ": continue omap load from '"
220              << last_key << "'" << dendl;
221     object_t oid = get_object_name();
222     object_locator_t oloc(mds->mdsmap->get_metadata_pool());
223     C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
224     ObjectOperation op;
225     op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op,
226                      &c->session_vals, &c->more_session_vals, &c->values_r);
227     mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
228         new C_OnFinisher(c, mds->finisher));
229   } else {
230     // I/O is complete.  Update `by_state`
231     dout(10) << __func__ << ": omap load complete" << dendl;
232     for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
233          i != session_map.end(); ++i) {
234       Session *s = i->second;
235       auto by_state_entry = by_state.find(s->get_state());
236       if (by_state_entry == by_state.end())
237         by_state_entry = by_state.emplace(s->get_state(),
238                                           new xlist<Session*>).first;
239       by_state_entry->second->push_back(&s->item_session_list);
240     }
241
242     // Population is complete.  Trigger load waiters.
243     dout(10) << __func__ << ": v " << version 
244            << ", " << session_map.size() << " sessions" << dendl;
245     projected = committing = committed = version;
246     dump();
247     finish_contexts(g_ceph_context, waiting_for_load);
248   }
249 }
250
251 /**
252  * Populate session state from OMAP records in this
253  * rank's sessionmap object.
254  */
255 void SessionMap::load(MDSInternalContextBase *onload)
256 {
257   dout(10) << "load" << dendl;
258
259   if (onload)
260     waiting_for_load.push_back(onload);
261   
262   C_IO_SM_Load *c = new C_IO_SM_Load(this, true);
263   object_t oid = get_object_name();
264   object_locator_t oloc(mds->mdsmap->get_metadata_pool());
265
266   ObjectOperation op;
267   op.omap_get_header(&c->header_bl, &c->header_r);
268   op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op,
269                    &c->session_vals, &c->more_session_vals, &c->values_r);
270
271   mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher));
272 }
273
274 namespace {
275 class C_IO_SM_LoadLegacy : public SessionMapIOContext {
276 public:
277   bufferlist bl;
278   explicit C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {}
279   void finish(int r) override {
280     sessionmap->_load_legacy_finish(r, bl);
281   }
282 };
283 }
284
285
286 /**
287  * Load legacy (object data blob) SessionMap format, assuming
288  * that waiting_for_load has already been populated with
289  * the relevant completion.  This is the fallback if we do not
290  * find an OMAP header when attempting to load normally.
291  */
292 void SessionMap::load_legacy()
293 {
294   dout(10) << __func__ << dendl;
295
296   C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this);
297   object_t oid = get_object_name();
298   object_locator_t oloc(mds->mdsmap->get_metadata_pool());
299
300   mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0,
301                            new C_OnFinisher(c, mds->finisher));
302 }
303
304 void SessionMap::_load_legacy_finish(int r, bufferlist &bl)
305
306   bufferlist::iterator blp = bl.begin();
307   if (r < 0) {
308     derr << "_load_finish got " << cpp_strerror(r) << dendl;
309     assert(0 == "failed to load sessionmap");
310   }
311   dump();
312   decode_legacy(blp);  // note: this sets last_cap_renew = now()
313   dout(10) << "_load_finish v " << version 
314            << ", " << session_map.size() << " sessions, "
315            << bl.length() << " bytes"
316            << dendl;
317   projected = committing = committed = version;
318   dump();
319
320   // Mark all sessions dirty, so that on next save() we will write
321   // a complete OMAP version of the data loaded from the legacy format
322   for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
323        i != session_map.end(); ++i) {
324     // Don't use mark_dirty because on this occasion we want to ignore the
325     // keys_per_op limit and do one big write (upgrade must be atomic)
326     dirty_sessions.insert(i->first);
327   }
328   loaded_legacy = true;
329
330   finish_contexts(g_ceph_context, waiting_for_load);
331 }
332
333
334 // ----------------
335 // SAVE
336
337 namespace {
338 class C_IO_SM_Save : public SessionMapIOContext {
339   version_t version;
340 public:
341   C_IO_SM_Save(SessionMap *cm, version_t v) : SessionMapIOContext(cm), version(v) {}
342   void finish(int r) override {
343     if (r != 0) {
344       get_mds()->handle_write_error(r);
345     } else {
346       sessionmap->_save_finish(version);
347     }
348   }
349 };
350 }
351
352 void SessionMap::save(MDSInternalContextBase *onsave, version_t needv)
353 {
354   dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl;
355  
356   if (needv && committing >= needv) {
357     assert(committing > committed);
358     commit_waiters[committing].push_back(onsave);
359     return;
360   }
361
362   commit_waiters[version].push_back(onsave);
363
364   committing = version;
365   SnapContext snapc;
366   object_t oid = get_object_name();
367   object_locator_t oloc(mds->mdsmap->get_metadata_pool());
368
369   ObjectOperation op;
370
371   /* Compose OSD OMAP transaction for full write */
372   bufferlist header_bl;
373   encode_header(&header_bl);
374   op.omap_set_header(header_bl);
375
376   /* If we loaded a legacy sessionmap, then erase the old data.  If
377    * an old-versioned MDS tries to read it, it'll fail out safely
378    * with an end_of_buffer exception */
379   if (loaded_legacy) {
380     dout(4) << __func__ << " erasing legacy sessionmap" << dendl;
381     op.truncate(0);
382     loaded_legacy = false;  // only need to truncate once.
383   }
384
385   dout(20) << " updating keys:" << dendl;
386   map<string, bufferlist> to_set;
387   for(std::set<entity_name_t>::iterator i = dirty_sessions.begin();
388       i != dirty_sessions.end(); ++i) {
389     const entity_name_t name = *i;
390     Session *session = session_map[name];
391
392     if (session->is_open() ||
393         session->is_closing() ||
394         session->is_stale() ||
395         session->is_killing()) {
396       dout(20) << "  " << name << dendl;
397       // Serialize K
398       std::ostringstream k;
399       k << name;
400
401       // Serialize V
402       bufferlist bl;
403       session->info.encode(bl, mds->mdsmap->get_up_features());
404
405       // Add to RADOS op
406       to_set[k.str()] = bl;
407
408       session->clear_dirty_completed_requests();
409     } else {
410       dout(20) << "  " << name << " (ignoring)" << dendl;
411     }
412   }
413   if (!to_set.empty()) {
414     op.omap_set(to_set);
415   }
416
417   dout(20) << " removing keys:" << dendl;
418   set<string> to_remove;
419   for(std::set<entity_name_t>::const_iterator i = null_sessions.begin();
420       i != null_sessions.end(); ++i) {
421     dout(20) << "  " << *i << dendl;
422     std::ostringstream k;
423     k << *i;
424     to_remove.insert(k.str());
425   }
426   if (!to_remove.empty()) {
427     op.omap_rm_keys(to_remove);
428   }
429
430   dirty_sessions.clear();
431   null_sessions.clear();
432
433   mds->objecter->mutate(oid, oloc, op, snapc,
434                         ceph::real_clock::now(),
435                         0,
436                         new C_OnFinisher(new C_IO_SM_Save(this, version),
437                                          mds->finisher));
438 }
439
440 void SessionMap::_save_finish(version_t v)
441 {
442   dout(10) << "_save_finish v" << v << dendl;
443   committed = v;
444
445   finish_contexts(g_ceph_context, commit_waiters[v]);
446   commit_waiters.erase(v);
447 }
448
449
450 /**
451  * Deserialize sessions, and update by_state index
452  */
453 void SessionMap::decode_legacy(bufferlist::iterator &p)
454 {
455   // Populate `sessions`
456   SessionMapStore::decode_legacy(p);
457
458   // Update `by_state`
459   for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
460        i != session_map.end(); ++i) {
461     Session *s = i->second;
462     auto by_state_entry = by_state.find(s->get_state());
463     if (by_state_entry == by_state.end())
464       by_state_entry = by_state.emplace(s->get_state(),
465                                         new xlist<Session*>).first;
466     by_state_entry->second->push_back(&s->item_session_list);
467   }
468 }
469
470 uint64_t SessionMap::set_state(Session *session, int s) {
471   if (session->state != s) {
472     session->set_state(s);
473     auto by_state_entry = by_state.find(s);
474     if (by_state_entry == by_state.end())
475       by_state_entry = by_state.emplace(s, new xlist<Session*>).first;
476     by_state_entry->second->push_back(&session->item_session_list);
477   }
478   return session->get_state_seq();
479 }
480
481 void SessionMapStore::decode_legacy(bufferlist::iterator& p)
482 {
483   utime_t now = ceph_clock_now();
484   uint64_t pre;
485   ::decode(pre, p);
486   if (pre == (uint64_t)-1) {
487     DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p);
488     assert(struct_v >= 2);
489     
490     ::decode(version, p);
491     
492     while (!p.end()) {
493       entity_inst_t inst;
494       ::decode(inst.name, p);
495       Session *s = get_or_add_session(inst);
496       if (s->is_closed())
497         s->set_state(Session::STATE_OPEN);
498       s->decode(p);
499     }
500
501     DECODE_FINISH(p);
502   } else {
503     // --- old format ----
504     version = pre;
505
506     // this is a meaningless upper bound.  can be ignored.
507     __u32 n;
508     ::decode(n, p);
509     
510     while (n-- && !p.end()) {
511       bufferlist::iterator p2 = p;
512       Session *s = new Session;
513       s->info.decode(p);
514       if (session_map.count(s->info.inst.name)) {
515         // eager client connected too fast!  aie.
516         dout(10) << " already had session for " << s->info.inst.name << ", recovering" << dendl;
517         entity_name_t n = s->info.inst.name;
518         delete s;
519         s = session_map[n];
520         p = p2;
521         s->info.decode(p);
522       } else {
523         session_map[s->info.inst.name] = s;
524       }
525       s->set_state(Session::STATE_OPEN);
526       s->last_cap_renew = now;
527     }
528   }
529 }
530
531 void SessionMapStore::dump(Formatter *f) const
532 {
533   f->open_array_section("Sessions");
534   for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
535        p != session_map.end();
536        ++p)  {
537     f->open_object_section("Session");
538     f->open_object_section("entity name");
539     p->first.dump(f);
540     f->close_section(); // entity name
541     f->dump_string("state", p->second->get_state_name());
542     f->open_object_section("Session info");
543     p->second->info.dump(f);
544     f->close_section(); // Session info
545     f->close_section(); // Session
546   }
547   f->close_section(); // Sessions
548 }
549
550 void SessionMapStore::generate_test_instances(list<SessionMapStore*>& ls)
551 {
552   // pretty boring for now
553   ls.push_back(new SessionMapStore());
554 }
555
556 void SessionMap::wipe()
557 {
558   dout(1) << "wipe start" << dendl;
559   dump();
560   while (!session_map.empty()) {
561     Session *s = session_map.begin()->second;
562     remove_session(s);
563   }
564   version = ++projected;
565   dout(1) << "wipe result" << dendl;
566   dump();
567   dout(1) << "wipe done" << dendl;
568 }
569
570 void SessionMap::wipe_ino_prealloc()
571 {
572   for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin(); 
573        p != session_map.end(); 
574        ++p) {
575     p->second->pending_prealloc_inos.clear();
576     p->second->info.prealloc_inos.clear();
577     p->second->info.used_inos.clear();
578   }
579   projected = ++version;
580 }
581
582 void SessionMap::add_session(Session *s)
583 {
584   dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
585
586   assert(session_map.count(s->info.inst.name) == 0);
587   session_map[s->info.inst.name] = s;
588   auto by_state_entry = by_state.find(s->state);
589   if (by_state_entry == by_state.end())
590     by_state_entry = by_state.emplace(s->state, new xlist<Session*>).first;
591   by_state_entry->second->push_back(&s->item_session_list);
592   s->get();
593
594   logger->set(l_mdssm_session_count, session_map.size());
595   logger->inc(l_mdssm_session_add);
596 }
597
598 void SessionMap::remove_session(Session *s)
599 {
600   dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
601
602   s->trim_completed_requests(0);
603   s->item_session_list.remove_myself();
604   session_map.erase(s->info.inst.name);
605   dirty_sessions.erase(s->info.inst.name);
606   null_sessions.insert(s->info.inst.name);
607   s->put();
608
609   logger->set(l_mdssm_session_count, session_map.size());
610   logger->inc(l_mdssm_session_remove);
611 }
612
613 void SessionMap::touch_session(Session *session)
614 {
615   dout(10) << __func__ << " s=" << session << " name=" << session->info.inst.name << dendl;
616
617   // Move to the back of the session list for this state (should
618   // already be on a list courtesy of add_session and set_state)
619   assert(session->item_session_list.is_on_list());
620   auto by_state_entry = by_state.find(session->state);
621   if (by_state_entry == by_state.end())
622     by_state_entry = by_state.emplace(session->state,
623                                       new xlist<Session*>).first;
624   by_state_entry->second->push_back(&session->item_session_list);
625
626   session->last_cap_renew = ceph_clock_now();
627 }
628
629 void SessionMap::_mark_dirty(Session *s)
630 {
631   if (dirty_sessions.count(s->info.inst.name))
632     return;
633
634   if (dirty_sessions.size() >= g_conf->mds_sessionmap_keys_per_op) {
635     // Pre-empt the usual save() call from journal segment trim, in
636     // order to avoid building up an oversized OMAP update operation
637     // from too many sessions modified at once
638     save(new C_MDSInternalNoop, version);
639   }
640
641   null_sessions.erase(s->info.inst.name);
642   dirty_sessions.insert(s->info.inst.name);
643 }
644
645 void SessionMap::mark_dirty(Session *s)
646 {
647   dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
648     << " v=" << version << dendl;
649
650   _mark_dirty(s);
651   version++;
652   s->pop_pv(version);
653 }
654
655 void SessionMap::replay_dirty_session(Session *s)
656 {
657   dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
658     << " v=" << version << dendl;
659
660   _mark_dirty(s);
661
662   replay_advance_version();
663 }
664
665 void SessionMap::replay_advance_version()
666 {
667   version++;
668   projected = version;
669 }
670
671 version_t SessionMap::mark_projected(Session *s)
672 {
673   dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
674     << " pv=" << projected << " -> " << projected + 1 << dendl;
675   ++projected;
676   s->push_pv(projected);
677   return projected;
678 }
679
680 namespace {
681 class C_IO_SM_Save_One : public SessionMapIOContext {
682   MDSInternalContextBase *on_safe;
683 public:
684   C_IO_SM_Save_One(SessionMap *cm, MDSInternalContextBase *on_safe_)
685     : SessionMapIOContext(cm), on_safe(on_safe_) {}
686   void finish(int r) override {
687     if (r != 0) {
688       get_mds()->handle_write_error(r);
689     } else {
690       on_safe->complete(r);
691     }
692   }
693 };
694 }
695
696
697 void SessionMap::save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
698                                MDSGatherBuilder *gather_bld)
699 {
700   assert(gather_bld != NULL);
701
702   std::vector<entity_name_t> write_sessions;
703
704   // Decide which sessions require a write
705   for (std::set<entity_name_t>::iterator i = tgt_sessions.begin();
706        i != tgt_sessions.end(); ++i) {
707     const entity_name_t &session_id = *i;
708
709     if (session_map.count(session_id) == 0) {
710       // Session isn't around any more, never mind.
711       continue;
712     }
713
714     Session *session = session_map[session_id];
715     if (!session->has_dirty_completed_requests()) {
716       // Session hasn't had completed_requests
717       // modified since last write, no need to
718       // write it now.
719       continue;
720     }
721
722     if (dirty_sessions.count(session_id) > 0) {
723       // Session is already dirtied, will be written, no
724       // need to pre-empt that.
725       continue;
726     }
727     // Okay, passed all our checks, now we write
728     // this session out.  The version we write
729     // into the OMAP may now be higher-versioned
730     // than the version in the header, but that's
731     // okay because it's never a problem to have
732     // an overly-fresh copy of a session.
733     write_sessions.push_back(*i);
734   }
735
736   dout(4) << __func__ << ": writing " << write_sessions.size() << dendl;
737
738   // Batch writes into mds_sessionmap_keys_per_op
739   const uint32_t kpo = g_conf->mds_sessionmap_keys_per_op;
740   map<string, bufferlist> to_set;
741   for (uint32_t i = 0; i < write_sessions.size(); ++i) {
742     // Start a new write transaction?
743     if (i % g_conf->mds_sessionmap_keys_per_op == 0) {
744       to_set.clear();
745     }
746
747     const entity_name_t &session_id = write_sessions[i];
748     Session *session = session_map[session_id];
749     session->clear_dirty_completed_requests();
750
751     // Serialize K
752     std::ostringstream k;
753     k << session_id;
754
755     // Serialize V
756     bufferlist bl;
757     session->info.encode(bl, mds->mdsmap->get_up_features());
758
759     // Add to RADOS op
760     to_set[k.str()] = bl;
761
762     // Complete this write transaction?
763     if (i == write_sessions.size() - 1
764         || i % kpo == kpo - 1) {
765       ObjectOperation op;
766       op.omap_set(to_set);
767
768       SnapContext snapc;
769       object_t oid = get_object_name();
770       object_locator_t oloc(mds->mdsmap->get_metadata_pool());
771       MDSInternalContextBase *on_safe = gather_bld->new_sub();
772       mds->objecter->mutate(oid, oloc, op, snapc,
773                             ceph::real_clock::now(),
774                             0, new C_OnFinisher(
775                               new C_IO_SM_Save_One(this, on_safe),
776                               mds->finisher));
777     }
778   }
779 }
780
781 // =================
782 // Session
783
784 #undef dout_prefix
785 #define dout_prefix *_dout << "Session "
786
787 /**
788  * Calculate the length of the `requests` member list,
789  * because elist does not have a size() method.
790  *
791  * O(N) runtime.  This would be const, but elist doesn't
792  * have const iterators.
793  */
794 size_t Session::get_request_count()
795 {
796   size_t result = 0;
797
798   elist<MDRequestImpl*>::iterator p = requests.begin(
799       member_offset(MDRequestImpl, item_session_request));
800   while (!p.end()) {
801     ++result;
802     ++p;
803   }
804
805   return result;
806 }
807
808 /**
809  * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
810  * with n_caps equal to the number of caps that were released
811  * in the message.  Used to update state about how many caps a
812  * client has released since it was last instructed to RECALL_STATE.
813  */
814 void Session::notify_cap_release(size_t n_caps)
815 {
816   if (!recalled_at.is_zero()) {
817     recall_release_count += n_caps;
818     if (recall_release_count >= recall_count)
819       clear_recalled_at();
820   }
821 }
822
823 /**
824  * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
825  * message is sent to the client.  Update our recall-related state
826  * in order to generate health metrics if the session doesn't see
827  * a commensurate number of calls to ::notify_cap_release
828  */
829 void Session::notify_recall_sent(const size_t new_limit)
830 {
831   if (recalled_at.is_zero()) {
832     // Entering recall phase, set up counters so we can later
833     // judge whether the client has respected the recall request
834     recalled_at = last_recall_sent = ceph_clock_now();
835     assert (new_limit < caps.size());  // Behaviour of Server::recall_client_state
836     recall_count = caps.size() - new_limit;
837     recall_release_count = 0;
838   } else {
839     last_recall_sent = ceph_clock_now();
840   }
841 }
842
843 void Session::clear_recalled_at()
844 {
845   recalled_at = last_recall_sent = utime_t();
846   recall_count = 0;
847   recall_release_count = 0;
848 }
849
850 void Session::set_client_metadata(map<string, string> const &meta)
851 {
852   info.client_metadata = meta;
853
854   _update_human_name();
855 }
856
857 /**
858  * Use client metadata to generate a somewhat-friendlier
859  * name for the client than its session ID.
860  *
861  * This is *not* guaranteed to be unique, and any machine
862  * consumers of session-related output should always use
863  * the session ID as a primary capacity and use this only
864  * as a presentation hint.
865  */
866 void Session::_update_human_name()
867 {
868   auto info_client_metadata_entry = info.client_metadata.find("hostname");
869   if (info_client_metadata_entry != info.client_metadata.end()) {
870     // Happy path, refer to clients by hostname
871     human_name = info_client_metadata_entry->second;
872     if (!info.auth_name.has_default_id()) {
873       // When a non-default entity ID is set by the user, assume they
874       // would like to see it in references to the client, if it's
875       // reasonable short.  Limit the length because we don't want
876       // to put e.g. uuid-generated names into a "human readable"
877       // rendering.
878       const int arbitrarily_short = 16;
879       if (info.auth_name.get_id().size() < arbitrarily_short) {
880         human_name += std::string(":") + info.auth_name.get_id();
881       }
882     }
883   } else {
884     // Fallback, refer to clients by ID e.g. client.4567
885     human_name = stringify(info.inst.name.num());
886   }
887 }
888
889 void Session::decode(bufferlist::iterator &p)
890 {
891   info.decode(p);
892
893   _update_human_name();
894 }
895
896 int Session::check_access(CInode *in, unsigned mask,
897                           int caller_uid, int caller_gid,
898                           const vector<uint64_t> *caller_gid_list,
899                           int new_uid, int new_gid)
900 {
901   string path;
902   CInode *diri = NULL;
903   if (!in->is_base())
904     diri = in->get_projected_parent_dn()->get_dir()->get_inode();
905   if (diri && diri->is_stray()){
906     path = in->get_projected_inode()->stray_prior_path;
907     dout(20) << __func__ << " stray_prior_path " << path << dendl;
908   } else {
909     in->make_path_string(path, true);
910     dout(20) << __func__ << " path " << path << dendl;
911   }
912   if (path.length())
913     path = path.substr(1);    // drop leading /
914
915   if (in->inode.is_dir() &&
916       in->inode.has_layout() &&
917       in->inode.layout.pool_ns.length() &&
918       !connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
919     dout(10) << __func__ << " client doesn't support FS_FILE_LAYOUT_V2" << dendl;
920     return -EIO;
921   }
922
923   if (!auth_caps.is_capable(path, in->inode.uid, in->inode.gid, in->inode.mode,
924                             caller_uid, caller_gid, caller_gid_list, mask,
925                             new_uid, new_gid)) {
926     return -EACCES;
927   }
928   return 0;
929 }
930
931 int SessionFilter::parse(
932     const std::vector<std::string> &args,
933     std::stringstream *ss)
934 {
935   assert(ss != NULL);
936
937   for (const auto &s : args) {
938     dout(20) << __func__ << " parsing filter '" << s << "'" << dendl;
939
940     auto eq = s.find("=");
941     if (eq == std::string::npos || eq == s.size()) {
942       *ss << "Invalid filter '" << s << "'";
943       return -EINVAL;
944     }
945
946     // Keys that start with this are to be taken as referring
947     // to freeform client metadata fields.
948     const std::string metadata_prefix("client_metadata.");
949
950     auto k = s.substr(0, eq);
951     auto v = s.substr(eq + 1);
952
953     dout(20) << __func__ << " parsed k='" << k << "', v='" << v << "'" << dendl;
954
955     if (k.compare(0, metadata_prefix.size(), metadata_prefix) == 0
956         && k.size() > metadata_prefix.size()) {
957       // Filter on arbitrary metadata key (no fixed schema for this,
958       // so anything after the dot is a valid field to filter on)
959       auto metadata_key = k.substr(metadata_prefix.size());
960       metadata.insert(std::make_pair(metadata_key, v));
961     } else if (k == "auth_name") {
962       // Filter on client entity name
963       auth_name = v;
964     } else if (k == "state") {
965       state = v;
966     } else if (k == "id") {
967       std::string err;
968       id = strict_strtoll(v.c_str(), 10, &err);
969       if (!err.empty()) {
970         *ss << err;
971         return -EINVAL;
972       }
973     } else if (k == "reconnecting") {
974
975       /**
976        * Strict boolean parser.  Allow true/false/0/1.
977        * Anything else is -EINVAL.
978        */
979       auto is_true = [](const std::string &bstr, bool *out) -> bool
980       {
981         assert(out != nullptr);
982
983         if (bstr == "true" || bstr == "1") {
984           *out = true;
985           return 0;
986         } else if (bstr == "false" || bstr == "0") {
987           *out = false;
988           return 0;
989         } else {
990           return -EINVAL;
991         }
992       };
993
994       bool bval;
995       int r = is_true(v, &bval);
996       if (r == 0) {
997         set_reconnecting(bval);
998       } else {
999         *ss << "Invalid boolean value '" << v << "'";
1000         return -EINVAL;
1001       }
1002     } else {
1003       *ss << "Invalid filter key '" << k << "'";
1004       return -EINVAL;
1005     }
1006   }
1007
1008   return 0;
1009 }
1010
1011 bool SessionFilter::match(
1012     const Session &session,
1013     std::function<bool(client_t)> is_reconnecting) const
1014 {
1015   for (const auto &m : metadata) {
1016     const auto &k = m.first;
1017     const auto &v = m.second;
1018     if (session.info.client_metadata.count(k) == 0) {
1019       return false;
1020     }
1021     if (session.info.client_metadata.at(k) != v) {
1022       return false;
1023     }
1024   }
1025
1026   if (!auth_name.empty() && auth_name != session.info.auth_name.get_id()) {
1027     return false;
1028   }
1029
1030   if (!state.empty() && state != session.get_state_name()) {
1031     return false;
1032   }
1033
1034   if (id != 0 && id != session.info.inst.name.num()) {
1035     return false;
1036   }
1037
1038   if (reconnecting.first) {
1039     const bool am_reconnecting = is_reconnecting(session.info.inst.name.num());
1040     if (reconnecting.second != am_reconnecting) {
1041       return false;
1042     }
1043   }
1044
1045   return true;
1046 }
1047
1048 std::ostream& operator<<(std::ostream &out, const Session &s)
1049 {
1050  if (s.get_human_name() == stringify(s.info.inst.name.num())) {
1051    out << s.get_human_name();
1052  } else {
1053    out << s.get_human_name() << " (" << std::dec << s.info.inst.name.num() << ")";
1054  }
1055  return out;
1056 }
1057