Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osdc / Filer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15
16 #include <mutex>
17 #include <algorithm>
18 #include "Filer.h"
19 #include "osd/OSDMap.h"
20 #include "Striper.h"
21
22 #include "messages/MOSDOp.h"
23 #include "messages/MOSDOpReply.h"
24 #include "messages/MOSDMap.h"
25
26 #include "msg/Messenger.h"
27
28 #include "include/Context.h"
29
30 #include "common/Finisher.h"
31 #include "common/config.h"
32
33 #define dout_subsys ceph_subsys_filer
34 #undef dout_prefix
35 #define dout_prefix *_dout << objecter->messenger->get_myname() << ".filer "
36
37 class Filer::C_Probe : public Context {
38 public:
39   Filer *filer;
40   Probe *probe;
41   object_t oid;
42   uint64_t size;
43   ceph::real_time mtime;
44   C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o),
45                                             size(0) {}
46   void finish(int r) override {
47     if (r == -ENOENT) {
48       r = 0;
49       assert(size == 0);
50     }
51
52     bool probe_complete;
53     {
54       Probe::unique_lock pl(probe->lock);
55       if (r != 0) {
56         probe->err = r;
57       }
58
59       probe_complete = filer->_probed(probe, oid, size, mtime, pl);
60       assert(!pl.owns_lock());
61     }
62     if (probe_complete) {
63       probe->onfinish->complete(probe->err);
64       delete probe;
65     }
66   }
67 };
68
69 int Filer::probe(inodeno_t ino,
70                  file_layout_t *layout,
71                  snapid_t snapid,
72                  uint64_t start_from,
73                  uint64_t *end, // LB, when !fwd
74                  ceph::real_time *pmtime,
75                  bool fwd,
76                  int flags,
77                  Context *onfinish)
78 {
79   ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
80            << hex << ino << dec
81            << " starting from " << start_from
82            << dendl;
83
84   assert(snapid);  // (until there is a non-NOSNAP write)
85
86   Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
87                            flags, fwd, onfinish);
88
89   return probe_impl(probe, layout, start_from, end);
90 }
91
92 int Filer::probe(inodeno_t ino,
93                  file_layout_t *layout,
94                  snapid_t snapid,
95                  uint64_t start_from,
96                  uint64_t *end, // LB, when !fwd
97                  utime_t *pmtime,
98                  bool fwd,
99                  int flags,
100                  Context *onfinish)
101 {
102   ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
103            << hex << ino << dec
104            << " starting from " << start_from
105            << dendl;
106
107   assert(snapid);  // (until there is a non-NOSNAP write)
108
109   Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
110                            flags, fwd, onfinish);
111   return probe_impl(probe, layout, start_from, end);
112 }
113
114 int Filer::probe_impl(Probe* probe, file_layout_t *layout,
115                       uint64_t start_from, uint64_t *end) // LB, when !fwd
116 {
117   // period (bytes before we jump unto a new set of object(s))
118   uint64_t period = layout->get_period();
119
120   // start with 1+ periods.
121   probe->probing_len = period;
122   if (probe->fwd) {
123     if (start_from % period)
124       probe->probing_len += period - (start_from % period);
125   } else {
126     assert(start_from > *end);
127     if (start_from % period)
128       probe->probing_len -= period - (start_from % period);
129     probe->probing_off -= probe->probing_len;
130   }
131
132   Probe::unique_lock pl(probe->lock);
133   _probe(probe, pl);
134   assert(!pl.owns_lock());
135
136   return 0;
137 }
138
139
140
141 /**
142  * probe->lock must be initially locked, this function will release it
143  */
144 void Filer::_probe(Probe *probe, Probe::unique_lock& pl)
145 {
146   assert(pl.owns_lock() && pl.mutex() == &probe->lock);
147
148   ldout(cct, 10) << "_probe " << hex << probe->ino << dec
149                  << " " << probe->probing_off << "~" << probe->probing_len
150                  << dendl;
151
152   // map range onto objects
153   probe->known_size.clear();
154   probe->probing.clear();
155   Striper::file_to_extents(cct, probe->ino, &probe->layout, probe->probing_off,
156                            probe->probing_len, 0, probe->probing);
157
158   std::vector<ObjectExtent> stat_extents;
159   for (vector<ObjectExtent>::iterator p = probe->probing.begin();
160        p != probe->probing.end();
161        ++p) {
162     ldout(cct, 10) << "_probe  probing " << p->oid << dendl;
163     probe->ops.insert(p->oid);
164     stat_extents.push_back(*p);
165   }
166
167   pl.unlock();
168   for (std::vector<ObjectExtent>::iterator i = stat_extents.begin();
169        i != stat_extents.end(); ++i) {
170     C_Probe *c = new C_Probe(this, probe, i->oid);
171     objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime,
172                    probe->flags | CEPH_OSD_FLAG_RWORDERED,
173                    new C_OnFinisher(c, finisher));
174   }
175 }
176
177 /**
178  * probe->lock must be initially held, and will be released by this function.
179  *
180  * @return true if probe is complete and Probe object may be freed.
181  */
182 bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size,
183                     ceph::real_time mtime, Probe::unique_lock& pl)
184 {
185   assert(pl.owns_lock() && pl.mutex() == &probe->lock);
186
187   ldout(cct, 10) << "_probed " << probe->ino << " object " << oid
188            << " has size " << size << " mtime " << mtime << dendl;
189
190   probe->known_size[oid] = size;
191   if (mtime > probe->max_mtime)
192     probe->max_mtime = mtime;
193
194   assert(probe->ops.count(oid));
195   probe->ops.erase(oid);
196
197   if (!probe->ops.empty()) {
198     pl.unlock();
199     return false;  // waiting for more!
200   }
201
202   if (probe->err) { // we hit an error, propagate back up
203     pl.unlock();
204     return true;
205   }
206
207   // analyze!
208   uint64_t end = 0;
209
210   if (!probe->fwd) {
211     std::reverse(probe->probing.begin(), probe->probing.end());
212   }
213
214   for (vector<ObjectExtent>::iterator p = probe->probing.begin();
215        p != probe->probing.end();
216        ++p) {
217     uint64_t shouldbe = p->length + p->offset;
218     ldout(cct, 10) << "_probed  " << probe->ino << " object " << hex
219                    << p->oid << dec << " should be " << shouldbe
220                    << ", actual is " << probe->known_size[p->oid]
221                    << dendl;
222
223     if (!probe->found_size) {
224       assert(probe->known_size[p->oid] <= shouldbe);
225
226       if ((probe->fwd && probe->known_size[p->oid] == shouldbe) ||
227           (!probe->fwd && probe->known_size[p->oid] == 0 &&
228            probe->probing_off > 0))
229         continue;  // keep going
230
231       // aha, we found the end!
232       // calc offset into buffer_extent to get distance from probe->from.
233       uint64_t oleft = probe->known_size[p->oid] - p->offset;
234       for (vector<pair<uint64_t, uint64_t> >::iterator i
235              = p->buffer_extents.begin();
236            i != p->buffer_extents.end();
237            ++i) {
238         if (oleft <= (uint64_t)i->second) {
239           end = probe->probing_off + i->first + oleft;
240           ldout(cct, 10) << "_probed  end is in buffer_extent " << i->first
241                          << "~" << i->second << " off " << oleft
242                          << ", from was " << probe->probing_off << ", end is "
243                          << end << dendl;
244
245           probe->found_size = true;
246           ldout(cct, 10) << "_probed found size at " << end << dendl;
247           *probe->psize = end;
248
249           if (!probe->pmtime &&
250               !probe->pumtime)  // stop if we don't need mtime too
251             break;
252         }
253         oleft -= i->second;
254       }
255     }
256     break;
257   }
258
259   if (!probe->found_size || (probe->probing_off && (probe->pmtime ||
260                                                     probe->pumtime))) {
261     // keep probing!
262     ldout(cct, 10) << "_probed probing further" << dendl;
263
264     uint64_t period = probe->layout.get_period();
265     if (probe->fwd) {
266       probe->probing_off += probe->probing_len;
267       assert(probe->probing_off % period == 0);
268       probe->probing_len = period;
269     } else {
270       // previous period.
271       assert(probe->probing_off % period == 0);
272       probe->probing_len = period;
273       probe->probing_off -= period;
274     }
275     _probe(probe, pl);
276     assert(!pl.owns_lock());
277     return false;
278   } else if (probe->pmtime) {
279     ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
280     *probe->pmtime = probe->max_mtime;
281   } else if (probe->pumtime) {
282     ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
283     *probe->pumtime = ceph::real_clock::to_ceph_timespec(probe->max_mtime);
284   }
285   // done!
286   pl.unlock();
287   return true;
288 }
289
290
291 // -----------------------
292
293 struct PurgeRange {
294   std::mutex lock;
295   typedef std::lock_guard<std::mutex> lock_guard;
296   typedef std::unique_lock<std::mutex> unique_lock;
297   inodeno_t ino;
298   file_layout_t layout;
299   SnapContext snapc;
300   uint64_t first, num;
301   ceph::real_time mtime;
302   int flags;
303   Context *oncommit;
304   int uncommitted;
305   PurgeRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc,
306              uint64_t fo, uint64_t no, ceph::real_time t, int fl,
307              Context *fin)
308     : ino(i), layout(l), snapc(sc), first(fo), num(no), mtime(t), flags(fl),
309       oncommit(fin), uncommitted(0) {}
310 };
311
312 int Filer::purge_range(inodeno_t ino,
313                        const file_layout_t *layout,
314                        const SnapContext& snapc,
315                        uint64_t first_obj, uint64_t num_obj,
316                        ceph::real_time mtime,
317                        int flags,
318                        Context *oncommit)
319 {
320   assert(num_obj > 0);
321
322   // single object?  easy!
323   if (num_obj == 1) {
324     object_t oid = file_object_t(ino, first_obj);
325     object_locator_t oloc = OSDMap::file_to_object_locator(*layout);
326     objecter->remove(oid, oloc, snapc, mtime, flags, oncommit);
327     return 0;
328   }
329
330   PurgeRange *pr = new PurgeRange(ino, *layout, snapc, first_obj,
331                                   num_obj, mtime, flags, oncommit);
332
333   _do_purge_range(pr, 0);
334   return 0;
335 }
336
337 struct C_PurgeRange : public Context {
338   Filer *filer;
339   PurgeRange *pr;
340   C_PurgeRange(Filer *f, PurgeRange *p) : filer(f), pr(p) {}
341   void finish(int r) override {
342     filer->_do_purge_range(pr, 1);
343   }
344 };
345
346 void Filer::_do_purge_range(PurgeRange *pr, int fin)
347 {
348   PurgeRange::unique_lock prl(pr->lock);
349   pr->uncommitted -= fin;
350   ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first
351                  << "~" << pr->num << " uncommitted " << pr->uncommitted
352                  << dendl;
353
354   if (pr->num == 0 && pr->uncommitted == 0) {
355     pr->oncommit->complete(0);
356     prl.unlock();
357     delete pr;
358     return;
359   }
360
361   std::vector<object_t> remove_oids;
362
363   int max = cct->_conf->filer_max_purge_ops - pr->uncommitted;
364   while (pr->num > 0 && max > 0) {
365     remove_oids.push_back(file_object_t(pr->ino, pr->first));
366     pr->uncommitted++;
367     pr->first++;
368     pr->num--;
369     max--;
370   }
371   prl.unlock();
372
373   // Issue objecter ops outside pr->lock to avoid lock dependency loop
374   for (const auto& oid : remove_oids) {
375     object_locator_t oloc = OSDMap::file_to_object_locator(pr->layout);
376     objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
377                      new C_OnFinisher(new C_PurgeRange(this, pr), finisher));
378   }
379 }
380
381 // -----------------------
382 struct TruncRange {
383   std::mutex lock;
384   typedef std::lock_guard<std::mutex> lock_guard;
385   typedef std::unique_lock<std::mutex> unique_lock;
386   inodeno_t ino;
387   file_layout_t layout;
388   SnapContext snapc;
389   ceph::real_time mtime;
390   int flags;
391   Context *oncommit;
392   int uncommitted;
393   uint64_t offset;
394   uint64_t length;
395   uint32_t truncate_seq;
396   TruncRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc,
397              ceph::real_time t, int fl, Context *fin,
398              uint64_t off, uint64_t len, uint32_t ts)
399     : ino(i), layout(l), snapc(sc), mtime(t), flags(fl), oncommit(fin),
400       uncommitted(0), offset(off), length(len), truncate_seq(ts) {}
401 };
402
403 void Filer::truncate(inodeno_t ino,
404                      file_layout_t *layout,
405                      const SnapContext& snapc,
406                      uint64_t offset,
407                      uint64_t len,
408                      __u32 truncate_seq,
409                      ceph::real_time mtime,
410                      int flags,
411                      Context *oncommit)
412 {
413   uint64_t period = layout->get_period();
414   uint64_t num_objs = Striper::get_num_objects(*layout, len + (offset % period));
415   if (num_objs == 1) {
416     vector<ObjectExtent> extents;
417     Striper::file_to_extents(cct, ino, layout, offset, len, 0, extents);
418     vector<OSDOp> ops(1);
419     ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
420     ops[0].op.extent.truncate_seq = truncate_seq;
421     ops[0].op.extent.truncate_size = extents[0].offset;
422     objecter->_modify(extents[0].oid, extents[0].oloc, ops, mtime, snapc,
423                       flags, oncommit);
424     return;
425   }
426
427   if (len > 0 && (offset + len) % period)
428     len += period - ((offset + len) % period);
429
430   TruncRange *tr = new TruncRange(ino, *layout, snapc, mtime, flags, oncommit,
431                                   offset, len, truncate_seq);
432   _do_truncate_range(tr, 0);
433 }
434
435 struct C_TruncRange : public Context {
436   Filer *filer;
437   TruncRange *tr;
438   C_TruncRange(Filer *f, TruncRange *t) : filer(f), tr(t) {}
439   void finish(int r) override {
440     filer->_do_truncate_range(tr, 1);
441   }
442 };
443
444 void Filer::_do_truncate_range(TruncRange *tr, int fin)
445 {
446   TruncRange::unique_lock trl(tr->lock);
447   tr->uncommitted -= fin;
448   ldout(cct, 10) << "_do_truncate_range " << tr->ino << " objects " << tr->offset
449                  << "~" << tr->length << " uncommitted " << tr->uncommitted
450                  << dendl;
451
452   if (tr->length == 0 && tr->uncommitted == 0) {
453     tr->oncommit->complete(0);
454     trl.unlock();
455     delete tr;
456     return;
457   }
458
459   vector<ObjectExtent> extents;
460
461   int max = cct->_conf->filer_max_truncate_ops - tr->uncommitted;
462   if (max > 0 && tr->length > 0) {
463     uint64_t len = tr->layout.get_period() * max;
464     if (len > tr->length)
465       len = tr->length;
466
467     uint64_t offset = tr->offset + tr->length - len;
468     Striper::file_to_extents(cct, tr->ino, &tr->layout, offset, len, 0, extents);
469     tr->uncommitted += extents.size();
470     tr->length -= len;
471   }
472
473   trl.unlock();
474
475   // Issue objecter ops outside tr->lock to avoid lock dependency loop
476   for (const auto& p : extents) {
477     vector<OSDOp> ops(1);
478     ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
479     ops[0].op.extent.truncate_size = p.offset;
480     ops[0].op.extent.truncate_seq = tr->truncate_seq;
481     objecter->_modify(p.oid, p.oloc, ops, tr->mtime, tr->snapc, tr->flags,
482                       new C_OnFinisher(new C_TruncRange(this, tr), finisher));
483   }
484 }