Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / client / barrier.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  *
5  * Copyright (C) 2012 CohortFS, LLC.
6  *
7  * This is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License version 2.1, as published by the Free Software
10  * Foundation.  See file COPYING.
11  *
12  */
13
14 #if defined(__FreeBSD__)
15 #include <sys/param.h>
16 #endif
17
18 #include "include/Context.h"
19 #include "Client.h"
20 #include "barrier.h"
21 #include "include/assert.h"
22
23 #undef dout_prefix
24 #define dout_prefix *_dout << "client." << whoami << " "
25
26 #define dout_subsys ceph_subsys_client
27
28 #define cldout(cl, v)  dout_impl((cl)->cct, dout_subsys, v) \
29   *_dout << "client." << cl->whoami << " "
30
31 /* C_Block_Sync */
32 class C_Block_Sync : public Context {
33 private:
34   Client *cl;
35   uint64_t ino;
36   barrier_interval iv;
37   enum CBlockSync_State state;
38   Barrier *barrier;
39   int *rval; /* see Cond.h */
40
41 public:
42   boost::intrusive::list_member_hook<> intervals_hook;
43   C_Block_Sync(Client *c, uint64_t i, barrier_interval iv, int *r);
44   void finish(int rval);
45
46   friend class Barrier;
47   friend class BarrierContext;
48 };
49
50 C_Block_Sync::C_Block_Sync(Client *c, uint64_t i, barrier_interval iv,
51                            int *r=0) :
52   cl(c), ino(i), iv(iv), rval(r)
53 {
54   state = CBlockSync_State_None;
55   barrier = NULL;
56
57   cldout(cl, 1) << "C_Block_Sync for " << ino << dendl;
58
59   if (!cl->barriers[ino]) {
60     cl->barriers[ino] = new BarrierContext(cl, ino);
61   }
62   /* XXX current semantics aren't commit-ordered */
63   cl->barriers[ino]->write_nobarrier(*this);
64 }
65
66 void C_Block_Sync::finish(int r) {
67   cldout(cl, 1) << "C_Block_Sync::finish() for " << ino << " "
68                 << iv << " r==" << r << dendl;
69   if (rval)
70     *rval = r;
71   cl->barriers[ino]->complete(*this);
72 }
73
74 /* Barrier */
75 Barrier::Barrier()
76 { }
77
78 Barrier::~Barrier()
79 { }
80
81 /* BarrierContext */
82 BarrierContext::BarrierContext(Client *c, uint64_t ino) :
83   cl(c), ino(ino), lock("BarrierContext")
84 { };
85
86 void BarrierContext::write_nobarrier(C_Block_Sync &cbs)
87 {
88   Mutex::Locker locker(lock);
89   cbs.state = CBlockSync_State_Unclaimed;
90   outstanding_writes.push_back(cbs);
91 }
92
93 void BarrierContext::write_barrier(C_Block_Sync &cbs)
94 {
95   Mutex::Locker locker(lock);
96   barrier_interval &iv = cbs.iv;
97
98   { /* find blocking commit--intrusive no help here */
99     BarrierList::iterator iter;
100     bool done = false;
101     for (iter = active_commits.begin();
102          !done && (iter != active_commits.end());
103          ++iter) {
104       Barrier &barrier = *iter;
105       while (boost::icl::intersects(barrier.span, iv)) {
106         /*  wait on this */
107         barrier.cond.Wait(lock);
108         done = true;
109       }
110     }
111   }
112
113   cbs.state = CBlockSync_State_Unclaimed;
114   outstanding_writes.push_back(cbs);
115
116 } /* write_barrier */
117
118 void BarrierContext::commit_barrier(barrier_interval &civ)
119 {
120     Mutex::Locker locker(lock);
121
122     /* we commit outstanding writes--if none exist, we don't care */
123     if (outstanding_writes.size() == 0)
124       return;
125
126     boost::icl::interval_set<uint64_t> cvs;
127     cvs.insert(civ);
128
129     Barrier *barrier = NULL;
130     BlockSyncList::iterator iter, iter2;
131
132     iter = outstanding_writes.begin();
133     while (iter != outstanding_writes.end()) {
134       barrier_interval &iv = iter->iv;
135       if (boost::icl::intersects(cvs, iv)) {
136         C_Block_Sync &a_write = *iter;
137         if (! barrier)
138           barrier = new Barrier();
139         /* mark the callback */
140         a_write.state = CBlockSync_State_Committing;
141         a_write.barrier = barrier;
142         iter2 = iter++;
143         outstanding_writes.erase(iter2);
144         barrier->write_list.push_back(a_write);
145         barrier->span.insert(iv);
146         /* avoid iter invalidate */
147       } else {
148         ++iter;
149       }
150     }
151
152     if (barrier) {
153       active_commits.push_back(*barrier);
154       /* and wait on this */
155       barrier->cond.Wait(lock);
156     }
157
158 } /* commit_barrier */
159
160 void BarrierContext::complete(C_Block_Sync &cbs)
161 {
162     Mutex::Locker locker(lock);
163     BlockSyncList::iterator iter =
164       BlockSyncList::s_iterator_to(cbs);
165
166     switch (cbs.state) {
167     case CBlockSync_State_Unclaimed:
168       /* cool, no waiting */
169       outstanding_writes.erase(iter);
170       break;
171     case CBlockSync_State_Committing:
172     {
173       Barrier *barrier = iter->barrier;
174       barrier->write_list.erase(iter);
175       /* signal waiters */
176       barrier->cond.Signal();
177         /* dispose cleared barrier */
178       if (barrier->write_list.size() == 0) {
179         BarrierList::iterator iter2 =
180           BarrierList::s_iterator_to(*barrier);
181         active_commits.erase(iter2);
182         delete barrier;
183       }
184     }
185     break;
186     default:
187       assert(false);
188       break;
189     }
190
191     cbs.state = CBlockSync_State_Completed;
192
193 } /* complete */
194
195 BarrierContext::~BarrierContext()
196 { }