Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / PurgeQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "common/debug.h"
16 #include "mds/mdstypes.h"
17 #include "mds/CInode.h"
18 #include "mds/MDCache.h"
19
20 #include "PurgeQueue.h"
21
22
23 #define dout_context cct
24 #define dout_subsys ceph_subsys_mds
25 #undef dout_prefix
26 #define dout_prefix _prefix(_dout, rank) << __func__ << ": "
27 static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
28   return *_dout << "mds." << rank << ".purge_queue ";
29 }
30
31 void PurgeItem::encode(bufferlist &bl) const
32 {
33   ENCODE_START(1, 1, bl);
34   ::encode((uint8_t)action, bl);
35   ::encode(ino, bl);
36   ::encode(size, bl);
37   ::encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
38   ::encode(old_pools, bl);
39   ::encode(snapc, bl);
40   ::encode(fragtree, bl);
41   ENCODE_FINISH(bl);
42 }
43
44 void PurgeItem::decode(bufferlist::iterator &p)
45 {
46   DECODE_START(1, p);
47   ::decode((uint8_t&)action, p);
48   ::decode(ino, p);
49   ::decode(size, p);
50   ::decode(layout, p);
51   ::decode(old_pools, p);
52   ::decode(snapc, p);
53   ::decode(fragtree, p);
54   DECODE_FINISH(p);
55 }
56
57 // TODO: if Objecter has any slow requests, take that as a hint and
58 // slow down our rate of purging (keep accepting pushes though)
59 PurgeQueue::PurgeQueue(
60       CephContext *cct_,
61       mds_rank_t rank_,
62       const int64_t metadata_pool_,
63       Objecter *objecter_,
64       Context *on_error_)
65   :
66     cct(cct_),
67     rank(rank_),
68     lock("PurgeQueue"),
69     metadata_pool(metadata_pool_),
70     finisher(cct, "PurgeQueue", "PQ_Finisher"),
71     timer(cct, lock),
72     filer(objecter_, &finisher),
73     objecter(objecter_),
74     journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
75       CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0,
76       &finisher),
77     on_error(on_error_),
78     ops_in_flight(0),
79     max_purge_ops(0),
80     drain_initial(0),
81     draining(false),
82     delayed_flush(nullptr),
83     recovered(false)
84 {
85   assert(cct != nullptr);
86   assert(on_error != nullptr);
87   assert(objecter != nullptr);
88   journaler.set_write_error_handler(on_error);
89 }
90
91 PurgeQueue::~PurgeQueue()
92 {
93   if (logger) {
94     g_ceph_context->get_perfcounters_collection()->remove(logger.get());
95   }
96 }
97
98 void PurgeQueue::create_logger()
99 {
100   PerfCountersBuilder pcb(g_ceph_context,
101           "purge_queue", l_pq_first, l_pq_last);
102   pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
103   pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
104   pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed", "purg",
105       PerfCountersBuilder::PRIO_INTERESTING);
106
107   logger.reset(pcb.create_perf_counters());
108   g_ceph_context->get_perfcounters_collection()->add(logger.get());
109 }
110
111 void PurgeQueue::init()
112 {
113   Mutex::Locker l(lock);
114
115   assert(logger != nullptr);
116
117   finisher.start();
118   timer.init();
119 }
120
121 void PurgeQueue::activate()
122 {
123   Mutex::Locker l(lock);
124   if (journaler.get_read_pos() == journaler.get_write_pos())
125     return;
126
127   if (in_flight.empty()) {
128     dout(4) << "start work (by drain)" << dendl;
129     finisher.queue(new FunctionContext([this](int r) {
130           Mutex::Locker l(lock);
131           _consume();
132           }));
133   }
134 }
135
136 void PurgeQueue::shutdown()
137 {
138   Mutex::Locker l(lock);
139
140   journaler.shutdown();
141   timer.shutdown();
142   finisher.stop();
143 }
144
145 void PurgeQueue::open(Context *completion)
146 {
147   dout(4) << "opening" << dendl;
148
149   Mutex::Locker l(lock);
150
151   if (completion)
152     waiting_for_recovery.push_back(completion);
153
154   journaler.recover(new FunctionContext([this](int r){
155     if (r == -ENOENT) {
156       dout(1) << "Purge Queue not found, assuming this is an upgrade and "
157                  "creating it." << dendl;
158       create(NULL);
159     } else if (r == 0) {
160       Mutex::Locker l(lock);
161       dout(4) << "open complete" << dendl;
162
163       // Journaler only guarantees entries before head write_pos have been
164       // fully flushed. Before appending new entries, we need to find and
165       // drop any partial written entry.
166       if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
167         dout(4) << "recovering write_pos" << dendl;
168         journaler.set_read_pos(journaler.last_committed.write_pos);
169         _recover();
170         return;
171       }
172
173       journaler.set_writeable();
174       recovered = true;
175       finish_contexts(g_ceph_context, waiting_for_recovery);
176     } else {
177       derr << "Error " << r << " loading Journaler" << dendl;
178       on_error->complete(r);
179     }
180   }));
181 }
182
183 void PurgeQueue::wait_for_recovery(Context* c)
184 {
185   Mutex::Locker l(lock);
186   if (recovered)
187     c->complete(0);
188   else
189     waiting_for_recovery.push_back(c);
190 }
191
192 void PurgeQueue::_recover()
193 {
194   assert(lock.is_locked_by_me());
195
196   // Journaler::is_readable() adjusts write_pos if partial entry is encountered
197   while (1) {
198     if (!journaler.is_readable() &&
199         !journaler.get_error() &&
200         journaler.get_read_pos() < journaler.get_write_pos()) {
201       journaler.wait_for_readable(new FunctionContext([this](int r) {
202         Mutex::Locker l(lock);
203         _recover();
204       }));
205       return;
206     }
207
208     if (journaler.get_error()) {
209       int r = journaler.get_error();
210       derr << "Error " << r << " recovering write_pos" << dendl;
211       on_error->complete(r);
212       return;
213     }
214
215     if (journaler.get_read_pos() == journaler.get_write_pos()) {
216       dout(4) << "write_pos recovered" << dendl;
217       // restore original read_pos
218       journaler.set_read_pos(journaler.last_committed.expire_pos);
219       journaler.set_writeable();
220       recovered = true;
221       finish_contexts(g_ceph_context, waiting_for_recovery);
222       return;
223     }
224
225     bufferlist bl;
226     bool readable = journaler.try_read_entry(bl);
227     assert(readable);  // we checked earlier
228   }
229 }
230
231 void PurgeQueue::create(Context *fin)
232 {
233   dout(4) << "creating" << dendl;
234   Mutex::Locker l(lock);
235
236   if (fin)
237     waiting_for_recovery.push_back(fin);
238
239   file_layout_t layout = file_layout_t::get_default();
240   layout.pool_id = metadata_pool;
241   journaler.set_writeable();
242   journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
243   journaler.write_head(new FunctionContext([this](int r) {
244     Mutex::Locker l(lock);
245     recovered = true;
246     finish_contexts(g_ceph_context, waiting_for_recovery);
247   }));
248 }
249
250 /**
251  * The `completion` context will always be called back via a Finisher
252  */
253 void PurgeQueue::push(const PurgeItem &pi, Context *completion)
254 {
255   dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl;
256   Mutex::Locker l(lock);
257
258   // Callers should have waited for open() before using us
259   assert(!journaler.is_readonly());
260
261   bufferlist bl;
262
263   ::encode(pi, bl);
264   journaler.append_entry(bl);
265   journaler.wait_for_flush(completion);
266
267   // Maybe go ahead and do something with it right away
268   bool could_consume = _consume();
269   if (!could_consume) {
270     // Usually, it is not necessary to explicitly flush here, because the reader
271     // will get flushes generated inside Journaler::is_readable.  However,
272     // if we remain in a can_consume()==false state for a long period then
273     // we should flush in order to allow MDCache to drop its strays rather
274     // than having them wait for purgequeue to progress.
275     if (!delayed_flush) {
276       delayed_flush = new FunctionContext([this](int r){
277             delayed_flush = nullptr;
278             journaler.flush();
279           });
280
281       timer.add_event_after(
282           g_conf->mds_purge_queue_busy_flush_period,
283           delayed_flush);
284     }
285   }
286 }
287
288 uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
289 {
290   uint32_t ops_required = 0;
291   if (item.action == PurgeItem::PURGE_DIR) {
292     // Directory, count dirfrags to be deleted
293     std::list<frag_t> ls;
294     if (!item.fragtree.is_leaf(frag_t())) {
295       item.fragtree.get_leaves(ls);
296     }
297     // One for the root, plus any leaves
298     ops_required = 1 + ls.size();
299   } else {
300     // File, work out concurrent Filer::purge deletes
301     const uint64_t num = (item.size > 0) ?
302       Striper::get_num_objects(item.layout, item.size) : 1;
303
304     ops_required = MIN(num, g_conf->filer_max_purge_ops);
305
306     // Account for removing (or zeroing) backtrace
307     ops_required += 1;
308
309     // Account for deletions for old pools
310     if (item.action != PurgeItem::TRUNCATE_FILE) {
311       ops_required += item.old_pools.size();
312     }
313   }
314
315   return ops_required;
316 }
317
318 bool PurgeQueue::can_consume()
319 {
320   dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
321            << in_flight.size() << "/" << g_conf->mds_max_purge_files
322            << " files" << dendl;
323
324   if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
325     // Always permit consumption if nothing is in flight, so that the ops
326     // limit can never be so low as to forbid all progress (unless
327     // administrator has deliberately paused purging by setting max
328     // purge files to zero).
329     return true;
330   }
331
332   if (ops_in_flight >= max_purge_ops) {
333     dout(20) << "Throttling on op limit " << ops_in_flight << "/"
334              << max_purge_ops << dendl;
335     return false;
336   }
337
338   if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
339     dout(20) << "Throttling on item limit " << in_flight.size()
340              << "/" << cct->_conf->mds_max_purge_files << dendl;
341     return false;
342   } else {
343     return true;
344   }
345 }
346
347 bool PurgeQueue::_consume()
348 {
349   assert(lock.is_locked_by_me());
350
351   bool could_consume = false;
352   while(can_consume()) {
353     could_consume = true;
354
355     if (delayed_flush) {
356       // We are now going to read from the journal, so any proactive
357       // flush is no longer necessary.  This is not functionally necessary
358       // but it can avoid generating extra fragmented flush IOs.
359       timer.cancel_event(delayed_flush);
360       delayed_flush = nullptr;
361     }
362
363     if (!journaler.is_readable()) {
364       dout(10) << " not readable right now" << dendl;
365       // Because we are the writer and the reader of the journal
366       // via the same Journaler instance, we never need to reread_head
367       if (!journaler.have_waiter()) {
368         journaler.wait_for_readable(new FunctionContext([this](int r) {
369           Mutex::Locker l(lock);
370           if (r == 0) {
371             _consume();
372           }
373         }));
374       }
375
376       return could_consume;
377     }
378
379     // The journaler is readable: consume an entry
380     bufferlist bl;
381     bool readable = journaler.try_read_entry(bl);
382     assert(readable);  // we checked earlier
383
384     dout(20) << " decoding entry" << dendl;
385     PurgeItem item;
386     bufferlist::iterator q = bl.begin();
387     try {
388       ::decode(item, q);
389     } catch (const buffer::error &err) {
390       derr << "Decode error at read_pos=0x" << std::hex
391            << journaler.get_read_pos() << dendl;
392       on_error->complete(0);
393     }
394     dout(20) << " executing item (0x" << std::hex << item.ino
395              << std::dec << ")" << dendl;
396     _execute_item(item, journaler.get_read_pos());
397   }
398
399   dout(10) << " cannot consume right now" << dendl;
400
401   return could_consume;
402 }
403
404 void PurgeQueue::_execute_item(
405     const PurgeItem &item,
406     uint64_t expire_to)
407 {
408   assert(lock.is_locked_by_me());
409
410   in_flight[expire_to] = item;
411   logger->set(l_pq_executing, in_flight.size());
412   ops_in_flight += _calculate_ops(item);
413   logger->set(l_pq_executing_ops, ops_in_flight);
414
415   SnapContext nullsnapc;
416
417   C_GatherBuilder gather(cct);
418   if (item.action == PurgeItem::PURGE_FILE) {
419     if (item.size > 0) {
420       uint64_t num = Striper::get_num_objects(item.layout, item.size);
421       dout(10) << " 0~" << item.size << " objects 0~" << num
422                << " snapc " << item.snapc << " on " << item.ino << dendl;
423       filer.purge_range(item.ino, &item.layout, item.snapc,
424                         0, num, ceph::real_clock::now(), 0,
425                         gather.new_sub());
426     }
427
428     // remove the backtrace object if it was not purged
429     object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
430     if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
431       object_locator_t oloc(item.layout.pool_id);
432       dout(10) << " remove backtrace object " << oid
433                << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
434       objecter->remove(oid, oloc, item.snapc,
435                             ceph::real_clock::now(), 0,
436                             gather.new_sub());
437     }
438
439     // remove old backtrace objects
440     for (const auto &p : item.old_pools) {
441       object_locator_t oloc(p);
442       dout(10) << " remove backtrace object " << oid
443                << " old pool " << p << " snapc " << item.snapc << dendl;
444       objecter->remove(oid, oloc, item.snapc,
445                             ceph::real_clock::now(), 0,
446                             gather.new_sub());
447     }
448   } else if (item.action == PurgeItem::PURGE_DIR) {
449     object_locator_t oloc(metadata_pool);
450     std::list<frag_t> frags;
451     if (!item.fragtree.is_leaf(frag_t()))
452       item.fragtree.get_leaves(frags);
453     frags.push_back(frag_t());
454     for (const auto &frag : frags) {
455       object_t oid = CInode::get_object_name(item.ino, frag, "");
456       dout(10) << " remove dirfrag " << oid << dendl;
457       objecter->remove(oid, oloc, nullsnapc,
458                        ceph::real_clock::now(),
459                        0, gather.new_sub());
460     }
461   } else if (item.action == PurgeItem::TRUNCATE_FILE) {
462     const uint64_t num = Striper::get_num_objects(item.layout, item.size);
463     dout(10) << " 0~" << item.size << " objects 0~" << num
464              << " snapc " << item.snapc << " on " << item.ino << dendl;
465
466     // keep backtrace object
467     if (num > 1) {
468       filer.purge_range(item.ino, &item.layout, item.snapc,
469                         1, num - 1, ceph::real_clock::now(),
470                         0, gather.new_sub());
471     }
472     filer.zero(item.ino, &item.layout, item.snapc,
473                0, item.layout.object_size,
474                ceph::real_clock::now(),
475                0, true, gather.new_sub());
476   } else {
477     derr << "Invalid item (action=" << item.action << ") in purge queue, "
478             "dropping it" << dendl;
479     in_flight.erase(expire_to);
480     logger->set(l_pq_executing, in_flight.size());
481     return;
482   }
483   assert(gather.has_subs());
484
485   gather.set_finisher(new C_OnFinisher(
486                       new FunctionContext([this, expire_to](int r){
487     Mutex::Locker l(lock);
488     _execute_item_complete(expire_to);
489
490     _consume();
491
492     // Have we gone idle?  If so, do an extra write_head now instead of
493     // waiting for next flush after journaler_write_head_interval.
494     // Also do this periodically even if not idle, so that the persisted
495     // expire_pos doesn't fall too far behind our progress when consuming
496     // a very long queue.
497     if (in_flight.empty() || journaler.write_head_needed()) {
498       journaler.write_head(new FunctionContext([this](int r){
499             journaler.trim();
500             }));
501     }
502   }), &finisher));
503
504   gather.activate();
505 }
506
507 void PurgeQueue::_execute_item_complete(
508     uint64_t expire_to)
509 {
510   assert(lock.is_locked_by_me());
511   dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
512   assert(in_flight.count(expire_to) == 1);
513
514   auto iter = in_flight.find(expire_to);
515   assert(iter != in_flight.end());
516   if (iter == in_flight.begin()) {
517     // This was the lowest journal position in flight, so we can now
518     // safely expire the journal up to here.
519     dout(10) << "expiring to 0x" << std::hex << expire_to << std::dec << dendl;
520     journaler.set_expire_pos(expire_to);
521   } else {
522     // This is completely fine, we're not supposed to purge files in
523     // order when doing them in parallel.
524     dout(10) << "non-sequential completion, not expiring anything" << dendl;
525   }
526
527   ops_in_flight -= _calculate_ops(iter->second);
528   logger->set(l_pq_executing_ops, ops_in_flight);
529
530   dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
531            << std::dec << dendl;
532
533   in_flight.erase(iter);
534   logger->set(l_pq_executing, in_flight.size());
535   dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
536
537   logger->inc(l_pq_executed);
538 }
539
540 void PurgeQueue::update_op_limit(const MDSMap &mds_map)
541 {
542   Mutex::Locker l(lock);
543
544   uint64_t pg_count = 0;
545   objecter->with_osdmap([&](const OSDMap& o) {
546     // Number of PGs across all data pools
547     const std::vector<int64_t> &data_pools = mds_map.get_data_pools();
548     for (const auto dp : data_pools) {
549       if (o.get_pg_pool(dp) == NULL) {
550         // It is possible that we have an older OSDMap than MDSMap,
551         // because we don't start watching every OSDMap until after
552         // MDSRank is initialized
553         dout(4) << " data pool " << dp << " not found in OSDMap" << dendl;
554         continue;
555       }
556       pg_count += o.get_pg_num(dp);
557     }
558   });
559
560   // Work out a limit based on n_pgs / n_mdss, multiplied by the user's
561   // preference for how many ops per PG
562   max_purge_ops = uint64_t(((double)pg_count / (double)mds_map.get_max_mds()) *
563                            cct->_conf->mds_max_purge_ops_per_pg);
564
565   // User may also specify a hard limit, apply this if so.
566   if (cct->_conf->mds_max_purge_ops) {
567     max_purge_ops = MIN(max_purge_ops, cct->_conf->mds_max_purge_ops);
568   }
569 }
570
571 void PurgeQueue::handle_conf_change(const struct md_config_t *conf,
572                              const std::set <std::string> &changed,
573                              const MDSMap &mds_map)
574 {
575   if (changed.count("mds_max_purge_ops")
576       || changed.count("mds_max_purge_ops_per_pg")) {
577     update_op_limit(mds_map);
578   } else if (changed.count("mds_max_purge_files")) {
579     Mutex::Locker l(lock);
580
581     if (in_flight.empty()) {
582       // We might have gone from zero to a finite limit, so
583       // might need to kick off consume.
584       dout(4) << "maybe start work again (max_purge_files="
585               << conf->mds_max_purge_files << dendl;
586       finisher.queue(new FunctionContext([this](int r){
587         Mutex::Locker l(lock);
588         _consume();
589       }));
590     }
591   }
592 }
593
594 bool PurgeQueue::drain(
595     uint64_t *progress,
596     uint64_t *progress_total,
597     size_t *in_flight_count
598     )
599 {
600   assert(progress != nullptr);
601   assert(progress_total != nullptr);
602   assert(in_flight_count != nullptr);
603
604   const bool done = in_flight.empty() && (
605       journaler.get_read_pos() == journaler.get_write_pos());
606   if (done) {
607     return true;
608   }
609
610   const uint64_t bytes_remaining = journaler.get_write_pos()
611                                    - journaler.get_read_pos();
612
613   if (!draining) {
614     // Start of draining: remember how much there was outstanding at
615     // this point so that we can give a progress percentage later
616     draining = true;
617
618     // Life the op throttle as this daemon now has nothing to do but
619     // drain the purge queue, so do it as fast as we can.
620     max_purge_ops = 0xffff;
621   }
622
623   drain_initial = max(bytes_remaining, drain_initial);
624
625   *progress = drain_initial - bytes_remaining;
626   *progress_total = drain_initial;
627   *in_flight_count = in_flight.size();
628
629   return false;
630 }
631