1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #ifndef CEPH_OSDMAPMAPPING_H
6 #define CEPH_OSDMAPMAPPING_H
11 #include "osd/osd_types.h"
12 #include "common/WorkQueue.h"
16 /// work queue to perform work on batches of pgids on multiple CPUs
17 class ParallelPGMapper {
20 utime_t start, finish;
24 Context *onfinish = nullptr;
26 Mutex lock = {"ParallelPGMapper::Job::lock"};
29 Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
34 // child must implement this
35 virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
36 virtual void complete() = 0;
38 void set_finish_event(Context *fin) {
51 Mutex::Locker l(lock);
54 utime_t get_duration() {
55 return finish - start;
58 Mutex::Locker l(lock);
63 bool wait_for(double duration) {
64 utime_t until = start;
66 Mutex::Locker l(lock);
68 if (ceph_clock_now() >= until) {
76 Context *fin = nullptr;
78 Mutex::Locker l(lock);
87 fin->complete(-ECANCELED);
92 Mutex::Locker l(lock);
106 Item(Job *j, int64_t p, unsigned b, unsigned e)
114 struct WQ : public ThreadPool::WorkQueue<Item> {
117 WQ(ParallelPGMapper *m_, ThreadPool *tp)
118 : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
121 bool _enqueue(Item *i) override {
125 void _dequeue(Item *i) override {
128 Item *_dequeue() override {
129 while (!m->q.empty()) {
130 Item *i = m->q.front();
132 if (i->job->aborted) {
133 i->job->finish_one();
142 void _process(Item *i, ThreadPool::TPHandle &h) override;
144 void _clear() override {
148 bool _empty() override {
154 ParallelPGMapper(CephContext *cct, ThreadPool *tp)
160 unsigned pgs_per_item);
168 /// a precalculated mapping of every PG for a given OSDMap
169 class OSDMapMapping {
171 MEMPOOL_CLASS_HELPERS();
175 MEMPOOL_CLASS_HELPERS();
179 mempool::osdmap_mapping::vector<int32_t> table;
181 size_t row_size() const {
183 1 + // acting_primary
191 PoolMapping(int s, int p)
194 table(pg_num * row_size()) {
198 std::vector<int> *up,
200 std::vector<int> *acting,
201 int *acting_primary) const {
202 const int32_t *row = &table[row_size() * ps];
203 if (acting_primary) {
204 *acting_primary = row[0];
207 *up_primary = row[1];
210 acting->resize(row[2]);
211 for (int i = 0; i < row[2]; ++i) {
212 (*acting)[i] = row[4 + i];
217 for (int i = 0; i < row[3]; ++i) {
218 (*up)[i] = row[4 + size + i];
224 const std::vector<int>& up,
226 const std::vector<int>& acting,
227 int acting_primary) {
228 int32_t *row = &table[row_size() * ps];
229 row[0] = acting_primary;
231 row[2] = acting.size();
233 for (int i = 0; i < row[2]; ++i) {
234 row[4 + i] = acting[i];
236 for (int i = 0; i < row[3]; ++i) {
237 row[4 + size + i] = up[i];
242 mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
243 mempool::osdmap_mapping::vector<
244 mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg
245 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
247 uint64_t num_pgs = 0;
249 void _init_mappings(const OSDMap& osdmap);
253 unsigned pg_begin, unsigned pg_end);
255 void _build_rmap(const OSDMap& osdmap);
257 void _start(const OSDMap& osdmap) {
258 _init_mappings(osdmap);
260 void _finish(const OSDMap& osdmap);
264 friend class ParallelPGMapper;
266 struct MappingJob : public ParallelPGMapper::Job {
267 OSDMapMapping *mapping;
268 MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
269 : Job(osdmap), mapping(m) {
270 mapping->_start(*osdmap);
272 void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
273 mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
275 void complete() override {
276 mapping->_finish(*osdmap);
282 std::vector<int> *up,
284 std::vector<int> *acting,
285 int *acting_primary) const {
286 auto p = pools.find(pgid.pool());
287 assert(p != pools.end());
288 assert(pgid.ps() < p->second.pg_num);
289 p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
292 const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
293 assert(osd < acting_rmap.size());
294 return acting_rmap[osd];
297 const std::vector<pg_t>& get_osd_up_pgs(unsigned osd) {
298 assert(osd < up_rmap.size());
303 void update(const OSDMap& map);
304 void update(const OSDMap& map, pg_t pgid);
306 std::unique_ptr<MappingJob> start_update(
308 ParallelPGMapper& mapper,
309 unsigned pgs_per_item) {
310 std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
311 mapper.queue(job.get(), pgs_per_item);
315 epoch_t get_epoch() const {
319 uint64_t get_num_pgs() const {