Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / RecoveryQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "CInode.h"
16 #include "MDCache.h"
17 #include "MDSRank.h"
18 #include "Locker.h"
19 #include "osdc/Filer.h"
20
21 #include "RecoveryQueue.h"
22
23 #define dout_context g_ceph_context
24 #define dout_subsys ceph_subsys_mds
25 #undef dout_prefix
26 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << " RecoveryQueue::" << __func__ << " "
27
28 class C_MDC_Recover : public MDSIOContextBase {
29 protected:
30   RecoveryQueue *rq;
31   CInode *in;
32   void finish(int r) override {
33     rq->_recovered(in, r, size, mtime);
34   }
35
36   MDSRank *get_mds() override {
37     return rq->mds;
38   }
39
40 public:
41   uint64_t size;
42   utime_t mtime;
43
44   C_MDC_Recover(RecoveryQueue *rq_, CInode *i) : rq(rq_), in(i), size(0) {
45     assert(rq != NULL);
46   }
47 };
48
49
50 RecoveryQueue::RecoveryQueue(MDSRank *mds_)
51   : mds(mds_), logger(NULL), filer(mds_->objecter, mds_->finisher)
52 {}
53
54
55 /**
56  * Progress the queue.  Call this after enqueuing something or on
57  * completion of something.
58  */
59 void RecoveryQueue::advance()
60 {
61   dout(10) << file_recover_queue.size() << " queued, "
62            << file_recover_queue_front.size() << " prioritized, "
63            << file_recovering.size() << " recovering" << dendl;
64
65   while (file_recovering.size() < g_conf->mds_max_file_recover) {
66     if (!file_recover_queue_front.empty()) {
67       CInode *in = *file_recover_queue_front.begin();
68       file_recover_queue_front.erase(file_recover_queue_front.begin());
69       file_recover_queue.erase(in);
70       _start(in);
71     } else if (!file_recover_queue.empty()) {
72       CInode *in = *file_recover_queue.begin();
73       file_recover_queue.erase(file_recover_queue.begin());
74       _start(in);
75     } else {
76       break;
77     }
78   }
79
80   logger->set(l_mdc_num_recovering_processing, file_recovering.size());
81   logger->set(l_mdc_num_recovering_enqueued, file_recover_queue.size());
82   logger->set(l_mdc_num_recovering_prioritized, file_recover_queue_front.size());
83 }
84
85 void RecoveryQueue::_start(CInode *in)
86 {
87   inode_t *pi = in->get_projected_inode();
88
89   // blech
90   if (pi->client_ranges.size() && !pi->get_max_size()) {
91     mds->clog->warn() << "bad client_range " << pi->client_ranges
92                       << " on ino " << pi->ino;
93   }
94
95   if (pi->client_ranges.size() && pi->get_max_size()) {
96     dout(10) << "starting " << in->inode.size << " " << pi->client_ranges
97              << " " << *in << dendl;
98     file_recovering.insert(in);
99
100     C_MDC_Recover *fin = new C_MDC_Recover(this, in);
101     filer.probe(in->inode.ino, &in->inode.layout, in->last,
102                 pi->get_max_size(), &fin->size, &fin->mtime, false,
103                 0, fin);
104   } else {
105     dout(10) << "skipping " << in->inode.size << " " << *in << dendl;
106     in->state_clear(CInode::STATE_RECOVERING);
107     mds->locker->eval(in, CEPH_LOCK_IFILE);
108     in->auth_unpin(this);
109   }
110 }
111
112 void RecoveryQueue::prioritize(CInode *in)
113 {
114   if (file_recovering.count(in)) {
115     dout(10) << "already working on " << *in << dendl;
116     return;
117   }
118
119   if (file_recover_queue.count(in)) {
120     dout(20) << *in << dendl;
121     file_recover_queue_front.insert(in);
122     logger->set(l_mdc_num_recovering_prioritized, file_recover_queue_front.size());
123     return;
124   }
125
126   dout(10) << "not queued " << *in << dendl;
127 }
128
129
130 /**
131  * Given an authoritative inode which is in the cache,
132  * enqueue it for recovery.
133  */
134 void RecoveryQueue::enqueue(CInode *in)
135 {
136   dout(15) << "RecoveryQueue::enqueue " << *in << dendl;
137   assert(logger);  // Caller should have done set_logger before using me
138   assert(in->is_auth());
139
140   in->state_clear(CInode::STATE_NEEDSRECOVER);
141   if (!in->state_test(CInode::STATE_RECOVERING)) {
142     in->state_set(CInode::STATE_RECOVERING);
143     in->auth_pin(this);
144     logger->inc(l_mdc_recovery_started);
145   }
146   file_recover_queue.insert(in);
147   logger->set(l_mdc_num_recovering_enqueued, file_recover_queue.size());
148 }
149
150
151 /**
152  * Call back on completion of Filer probe on an inode.
153  */
154 void RecoveryQueue::_recovered(CInode *in, int r, uint64_t size, utime_t mtime)
155 {
156   dout(10) << "_recovered r=" << r << " size=" << size << " mtime=" << mtime
157            << " for " << *in << dendl;
158
159   if (r != 0) {
160     dout(0) << "recovery error! " << r << dendl;
161     if (r == -EBLACKLISTED) {
162       mds->respawn();
163       return;
164     } else {
165       // Something wrong on the OSD side trying to recover the size
166       // of this inode.  In principle we could record this as a piece
167       // of per-inode damage, but it's actually more likely that
168       // this indicates something wrong with the MDS (like maybe
169       // it has the wrong auth caps?)
170       mds->clog->error() << " OSD read error while recovering size for inode 0x"
171                          << std::hex << in->ino() << std::dec;
172       mds->damaged();
173     }
174   }
175
176   file_recovering.erase(in);
177   logger->set(l_mdc_num_recovering_processing, file_recovering.size());
178   logger->inc(l_mdc_recovery_completed);
179   in->state_clear(CInode::STATE_RECOVERING);
180
181   if (!in->get_parent_dn() && !in->get_projected_parent_dn()) {
182     dout(10) << " inode has no parents, killing it off" << dendl;
183     in->auth_unpin(this);
184     mds->mdcache->remove_inode(in);
185   } else {
186     // journal
187     mds->locker->check_inode_max_size(in, true, 0,  size, mtime);
188     mds->locker->eval(in, CEPH_LOCK_IFILE);
189     in->auth_unpin(this);
190   }
191
192   advance();
193 }
194