Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / WBThrottle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "acconfig.h"
5
6 #include "os/filestore/WBThrottle.h"
7 #include "common/perf_counters.h"
8
9 WBThrottle::WBThrottle(CephContext *cct) :
10   cur_ios(0), cur_size(0),
11   cct(cct),
12   logger(NULL),
13   stopping(true),
14   lock("WBThrottle::lock", false, true, false, cct),
15   fs(XFS)
16 {
17   {
18     Mutex::Locker l(lock);
19     set_from_conf();
20   }
21   assert(cct);
22   PerfCountersBuilder b(
23     cct, string("WBThrottle"),
24     l_wbthrottle_first, l_wbthrottle_last);
25   b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied", "Dirty data");
26   b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb", "Written data");
27   b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied", "Dirty operations");
28   b.add_u64(l_wbthrottle_ios_wb, "ios_wb", "Written operations");
29   b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied", "Entries waiting for write");
30   b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb", "Written entries");
31   logger = b.create_perf_counters();
32   cct->get_perfcounters_collection()->add(logger);
33   for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i)
34     logger->set(i, 0);
35
36   cct->_conf->add_observer(this);
37 }
38
39 WBThrottle::~WBThrottle() {
40   assert(cct);
41   cct->get_perfcounters_collection()->remove(logger);
42   delete logger;
43   cct->_conf->remove_observer(this);
44 }
45
46 void WBThrottle::start()
47 {
48   {
49     Mutex::Locker l(lock);
50     stopping = false;
51   }
52   create("wb_throttle");
53 }
54
55 void WBThrottle::stop()
56 {
57   {
58     Mutex::Locker l(lock);
59     stopping = true;
60     cond.Signal();
61   }
62
63   join();
64 }
65
66 const char** WBThrottle::get_tracked_conf_keys() const
67 {
68   static const char* KEYS[] = {
69     "filestore_wbthrottle_btrfs_bytes_start_flusher",
70     "filestore_wbthrottle_btrfs_bytes_hard_limit",
71     "filestore_wbthrottle_btrfs_ios_start_flusher",
72     "filestore_wbthrottle_btrfs_ios_hard_limit",
73     "filestore_wbthrottle_btrfs_inodes_start_flusher",
74     "filestore_wbthrottle_btrfs_inodes_hard_limit",
75     "filestore_wbthrottle_xfs_bytes_start_flusher",
76     "filestore_wbthrottle_xfs_bytes_hard_limit",
77     "filestore_wbthrottle_xfs_ios_start_flusher",
78     "filestore_wbthrottle_xfs_ios_hard_limit",
79     "filestore_wbthrottle_xfs_inodes_start_flusher",
80     "filestore_wbthrottle_xfs_inodes_hard_limit",
81     NULL
82   };
83   return KEYS;
84 }
85
86 void WBThrottle::set_from_conf()
87 {
88   assert(lock.is_locked());
89   if (fs == BTRFS) {
90     size_limits.first =
91       cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
92     size_limits.second =
93       cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
94     io_limits.first =
95       cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
96     io_limits.second =
97       cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
98     fd_limits.first =
99       cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
100     fd_limits.second =
101       cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
102   } else if (fs == XFS) {
103     size_limits.first =
104       cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
105     size_limits.second =
106       cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
107     io_limits.first =
108       cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
109     io_limits.second =
110       cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
111     fd_limits.first =
112       cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
113     fd_limits.second =
114       cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
115   } else {
116     assert(0 == "invalid value for fs");
117   }
118   cond.Signal();
119 }
120
121 void WBThrottle::handle_conf_change(const md_config_t *conf,
122                                     const std::set<std::string> &changed)
123 {
124   Mutex::Locker l(lock);
125   for (const char** i = get_tracked_conf_keys(); *i; ++i) {
126     if (changed.count(*i)) {
127       set_from_conf();
128       return;
129     }
130   }
131 }
132
133 bool WBThrottle::get_next_should_flush(
134   boost::tuple<ghobject_t, FDRef, PendingWB> *next)
135 {
136   assert(lock.is_locked());
137   assert(next);
138   while (!stopping && !beyond_limit())
139          cond.Wait(lock);
140   if (stopping)
141     return false;
142   assert(!pending_wbs.empty());
143   ghobject_t obj(pop_object());
144
145   ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
146     pending_wbs.find(obj);
147   *next = boost::make_tuple(obj, i->second.second, i->second.first);
148   pending_wbs.erase(i);
149   return true;
150 }
151
152
153 void *WBThrottle::entry()
154 {
155   Mutex::Locker l(lock);
156   boost::tuple<ghobject_t, FDRef, PendingWB> wb;
157   while (get_next_should_flush(&wb)) {
158     clearing = wb.get<0>();
159     cur_ios -= wb.get<2>().ios;
160     logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
161     logger->inc(l_wbthrottle_ios_wb, wb.get<2>().ios);
162     cur_size -= wb.get<2>().size;
163     logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size);
164     logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size);
165     logger->dec(l_wbthrottle_inodes_dirtied);
166     logger->inc(l_wbthrottle_inodes_wb);
167     lock.Unlock();
168 #ifdef HAVE_FDATASYNC
169     ::fdatasync(**wb.get<1>());
170 #else
171     ::fsync(**wb.get<1>());
172 #endif
173 #ifdef HAVE_POSIX_FADVISE
174     if (cct->_conf->filestore_fadvise && wb.get<2>().nocache) {
175       int fa_r = posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
176       assert(fa_r == 0);
177     }
178 #endif
179     lock.Lock();
180     clearing = ghobject_t();
181     cond.Signal();
182     wb = boost::tuple<ghobject_t, FDRef, PendingWB>();
183   }
184   return 0;
185 }
186
187 void WBThrottle::queue_wb(
188   FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len,
189   bool nocache)
190 {
191   Mutex::Locker l(lock);
192   ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
193     pending_wbs.find(hoid);
194   if (wbiter == pending_wbs.end()) {
195     wbiter = pending_wbs.insert(
196       make_pair(hoid,
197         make_pair(
198           PendingWB(),
199           fd))).first;
200     logger->inc(l_wbthrottle_inodes_dirtied);
201   } else {
202     remove_object(hoid);
203   }
204
205   cur_ios++;
206   logger->inc(l_wbthrottle_ios_dirtied);
207   cur_size += len;
208   logger->inc(l_wbthrottle_bytes_dirtied, len);
209
210   wbiter->second.first.add(nocache, len, 1);
211   insert_object(hoid);
212   if (beyond_limit())
213     cond.Signal();
214 }
215
216 void WBThrottle::clear()
217 {
218   Mutex::Locker l(lock);
219   for (ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
220          pending_wbs.begin();
221        i != pending_wbs.end();
222        ++i) {
223 #ifdef HAVE_POSIX_FADVISE
224     if (cct->_conf->filestore_fadvise && i->second.first.nocache) {
225       int fa_r = posix_fadvise(**i->second.second, 0, 0, POSIX_FADV_DONTNEED);
226       assert(fa_r == 0);
227     }
228 #endif
229
230   }
231   cur_ios = cur_size = 0;
232   logger->set(l_wbthrottle_ios_dirtied, 0);
233   logger->set(l_wbthrottle_bytes_dirtied, 0);
234   logger->set(l_wbthrottle_inodes_dirtied, 0);
235   pending_wbs.clear();
236   lru.clear();
237   rev_lru.clear();
238   cond.Signal();
239 }
240
241 void WBThrottle::clear_object(const ghobject_t &hoid)
242 {
243   Mutex::Locker l(lock);
244   while (clearing == hoid)
245     cond.Wait(lock);
246   ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
247     pending_wbs.find(hoid);
248   if (i == pending_wbs.end())
249     return;
250
251   cur_ios -= i->second.first.ios;
252   logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios);
253   cur_size -= i->second.first.size;
254   logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size);
255   logger->dec(l_wbthrottle_inodes_dirtied);
256
257   pending_wbs.erase(i);
258   remove_object(hoid);
259   cond.Signal();
260 }
261
262 void WBThrottle::throttle()
263 {
264   Mutex::Locker l(lock);
265   while (!stopping && need_flush())
266     cond.Wait(lock);
267 }