+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "MDSRank.h"
-#include "MDCache.h"
-#include "Mutation.h"
-#include "SessionMap.h"
-#include "osdc/Filer.h"
-#include "common/Finisher.h"
-
-#include "common/config.h"
-#include "common/errno.h"
-#include "include/assert.h"
-#include "include/stringify.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_mds
-#undef dout_prefix
-#define dout_prefix *_dout << "mds." << rank << ".sessionmap "
-
-namespace {
-class SessionMapIOContext : public MDSIOContextBase
-{
- protected:
- SessionMap *sessionmap;
- MDSRank *get_mds() override {return sessionmap->mds;}
- public:
- explicit SessionMapIOContext(SessionMap *sessionmap_) : sessionmap(sessionmap_) {
- assert(sessionmap != NULL);
- }
-};
-};
-
-void SessionMap::register_perfcounters()
-{
- PerfCountersBuilder plb(g_ceph_context, "mds_sessions",
- l_mdssm_first, l_mdssm_last);
- plb.add_u64(l_mdssm_session_count, "session_count",
- "Session count");
- plb.add_u64_counter(l_mdssm_session_add, "session_add",
- "Sessions added");
- plb.add_u64_counter(l_mdssm_session_remove, "session_remove",
- "Sessions removed");
- logger = plb.create_perf_counters();
- g_ceph_context->get_perfcounters_collection()->add(logger);
-}
-
-void SessionMap::dump()
-{
- dout(10) << "dump" << dendl;
- for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
- p != session_map.end();
- ++p)
- dout(10) << p->first << " " << p->second
- << " state " << p->second->get_state_name()
- << " completed " << p->second->info.completed_requests
- << " prealloc_inos " << p->second->info.prealloc_inos
- << " used_inos " << p->second->info.used_inos
- << dendl;
-}
-
-
-// ----------------
-// LOAD
-
-
-object_t SessionMap::get_object_name() const
-{
- char s[30];
- snprintf(s, sizeof(s), "mds%d_sessionmap", int(mds->get_nodeid()));
- return object_t(s);
-}
-
-namespace {
-class C_IO_SM_Load : public SessionMapIOContext {
-public:
- const bool first; //< Am I the initial (header) load?
- int header_r; //< Return value from OMAP header read
- int values_r; //< Return value from OMAP value read
- bufferlist header_bl;
- std::map<std::string, bufferlist> session_vals;
- bool more_session_vals = false;
-
- C_IO_SM_Load(SessionMap *cm, const bool f)
- : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {}
-
- void finish(int r) override {
- sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals,
- more_session_vals);
- }
-};
-}
-
-
-/**
- * Decode OMAP header. Call this once when loading.
- */
-void SessionMapStore::decode_header(
- bufferlist &header_bl)
-{
- bufferlist::iterator q = header_bl.begin();
- DECODE_START(1, q)
- ::decode(version, q);
- DECODE_FINISH(q);
-}
-
-void SessionMapStore::encode_header(
- bufferlist *header_bl)
-{
- ENCODE_START(1, 1, *header_bl);
- ::encode(version, *header_bl);
- ENCODE_FINISH(*header_bl);
-}
-
-/**
- * Decode and insert some serialized OMAP values. Call this
- * repeatedly to insert batched loads.
- */
-void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals)
-{
- for (std::map<std::string, bufferlist>::iterator i = session_vals.begin();
- i != session_vals.end(); ++i) {
-
- entity_inst_t inst;
-
- bool parsed = inst.name.parse(i->first);
- if (!parsed) {
- derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl;
- throw buffer::malformed_input("Corrupt entity name in sessionmap");
- }
-
- Session *s = get_or_add_session(inst);
- if (s->is_closed())
- s->set_state(Session::STATE_OPEN);
- bufferlist::iterator q = i->second.begin();
- s->decode(q);
- }
-}
-
-/**
- * An OMAP read finished.
- */
-void SessionMap::_load_finish(
- int operation_r,
- int header_r,
- int values_r,
- bool first,
- bufferlist &header_bl,
- std::map<std::string, bufferlist> &session_vals,
- bool more_session_vals)
-{
- if (operation_r < 0) {
- derr << "_load_finish got " << cpp_strerror(operation_r) << dendl;
- mds->clog->error() << "error reading sessionmap '" << get_object_name()
- << "' " << operation_r << " ("
- << cpp_strerror(operation_r) << ")";
- mds->damaged();
- ceph_abort(); // Should be unreachable because damaged() calls respawn()
- }
-
- // Decode header
- if (first) {
- if (header_r != 0) {
- derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl;
- mds->clog->error() << "error reading sessionmap header "
- << header_r << " (" << cpp_strerror(header_r) << ")";
- mds->damaged();
- ceph_abort(); // Should be unreachable because damaged() calls respawn()
- }
-
- if(header_bl.length() == 0) {
- dout(4) << __func__ << ": header missing, loading legacy..." << dendl;
- load_legacy();
- return;
- }
-
- try {
- decode_header(header_bl);
- } catch (buffer::error &e) {
- mds->clog->error() << "corrupt sessionmap header: " << e.what();
- mds->damaged();
- ceph_abort(); // Should be unreachable because damaged() calls respawn()
- }
- dout(10) << __func__ << " loaded version " << version << dendl;
- }
-
- if (values_r != 0) {
- derr << __func__ << ": error reading values: "
- << cpp_strerror(values_r) << dendl;
- mds->clog->error() << "error reading sessionmap values: "
- << values_r << " (" << cpp_strerror(values_r) << ")";
- mds->damaged();
- ceph_abort(); // Should be unreachable because damaged() calls respawn()
- }
-
- // Decode session_vals
- try {
- decode_values(session_vals);
- } catch (buffer::error &e) {
- mds->clog->error() << "corrupt sessionmap values: " << e.what();
- mds->damaged();
- ceph_abort(); // Should be unreachable because damaged() calls respawn()
- }
-
- if (more_session_vals) {
- // Issue another read if we're not at the end of the omap
- const std::string last_key = session_vals.rbegin()->first;
- dout(10) << __func__ << ": continue omap load from '"
- << last_key << "'" << dendl;
- object_t oid = get_object_name();
- object_locator_t oloc(mds->mdsmap->get_metadata_pool());
- C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
- ObjectOperation op;
- op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op,
- &c->session_vals, &c->more_session_vals, &c->values_r);
- mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
- new C_OnFinisher(c, mds->finisher));
- } else {
- // I/O is complete. Update `by_state`
- dout(10) << __func__ << ": omap load complete" << dendl;
- for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
- i != session_map.end(); ++i) {
- Session *s = i->second;
- auto by_state_entry = by_state.find(s->get_state());
- if (by_state_entry == by_state.end())
- by_state_entry = by_state.emplace(s->get_state(),
- new xlist<Session*>).first;
- by_state_entry->second->push_back(&s->item_session_list);
- }
-
- // Population is complete. Trigger load waiters.
- dout(10) << __func__ << ": v " << version
- << ", " << session_map.size() << " sessions" << dendl;
- projected = committing = committed = version;
- dump();
- finish_contexts(g_ceph_context, waiting_for_load);
- }
-}
-
-/**
- * Populate session state from OMAP records in this
- * rank's sessionmap object.
- */
-void SessionMap::load(MDSInternalContextBase *onload)
-{
- dout(10) << "load" << dendl;
-
- if (onload)
- waiting_for_load.push_back(onload);
-
- C_IO_SM_Load *c = new C_IO_SM_Load(this, true);
- object_t oid = get_object_name();
- object_locator_t oloc(mds->mdsmap->get_metadata_pool());
-
- ObjectOperation op;
- op.omap_get_header(&c->header_bl, &c->header_r);
- op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op,
- &c->session_vals, &c->more_session_vals, &c->values_r);
-
- mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher));
-}
-
-namespace {
-class C_IO_SM_LoadLegacy : public SessionMapIOContext {
-public:
- bufferlist bl;
- explicit C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {}
- void finish(int r) override {
- sessionmap->_load_legacy_finish(r, bl);
- }
-};
-}
-
-
-/**
- * Load legacy (object data blob) SessionMap format, assuming
- * that waiting_for_load has already been populated with
- * the relevant completion. This is the fallback if we do not
- * find an OMAP header when attempting to load normally.
- */
-void SessionMap::load_legacy()
-{
- dout(10) << __func__ << dendl;
-
- C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this);
- object_t oid = get_object_name();
- object_locator_t oloc(mds->mdsmap->get_metadata_pool());
-
- mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0,
- new C_OnFinisher(c, mds->finisher));
-}
-
-void SessionMap::_load_legacy_finish(int r, bufferlist &bl)
-{
- bufferlist::iterator blp = bl.begin();
- if (r < 0) {
- derr << "_load_finish got " << cpp_strerror(r) << dendl;
- assert(0 == "failed to load sessionmap");
- }
- dump();
- decode_legacy(blp); // note: this sets last_cap_renew = now()
- dout(10) << "_load_finish v " << version
- << ", " << session_map.size() << " sessions, "
- << bl.length() << " bytes"
- << dendl;
- projected = committing = committed = version;
- dump();
-
- // Mark all sessions dirty, so that on next save() we will write
- // a complete OMAP version of the data loaded from the legacy format
- for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
- i != session_map.end(); ++i) {
- // Don't use mark_dirty because on this occasion we want to ignore the
- // keys_per_op limit and do one big write (upgrade must be atomic)
- dirty_sessions.insert(i->first);
- }
- loaded_legacy = true;
-
- finish_contexts(g_ceph_context, waiting_for_load);
-}
-
-
-// ----------------
-// SAVE
-
-namespace {
-class C_IO_SM_Save : public SessionMapIOContext {
- version_t version;
-public:
- C_IO_SM_Save(SessionMap *cm, version_t v) : SessionMapIOContext(cm), version(v) {}
- void finish(int r) override {
- if (r != 0) {
- get_mds()->handle_write_error(r);
- } else {
- sessionmap->_save_finish(version);
- }
- }
-};
-}
-
-void SessionMap::save(MDSInternalContextBase *onsave, version_t needv)
-{
- dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl;
-
- if (needv && committing >= needv) {
- assert(committing > committed);
- commit_waiters[committing].push_back(onsave);
- return;
- }
-
- commit_waiters[version].push_back(onsave);
-
- committing = version;
- SnapContext snapc;
- object_t oid = get_object_name();
- object_locator_t oloc(mds->mdsmap->get_metadata_pool());
-
- ObjectOperation op;
-
- /* Compose OSD OMAP transaction for full write */
- bufferlist header_bl;
- encode_header(&header_bl);
- op.omap_set_header(header_bl);
-
- /* If we loaded a legacy sessionmap, then erase the old data. If
- * an old-versioned MDS tries to read it, it'll fail out safely
- * with an end_of_buffer exception */
- if (loaded_legacy) {
- dout(4) << __func__ << " erasing legacy sessionmap" << dendl;
- op.truncate(0);
- loaded_legacy = false; // only need to truncate once.
- }
-
- dout(20) << " updating keys:" << dendl;
- map<string, bufferlist> to_set;
- for(std::set<entity_name_t>::iterator i = dirty_sessions.begin();
- i != dirty_sessions.end(); ++i) {
- const entity_name_t name = *i;
- Session *session = session_map[name];
-
- if (session->is_open() ||
- session->is_closing() ||
- session->is_stale() ||
- session->is_killing()) {
- dout(20) << " " << name << dendl;
- // Serialize K
- std::ostringstream k;
- k << name;
-
- // Serialize V
- bufferlist bl;
- session->info.encode(bl, mds->mdsmap->get_up_features());
-
- // Add to RADOS op
- to_set[k.str()] = bl;
-
- session->clear_dirty_completed_requests();
- } else {
- dout(20) << " " << name << " (ignoring)" << dendl;
- }
- }
- if (!to_set.empty()) {
- op.omap_set(to_set);
- }
-
- dout(20) << " removing keys:" << dendl;
- set<string> to_remove;
- for(std::set<entity_name_t>::const_iterator i = null_sessions.begin();
- i != null_sessions.end(); ++i) {
- dout(20) << " " << *i << dendl;
- std::ostringstream k;
- k << *i;
- to_remove.insert(k.str());
- }
- if (!to_remove.empty()) {
- op.omap_rm_keys(to_remove);
- }
-
- dirty_sessions.clear();
- null_sessions.clear();
-
- mds->objecter->mutate(oid, oloc, op, snapc,
- ceph::real_clock::now(),
- 0,
- new C_OnFinisher(new C_IO_SM_Save(this, version),
- mds->finisher));
-}
-
-void SessionMap::_save_finish(version_t v)
-{
- dout(10) << "_save_finish v" << v << dendl;
- committed = v;
-
- finish_contexts(g_ceph_context, commit_waiters[v]);
- commit_waiters.erase(v);
-}
-
-
-/**
- * Deserialize sessions, and update by_state index
- */
-void SessionMap::decode_legacy(bufferlist::iterator &p)
-{
- // Populate `sessions`
- SessionMapStore::decode_legacy(p);
-
- // Update `by_state`
- for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
- i != session_map.end(); ++i) {
- Session *s = i->second;
- auto by_state_entry = by_state.find(s->get_state());
- if (by_state_entry == by_state.end())
- by_state_entry = by_state.emplace(s->get_state(),
- new xlist<Session*>).first;
- by_state_entry->second->push_back(&s->item_session_list);
- }
-}
-
-uint64_t SessionMap::set_state(Session *session, int s) {
- if (session->state != s) {
- session->set_state(s);
- auto by_state_entry = by_state.find(s);
- if (by_state_entry == by_state.end())
- by_state_entry = by_state.emplace(s, new xlist<Session*>).first;
- by_state_entry->second->push_back(&session->item_session_list);
- }
- return session->get_state_seq();
-}
-
-void SessionMapStore::decode_legacy(bufferlist::iterator& p)
-{
- utime_t now = ceph_clock_now();
- uint64_t pre;
- ::decode(pre, p);
- if (pre == (uint64_t)-1) {
- DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p);
- assert(struct_v >= 2);
-
- ::decode(version, p);
-
- while (!p.end()) {
- entity_inst_t inst;
- ::decode(inst.name, p);
- Session *s = get_or_add_session(inst);
- if (s->is_closed())
- s->set_state(Session::STATE_OPEN);
- s->decode(p);
- }
-
- DECODE_FINISH(p);
- } else {
- // --- old format ----
- version = pre;
-
- // this is a meaningless upper bound. can be ignored.
- __u32 n;
- ::decode(n, p);
-
- while (n-- && !p.end()) {
- bufferlist::iterator p2 = p;
- Session *s = new Session;
- s->info.decode(p);
- if (session_map.count(s->info.inst.name)) {
- // eager client connected too fast! aie.
- dout(10) << " already had session for " << s->info.inst.name << ", recovering" << dendl;
- entity_name_t n = s->info.inst.name;
- delete s;
- s = session_map[n];
- p = p2;
- s->info.decode(p);
- } else {
- session_map[s->info.inst.name] = s;
- }
- s->set_state(Session::STATE_OPEN);
- s->last_cap_renew = now;
- }
- }
-}
-
-void SessionMapStore::dump(Formatter *f) const
-{
- f->open_array_section("Sessions");
- for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
- p != session_map.end();
- ++p) {
- f->open_object_section("Session");
- f->open_object_section("entity name");
- p->first.dump(f);
- f->close_section(); // entity name
- f->dump_string("state", p->second->get_state_name());
- f->open_object_section("Session info");
- p->second->info.dump(f);
- f->close_section(); // Session info
- f->close_section(); // Session
- }
- f->close_section(); // Sessions
-}
-
-void SessionMapStore::generate_test_instances(list<SessionMapStore*>& ls)
-{
- // pretty boring for now
- ls.push_back(new SessionMapStore());
-}
-
-void SessionMap::wipe()
-{
- dout(1) << "wipe start" << dendl;
- dump();
- while (!session_map.empty()) {
- Session *s = session_map.begin()->second;
- remove_session(s);
- }
- version = ++projected;
- dout(1) << "wipe result" << dendl;
- dump();
- dout(1) << "wipe done" << dendl;
-}
-
-void SessionMap::wipe_ino_prealloc()
-{
- for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
- p != session_map.end();
- ++p) {
- p->second->pending_prealloc_inos.clear();
- p->second->info.prealloc_inos.clear();
- p->second->info.used_inos.clear();
- }
- projected = ++version;
-}
-
-void SessionMap::add_session(Session *s)
-{
- dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
-
- assert(session_map.count(s->info.inst.name) == 0);
- session_map[s->info.inst.name] = s;
- auto by_state_entry = by_state.find(s->state);
- if (by_state_entry == by_state.end())
- by_state_entry = by_state.emplace(s->state, new xlist<Session*>).first;
- by_state_entry->second->push_back(&s->item_session_list);
- s->get();
-
- logger->set(l_mdssm_session_count, session_map.size());
- logger->inc(l_mdssm_session_add);
-}
-
-void SessionMap::remove_session(Session *s)
-{
- dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
-
- s->trim_completed_requests(0);
- s->item_session_list.remove_myself();
- session_map.erase(s->info.inst.name);
- dirty_sessions.erase(s->info.inst.name);
- null_sessions.insert(s->info.inst.name);
- s->put();
-
- logger->set(l_mdssm_session_count, session_map.size());
- logger->inc(l_mdssm_session_remove);
-}
-
-void SessionMap::touch_session(Session *session)
-{
- dout(10) << __func__ << " s=" << session << " name=" << session->info.inst.name << dendl;
-
- // Move to the back of the session list for this state (should
- // already be on a list courtesy of add_session and set_state)
- assert(session->item_session_list.is_on_list());
- auto by_state_entry = by_state.find(session->state);
- if (by_state_entry == by_state.end())
- by_state_entry = by_state.emplace(session->state,
- new xlist<Session*>).first;
- by_state_entry->second->push_back(&session->item_session_list);
-
- session->last_cap_renew = ceph_clock_now();
-}
-
-void SessionMap::_mark_dirty(Session *s)
-{
- if (dirty_sessions.count(s->info.inst.name))
- return;
-
- if (dirty_sessions.size() >= g_conf->mds_sessionmap_keys_per_op) {
- // Pre-empt the usual save() call from journal segment trim, in
- // order to avoid building up an oversized OMAP update operation
- // from too many sessions modified at once
- save(new C_MDSInternalNoop, version);
- }
-
- null_sessions.erase(s->info.inst.name);
- dirty_sessions.insert(s->info.inst.name);
-}
-
-void SessionMap::mark_dirty(Session *s)
-{
- dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
- << " v=" << version << dendl;
-
- _mark_dirty(s);
- version++;
- s->pop_pv(version);
-}
-
-void SessionMap::replay_dirty_session(Session *s)
-{
- dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
- << " v=" << version << dendl;
-
- _mark_dirty(s);
-
- replay_advance_version();
-}
-
-void SessionMap::replay_advance_version()
-{
- version++;
- projected = version;
-}
-
-version_t SessionMap::mark_projected(Session *s)
-{
- dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
- << " pv=" << projected << " -> " << projected + 1 << dendl;
- ++projected;
- s->push_pv(projected);
- return projected;
-}
-
-namespace {
-class C_IO_SM_Save_One : public SessionMapIOContext {
- MDSInternalContextBase *on_safe;
-public:
- C_IO_SM_Save_One(SessionMap *cm, MDSInternalContextBase *on_safe_)
- : SessionMapIOContext(cm), on_safe(on_safe_) {}
- void finish(int r) override {
- if (r != 0) {
- get_mds()->handle_write_error(r);
- } else {
- on_safe->complete(r);
- }
- }
-};
-}
-
-
-void SessionMap::save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
- MDSGatherBuilder *gather_bld)
-{
- assert(gather_bld != NULL);
-
- std::vector<entity_name_t> write_sessions;
-
- // Decide which sessions require a write
- for (std::set<entity_name_t>::iterator i = tgt_sessions.begin();
- i != tgt_sessions.end(); ++i) {
- const entity_name_t &session_id = *i;
-
- if (session_map.count(session_id) == 0) {
- // Session isn't around any more, never mind.
- continue;
- }
-
- Session *session = session_map[session_id];
- if (!session->has_dirty_completed_requests()) {
- // Session hasn't had completed_requests
- // modified since last write, no need to
- // write it now.
- continue;
- }
-
- if (dirty_sessions.count(session_id) > 0) {
- // Session is already dirtied, will be written, no
- // need to pre-empt that.
- continue;
- }
- // Okay, passed all our checks, now we write
- // this session out. The version we write
- // into the OMAP may now be higher-versioned
- // than the version in the header, but that's
- // okay because it's never a problem to have
- // an overly-fresh copy of a session.
- write_sessions.push_back(*i);
- }
-
- dout(4) << __func__ << ": writing " << write_sessions.size() << dendl;
-
- // Batch writes into mds_sessionmap_keys_per_op
- const uint32_t kpo = g_conf->mds_sessionmap_keys_per_op;
- map<string, bufferlist> to_set;
- for (uint32_t i = 0; i < write_sessions.size(); ++i) {
- // Start a new write transaction?
- if (i % g_conf->mds_sessionmap_keys_per_op == 0) {
- to_set.clear();
- }
-
- const entity_name_t &session_id = write_sessions[i];
- Session *session = session_map[session_id];
- session->clear_dirty_completed_requests();
-
- // Serialize K
- std::ostringstream k;
- k << session_id;
-
- // Serialize V
- bufferlist bl;
- session->info.encode(bl, mds->mdsmap->get_up_features());
-
- // Add to RADOS op
- to_set[k.str()] = bl;
-
- // Complete this write transaction?
- if (i == write_sessions.size() - 1
- || i % kpo == kpo - 1) {
- ObjectOperation op;
- op.omap_set(to_set);
-
- SnapContext snapc;
- object_t oid = get_object_name();
- object_locator_t oloc(mds->mdsmap->get_metadata_pool());
- MDSInternalContextBase *on_safe = gather_bld->new_sub();
- mds->objecter->mutate(oid, oloc, op, snapc,
- ceph::real_clock::now(),
- 0, new C_OnFinisher(
- new C_IO_SM_Save_One(this, on_safe),
- mds->finisher));
- }
- }
-}
-
-// =================
-// Session
-
-#undef dout_prefix
-#define dout_prefix *_dout << "Session "
-
-/**
- * Calculate the length of the `requests` member list,
- * because elist does not have a size() method.
- *
- * O(N) runtime. This would be const, but elist doesn't
- * have const iterators.
- */
-size_t Session::get_request_count()
-{
- size_t result = 0;
-
- elist<MDRequestImpl*>::iterator p = requests.begin(
- member_offset(MDRequestImpl, item_session_request));
- while (!p.end()) {
- ++result;
- ++p;
- }
-
- return result;
-}
-
-/**
- * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
- * with n_caps equal to the number of caps that were released
- * in the message. Used to update state about how many caps a
- * client has released since it was last instructed to RECALL_STATE.
- */
-void Session::notify_cap_release(size_t n_caps)
-{
- if (!recalled_at.is_zero()) {
- recall_release_count += n_caps;
- if (recall_release_count >= recall_count)
- clear_recalled_at();
- }
-}
-
-/**
- * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
- * message is sent to the client. Update our recall-related state
- * in order to generate health metrics if the session doesn't see
- * a commensurate number of calls to ::notify_cap_release
- */
-void Session::notify_recall_sent(const size_t new_limit)
-{
- if (recalled_at.is_zero()) {
- // Entering recall phase, set up counters so we can later
- // judge whether the client has respected the recall request
- recalled_at = last_recall_sent = ceph_clock_now();
- assert (new_limit < caps.size()); // Behaviour of Server::recall_client_state
- recall_count = caps.size() - new_limit;
- recall_release_count = 0;
- } else {
- last_recall_sent = ceph_clock_now();
- }
-}
-
-void Session::clear_recalled_at()
-{
- recalled_at = last_recall_sent = utime_t();
- recall_count = 0;
- recall_release_count = 0;
-}
-
-void Session::set_client_metadata(map<string, string> const &meta)
-{
- info.client_metadata = meta;
-
- _update_human_name();
-}
-
-/**
- * Use client metadata to generate a somewhat-friendlier
- * name for the client than its session ID.
- *
- * This is *not* guaranteed to be unique, and any machine
- * consumers of session-related output should always use
- * the session ID as a primary capacity and use this only
- * as a presentation hint.
- */
-void Session::_update_human_name()
-{
- auto info_client_metadata_entry = info.client_metadata.find("hostname");
- if (info_client_metadata_entry != info.client_metadata.end()) {
- // Happy path, refer to clients by hostname
- human_name = info_client_metadata_entry->second;
- if (!info.auth_name.has_default_id()) {
- // When a non-default entity ID is set by the user, assume they
- // would like to see it in references to the client, if it's
- // reasonable short. Limit the length because we don't want
- // to put e.g. uuid-generated names into a "human readable"
- // rendering.
- const int arbitrarily_short = 16;
- if (info.auth_name.get_id().size() < arbitrarily_short) {
- human_name += std::string(":") + info.auth_name.get_id();
- }
- }
- } else {
- // Fallback, refer to clients by ID e.g. client.4567
- human_name = stringify(info.inst.name.num());
- }
-}
-
-void Session::decode(bufferlist::iterator &p)
-{
- info.decode(p);
-
- _update_human_name();
-}
-
-int Session::check_access(CInode *in, unsigned mask,
- int caller_uid, int caller_gid,
- const vector<uint64_t> *caller_gid_list,
- int new_uid, int new_gid)
-{
- string path;
- CInode *diri = NULL;
- if (!in->is_base())
- diri = in->get_projected_parent_dn()->get_dir()->get_inode();
- if (diri && diri->is_stray()){
- path = in->get_projected_inode()->stray_prior_path;
- dout(20) << __func__ << " stray_prior_path " << path << dendl;
- } else {
- in->make_path_string(path, true);
- dout(20) << __func__ << " path " << path << dendl;
- }
- if (path.length())
- path = path.substr(1); // drop leading /
-
- if (in->inode.is_dir() &&
- in->inode.has_layout() &&
- in->inode.layout.pool_ns.length() &&
- !connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
- dout(10) << __func__ << " client doesn't support FS_FILE_LAYOUT_V2" << dendl;
- return -EIO;
- }
-
- if (!auth_caps.is_capable(path, in->inode.uid, in->inode.gid, in->inode.mode,
- caller_uid, caller_gid, caller_gid_list, mask,
- new_uid, new_gid)) {
- return -EACCES;
- }
- return 0;
-}
-
-int SessionFilter::parse(
- const std::vector<std::string> &args,
- std::stringstream *ss)
-{
- assert(ss != NULL);
-
- for (const auto &s : args) {
- dout(20) << __func__ << " parsing filter '" << s << "'" << dendl;
-
- auto eq = s.find("=");
- if (eq == std::string::npos || eq == s.size()) {
- *ss << "Invalid filter '" << s << "'";
- return -EINVAL;
- }
-
- // Keys that start with this are to be taken as referring
- // to freeform client metadata fields.
- const std::string metadata_prefix("client_metadata.");
-
- auto k = s.substr(0, eq);
- auto v = s.substr(eq + 1);
-
- dout(20) << __func__ << " parsed k='" << k << "', v='" << v << "'" << dendl;
-
- if (k.compare(0, metadata_prefix.size(), metadata_prefix) == 0
- && k.size() > metadata_prefix.size()) {
- // Filter on arbitrary metadata key (no fixed schema for this,
- // so anything after the dot is a valid field to filter on)
- auto metadata_key = k.substr(metadata_prefix.size());
- metadata.insert(std::make_pair(metadata_key, v));
- } else if (k == "auth_name") {
- // Filter on client entity name
- auth_name = v;
- } else if (k == "state") {
- state = v;
- } else if (k == "id") {
- std::string err;
- id = strict_strtoll(v.c_str(), 10, &err);
- if (!err.empty()) {
- *ss << err;
- return -EINVAL;
- }
- } else if (k == "reconnecting") {
-
- /**
- * Strict boolean parser. Allow true/false/0/1.
- * Anything else is -EINVAL.
- */
- auto is_true = [](const std::string &bstr, bool *out) -> bool
- {
- assert(out != nullptr);
-
- if (bstr == "true" || bstr == "1") {
- *out = true;
- return 0;
- } else if (bstr == "false" || bstr == "0") {
- *out = false;
- return 0;
- } else {
- return -EINVAL;
- }
- };
-
- bool bval;
- int r = is_true(v, &bval);
- if (r == 0) {
- set_reconnecting(bval);
- } else {
- *ss << "Invalid boolean value '" << v << "'";
- return -EINVAL;
- }
- } else {
- *ss << "Invalid filter key '" << k << "'";
- return -EINVAL;
- }
- }
-
- return 0;
-}
-
-bool SessionFilter::match(
- const Session &session,
- std::function<bool(client_t)> is_reconnecting) const
-{
- for (const auto &m : metadata) {
- const auto &k = m.first;
- const auto &v = m.second;
- if (session.info.client_metadata.count(k) == 0) {
- return false;
- }
- if (session.info.client_metadata.at(k) != v) {
- return false;
- }
- }
-
- if (!auth_name.empty() && auth_name != session.info.auth_name.get_id()) {
- return false;
- }
-
- if (!state.empty() && state != session.get_state_name()) {
- return false;
- }
-
- if (id != 0 && id != session.info.inst.name.num()) {
- return false;
- }
-
- if (reconnecting.first) {
- const bool am_reconnecting = is_reconnecting(session.info.inst.name.num());
- if (reconnecting.second != am_reconnecting) {
- return false;
- }
- }
-
- return true;
-}
-
-std::ostream& operator<<(std::ostream &out, const Session &s)
-{
- if (s.get_human_name() == stringify(s.info.inst.name.num())) {
- out << s.get_human_name();
- } else {
- out << s.get_human_name() << " (" << std::dec << s.info.inst.name.num() << ")";
- }
- return out;
-}
-