Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / JournalingObjectStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #include "JournalingObjectStore.h"
4
5 #include "common/errno.h"
6 #include "common/debug.h"
7
8 #define dout_context cct
9 #define dout_subsys ceph_subsys_journal
10 #undef dout_prefix
11 #define dout_prefix *_dout << "journal "
12
13
14
15 void JournalingObjectStore::journal_start()
16 {
17   dout(10) << "journal_start" << dendl;
18   finisher.start();
19 }
20
21 void JournalingObjectStore::journal_stop()
22 {
23   dout(10) << "journal_stop" << dendl;
24   finisher.wait_for_empty();
25   finisher.stop();
26 }
27
28 // A journal_replay() makes journal writeable, this closes that out.
29 void JournalingObjectStore::journal_write_close()
30 {
31   if (journal) {
32     journal->close();
33     delete journal;
34     journal = 0;
35   }
36   apply_manager.reset();
37 }
38
39 int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
40 {
41   dout(10) << "journal_replay fs op_seq " << fs_op_seq << dendl;
42
43   if (cct->_conf->journal_replay_from) {
44     dout(0) << "journal_replay forcing replay from "
45             << cct->_conf->journal_replay_from
46             << " instead of " << fs_op_seq << dendl;
47     // the previous op is the last one committed
48     fs_op_seq = cct->_conf->journal_replay_from - 1;
49   }
50
51   uint64_t op_seq = fs_op_seq;
52   apply_manager.init_seq(fs_op_seq);
53
54   if (!journal) {
55     submit_manager.set_op_seq(op_seq);
56     return 0;
57   }
58
59   int err = journal->open(op_seq);
60   if (err < 0) {
61     dout(3) << "journal_replay open failed with "
62             << cpp_strerror(err) << dendl;
63     delete journal;
64     journal = 0;
65     return err;
66   }
67
68   replaying = true;
69
70   int count = 0;
71   while (1) {
72     bufferlist bl;
73     uint64_t seq = op_seq + 1;
74     if (!journal->read_entry(bl, seq)) {
75       dout(3) << "journal_replay: end of journal, done." << dendl;
76       break;
77     }
78
79     if (seq <= op_seq) {
80       dout(3) << "journal_replay: skipping old op seq " << seq << " <= " << op_seq << dendl;
81       continue;
82     }
83     assert(op_seq == seq-1);
84
85     dout(3) << "journal_replay: applying op seq " << seq << dendl;
86     bufferlist::iterator p = bl.begin();
87     vector<ObjectStore::Transaction> tls;
88     while (!p.end()) {
89       tls.emplace_back(Transaction(p));
90     }
91
92     apply_manager.op_apply_start(seq);
93     int r = do_transactions(tls, seq);
94     apply_manager.op_apply_finish(seq);
95
96     op_seq = seq;
97     count++;
98
99     dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl;
100   }
101
102   if (count)
103     dout(3) << "journal_replay: total = " << count << dendl;
104
105   replaying = false;
106
107   submit_manager.set_op_seq(op_seq);
108
109   // done reading, make writeable.
110   err = journal->make_writeable();
111   if (err < 0)
112     return err;
113
114   if (!count)
115     journal->committed_thru(fs_op_seq);
116
117   return count;
118 }
119
120
121 // ------------------------------------
122
123 uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
124 {
125   Mutex::Locker l(apply_lock);
126   while (blocked) {
127     dout(10) << "op_apply_start blocked, waiting" << dendl;
128     blocked_cond.Wait(apply_lock);
129   }
130   dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> "
131            << (open_ops+1) << dendl;
132   assert(!blocked);
133   assert(op > committed_seq);
134   open_ops++;
135   return op;
136 }
137
138 void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
139 {
140   Mutex::Locker l(apply_lock);
141   dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> "
142            << (open_ops-1) << ", max_applied_seq " << max_applied_seq << " -> "
143            << MAX(op, max_applied_seq) << dendl;
144   --open_ops;
145   assert(open_ops >= 0);
146
147   // signal a blocked commit_start
148   if (blocked) {
149     blocked_cond.Signal();
150   }
151
152   // there can be multiple applies in flight; track the max value we
153   // note.  note that we can't _read_ this value and learn anything
154   // meaningful unless/until we've quiesced all in-flight applies.
155   if (op > max_applied_seq)
156     max_applied_seq = op;
157 }
158
159 uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
160 {
161   lock.Lock();
162   uint64_t op = ++op_seq;
163   dout(10) << "op_submit_start " << op << dendl;
164   return op;
165 }
166
167 void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
168 {
169   dout(10) << "op_submit_finish " << op << dendl;
170   if (op != op_submitted + 1) {
171     dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1)
172             << ", OUT OF ORDER" << dendl;
173     assert(0 == "out of order op_submit_finish");
174   }
175   op_submitted = op;
176   lock.Unlock();
177 }
178
179
180 // ------------------------------------------
181
182 void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
183 {
184   Mutex::Locker l(com_lock);
185   assert(c);
186   commit_waiters[op].push_back(c);
187 }
188
189 bool JournalingObjectStore::ApplyManager::commit_start()
190 {
191   bool ret = false;
192
193   {
194     Mutex::Locker l(apply_lock);
195     dout(10) << "commit_start max_applied_seq " << max_applied_seq
196              << ", open_ops " << open_ops << dendl;
197     blocked = true;
198     while (open_ops > 0) {
199       dout(10) << "commit_start waiting for " << open_ops
200                << " open ops to drain" << dendl;
201       blocked_cond.Wait(apply_lock);
202     }
203     assert(open_ops == 0);
204     dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
205     {
206       Mutex::Locker l(com_lock);
207       if (max_applied_seq == committed_seq) {
208         dout(10) << "commit_start nothing to do" << dendl;
209         blocked = false;
210         assert(commit_waiters.empty());
211         goto out;
212       }
213
214       committing_seq = max_applied_seq;
215
216       dout(10) << "commit_start committing " << committing_seq
217                << ", still blocked" << dendl;
218     }
219   }
220   ret = true;
221
222   if (journal)
223     journal->commit_start(committing_seq);  // tell the journal too
224  out:
225   return ret;
226 }
227
228 void JournalingObjectStore::ApplyManager::commit_started()
229 {
230   Mutex::Locker l(apply_lock);
231   // allow new ops. (underlying fs should now be committing all prior ops)
232   dout(10) << "commit_started committing " << committing_seq << ", unblocking"
233            << dendl;
234   blocked = false;
235   blocked_cond.Signal();
236 }
237
238 void JournalingObjectStore::ApplyManager::commit_finish()
239 {
240   Mutex::Locker l(com_lock);
241   dout(10) << "commit_finish thru " << committing_seq << dendl;
242
243   if (journal)
244     journal->committed_thru(committing_seq);
245
246   committed_seq = committing_seq;
247
248   map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
249   while (p != commit_waiters.end() &&
250     p->first <= committing_seq) {
251     finisher.queue(p->second);
252     commit_waiters.erase(p++);
253   }
254 }
255
256 void JournalingObjectStore::_op_journal_transactions(
257   bufferlist& tbl, uint32_t orig_len, uint64_t op,
258   Context *onjournal, TrackedOpRef osd_op)
259 {
260   if (osd_op.get())
261     dout(10) << "op_journal_transactions " << op << " reqid_t "
262              << (static_cast<OpRequest *>(osd_op.get()))->get_reqid() << dendl;
263   else
264     dout(10) << "op_journal_transactions " << op  << dendl;
265
266   if (journal && journal->is_writeable()) {
267     journal->submit_entry(op, tbl, orig_len, onjournal, osd_op);
268   } else if (onjournal) {
269     apply_manager.add_waiter(op, onjournal);
270   }
271 }