+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_MDS_SESSIONMAP_H
-#define CEPH_MDS_SESSIONMAP_H
-
-#include <set>
-using std::set;
-
-#include "include/unordered_map.h"
-
-#include "include/Context.h"
-#include "include/xlist.h"
-#include "include/elist.h"
-#include "include/interval_set.h"
-#include "mdstypes.h"
-#include "mds/MDSAuthCaps.h"
-#include "common/perf_counters.h"
-
-class CInode;
-struct MDRequestImpl;
-
-#include "CInode.h"
-#include "Capability.h"
-#include "msg/Message.h"
-
-enum {
- l_mdssm_first = 5500,
- l_mdssm_session_count,
- l_mdssm_session_add,
- l_mdssm_session_remove,
- l_mdssm_last,
-};
-
-/*
- * session
- */
-
-class Session : public RefCountedObject {
- // -- state etc --
-public:
- /*
-
- <deleted> <-- closed <------------+
- ^ | |
- | v |
- killing <-- opening <----+ |
- ^ | | |
- | v | |
- stale <--> open --> closing ---+
-
- + additional dimension of 'importing' (with counter)
-
- */
- enum {
- STATE_CLOSED = 0,
- STATE_OPENING = 1, // journaling open
- STATE_OPEN = 2,
- STATE_CLOSING = 3, // journaling close
- STATE_STALE = 4,
- STATE_KILLING = 5
- };
-
- const char *get_state_name(int s) const {
- switch (s) {
- case STATE_CLOSED: return "closed";
- case STATE_OPENING: return "opening";
- case STATE_OPEN: return "open";
- case STATE_CLOSING: return "closing";
- case STATE_STALE: return "stale";
- case STATE_KILLING: return "killing";
- default: return "???";
- }
- }
-
-private:
- int state;
- uint64_t state_seq;
- int importing_count;
- friend class SessionMap;
-
- // Human (friendly) name is soft state generated from client metadata
- void _update_human_name();
- std::string human_name;
-
- // Versions in this session was projected: used to verify
- // that appropriate mark_dirty calls follow.
- std::deque<version_t> projected;
-
-
-
-public:
-
- void push_pv(version_t pv)
- {
- assert(projected.empty() || projected.back() != pv);
- projected.push_back(pv);
- }
-
- void pop_pv(version_t v)
- {
- assert(!projected.empty());
- assert(projected.front() == v);
- projected.pop_front();
- }
-
- int get_state() const { return state; }
- void set_state(int new_state)
- {
- if (state != new_state) {
- state = new_state;
- state_seq++;
- }
- }
- void decode(bufferlist::iterator &p);
- void set_client_metadata(std::map<std::string, std::string> const &meta);
- std::string get_human_name() const {return human_name;}
-
- // Ephemeral state for tracking progress of capability recalls
- utime_t recalled_at; // When was I asked to SESSION_RECALL?
- utime_t last_recall_sent;
- uint32_t recall_count; // How many caps was I asked to SESSION_RECALL?
- uint32_t recall_release_count; // How many caps have I actually revoked?
-
- session_info_t info; ///< durable bits
-
- MDSAuthCaps auth_caps;
-
- ConnectionRef connection;
- xlist<Session*>::item item_session_list;
-
- list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
-
- elist<MDRequestImpl*> requests;
- size_t get_request_count();
-
- interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
-
- void notify_cap_release(size_t n_caps);
- void notify_recall_sent(const size_t new_limit);
- void clear_recalled_at();
-
- inodeno_t next_ino() const {
- if (info.prealloc_inos.empty())
- return 0;
- return info.prealloc_inos.range_start();
- }
- inodeno_t take_ino(inodeno_t ino = 0) {
- assert(!info.prealloc_inos.empty());
-
- if (ino) {
- if (info.prealloc_inos.contains(ino))
- info.prealloc_inos.erase(ino);
- else
- ino = 0;
- }
- if (!ino) {
- ino = info.prealloc_inos.range_start();
- info.prealloc_inos.erase(ino);
- }
- info.used_inos.insert(ino, 1);
- return ino;
- }
- int get_num_projected_prealloc_inos() const {
- return info.prealloc_inos.size() + pending_prealloc_inos.size();
- }
-
- client_t get_client() const {
- return info.get_client();
- }
-
- const char *get_state_name() const { return get_state_name(state); }
- uint64_t get_state_seq() const { return state_seq; }
- bool is_closed() const { return state == STATE_CLOSED; }
- bool is_opening() const { return state == STATE_OPENING; }
- bool is_open() const { return state == STATE_OPEN; }
- bool is_closing() const { return state == STATE_CLOSING; }
- bool is_stale() const { return state == STATE_STALE; }
- bool is_killing() const { return state == STATE_KILLING; }
-
- void inc_importing() {
- ++importing_count;
- }
- void dec_importing() {
- assert(importing_count > 0);
- --importing_count;
- }
- bool is_importing() const { return importing_count > 0; }
-
- // -- caps --
-private:
- version_t cap_push_seq; // cap push seq #
- map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
-
-public:
- xlist<Capability*> caps; // inodes with caps; front=most recently used
- xlist<ClientLease*> leases; // metadata leases to clients
- utime_t last_cap_renew;
-
-public:
- version_t inc_push_seq() { return ++cap_push_seq; }
- version_t get_push_seq() const { return cap_push_seq; }
-
- version_t wait_for_flush(MDSInternalContextBase* c) {
- waitfor_flush[get_push_seq()].push_back(c);
- return get_push_seq();
- }
- void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
- while (!waitfor_flush.empty()) {
- if (waitfor_flush.begin()->first > seq)
- break;
- ls.splice(ls.end(), waitfor_flush.begin()->second);
- waitfor_flush.erase(waitfor_flush.begin());
- }
- }
-
- void add_cap(Capability *cap) {
- caps.push_back(&cap->item_session_caps);
- }
- void touch_lease(ClientLease *r) {
- leases.push_back(&r->item_session_lease);
- }
-
- // -- leases --
- uint32_t lease_seq;
-
- // -- completed requests --
-private:
- // Has completed_requests been modified since the last time we
- // wrote this session out?
- bool completed_requests_dirty;
-
- unsigned num_trim_flushes_warnings;
- unsigned num_trim_requests_warnings;
-public:
- void add_completed_request(ceph_tid_t t, inodeno_t created) {
- info.completed_requests[t] = created;
- completed_requests_dirty = true;
- }
- bool trim_completed_requests(ceph_tid_t mintid) {
- // trim
- bool erased_any = false;
- while (!info.completed_requests.empty() &&
- (mintid == 0 || info.completed_requests.begin()->first < mintid)) {
- info.completed_requests.erase(info.completed_requests.begin());
- erased_any = true;
- }
-
- if (erased_any) {
- completed_requests_dirty = true;
- }
- return erased_any;
- }
- bool have_completed_request(ceph_tid_t tid, inodeno_t *pcreated) const {
- map<ceph_tid_t,inodeno_t>::const_iterator p = info.completed_requests.find(tid);
- if (p == info.completed_requests.end())
- return false;
- if (pcreated)
- *pcreated = p->second;
- return true;
- }
-
- void add_completed_flush(ceph_tid_t tid) {
- info.completed_flushes.insert(tid);
- }
- bool trim_completed_flushes(ceph_tid_t mintid) {
- bool erased_any = false;
- while (!info.completed_flushes.empty() &&
- (mintid == 0 || *info.completed_flushes.begin() < mintid)) {
- info.completed_flushes.erase(info.completed_flushes.begin());
- erased_any = true;
- }
- if (erased_any) {
- completed_requests_dirty = true;
- }
- return erased_any;
- }
- bool have_completed_flush(ceph_tid_t tid) const {
- return info.completed_flushes.count(tid);
- }
-
- unsigned get_num_completed_flushes() const { return info.completed_flushes.size(); }
- unsigned get_num_trim_flushes_warnings() const {
- return num_trim_flushes_warnings;
- }
- void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings; }
- void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings = 0; }
-
- unsigned get_num_completed_requests() const { return info.completed_requests.size(); }
- unsigned get_num_trim_requests_warnings() const {
- return num_trim_requests_warnings;
- }
- void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; }
- void reset_num_trim_requests_warnings() { num_trim_requests_warnings = 0; }
-
- bool has_dirty_completed_requests() const
- {
- return completed_requests_dirty;
- }
-
- void clear_dirty_completed_requests()
- {
- completed_requests_dirty = false;
- }
-
- int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
- const vector<uint64_t> *gid_list, int new_uid, int new_gid);
-
-
- Session() :
- state(STATE_CLOSED), state_seq(0), importing_count(0),
- recall_count(0), recall_release_count(0),
- auth_caps(g_ceph_context),
- connection(NULL), item_session_list(this),
- requests(0), // member_offset passed to front() manually
- cap_push_seq(0),
- lease_seq(0),
- completed_requests_dirty(false),
- num_trim_flushes_warnings(0),
- num_trim_requests_warnings(0) { }
- ~Session() override {
- assert(!item_session_list.is_on_list());
- while (!preopen_out_queue.empty()) {
- preopen_out_queue.front()->put();
- preopen_out_queue.pop_front();
- }
- }
-
- void clear() {
- pending_prealloc_inos.clear();
- info.clear_meta();
-
- cap_push_seq = 0;
- last_cap_renew = utime_t();
-
- }
-};
-
-class SessionFilter
-{
-protected:
- // First is whether to filter, second is filter value
- std::pair<bool, bool> reconnecting;
-
-public:
- std::map<std::string, std::string> metadata;
- std::string auth_name;
- std::string state;
- int64_t id;
-
- SessionFilter()
- : reconnecting(false, false), id(0)
- {}
-
- bool match(
- const Session &session,
- std::function<bool(client_t)> is_reconnecting) const;
- int parse(const std::vector<std::string> &args, std::stringstream *ss);
- void set_reconnecting(bool v)
- {
- reconnecting.first = true;
- reconnecting.second = v;
- }
-};
-
-/*
- * session map
- */
-
-class MDSRank;
-
-/**
- * Encapsulate the serialized state associated with SessionMap. Allows
- * encode/decode outside of live MDS instance.
- */
-class SessionMapStore {
-protected:
- version_t version;
- ceph::unordered_map<entity_name_t, Session*> session_map;
- PerfCounters *logger;
-public:
- mds_rank_t rank;
-
- version_t get_version() const {return version;}
-
- virtual void encode_header(bufferlist *header_bl);
- virtual void decode_header(bufferlist &header_bl);
- virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
- virtual void decode_legacy(bufferlist::iterator& blp);
- void dump(Formatter *f) const;
-
- void set_rank(mds_rank_t r)
- {
- rank = r;
- }
-
- Session* get_or_add_session(const entity_inst_t& i) {
- Session *s;
- auto session_map_entry = session_map.find(i.name);
- if (session_map_entry != session_map.end()) {
- s = session_map_entry->second;
- } else {
- s = session_map[i.name] = new Session;
- s->info.inst = i;
- s->last_cap_renew = ceph_clock_now();
- if (logger) {
- logger->set(l_mdssm_session_count, session_map.size());
- logger->inc(l_mdssm_session_add);
- }
- }
-
- return s;
- }
-
- static void generate_test_instances(list<SessionMapStore*>& ls);
-
- void reset_state()
- {
- session_map.clear();
- }
-
- SessionMapStore() : version(0), logger(nullptr), rank(MDS_RANK_NONE) {}
- virtual ~SessionMapStore() {};
-};
-
-class SessionMap : public SessionMapStore {
-public:
- MDSRank *mds;
-
-protected:
- version_t projected, committing, committed;
-public:
- map<int,xlist<Session*>* > by_state;
- uint64_t set_state(Session *session, int state);
- map<version_t, list<MDSInternalContextBase*> > commit_waiters;
-
- explicit SessionMap(MDSRank *m) : mds(m),
- projected(0), committing(0), committed(0),
- loaded_legacy(false)
- { }
-
- ~SessionMap() override
- {
- for (auto p : by_state)
- delete p.second;
-
- if (logger) {
- g_ceph_context->get_perfcounters_collection()->remove(logger);
- }
-
- delete logger;
- }
-
- void register_perfcounters();
-
- void set_version(const version_t v)
- {
- version = projected = v;
- }
-
- void set_projected(const version_t v)
- {
- projected = v;
- }
-
- version_t get_projected() const
- {
- return projected;
- }
-
- version_t get_committed() const
- {
- return committed;
- }
-
- version_t get_committing() const
- {
- return committing;
- }
-
- // sessions
- void decode_legacy(bufferlist::iterator& blp) override;
- bool empty() const { return session_map.empty(); }
- const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
- {
- return session_map;
- }
-
- bool is_any_state(int state) const {
- map<int,xlist<Session*>* >::const_iterator p = by_state.find(state);
- if (p == by_state.end() || p->second->empty())
- return false;
- return true;
- }
-
- bool have_unclosed_sessions() const {
- return
- is_any_state(Session::STATE_OPENING) ||
- is_any_state(Session::STATE_OPEN) ||
- is_any_state(Session::STATE_CLOSING) ||
- is_any_state(Session::STATE_STALE) ||
- is_any_state(Session::STATE_KILLING);
- }
- bool have_session(entity_name_t w) const {
- return session_map.count(w);
- }
- Session* get_session(entity_name_t w) {
- auto session_map_entry = session_map.find(w);
- return (session_map_entry != session_map.end() ?
- session_map_entry-> second : nullptr);
- }
- const Session* get_session(entity_name_t w) const {
- ceph::unordered_map<entity_name_t, Session*>::const_iterator p = session_map.find(w);
- if (p == session_map.end()) {
- return NULL;
- } else {
- return p->second;
- }
- }
-
- void add_session(Session *s);
- void remove_session(Session *s);
- void touch_session(Session *session);
-
- Session *get_oldest_session(int state) {
- auto by_state_entry = by_state.find(state);
- if (by_state_entry == by_state.end() || by_state_entry->second->empty())
- return 0;
- return by_state_entry->second->front();
- }
-
- void dump();
-
- void get_client_set(set<client_t>& s) {
- for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
- p != session_map.end();
- ++p)
- if (p->second->info.inst.name.is_client())
- s.insert(p->second->info.inst.name.num());
- }
- void get_client_session_set(set<Session*>& s) const {
- for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
- p != session_map.end();
- ++p)
- if (p->second->info.inst.name.is_client())
- s.insert(p->second);
- }
-
- void open_sessions(map<client_t,entity_inst_t>& client_map) {
- for (map<client_t,entity_inst_t>::iterator p = client_map.begin();
- p != client_map.end();
- ++p) {
- Session *s = get_or_add_session(p->second);
- set_state(s, Session::STATE_OPEN);
- version++;
- }
- }
-
- // helpers
- entity_inst_t& get_inst(entity_name_t w) {
- assert(session_map.count(w));
- return session_map[w]->info.inst;
- }
- version_t inc_push_seq(client_t client) {
- return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
- }
- version_t get_push_seq(client_t client) {
- return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
- }
- bool have_completed_request(metareqid_t rid) {
- Session *session = get_session(rid.name);
- return session && session->have_completed_request(rid.tid, NULL);
- }
- void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
- Session *session = get_session(c);
- assert(session);
- session->trim_completed_requests(tid);
- }
-
- void wipe();
- void wipe_ino_prealloc();
-
- // -- loading, saving --
- inodeno_t ino;
- list<MDSInternalContextBase*> waiting_for_load;
-
- object_t get_object_name() const;
-
- void load(MDSInternalContextBase *onload);
- void _load_finish(
- int operation_r,
- int header_r,
- int values_r,
- bool first,
- bufferlist &header_bl,
- std::map<std::string, bufferlist> &session_vals,
- bool more_session_vals);
-
- void load_legacy();
- void _load_legacy_finish(int r, bufferlist &bl);
-
- void save(MDSInternalContextBase *onsave, version_t needv=0);
- void _save_finish(version_t v);
-
-protected:
- std::set<entity_name_t> dirty_sessions;
- std::set<entity_name_t> null_sessions;
- bool loaded_legacy;
- void _mark_dirty(Session *session);
-public:
-
- /**
- * Advance the version, and mark this session
- * as dirty within the new version.
- *
- * Dirty means journalled but needing writeback
- * to the backing store. Must have called
- * mark_projected previously for this session.
- */
- void mark_dirty(Session *session);
-
- /**
- * Advance the projected version, and mark this
- * session as projected within the new version
- *
- * Projected means the session is updated in memory
- * but we're waiting for the journal write of the update
- * to finish. Must subsequently call mark_dirty
- * for sessions in the same global order as calls
- * to mark_projected.
- */
- version_t mark_projected(Session *session);
-
- /**
- * During replay, advance versions to account
- * for a session modification, and mark the
- * session dirty.
- */
- void replay_dirty_session(Session *session);
-
- /**
- * During replay, if a session no longer present
- * would have consumed a version, advance `version`
- * and `projected` to account for that.
- */
- void replay_advance_version();
-
- /**
- * For these session IDs, if a session exists with this ID, and it has
- * dirty completed_requests, then persist it immediately
- * (ahead of usual project/dirty versioned writes
- * of the map).
- */
- void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
- MDSGatherBuilder *gather_bld);
-};
-
-std::ostream& operator<<(std::ostream &out, const Session &s);
-
-
-#endif