Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / CDentry.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16
17 #include "CDentry.h"
18 #include "CInode.h"
19 #include "CDir.h"
20
21 #include "MDSRank.h"
22 #include "MDCache.h"
23 #include "Locker.h"
24 #include "LogSegment.h"
25
26 #include "messages/MLock.h"
27
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mds
30 #undef dout_prefix
31 #define dout_prefix *_dout << "mds." << dir->cache->mds->get_nodeid() << ".cache.den(" << dir->dirfrag() << " " << name << ") "
32
33
34 ostream& CDentry::print_db_line_prefix(ostream& out)
35 {
36   return out << ceph_clock_now() << " mds." << dir->cache->mds->get_nodeid() << ".cache.den(" << dir->ino() << " " << name << ") ";
37 }
38
39 LockType CDentry::lock_type(CEPH_LOCK_DN);
40 LockType CDentry::versionlock_type(CEPH_LOCK_DVERSION);
41
42
43 // CDentry
44
45 ostream& operator<<(ostream& out, const CDentry& dn)
46 {
47   filepath path;
48   dn.make_path(path);
49   
50   out << "[dentry " << path;
51   
52   if (true || dn.first != 0 || dn.last != CEPH_NOSNAP) {
53     out << " [" << dn.first << ",";
54     if (dn.last == CEPH_NOSNAP) 
55       out << "head";
56     else
57       out << dn.last;
58     out << ']';
59   }
60
61   if (dn.is_auth()) {
62     out << " auth";
63     if (dn.is_replicated()) 
64       out << dn.get_replicas();
65   } else {
66     out << " rep@" << dn.authority();
67     out << "." << dn.get_replica_nonce();
68   }
69
70   if (dn.get_linkage()->is_null()) out << " NULL";
71   if (dn.get_linkage()->is_remote()) {
72     out << " REMOTE(";
73     out << dn.get_linkage()->get_remote_d_type_string();
74     out << ")";
75   }
76
77   if (!dn.lock.is_sync_and_unlocked())
78     out << " " << dn.lock;
79   if (!dn.versionlock.is_sync_and_unlocked())
80     out << " " << dn.versionlock;
81
82   if (dn.get_projected_version() != dn.get_version())
83     out << " pv=" << dn.get_projected_version();
84   out << " v=" << dn.get_version();
85
86   if (dn.is_auth_pinned())
87     out << " ap=" << dn.get_num_auth_pins() << "+" << dn.get_num_nested_auth_pins();
88
89   out << " inode=" << dn.get_linkage()->get_inode();
90
91   out << " state=" << dn.get_state();
92   if (dn.is_new()) out << "|new";
93   if (dn.state_test(CDentry::STATE_BOTTOMLRU)) out << "|bottomlru";
94
95   if (dn.get_num_ref()) {
96     out << " |";
97     dn.print_pin_set(out);
98   }
99
100   out << " " << &dn;
101   out << "]";
102   return out;
103 }
104
105
106 bool operator<(const CDentry& l, const CDentry& r)
107 {
108   if ((l.get_dir()->ino() < r.get_dir()->ino()) ||
109       (l.get_dir()->ino() == r.get_dir()->ino() &&
110        (l.get_name() < r.get_name() ||
111         (l.get_name() == r.get_name() && l.last < r.last))))
112     return true;
113   return false;
114 }
115
116
117 void CDentry::print(ostream& out)
118 {
119   out << *this;
120 }
121
122
123 /*
124 inodeno_t CDentry::get_ino()
125 {
126   if (get_inode()) 
127     return get_inode()->ino();
128   return inodeno_t();
129 }
130 */
131
132 mds_authority_t CDentry::authority() const
133 {
134   return dir->authority();
135 }
136
137
138 void CDentry::add_waiter(uint64_t tag, MDSInternalContextBase *c)
139 {
140   // wait on the directory?
141   if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) {
142     dir->add_waiter(tag, c);
143     return;
144   }
145   MDSCacheObject::add_waiter(tag, c);
146 }
147
148
149 version_t CDentry::pre_dirty(version_t min)
150 {
151   projected_version = dir->pre_dirty(min);
152   dout(10) << " pre_dirty " << *this << dendl;
153   return projected_version;
154 }
155
156
157 void CDentry::_mark_dirty(LogSegment *ls)
158 {
159   // state+pin
160   if (!state_test(STATE_DIRTY)) {
161     state_set(STATE_DIRTY);
162     dir->inc_num_dirty();
163     get(PIN_DIRTY);
164     assert(ls);
165   }
166   if (ls) 
167     ls->dirty_dentries.push_back(&item_dirty);
168 }
169
170 void CDentry::mark_dirty(version_t pv, LogSegment *ls) 
171 {
172   dout(10) << " mark_dirty " << *this << dendl;
173
174   // i now live in this new dir version
175   assert(pv <= projected_version);
176   version = pv;
177   _mark_dirty(ls);
178
179   // mark dir too
180   dir->mark_dirty(pv, ls);
181 }
182
183
184 void CDentry::mark_clean() 
185 {
186   dout(10) << " mark_clean " << *this << dendl;
187   assert(is_dirty());
188
189   // not always true for recalc_auth_bits during resolve finish
190   //assert(dir->get_version() == 0 || version <= dir->get_version());  // hmm?
191
192   // state+pin
193   state_clear(STATE_DIRTY);
194   dir->dec_num_dirty();
195   put(PIN_DIRTY);
196   
197   item_dirty.remove_myself();
198
199   clear_new();
200 }    
201
202 void CDentry::mark_new() 
203 {
204   dout(10) << " mark_new " << *this << dendl;
205   state_set(STATE_NEW);
206 }
207
208 void CDentry::make_path_string(string& s, bool projected) const
209 {
210   if (dir) {
211     dir->inode->make_path_string(s, projected);
212   } else {
213     s = "???";
214   }
215   s += "/";
216   s.append(name.data(), name.length());
217 }
218
219 void CDentry::make_path(filepath& fp, bool projected) const
220 {
221   assert(dir);
222   dir->inode->make_path(fp, projected);
223   fp.push_dentry(name);
224 }
225
226 /*
227  * we only add ourselves to remote_parents when the linkage is
228  * active (no longer projected).  if the passed dnl is projected,
229  * don't link in, and do that work later in pop_projected_linkage().
230  */
231 void CDentry::link_remote(CDentry::linkage_t *dnl, CInode *in)
232 {
233   assert(dnl->is_remote());
234   assert(in->ino() == dnl->get_remote_ino());
235   dnl->inode = in;
236
237   if (dnl == &linkage)
238     in->add_remote_parent(this);
239 }
240
241 void CDentry::unlink_remote(CDentry::linkage_t *dnl)
242 {
243   assert(dnl->is_remote());
244   assert(dnl->inode);
245   
246   if (dnl == &linkage)
247     dnl->inode->remove_remote_parent(this);
248
249   dnl->inode = 0;
250 }
251
252 void CDentry::push_projected_linkage()
253 {
254   _project_linkage();
255
256   if (is_auth()) {
257     CInode *diri = dir->inode;
258     if (diri->is_stray())
259       diri->mdcache->notify_stray_removed();
260   }
261 }
262
263
264 void CDentry::push_projected_linkage(CInode *inode)
265 {
266   // dirty rstat tracking is in the projected plane
267   bool dirty_rstat = inode->is_dirty_rstat();
268   if (dirty_rstat)
269     inode->clear_dirty_rstat();
270
271   _project_linkage()->inode = inode;
272   inode->push_projected_parent(this);
273
274   if (dirty_rstat)
275     inode->mark_dirty_rstat();
276
277   if (is_auth()) {
278     CInode *diri = dir->inode;
279     if (diri->is_stray())
280       diri->mdcache->notify_stray_created();
281   }
282 }
283
284 CDentry::linkage_t *CDentry::pop_projected_linkage()
285 {
286   assert(projected.size());
287   
288   linkage_t& n = projected.front();
289
290   /*
291    * the idea here is that the link_remote_inode(), link_primary_inode(), 
292    * etc. calls should make linkage identical to &n (and we assert as
293    * much).
294    */
295
296   if (n.remote_ino) {
297     dir->link_remote_inode(this, n.remote_ino, n.remote_d_type);
298     if (n.inode) {
299       linkage.inode = n.inode;
300       linkage.inode->add_remote_parent(this);
301     }
302   } else if (n.inode) {
303     dir->link_primary_inode(this, n.inode);
304     n.inode->pop_projected_parent();
305   }
306
307   assert(n.inode == linkage.inode);
308   assert(n.remote_ino == linkage.remote_ino);
309   assert(n.remote_d_type == linkage.remote_d_type);
310
311   projected.pop_front();
312
313   return &linkage;
314 }
315
316
317
318 // ----------------------------
319 // auth pins
320
321 int CDentry::get_num_dir_auth_pins() const
322 {
323   assert(!is_projected());
324   if (get_linkage()->is_primary())
325     return auth_pins + get_linkage()->get_inode()->get_num_auth_pins();
326   return auth_pins;
327 }
328
329 bool CDentry::can_auth_pin() const
330 {
331   assert(dir);
332   return dir->can_auth_pin();
333 }
334
335 void CDentry::auth_pin(void *by)
336 {
337   if (auth_pins == 0)
338     get(PIN_AUTHPIN);
339   auth_pins++;
340
341 #ifdef MDS_AUTHPIN_SET
342   auth_pin_set.insert(by);
343 #endif
344
345   dout(10) << "auth_pin by " << by << " on " << *this 
346            << " now " << auth_pins << "+" << nested_auth_pins
347            << dendl;
348
349   dir->adjust_nested_auth_pins(1, 1, by);
350 }
351
352 void CDentry::auth_unpin(void *by)
353 {
354   auth_pins--;
355
356 #ifdef MDS_AUTHPIN_SET
357   assert(auth_pin_set.count(by));
358   auth_pin_set.erase(auth_pin_set.find(by));
359 #endif
360
361   if (auth_pins == 0)
362     put(PIN_AUTHPIN);
363
364   dout(10) << "auth_unpin by " << by << " on " << *this
365            << " now " << auth_pins << "+" << nested_auth_pins
366            << dendl;
367   assert(auth_pins >= 0);
368
369   dir->adjust_nested_auth_pins(-1, -1, by);
370 }
371
372 void CDentry::adjust_nested_auth_pins(int adjustment, int diradj, void *by)
373 {
374   nested_auth_pins += adjustment;
375
376   dout(35) << "adjust_nested_auth_pins by " << by 
377            << ", change " << adjustment << " yields "
378            << auth_pins << "+" << nested_auth_pins
379            << dendl;
380   assert(nested_auth_pins >= 0);
381
382   dir->adjust_nested_auth_pins(adjustment, diradj, by);
383 }
384
385 bool CDentry::is_frozen() const
386 {
387   return dir->is_frozen();
388 }
389
390 bool CDentry::is_freezing() const
391 {
392   return dir->is_freezing();
393 }
394
395 void CDentry::decode_replica(bufferlist::iterator& p, bool is_new)
396 {
397   __u32 nonce;
398   ::decode(nonce, p);
399   replica_nonce = nonce;
400   
401   ::decode(first, p);
402
403   inodeno_t rino;
404   unsigned char rdtype;
405   __s32 ls;
406   ::decode(rino, p);
407   ::decode(rdtype, p);
408   ::decode(ls, p);
409
410   if (is_new) {
411     if (rino)
412       dir->link_remote_inode(this, rino, rdtype);
413     lock.set_state(ls);
414   }
415 }
416
417 // ----------------------------
418 // locking
419
420 void CDentry::set_object_info(MDSCacheObjectInfo &info) 
421 {
422   info.dirfrag = dir->dirfrag();
423   info.dname = name;
424   info.snapid = last;
425 }
426
427 void CDentry::encode_lock_state(int type, bufferlist& bl)
428 {
429   ::encode(first, bl);
430
431   // null, ino, or remote_ino?
432   char c;
433   if (linkage.is_primary()) {
434     c = 1;
435     ::encode(c, bl);
436     ::encode(linkage.get_inode()->inode.ino, bl);
437   }
438   else if (linkage.is_remote()) {
439     c = 2;
440     ::encode(c, bl);
441     ::encode(linkage.get_remote_ino(), bl);
442   }
443   else if (linkage.is_null()) {
444     // encode nothing.
445   }
446   else ceph_abort();
447 }
448
449 void CDentry::decode_lock_state(int type, bufferlist& bl)
450 {  
451   bufferlist::iterator p = bl.begin();
452
453   snapid_t newfirst;
454   ::decode(newfirst, p);
455
456   if (!is_auth() && newfirst != first) {
457     dout(10) << "decode_lock_state first " << first << " -> " << newfirst << dendl;
458     assert(newfirst > first);
459     first = newfirst;
460   }
461
462   if (p.end()) {
463     // null
464     assert(linkage.is_null());
465     return;
466   }
467
468   char c;
469   inodeno_t ino;
470   ::decode(c, p);
471
472   switch (c) {
473   case 1:
474   case 2:
475     ::decode(ino, p);
476     // newly linked?
477     if (linkage.is_null() && !is_auth()) {
478       // force trim from cache!
479       dout(10) << "decode_lock_state replica dentry null -> non-null, must trim" << dendl;
480       //assert(get_num_ref() == 0);
481     } else {
482       // verify?
483       
484     }
485     break;
486   default: 
487     ceph_abort();
488   }
489 }
490
491
492 ClientLease *CDentry::add_client_lease(client_t c, Session *session) 
493 {
494   ClientLease *l;
495   if (client_lease_map.count(c))
496     l = client_lease_map[c];
497   else {
498     dout(20) << "add_client_lease client." << c << " on " << lock << dendl;
499     if (client_lease_map.empty())
500       get(PIN_CLIENTLEASE);
501     l = client_lease_map[c] = new ClientLease(c, this);
502     l->seq = ++session->lease_seq;
503   
504     lock.get_client_lease();
505   }
506   
507   return l;
508 }
509
510 void CDentry::remove_client_lease(ClientLease *l, Locker *locker) 
511 {
512   assert(l->parent == this);
513
514   bool gather = false;
515
516   dout(20) << "remove_client_lease client." << l->client << " on " << lock << dendl;
517   lock.put_client_lease();
518   if (lock.get_num_client_lease() == 0 && !lock.is_stable())
519     gather = true;
520
521   client_lease_map.erase(l->client);
522   l->item_lease.remove_myself();
523   l->item_session_lease.remove_myself();
524   delete l;
525
526   if (client_lease_map.empty())
527     put(PIN_CLIENTLEASE);
528
529   if (gather)
530     locker->eval_gather(&lock);
531 }
532
533 void CDentry::remove_client_leases(Locker *locker)
534 {
535   while (!client_lease_map.empty())
536     remove_client_lease(client_lease_map.begin()->second, locker);
537 }
538
539 void CDentry::_put()
540 {
541   if (get_num_ref() <= ((int)is_dirty() + 1)) {
542     CDentry::linkage_t *dnl = get_projected_linkage();
543     if (dnl->is_primary()) {
544       CInode *in = dnl->get_inode();
545       if (get_num_ref() == (int)is_dirty() + !!in->get_num_ref())
546         in->mdcache->maybe_eval_stray(in, true);
547     }
548   }
549 }
550
551 void CDentry::dump(Formatter *f) const
552 {
553   assert(f != NULL);
554
555   filepath path;
556   make_path(path);
557
558   f->dump_string("path", path.get_path());
559   f->dump_unsigned("path_ino", path.get_ino().val);
560   f->dump_unsigned("snap_first", first);
561   f->dump_unsigned("snap_last", last);
562   
563   f->dump_bool("is_primary", get_linkage()->is_primary());
564   f->dump_bool("is_remote", get_linkage()->is_remote());
565   f->dump_bool("is_null", get_linkage()->is_null());
566   f->dump_bool("is_new", is_new());
567   if (get_linkage()->get_inode()) {
568     f->dump_unsigned("inode", get_linkage()->get_inode()->ino());
569   } else {
570     f->dump_unsigned("inode", 0);
571   }
572
573   if (linkage.is_remote()) {
574     f->dump_string("remote_type", linkage.get_remote_d_type_string());
575   } else {
576     f->dump_string("remote_type", "");
577   }
578
579   f->dump_unsigned("version", get_version());
580   f->dump_unsigned("projected_version", get_projected_version());
581
582   f->dump_int("auth_pins", auth_pins);
583   f->dump_int("nested_auth_pins", nested_auth_pins);
584
585   MDSCacheObject::dump(f);
586
587   f->open_object_section("lock");
588   lock.dump(f);
589   f->close_section();
590
591   f->open_object_section("versionlock");
592   versionlock.dump(f);
593   f->close_section();
594
595   f->open_array_section("states");
596   MDSCacheObject::dump_states(f);
597   if (state_test(STATE_NEW))
598     f->dump_string("state", "new");
599   if (state_test(STATE_FRAGMENTING))
600     f->dump_string("state", "fragmenting");
601   if (state_test(STATE_PURGING))
602     f->dump_string("state", "purging");
603   if (state_test(STATE_BADREMOTEINO))
604     f->dump_string("state", "badremoteino");
605   if (state_test(STATE_STRAY))
606     f->dump_string("state", "stray");
607   f->close_section();
608 }
609
610 std::string CDentry::linkage_t::get_remote_d_type_string() const
611 {
612   switch (DTTOIF(remote_d_type)) {
613     case S_IFSOCK: return "sock";
614     case S_IFLNK: return "lnk";
615     case S_IFREG: return "reg";
616     case S_IFBLK: return "blk";
617     case S_IFDIR: return "dir";
618     case S_IFCHR: return "chr";
619     case S_IFIFO: return "fifo";
620     default: ceph_abort(); return "";
621   }
622 }
623
624 MEMPOOL_DEFINE_OBJECT_FACTORY(CDentry, co_dentry, mds_co);