1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "os/filestore/WBThrottle.h"
7 #include "common/perf_counters.h"
9 WBThrottle::WBThrottle(CephContext *cct) :
10 cur_ios(0), cur_size(0),
14 lock("WBThrottle::lock", false, true, false, cct),
18 Mutex::Locker l(lock);
22 PerfCountersBuilder b(
23 cct, string("WBThrottle"),
24 l_wbthrottle_first, l_wbthrottle_last);
25 b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied", "Dirty data");
26 b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb", "Written data");
27 b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied", "Dirty operations");
28 b.add_u64(l_wbthrottle_ios_wb, "ios_wb", "Written operations");
29 b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied", "Entries waiting for write");
30 b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb", "Written entries");
31 logger = b.create_perf_counters();
32 cct->get_perfcounters_collection()->add(logger);
33 for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i)
36 cct->_conf->add_observer(this);
39 WBThrottle::~WBThrottle() {
41 cct->get_perfcounters_collection()->remove(logger);
43 cct->_conf->remove_observer(this);
46 void WBThrottle::start()
49 Mutex::Locker l(lock);
52 create("wb_throttle");
55 void WBThrottle::stop()
58 Mutex::Locker l(lock);
66 const char** WBThrottle::get_tracked_conf_keys() const
68 static const char* KEYS[] = {
69 "filestore_wbthrottle_btrfs_bytes_start_flusher",
70 "filestore_wbthrottle_btrfs_bytes_hard_limit",
71 "filestore_wbthrottle_btrfs_ios_start_flusher",
72 "filestore_wbthrottle_btrfs_ios_hard_limit",
73 "filestore_wbthrottle_btrfs_inodes_start_flusher",
74 "filestore_wbthrottle_btrfs_inodes_hard_limit",
75 "filestore_wbthrottle_xfs_bytes_start_flusher",
76 "filestore_wbthrottle_xfs_bytes_hard_limit",
77 "filestore_wbthrottle_xfs_ios_start_flusher",
78 "filestore_wbthrottle_xfs_ios_hard_limit",
79 "filestore_wbthrottle_xfs_inodes_start_flusher",
80 "filestore_wbthrottle_xfs_inodes_hard_limit",
86 void WBThrottle::set_from_conf()
88 assert(lock.is_locked());
91 cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
93 cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
95 cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
97 cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
99 cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
101 cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
102 } else if (fs == XFS) {
104 cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
106 cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
108 cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
110 cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
112 cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
114 cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
116 assert(0 == "invalid value for fs");
121 void WBThrottle::handle_conf_change(const md_config_t *conf,
122 const std::set<std::string> &changed)
124 Mutex::Locker l(lock);
125 for (const char** i = get_tracked_conf_keys(); *i; ++i) {
126 if (changed.count(*i)) {
133 bool WBThrottle::get_next_should_flush(
134 boost::tuple<ghobject_t, FDRef, PendingWB> *next)
136 assert(lock.is_locked());
138 while (!stopping && !beyond_limit())
142 assert(!pending_wbs.empty());
143 ghobject_t obj(pop_object());
145 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
146 pending_wbs.find(obj);
147 *next = boost::make_tuple(obj, i->second.second, i->second.first);
148 pending_wbs.erase(i);
153 void *WBThrottle::entry()
155 Mutex::Locker l(lock);
156 boost::tuple<ghobject_t, FDRef, PendingWB> wb;
157 while (get_next_should_flush(&wb)) {
158 clearing = wb.get<0>();
159 cur_ios -= wb.get<2>().ios;
160 logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios);
161 logger->inc(l_wbthrottle_ios_wb, wb.get<2>().ios);
162 cur_size -= wb.get<2>().size;
163 logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size);
164 logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size);
165 logger->dec(l_wbthrottle_inodes_dirtied);
166 logger->inc(l_wbthrottle_inodes_wb);
168 #ifdef HAVE_FDATASYNC
169 ::fdatasync(**wb.get<1>());
171 ::fsync(**wb.get<1>());
173 #ifdef HAVE_POSIX_FADVISE
174 if (cct->_conf->filestore_fadvise && wb.get<2>().nocache) {
175 int fa_r = posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
180 clearing = ghobject_t();
182 wb = boost::tuple<ghobject_t, FDRef, PendingWB>();
187 void WBThrottle::queue_wb(
188 FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len,
191 Mutex::Locker l(lock);
192 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
193 pending_wbs.find(hoid);
194 if (wbiter == pending_wbs.end()) {
195 wbiter = pending_wbs.insert(
200 logger->inc(l_wbthrottle_inodes_dirtied);
206 logger->inc(l_wbthrottle_ios_dirtied);
208 logger->inc(l_wbthrottle_bytes_dirtied, len);
210 wbiter->second.first.add(nocache, len, 1);
216 void WBThrottle::clear()
218 Mutex::Locker l(lock);
219 for (ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
221 i != pending_wbs.end();
223 #ifdef HAVE_POSIX_FADVISE
224 if (cct->_conf->filestore_fadvise && i->second.first.nocache) {
225 int fa_r = posix_fadvise(**i->second.second, 0, 0, POSIX_FADV_DONTNEED);
231 cur_ios = cur_size = 0;
232 logger->set(l_wbthrottle_ios_dirtied, 0);
233 logger->set(l_wbthrottle_bytes_dirtied, 0);
234 logger->set(l_wbthrottle_inodes_dirtied, 0);
241 void WBThrottle::clear_object(const ghobject_t &hoid)
243 Mutex::Locker l(lock);
244 while (clearing == hoid)
246 ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i =
247 pending_wbs.find(hoid);
248 if (i == pending_wbs.end())
251 cur_ios -= i->second.first.ios;
252 logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios);
253 cur_size -= i->second.first.size;
254 logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size);
255 logger->dec(l_wbthrottle_inodes_dirtied);
257 pending_wbs.erase(i);
262 void WBThrottle::throttle()
264 Mutex::Locker l(lock);
265 while (!stopping && need_flush())