Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / WBThrottle.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank Storage, Inc.
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef WBTHROTTLE_H
16 #define WBTHROTTLE_H
17
18 #include "include/unordered_map.h"
19 #include <boost/tuple/tuple.hpp>
20 #include "include/memory.h"
21 #include "common/Formatter.h"
22 #include "common/hobject.h"
23 #include "include/interval_set.h"
24 #include "FDCache.h"
25 #include "common/Thread.h"
26 #include "common/ceph_context.h"
27
28 class PerfCounters;
29 enum {
30   l_wbthrottle_first = 999090,
31   l_wbthrottle_bytes_dirtied,
32   l_wbthrottle_bytes_wb,
33   l_wbthrottle_ios_dirtied,
34   l_wbthrottle_ios_wb,
35   l_wbthrottle_inodes_dirtied,
36   l_wbthrottle_inodes_wb,
37   l_wbthrottle_last
38 };
39
40 /**
41  * WBThrottle
42  *
43  * Tracks, throttles, and flushes outstanding IO
44  */
45 class WBThrottle : Thread, public md_config_obs_t {
46   ghobject_t clearing;
47   /* *_limits.first is the start_flusher limit and
48    * *_limits.second is the hard limit
49    */
50
51   /// Limits on unflushed bytes
52   pair<uint64_t, uint64_t> size_limits;
53
54   /// Limits on unflushed ios
55   pair<uint64_t, uint64_t> io_limits;
56
57   /// Limits on unflushed objects
58   pair<uint64_t, uint64_t> fd_limits;
59
60   uint64_t cur_ios;  /// Currently unflushed IOs
61   uint64_t cur_size; /// Currently unflushed bytes
62
63   /**
64    * PendingWB tracks the ios pending on an object.
65    */
66   class PendingWB {
67   public:
68     bool nocache;
69     uint64_t size;
70     uint64_t ios;
71     PendingWB() : nocache(true), size(0), ios(0) {}
72     void add(bool _nocache, uint64_t _size, uint64_t _ios) {
73       if (!_nocache)
74         nocache = false; // only nocache if all writes are nocache
75       size += _size;
76       ios += _ios;
77     }
78   };
79
80   CephContext *cct;
81   PerfCounters *logger;
82   bool stopping;
83   Mutex lock;
84   Cond cond;
85
86
87   /**
88    * Flush objects in lru order
89    */
90   list<ghobject_t> lru;
91   ceph::unordered_map<ghobject_t, list<ghobject_t>::iterator> rev_lru;
92   void remove_object(const ghobject_t &oid) {
93     assert(lock.is_locked());
94     ceph::unordered_map<ghobject_t, list<ghobject_t>::iterator>::iterator iter =
95       rev_lru.find(oid);
96     if (iter == rev_lru.end())
97       return;
98
99     lru.erase(iter->second);
100     rev_lru.erase(iter);
101   }
102   ghobject_t pop_object() {
103     assert(!lru.empty());
104     ghobject_t oid(lru.front());
105     lru.pop_front();
106     rev_lru.erase(oid);
107     return oid;
108   }
109   void insert_object(const ghobject_t &oid) {
110     assert(rev_lru.find(oid) == rev_lru.end());
111     lru.push_back(oid);
112     rev_lru.insert(make_pair(oid, --lru.end()));
113   }
114
115   ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> > pending_wbs;
116
117   /// get next flush to perform
118   bool get_next_should_flush(
119     boost::tuple<ghobject_t, FDRef, PendingWB> *next ///< [out] next to flush
120     ); ///< @return false if we are shutting down
121 public:
122   enum FS {
123     BTRFS,
124     XFS
125   };
126
127 private:
128   FS fs;
129
130   void set_from_conf();
131   bool beyond_limit() const {
132     if (cur_ios < io_limits.first &&
133         pending_wbs.size() < fd_limits.first &&
134         cur_size < size_limits.first)
135       return false;
136     else
137       return true;
138   }
139   bool need_flush() const {
140     if (cur_ios < io_limits.second &&
141         pending_wbs.size() < fd_limits.second &&
142         cur_size < size_limits.second)
143       return false;
144     else
145       return true;
146   }
147
148 public:
149   explicit WBThrottle(CephContext *cct);
150   ~WBThrottle() override;
151
152   void start();
153   void stop();
154   /// Set fs as XFS or BTRFS
155   void set_fs(FS new_fs) {
156     Mutex::Locker l(lock);
157     fs = new_fs;
158     set_from_conf();
159   }
160
161   /// Queue wb on oid, fd taking throttle (does not block)
162   void queue_wb(
163     FDRef fd,              ///< [in] FDRef to oid
164     const ghobject_t &oid, ///< [in] object
165     uint64_t offset,       ///< [in] offset written
166     uint64_t len,          ///< [in] length written
167     bool nocache           ///< [in] try to clear out of cache after write
168     );
169
170   /// Clear all wb (probably due to sync)
171   void clear();
172
173   /// Clear object
174   void clear_object(const ghobject_t &oid);
175
176   /// Block until there is throttle available
177   void throttle();
178
179   /// md_config_obs_t
180   const char** get_tracked_conf_keys() const override;
181   void handle_conf_change(const md_config_t *conf,
182                           const std::set<std::string> &changed) override;
183
184   /// Thread
185   void *entry() override;
186 };
187
188 #endif