Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / JournalRecorder.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/JournalRecorder.h"
5 #include "common/errno.h"
6 #include "journal/Entry.h"
7 #include "journal/Utils.h"
8
9 #include <atomic>
10
11 #define dout_subsys ceph_subsys_journaler
12 #undef dout_prefix
13 #define dout_prefix *_dout << "JournalRecorder: " << this << " "
14
15 using std::shared_ptr;
16
17 namespace journal {
18
19 namespace {
20
21 struct C_Flush : public Context {
22   JournalMetadataPtr journal_metadata;
23   Context *on_finish;
24   std::atomic<int64_t> pending_flushes = { 0 };
25   int ret_val;
26
27   C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
28           size_t _pending_flushes)
29     : journal_metadata(_journal_metadata), on_finish(_on_finish),
30       pending_flushes(_pending_flushes), ret_val(0) {
31   }
32
33   void complete(int r) override {
34     if (r < 0 && ret_val == 0) {
35       ret_val = r;
36     }
37     if (--pending_flushes == 0) {
38       // ensure all prior callback have been flushed as well
39       journal_metadata->queue(on_finish, ret_val);
40       delete this;
41     }
42   }
43   void finish(int r) override {
44   }
45 };
46
47 } // anonymous namespace
48
49 JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
50                                  const std::string &object_oid_prefix,
51                                  const JournalMetadataPtr& journal_metadata,
52                                  uint32_t flush_interval, uint64_t flush_bytes,
53                                  double flush_age)
54   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
55     m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
56     m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
57     m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
58     m_current_set(m_journal_metadata->get_active_set()) {
59
60   Mutex::Locker locker(m_lock);
61   m_ioctx.dup(ioctx);
62   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
63
64   uint8_t splay_width = m_journal_metadata->get_splay_width();
65   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
66     m_object_locks.push_back(shared_ptr<Mutex>(
67                                           new Mutex("ObjectRecorder::m_lock::"+
68                                           std::to_string(splay_offset))));
69     uint64_t object_number = splay_offset + (m_current_set * splay_width);
70     m_object_ptrs[splay_offset] = create_object_recorder(
71                                                 object_number,
72                                                 m_object_locks[splay_offset]);
73   }
74
75   m_journal_metadata->add_listener(&m_listener);
76 }
77
78 JournalRecorder::~JournalRecorder() {
79   m_journal_metadata->remove_listener(&m_listener);
80
81   Mutex::Locker locker(m_lock);
82   assert(m_in_flight_advance_sets == 0);
83   assert(m_in_flight_object_closes == 0);
84 }
85
86 Future JournalRecorder::append(uint64_t tag_tid,
87                                const bufferlist &payload_bl) {
88
89   m_lock.Lock();
90
91   uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
92   uint8_t splay_width = m_journal_metadata->get_splay_width();
93   uint8_t splay_offset = entry_tid % splay_width;
94
95   ObjectRecorderPtr object_ptr = get_object(splay_offset);
96   uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
97     object_ptr->get_object_number(), tag_tid, entry_tid);
98   FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid));
99   future->init(m_prev_future);
100   m_prev_future = future;
101
102   m_object_locks[splay_offset]->Lock();
103   m_lock.Unlock();
104
105   bufferlist entry_bl;
106   ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
107            entry_bl);
108   assert(entry_bl.length() <= m_journal_metadata->get_object_size());
109
110   bool object_full = object_ptr->append_unlock({{future, entry_bl}});
111   if (object_full) {
112     ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
113                      << dendl;
114     Mutex::Locker l(m_lock);
115     close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
116   }
117   return Future(future);
118 }
119
120 void JournalRecorder::flush(Context *on_safe) {
121   C_Flush *ctx;
122   {
123     Mutex::Locker locker(m_lock);
124
125     ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
126     for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
127          it != m_object_ptrs.end(); ++it) {
128       it->second->flush(ctx);
129     }
130
131   }
132
133   // avoid holding the lock in case there is nothing to flush
134   ctx->complete(0);
135 }
136
137 ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
138   assert(m_lock.is_locked());
139
140   ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
141   assert(object_recoder != NULL);
142   return object_recoder;
143 }
144
145 void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
146   assert(m_lock.is_locked());
147
148   // entry overflow from open object
149   if (m_current_set != object_set) {
150     ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
151     return;
152   }
153
154   // we shouldn't overflow upon append if already closed and we
155   // shouldn't receive an overflowed callback if already closed
156   assert(m_in_flight_advance_sets == 0);
157   assert(m_in_flight_object_closes == 0);
158
159   uint64_t active_set = m_journal_metadata->get_active_set();
160   assert(m_current_set == active_set);
161   ++m_current_set;
162   ++m_in_flight_advance_sets;
163
164   ldout(m_cct, 20) << __func__ << ": closing active object set "
165                    << object_set << dendl;
166   if (close_object_set(m_current_set)) {
167     advance_object_set();
168   }
169 }
170
171 void JournalRecorder::advance_object_set() {
172   assert(m_lock.is_locked());
173
174   assert(m_in_flight_object_closes == 0);
175   ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
176                    << dendl;
177   m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
178     this));
179 }
180
181 void JournalRecorder::handle_advance_object_set(int r) {
182   Mutex::Locker locker(m_lock);
183   ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
184
185   assert(m_in_flight_advance_sets > 0);
186   --m_in_flight_advance_sets;
187
188   if (r < 0 && r != -ESTALE) {
189     lderr(m_cct) << __func__ << ": failed to advance object set: "
190                  << cpp_strerror(r) << dendl;
191   }
192
193   if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
194     open_object_set();
195   }
196 }
197
198 void JournalRecorder::open_object_set() {
199   assert(m_lock.is_locked());
200
201   ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
202                    << dendl;
203
204   uint8_t splay_width = m_journal_metadata->get_splay_width();
205
206   lock_object_recorders();
207   for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
208        it != m_object_ptrs.end(); ++it) {
209     ObjectRecorderPtr object_recorder = it->second;
210     uint64_t object_number = object_recorder->get_object_number();
211     if (object_number / splay_width != m_current_set) {
212       assert(object_recorder->is_closed());
213
214       // ready to close object and open object in active set
215       create_next_object_recorder_unlock(object_recorder);
216     } else {
217       uint8_t splay_offset = object_number % splay_width;
218       m_object_locks[splay_offset]->Unlock();
219     }
220   }
221 }
222
223 bool JournalRecorder::close_object_set(uint64_t active_set) {
224   assert(m_lock.is_locked());
225
226   // object recorders will invoke overflow handler as they complete
227   // closing the object to ensure correct order of future appends
228   uint8_t splay_width = m_journal_metadata->get_splay_width();
229   lock_object_recorders();
230   for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
231        it != m_object_ptrs.end(); ++it) {
232     ObjectRecorderPtr object_recorder = it->second;
233     if (object_recorder->get_object_number() / splay_width != active_set) {
234       ldout(m_cct, 10) << __func__ << ": closing object "
235                        << object_recorder->get_oid() << dendl;
236       // flush out all queued appends and hold future appends
237       if (!object_recorder->close()) {
238         ++m_in_flight_object_closes;
239       } else {
240         ldout(m_cct, 20) << __func__ << ": object "
241                          << object_recorder->get_oid() << " closed" << dendl;
242       }
243     }
244   }
245   unlock_object_recorders();
246   return (m_in_flight_object_closes == 0);
247 }
248
249 ObjectRecorderPtr JournalRecorder::create_object_recorder(
250     uint64_t object_number, shared_ptr<Mutex> lock) {
251   ObjectRecorderPtr object_recorder(new ObjectRecorder(
252     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
253     object_number, lock, m_journal_metadata->get_work_queue(),
254     m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
255     &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
256     m_flush_bytes, m_flush_age));
257   return object_recorder;
258 }
259
260 void JournalRecorder::create_next_object_recorder_unlock(
261     ObjectRecorderPtr object_recorder) {
262   assert(m_lock.is_locked());
263
264   uint64_t object_number = object_recorder->get_object_number();
265   uint8_t splay_width = m_journal_metadata->get_splay_width();
266   uint8_t splay_offset = object_number % splay_width;
267
268   assert(m_object_locks[splay_offset]->is_locked());
269
270   ObjectRecorderPtr new_object_recorder = create_object_recorder(
271      (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
272
273   ldout(m_cct, 10) << __func__ << ": "
274                    << "old oid=" << object_recorder->get_oid() << ", "
275                    << "new oid=" << new_object_recorder->get_oid() << dendl;
276   AppendBuffers append_buffers;
277   object_recorder->claim_append_buffers(&append_buffers);
278
279   // update the commit record to point to the correct object number
280   for (auto &append_buffer : append_buffers) {
281     m_journal_metadata->overflow_commit_tid(
282       append_buffer.first->get_commit_tid(),
283       new_object_recorder->get_object_number());
284   }
285
286   new_object_recorder->append_unlock(std::move(append_buffers));
287   m_object_ptrs[splay_offset] = new_object_recorder;
288 }
289
290 void JournalRecorder::handle_update() {
291   Mutex::Locker locker(m_lock);
292
293   uint64_t active_set = m_journal_metadata->get_active_set();
294   if (m_current_set < active_set) {
295     // peer journal client advanced the active set
296     ldout(m_cct, 20) << __func__ << ": "
297                      << "current_set=" << m_current_set << ", "
298                      << "active_set=" << active_set << dendl;
299
300     uint64_t current_set = m_current_set;
301     m_current_set = active_set;
302     if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
303       ldout(m_cct, 20) << __func__ << ": closing current object set "
304                        << current_set << dendl;
305       if (close_object_set(active_set)) {
306         open_object_set();
307       }
308     }
309   }
310 }
311
312 void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
313   ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
314
315   Mutex::Locker locker(m_lock);
316
317   uint64_t object_number = object_recorder->get_object_number();
318   uint8_t splay_width = m_journal_metadata->get_splay_width();
319   uint8_t splay_offset = object_number % splay_width;
320   ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
321   assert(active_object_recorder->get_object_number() == object_number);
322
323   assert(m_in_flight_object_closes > 0);
324   --m_in_flight_object_closes;
325
326   // object closed after advance active set committed
327   ldout(m_cct, 20) << __func__ << ": object "
328                    << active_object_recorder->get_oid() << " closed" << dendl;
329   if (m_in_flight_object_closes == 0) {
330     if (m_in_flight_advance_sets == 0) {
331       // peer forced closing of object set
332       open_object_set();
333     } else {
334       // local overflow advanced object set
335       advance_object_set();
336     }
337   }
338 }
339
340 void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
341   ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
342
343   Mutex::Locker locker(m_lock);
344
345   uint64_t object_number = object_recorder->get_object_number();
346   uint8_t splay_width = m_journal_metadata->get_splay_width();
347   uint8_t splay_offset = object_number % splay_width;
348   ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
349   assert(active_object_recorder->get_object_number() == object_number);
350
351   ldout(m_cct, 20) << __func__ << ": object "
352                    << active_object_recorder->get_oid() << " overflowed"
353                    << dendl;
354   close_and_advance_object_set(object_number / splay_width);
355 }
356
357 } // namespace journal