Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / SnapRealm.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "SnapRealm.h"
16 #include "MDCache.h"
17 #include "MDSRank.h"
18
19 #include "messages/MClientSnap.h"
20
21
22 /*
23  * SnapRealm
24  */
25
26 #define dout_context g_ceph_context
27 #define dout_subsys ceph_subsys_mds
28 #undef dout_prefix
29 #define dout_prefix _prefix(_dout, mdcache->mds->get_nodeid(), inode, srnode.seq, this)
30 static ostream& _prefix(std::ostream *_dout, int whoami, const CInode *inode,
31                         uint64_t seq, const SnapRealm *realm) {
32   return *_dout << " mds." << whoami
33                 << ".cache.snaprealm(" << inode->ino()
34                 << " seq " << seq << " " << realm << ") ";
35 }
36
37 ostream& operator<<(ostream& out, const SnapRealm& realm) 
38 {
39   out << "snaprealm(" << realm.inode->ino()
40       << " seq " << realm.srnode.seq
41       << " lc " << realm.srnode.last_created
42       << " cr " << realm.srnode.created;
43   if (realm.srnode.created != realm.srnode.current_parent_since)
44     out << " cps " << realm.srnode.current_parent_since;
45   out << " snaps=" << realm.srnode.snaps;
46   if (realm.srnode.past_parents.size()) {
47     out << " past_parents=(";
48     for (map<snapid_t, snaplink_t>::const_iterator p = realm.srnode.past_parents.begin(); 
49          p != realm.srnode.past_parents.end(); 
50          ++p) {
51       if (p != realm.srnode.past_parents.begin()) out << ",";
52       out << p->second.first << "-" << p->first
53           << "=" << p->second.ino;
54     }
55     out << ")";
56   }
57   out << " " << &realm << ")";
58   return out;
59 }
60
61
62 void SnapRealm::add_open_past_parent(SnapRealm *parent, snapid_t last)
63 {
64   auto p = open_past_parents.find(parent->inode->ino());
65   if (p != open_past_parents.end()) {
66     assert(p->second.second.count(last) == 0);
67     p->second.second.insert(last);
68   } else {
69     open_past_parents[parent->inode->ino()].first = parent;
70     open_past_parents[parent->inode->ino()].second.insert(last);
71     parent->open_past_children.insert(this);
72     parent->inode->get(CInode::PIN_PASTSNAPPARENT);
73   }
74   ++num_open_past_parents;
75 }
76
77 void SnapRealm::remove_open_past_parent(inodeno_t ino, snapid_t last)
78 {
79   auto p = open_past_parents.find(ino);
80   assert(p != open_past_parents.end());
81   auto q = p->second.second.find(last);
82   assert(q != p->second.second.end());
83   p->second.second.erase(q);
84   --num_open_past_parents;
85   if (p->second.second.empty()) {
86     SnapRealm *parent = p->second.first;
87     open_past_parents.erase(p);
88     parent->open_past_children.erase(this);
89     parent->inode->put(CInode::PIN_PASTSNAPPARENT);
90   }
91 }
92
93 struct C_SR_RetryOpenParents : public MDSInternalContextBase {
94   SnapRealm *sr;
95   snapid_t first, last, parent_last;
96   inodeno_t parent;
97   MDSInternalContextBase* fin;
98   C_SR_RetryOpenParents(SnapRealm *s, snapid_t f, snapid_t l, snapid_t pl,
99                         inodeno_t p, MDSInternalContextBase *c) :
100     sr(s), first(f), last(l), parent_last(pl),  parent(p), fin(c) {
101     sr->inode->get(CInode::PIN_OPENINGSNAPPARENTS);
102   }
103   MDSRank *get_mds() override { return sr->mdcache->mds; }
104   void finish(int r) override {
105     if (r < 0)
106       sr->_remove_missing_parent(parent_last, parent, r);
107     if (sr->_open_parents(fin, first, last))
108       fin->complete(0);
109     sr->inode->put(CInode::PIN_OPENINGSNAPPARENTS);
110   }
111 };
112
113 void SnapRealm::_remove_missing_parent(snapid_t snapid, inodeno_t parent, int err)
114 {
115   map<snapid_t, snaplink_t>::iterator p = srnode.past_parents.find(snapid);
116   if (p != srnode.past_parents.end()) {
117     dout(10) << __func__ << " " << parent << " [" << p->second.first << ","
118              << p->first << "]  errno " << err << dendl;
119     srnode.past_parents.erase(p);
120   } else {
121     dout(10) << __func__ << " " << parent << " not found" << dendl;
122   }
123 }
124
125 bool SnapRealm::_open_parents(MDSInternalContextBase *finish, snapid_t first, snapid_t last)
126 {
127   dout(10) << "open_parents [" << first << "," << last << "]" << dendl;
128   if (open) 
129     return true;
130
131   // make sure my current parents' parents are open...
132   if (parent) {
133     dout(10) << " current parent [" << srnode.current_parent_since << ",head] is " << *parent
134              << " on " << *parent->inode << dendl;
135     if (last >= srnode.current_parent_since &&
136         !parent->_open_parents(finish, MAX(first, srnode.current_parent_since), last))
137       return false;
138   }
139
140   // and my past parents too!
141   assert(srnode.past_parents.size() >= num_open_past_parents);
142   if (srnode.past_parents.size() > num_open_past_parents) {
143     for (map<snapid_t, snaplink_t>::iterator p = srnode.past_parents.begin();
144          p != srnode.past_parents.end(); ) {
145       dout(10) << " past_parent [" << p->second.first << "," << p->first << "] is "
146                << p->second.ino << dendl;
147       CInode *parent = mdcache->get_inode(p->second.ino);
148       if (!parent) {
149         C_SR_RetryOpenParents *fin = new C_SR_RetryOpenParents(this, first, last, p->first,
150                                                                p->second.ino, finish);
151         mdcache->open_ino(p->second.ino, mdcache->mds->mdsmap->get_metadata_pool(), fin);
152         return false;
153       }
154       if (parent->state_test(CInode::STATE_PURGING)) {
155         dout(10) << " skip purging past_parent " << *parent << dendl;
156         srnode.past_parents.erase(p++);
157         continue;
158       }
159       assert(parent->snaprealm);  // hmm!
160       if (!parent->snaprealm->_open_parents(finish, p->second.first, p->first))
161         return false;
162       auto q = open_past_parents.find(p->second.ino);
163       if (q == open_past_parents.end() ||
164           q->second.second.count(p->first) == 0) {
165         add_open_past_parent(parent->snaprealm, p->first);
166       }
167       ++p;
168     }
169   }
170
171   open = true;
172   return true;
173 }
174
175 bool SnapRealm::open_parents(MDSInternalContextBase *retryorfinish) {
176   if (!_open_parents(retryorfinish))
177     return false;
178   delete retryorfinish;
179   return true;
180 }
181
182 bool SnapRealm::have_past_parents_open(snapid_t first, snapid_t last)
183 {
184   dout(10) << "have_past_parents_open [" << first << "," << last << "]" << dendl;
185   if (open)
186     return true;
187
188   for (map<snapid_t, snaplink_t>::iterator p = srnode.past_parents.lower_bound(first);
189        p != srnode.past_parents.end();
190        ++p) {
191     if (p->second.first > last)
192       break;
193     dout(10) << " past parent [" << p->second.first << "," << p->first << "] was "
194              << p->second.ino << dendl;
195     if (open_past_parents.count(p->second.ino) == 0) {
196       dout(10) << " past parent " << p->second.ino << " is not open" << dendl;
197       return false;
198     }
199     SnapRealm *parent_realm = open_past_parents[p->second.ino].first;
200     if (!parent_realm->have_past_parents_open(MAX(first, p->second.first),
201                                               MIN(last, p->first)))
202       return false;
203   }
204
205   open = true;
206   return true;
207 }
208
209 void SnapRealm::close_parents()
210 {
211   for (auto p = open_past_parents.begin(); p != open_past_parents.end(); ++p) {
212     num_open_past_parents -= p->second.second.size();
213     p->second.first->inode->put(CInode::PIN_PASTSNAPPARENT);
214     p->second.first->open_past_children.erase(this);
215   }
216   open_past_parents.clear();
217 }
218
219
220 /*
221  * get list of snaps for this realm.  we must include parents' snaps
222  * for the intervals during which they were our parent.
223  */
224 void SnapRealm::build_snap_set(set<snapid_t> &s,
225                                snapid_t& max_seq, snapid_t& max_last_created, snapid_t& max_last_destroyed,
226                                snapid_t first, snapid_t last) const
227 {
228   dout(10) << "build_snap_set [" << first << "," << last << "] on " << *this << dendl;
229
230   if (srnode.seq > max_seq)
231     max_seq = srnode.seq;
232   if (srnode.last_created > max_last_created)
233     max_last_created = srnode.last_created;
234   if (srnode.last_destroyed > max_last_destroyed)
235     max_last_destroyed = srnode.last_destroyed;
236
237   // include my snaps within interval [first,last]
238   for (map<snapid_t, SnapInfo>::const_iterator p = srnode.snaps.lower_bound(first); // first element >= first
239        p != srnode.snaps.end() && p->first <= last;
240        ++p)
241     s.insert(p->first);
242
243   // include snaps for parents during intervals that intersect [first,last]
244   for (map<snapid_t, snaplink_t>::const_iterator p = srnode.past_parents.lower_bound(first);
245        p != srnode.past_parents.end() && p->first >= first && p->second.first <= last;
246        ++p) {
247     const CInode *oldparent = mdcache->get_inode(p->second.ino);
248     assert(oldparent);  // call open_parents first!
249     assert(oldparent->snaprealm);
250     oldparent->snaprealm->build_snap_set(s, max_seq, max_last_created, max_last_destroyed,
251                                          MAX(first, p->second.first),
252                                          MIN(last, p->first));
253   }
254   if (srnode.current_parent_since <= last && parent)
255     parent->build_snap_set(s, max_seq, max_last_created, max_last_destroyed,
256                            MAX(first, srnode.current_parent_since), last);
257 }
258
259
260 void SnapRealm::check_cache() const
261 {
262   assert(open);
263   if (cached_seq >= srnode.seq)
264     return;
265
266   cached_snaps.clear();
267   cached_snap_context.clear();
268
269   cached_last_created = srnode.last_created;
270   cached_last_destroyed = srnode.last_destroyed;
271   cached_seq = srnode.seq;
272   build_snap_set(cached_snaps, cached_seq, cached_last_created, cached_last_destroyed,
273                  0, CEPH_NOSNAP);
274
275   cached_snap_trace.clear();
276   build_snap_trace(cached_snap_trace);
277   
278   dout(10) << "check_cache rebuilt " << cached_snaps
279            << " seq " << srnode.seq
280            << " cached_seq " << cached_seq
281            << " cached_last_created " << cached_last_created
282            << " cached_last_destroyed " << cached_last_destroyed
283            << ")" << dendl;
284 }
285
286 const set<snapid_t>& SnapRealm::get_snaps() const
287 {
288   check_cache();
289   dout(10) << "get_snaps " << cached_snaps
290            << " (seq " << srnode.seq << " cached_seq " << cached_seq << ")"
291            << dendl;
292   return cached_snaps;
293 }
294
295 /*
296  * build vector in reverse sorted order
297  */
298 const SnapContext& SnapRealm::get_snap_context() const
299 {
300   check_cache();
301
302   if (!cached_snap_context.seq) {
303     cached_snap_context.seq = cached_seq;
304     cached_snap_context.snaps.resize(cached_snaps.size());
305     unsigned i = 0;
306     for (set<snapid_t>::reverse_iterator p = cached_snaps.rbegin();
307          p != cached_snaps.rend();
308          ++p)
309       cached_snap_context.snaps[i++] = *p;
310   }
311
312   return cached_snap_context;
313 }
314
315 void SnapRealm::get_snap_info(map<snapid_t,SnapInfo*>& infomap, snapid_t first, snapid_t last)
316 {
317   const set<snapid_t>& snaps = get_snaps();
318   dout(10) << "get_snap_info snaps " << snaps << dendl;
319
320   // include my snaps within interval [first,last]
321   for (map<snapid_t, SnapInfo>::iterator p = srnode.snaps.lower_bound(first); // first element >= first
322        p != srnode.snaps.end() && p->first <= last;
323        ++p)
324     infomap[p->first] = &p->second;
325
326   // include snaps for parents during intervals that intersect [first,last]
327   for (map<snapid_t, snaplink_t>::iterator p = srnode.past_parents.lower_bound(first);
328        p != srnode.past_parents.end() && p->first >= first && p->second.first <= last;
329        ++p) {
330     CInode *oldparent = mdcache->get_inode(p->second.ino);
331     assert(oldparent);  // call open_parents first!
332     assert(oldparent->snaprealm);
333     oldparent->snaprealm->get_snap_info(infomap,
334                                         MAX(first, p->second.first),
335                                         MIN(last, p->first));
336   }
337   if (srnode.current_parent_since <= last && parent)
338     parent->get_snap_info(infomap, MAX(first, srnode.current_parent_since), last);
339 }
340
341 const string& SnapRealm::get_snapname(snapid_t snapid, inodeno_t atino)
342 {
343   auto srnode_snaps_entry = srnode.snaps.find(snapid);
344   if (srnode_snaps_entry != srnode.snaps.end()) {
345     if (atino == inode->ino())
346       return srnode_snaps_entry->second.name;
347     else
348       return srnode_snaps_entry->second.get_long_name();
349   }
350
351   map<snapid_t,snaplink_t>::iterator p = srnode.past_parents.lower_bound(snapid);
352   if (p != srnode.past_parents.end() && p->second.first <= snapid) {
353     CInode *oldparent = mdcache->get_inode(p->second.ino);
354     assert(oldparent);  // call open_parents first!
355     assert(oldparent->snaprealm);    
356     return oldparent->snaprealm->get_snapname(snapid, atino);
357   }
358
359   assert(srnode.current_parent_since <= snapid);
360   assert(parent);
361   return parent->get_snapname(snapid, atino);
362 }
363
364 snapid_t SnapRealm::resolve_snapname(const string& n, inodeno_t atino, snapid_t first, snapid_t last)
365 {
366   // first try me
367   dout(10) << "resolve_snapname '" << n << "' in [" << first << "," << last << "]" << dendl;
368
369   //snapid_t num;
370   //if (n[0] == '~') num = atoll(n.c_str()+1);
371
372   bool actual = (atino == inode->ino());
373   string pname;
374   inodeno_t pino;
375   if (!actual) {
376     if (!n.length() ||
377         n[0] != '_') return 0;
378     int next_ = n.find('_', 1);
379     if (next_ < 0) return 0;
380     pname = n.substr(1, next_ - 1);
381     pino = atoll(n.c_str() + next_ + 1);
382     dout(10) << " " << n << " parses to name '" << pname << "' dirino " << pino << dendl;
383   }
384
385   for (map<snapid_t, SnapInfo>::iterator p = srnode.snaps.lower_bound(first); // first element >= first
386        p != srnode.snaps.end() && p->first <= last;
387        ++p) {
388     dout(15) << " ? " << p->second << dendl;
389     //if (num && p->second.snapid == num)
390     //return p->first;
391     if (actual && p->second.name == n)
392         return p->first;
393     if (!actual && p->second.name == pname && p->second.ino == pino)
394       return p->first;
395   }
396
397     // include snaps for parents during intervals that intersect [first,last]
398   for (map<snapid_t, snaplink_t>::iterator p = srnode.past_parents.lower_bound(first);
399        p != srnode.past_parents.end() && p->first >= first && p->second.first <= last;
400        ++p) {
401     CInode *oldparent = mdcache->get_inode(p->second.ino);
402     assert(oldparent);  // call open_parents first!
403     assert(oldparent->snaprealm);
404     snapid_t r = oldparent->snaprealm->resolve_snapname(n, atino,
405                                                         MAX(first, p->second.first),
406                                                         MIN(last, p->first));
407     if (r)
408       return r;
409   }
410   if (parent && srnode.current_parent_since <= last)
411     return parent->resolve_snapname(n, atino, MAX(first, srnode.current_parent_since), last);
412   return 0;
413 }
414
415
416 void SnapRealm::adjust_parent()
417 {
418   SnapRealm *newparent = inode->get_parent_dn()->get_dir()->get_inode()->find_snaprealm();
419   if (newparent != parent) {
420     dout(10) << "adjust_parent " << parent << " -> " << newparent << dendl;
421     if (parent)
422       parent->open_children.erase(this);
423     parent = newparent;
424     if (parent)
425       parent->open_children.insert(this);
426     
427     invalidate_cached_snaps();
428   }
429 }
430
431 void SnapRealm::split_at(SnapRealm *child)
432 {
433   dout(10) << "split_at " << *child 
434            << " on " << *child->inode << dendl;
435
436   if (inode->is_mdsdir() || !child->inode->is_dir()) {
437     // it's not a dir.
438     if (child->inode->containing_realm) {
439       //  - no open children.
440       //  - only need to move this child's inode's caps.
441       child->inode->move_to_realm(child);
442     } else {
443       // no caps, nothing to move/split.
444       dout(20) << " split no-op, no caps to move on file " << *child->inode << dendl;
445       assert(!child->inode->is_any_caps());
446     }
447     return;
448   }
449
450   // it's a dir.
451
452   // split open_children
453   dout(10) << " open_children are " << open_children << dendl;
454   for (set<SnapRealm*>::iterator p = open_children.begin();
455        p != open_children.end(); ) {
456     SnapRealm *realm = *p;
457     if (realm != child &&
458         child->inode->is_projected_ancestor_of(realm->inode)) {
459       dout(20) << " child gets child realm " << *realm << " on " << *realm->inode << dendl;
460       realm->parent = child;
461       child->open_children.insert(realm);
462       open_children.erase(p++);
463     } else {
464       dout(20) << "    keeping child realm " << *realm << " on " << *realm->inode << dendl;
465       ++p;
466     }
467   }
468
469   // split inodes_with_caps
470   elist<CInode*>::iterator p = inodes_with_caps.begin(member_offset(CInode, item_caps));
471   while (!p.end()) {
472     CInode *in = *p;
473     ++p;
474
475     // does inode fall within the child realm?
476     bool under_child = false;
477
478     if (in == child->inode) {
479       under_child = true;
480     } else {
481       CInode *t = in;
482       while (t->get_parent_dn()) {
483         t = t->get_parent_dn()->get_dir()->get_inode();
484         if (t == child->inode) {
485           under_child = true;
486           break;
487         }
488         if (t == in)
489           break;
490       }
491     }
492     if (under_child) {
493       dout(20) << " child gets " << *in << dendl;
494       in->move_to_realm(child);
495     } else {
496       dout(20) << "    keeping " << *in << dendl;
497     }
498   }
499
500 }
501
502 const bufferlist& SnapRealm::get_snap_trace()
503 {
504   check_cache();
505   return cached_snap_trace;
506 }
507
508 void SnapRealm::build_snap_trace(bufferlist& snapbl) const
509 {
510   SnapRealmInfo info(inode->ino(), srnode.created, srnode.seq, srnode.current_parent_since);
511
512   if (parent) {
513     info.h.parent = parent->inode->ino();
514     if (!srnode.past_parents.empty()) {
515       snapid_t last = srnode.past_parents.rbegin()->first;
516       set<snapid_t> past;
517       snapid_t max_seq, max_last_created, max_last_destroyed;
518       build_snap_set(past, max_seq, max_last_created, max_last_destroyed, 0, last);
519       info.prior_parent_snaps.reserve(past.size());
520       for (set<snapid_t>::reverse_iterator p = past.rbegin(); p != past.rend(); ++p)
521         info.prior_parent_snaps.push_back(*p);
522       dout(10) << "build_snap_trace prior_parent_snaps from [1," << last << "] "
523                << info.prior_parent_snaps << dendl;
524     }
525   } else 
526     info.h.parent = 0;
527
528   info.my_snaps.reserve(srnode.snaps.size());
529   for (map<snapid_t,SnapInfo>::const_reverse_iterator p = srnode.snaps.rbegin();
530        p != srnode.snaps.rend();
531        ++p)
532     info.my_snaps.push_back(p->first);
533   dout(10) << "build_snap_trace my_snaps " << info.my_snaps << dendl;
534
535   ::encode(info, snapbl);
536
537   if (parent)
538     parent->build_snap_trace(snapbl);
539 }
540
541
542
543 void SnapRealm::prune_past_parents()
544 {
545   dout(10) << "prune_past_parents" << dendl;
546   check_cache();
547   assert(open);
548
549   map<snapid_t, snaplink_t>::iterator p = srnode.past_parents.begin();
550   while (p != srnode.past_parents.end()) {
551     set<snapid_t>::iterator q = cached_snaps.lower_bound(p->second.first);
552     if (q == cached_snaps.end() ||
553         *q > p->first) {
554       dout(10) << "prune_past_parents pruning [" << p->second.first << "," << p->first 
555                << "] " << p->second.ino << dendl;
556       remove_open_past_parent(p->second.ino, p->first);
557       srnode.past_parents.erase(p++);
558     } else {
559       dout(10) << "prune_past_parents keeping [" << p->second.first << "," << p->first 
560                << "] " << p->second.ino << dendl;
561       ++p;
562     }
563   }
564 }
565