Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / OSDMapMapping.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4
5 #ifndef CEPH_OSDMAPMAPPING_H
6 #define CEPH_OSDMAPMAPPING_H
7
8 #include <vector>
9 #include <map>
10
11 #include "osd/osd_types.h"
12 #include "common/WorkQueue.h"
13
14 class OSDMap;
15
16 /// work queue to perform work on batches of pgids on multiple CPUs
17 class ParallelPGMapper {
18 public:
19   struct Job {
20     utime_t start, finish;
21     unsigned shards = 0;
22     const OSDMap *osdmap;
23     bool aborted = false;
24     Context *onfinish = nullptr;
25
26     Mutex lock = {"ParallelPGMapper::Job::lock"};
27     Cond cond;
28
29     Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
30     virtual ~Job() {
31       assert(shards == 0);
32     }
33
34     // child must implement this
35     virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
36     virtual void complete() = 0;
37
38     void set_finish_event(Context *fin) {
39       lock.Lock();
40       if (shards == 0) {
41         // already done.
42         lock.Unlock();
43         fin->complete(0);
44       } else {
45         // set finisher
46         onfinish = fin;
47         lock.Unlock();
48       }
49     }
50     bool is_done() {
51       Mutex::Locker l(lock);
52       return shards == 0;
53     }
54     utime_t get_duration() {
55       return finish - start;
56     }
57     void wait() {
58       Mutex::Locker l(lock);
59       while (shards > 0) {
60         cond.Wait(lock);
61       }
62     }
63     bool wait_for(double duration) {
64       utime_t until = start;
65       until += duration;
66       Mutex::Locker l(lock);
67       while (shards > 0) {
68         if (ceph_clock_now() >= until) {
69           return false;
70         }
71         cond.Wait(lock);
72       }
73       return true;
74     }
75     void abort() {
76       Context *fin = nullptr;
77       {
78         Mutex::Locker l(lock);
79         aborted = true;
80         fin = onfinish;
81         onfinish = nullptr;
82         while (shards > 0) {
83           cond.Wait(lock);
84         }
85       }
86       if (fin) {
87         fin->complete(-ECANCELED);
88       }
89     }
90
91     void start_one() {
92       Mutex::Locker l(lock);
93       ++shards;
94     }
95     void finish_one();
96   };
97
98 protected:
99   CephContext *cct;
100
101   struct Item {
102     Job *job;
103     int64_t pool;
104     unsigned begin, end;
105
106     Item(Job *j, int64_t p, unsigned b, unsigned e)
107       : job(j),
108         pool(p),
109         begin(b),
110         end(e) {}
111   };
112   std::deque<Item*> q;
113
114   struct WQ : public ThreadPool::WorkQueue<Item> {
115     ParallelPGMapper *m;
116
117     WQ(ParallelPGMapper *m_, ThreadPool *tp)
118       : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
119         m(m_) {}
120
121     bool _enqueue(Item *i) override {
122       m->q.push_back(i);
123       return true;
124     }
125     void _dequeue(Item *i) override {
126       ceph_abort();
127     }
128     Item *_dequeue() override {
129       while (!m->q.empty()) {
130         Item *i = m->q.front();
131         m->q.pop_front();
132         if (i->job->aborted) {
133           i->job->finish_one();
134           delete i;
135         } else {
136           return i;
137         }
138       }
139       return nullptr;
140     }
141
142     void _process(Item *i, ThreadPool::TPHandle &h) override;
143
144     void _clear() override {
145       assert(_empty());
146     }
147
148     bool _empty() override {
149       return m->q.empty();
150     }
151   } wq;
152
153 public:
154   ParallelPGMapper(CephContext *cct, ThreadPool *tp)
155     : cct(cct),
156       wq(this, tp) {}
157
158   void queue(
159     Job *job,
160     unsigned pgs_per_item);
161
162   void drain() {
163     wq.drain();
164   }
165 };
166
167
168 /// a precalculated mapping of every PG for a given OSDMap
169 class OSDMapMapping {
170 public:
171   MEMPOOL_CLASS_HELPERS();
172 private:
173
174   struct PoolMapping {
175     MEMPOOL_CLASS_HELPERS();
176
177     unsigned size = 0;
178     unsigned pg_num = 0;
179     mempool::osdmap_mapping::vector<int32_t> table;
180
181     size_t row_size() const {
182       return
183         1 + // acting_primary
184         1 + // up_primary
185         1 + // num acting
186         1 + // num up
187         size + // acting
188         size;  // up
189     }
190
191     PoolMapping(int s, int p)
192       : size(s),
193         pg_num(p),
194         table(pg_num * row_size()) {
195     }
196
197     void get(size_t ps,
198              std::vector<int> *up,
199              int *up_primary,
200              std::vector<int> *acting,
201              int *acting_primary) const {
202       const int32_t *row = &table[row_size() * ps];
203       if (acting_primary) {
204         *acting_primary = row[0];
205       }
206       if (up_primary) {
207         *up_primary = row[1];
208       }
209       if (acting) {
210         acting->resize(row[2]);
211         for (int i = 0; i < row[2]; ++i) {
212           (*acting)[i] = row[4 + i];
213         }
214       }
215       if (up) {
216         up->resize(row[3]);
217         for (int i = 0; i < row[3]; ++i) {
218           (*up)[i] = row[4 + size + i];
219         }
220       }
221     }
222
223     void set(size_t ps,
224              const std::vector<int>& up,
225              int up_primary,
226              const std::vector<int>& acting,
227              int acting_primary) {
228       int32_t *row = &table[row_size() * ps];
229       row[0] = acting_primary;
230       row[1] = up_primary;
231       row[2] = acting.size();
232       row[3] = up.size();
233       for (int i = 0; i < row[2]; ++i) {
234         row[4 + i] = acting[i];
235       }
236       for (int i = 0; i < row[3]; ++i) {
237         row[4 + size + i] = up[i];
238       }
239     }
240   };
241
242   mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
243   mempool::osdmap_mapping::vector<
244     mempool::osdmap_mapping::vector<pg_t>> acting_rmap;  // osd -> pg
245   //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap;  // osd -> pg
246   epoch_t epoch = 0;
247   uint64_t num_pgs = 0;
248
249   void _init_mappings(const OSDMap& osdmap);
250   void _update_range(
251     const OSDMap& map,
252     int64_t pool,
253     unsigned pg_begin, unsigned pg_end);
254
255   void _build_rmap(const OSDMap& osdmap);
256
257   void _start(const OSDMap& osdmap) {
258     _init_mappings(osdmap);
259   }
260   void _finish(const OSDMap& osdmap);
261
262   void _dump();
263
264   friend class ParallelPGMapper;
265
266   struct MappingJob : public ParallelPGMapper::Job {
267     OSDMapMapping *mapping;
268     MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
269       : Job(osdmap), mapping(m) {
270       mapping->_start(*osdmap);
271     }
272     void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
273       mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
274     }
275     void complete() override {
276       mapping->_finish(*osdmap);
277     }
278   };
279
280 public:
281   void get(pg_t pgid,
282            std::vector<int> *up,
283            int *up_primary,
284            std::vector<int> *acting,
285            int *acting_primary) const {
286     auto p = pools.find(pgid.pool());
287     assert(p != pools.end());
288     assert(pgid.ps() < p->second.pg_num);
289     p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
290   }
291
292   const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
293     assert(osd < acting_rmap.size());
294     return acting_rmap[osd];
295   }
296   /* unsued
297   const std::vector<pg_t>& get_osd_up_pgs(unsigned osd) {
298     assert(osd < up_rmap.size());
299     return up_rmap[osd];
300   }
301   */
302
303   void update(const OSDMap& map);
304   void update(const OSDMap& map, pg_t pgid);
305
306   std::unique_ptr<MappingJob> start_update(
307     const OSDMap& map,
308     ParallelPGMapper& mapper,
309     unsigned pgs_per_item) {
310     std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
311     mapper.queue(job.get(), pgs_per_item);
312     return job;
313   }
314
315   epoch_t get_epoch() const {
316     return epoch;
317   }
318
319   uint64_t get_num_pgs() const {
320     return num_pgs;
321   }
322 };
323
324
325 #endif