Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / ObjectRecorder.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectRecorder.h"
5 #include "journal/Future.h"
6 #include "journal/Utils.h"
7 #include "include/assert.h"
8 #include "common/Timer.h"
9 #include "cls/journal/cls_journal_client.h"
10
11 #define dout_subsys ceph_subsys_journaler
12 #undef dout_prefix
13 #define dout_prefix *_dout << "ObjectRecorder: " << this << " "
14
15 using namespace cls::journal;
16 using std::shared_ptr;
17
18 namespace journal {
19
20 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
21                                uint64_t object_number, shared_ptr<Mutex> lock,
22                                ContextWQ *work_queue, SafeTimer &timer,
23                                Mutex &timer_lock, Handler *handler,
24                                uint8_t order, uint32_t flush_interval,
25                                uint64_t flush_bytes, double flush_age)
26   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
27     m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
28     m_timer_lock(timer_lock), m_handler(handler), m_order(order),
29     m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
30     m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
31     m_lock(lock), m_append_tid(0), m_pending_bytes(0),
32     m_size(0), m_overflowed(false), m_object_closed(false),
33     m_in_flight_flushes(false), m_aio_scheduled(false) {
34   m_ioctx.dup(ioctx);
35   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
36   assert(m_handler != NULL);
37 }
38
39 ObjectRecorder::~ObjectRecorder() {
40   assert(m_append_task == NULL);
41   assert(m_append_buffers.empty());
42   assert(m_in_flight_tids.empty());
43   assert(m_in_flight_appends.empty());
44   assert(!m_aio_scheduled);
45 }
46
47 bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
48   assert(m_lock->is_locked());
49
50   FutureImplPtr last_flushed_future;
51   bool schedule_append = false;
52
53   if (m_overflowed) {
54     m_append_buffers.insert(m_append_buffers.end(),
55                             append_buffers.begin(), append_buffers.end());
56     m_lock->Unlock();
57     return false;
58   }
59
60   for (AppendBuffers::const_iterator iter = append_buffers.begin();
61        iter != append_buffers.end(); ++iter) {
62     if (append(*iter, &schedule_append)) {
63       last_flushed_future = iter->first;
64     }
65   }
66
67   if (last_flushed_future) {
68     flush(last_flushed_future);
69     m_lock->Unlock();
70   } else {
71     m_lock->Unlock();
72     if (schedule_append) {
73       schedule_append_task();
74     } else {
75       cancel_append_task();
76     }
77   }
78   return (!m_object_closed && !m_overflowed &&
79           m_size + m_pending_bytes >= m_soft_max_size);
80 }
81
82 void ObjectRecorder::flush(Context *on_safe) {
83   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
84
85   cancel_append_task();
86   Future future;
87   {
88     Mutex::Locker locker(*m_lock);
89
90     // if currently handling flush notifications, wait so that
91     // we notify in the correct order (since lock is dropped on
92     // callback)
93     if (m_in_flight_flushes) {
94       m_in_flight_flushes_cond.Wait(*(m_lock.get()));
95     }
96
97     // attach the flush to the most recent append
98     if (!m_append_buffers.empty()) {
99       future = Future(m_append_buffers.rbegin()->first);
100
101       flush_appends(true);
102     } else if (!m_in_flight_appends.empty()) {
103       AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
104       assert(!append_buffers.empty());
105       future = Future(append_buffers.rbegin()->first);
106     }
107   }
108
109   if (future.is_valid()) {
110     future.flush(on_safe);
111   } else {
112     on_safe->complete(0);
113   }
114 }
115
116 void ObjectRecorder::flush(const FutureImplPtr &future) {
117   ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
118                    << dendl;
119
120   assert(m_lock->is_locked());
121
122   if (future->get_flush_handler().get() != &m_flush_handler) {
123     // if we don't own this future, re-issue the flush so that it hits the
124     // correct journal object owner
125     future->flush();
126     return;
127   } else if (future->is_flush_in_progress()) {
128     return;
129   }
130
131   if (m_object_closed || m_overflowed) {
132     return;
133   }
134
135   AppendBuffers::reverse_iterator r_it;
136   for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
137        ++r_it) {
138     if (r_it->first == future) {
139       break;
140     }
141   }
142   assert(r_it != m_append_buffers.rend());
143
144   auto it = (++r_it).base();
145   assert(it != m_append_buffers.end());
146   ++it;
147
148   AppendBuffers flush_buffers;
149   flush_buffers.splice(flush_buffers.end(), m_append_buffers,
150                        m_append_buffers.begin(), it);
151   send_appends(&flush_buffers);
152 }
153
154 void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
155   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
156
157   assert(m_lock->is_locked());
158   assert(m_in_flight_tids.empty());
159   assert(m_in_flight_appends.empty());
160   assert(m_object_closed || m_overflowed);
161   append_buffers->splice(append_buffers->end(), m_append_buffers,
162                          m_append_buffers.begin(), m_append_buffers.end());
163 }
164
165 bool ObjectRecorder::close() {
166   assert (m_lock->is_locked());
167
168   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
169
170   cancel_append_task();
171
172   flush_appends(true);
173
174   assert(!m_object_closed);
175   m_object_closed = true;
176   return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled);
177 }
178
179 void ObjectRecorder::handle_append_task() {
180   assert(m_timer_lock.is_locked());
181   m_append_task = NULL;
182
183   Mutex::Locker locker(*m_lock);
184   flush_appends(true);
185 }
186
187 void ObjectRecorder::cancel_append_task() {
188   Mutex::Locker locker(m_timer_lock);
189   if (m_append_task != NULL) {
190     m_timer.cancel_event(m_append_task);
191     m_append_task = NULL;
192   }
193 }
194
195 void ObjectRecorder::schedule_append_task() {
196   Mutex::Locker locker(m_timer_lock);
197   if (m_append_task == nullptr && m_flush_age > 0) {
198     m_append_task = m_timer.add_event_after(
199       m_flush_age, new FunctionContext([this](int) {
200           handle_append_task();
201         }));
202   }
203 }
204
205 bool ObjectRecorder::append(const AppendBuffer &append_buffer,
206                             bool *schedule_append) {
207   assert(m_lock->is_locked());
208
209   bool flush_requested = false;
210   if (!m_object_closed && !m_overflowed) {
211     flush_requested = append_buffer.first->attach(&m_flush_handler);
212   }
213
214   m_append_buffers.push_back(append_buffer);
215   m_pending_bytes += append_buffer.second.length();
216
217   if (!flush_appends(false)) {
218     *schedule_append = true;
219   }
220   return flush_requested;
221 }
222
223 bool ObjectRecorder::flush_appends(bool force) {
224   assert(m_lock->is_locked());
225   if (m_object_closed || m_overflowed) {
226     return true;
227   }
228
229   if (m_append_buffers.empty() ||
230       (!force &&
231        m_size + m_pending_bytes < m_soft_max_size &&
232        (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
233        (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
234     return false;
235   }
236
237   m_pending_bytes = 0;
238   AppendBuffers append_buffers;
239   append_buffers.swap(m_append_buffers);
240   send_appends(&append_buffers);
241   return true;
242 }
243
244 void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
245   ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
246                    << ", r=" << r << dendl;
247
248   AppendBuffers append_buffers;
249   {
250     m_lock->Lock();
251     auto tid_iter = m_in_flight_tids.find(tid);
252     assert(tid_iter != m_in_flight_tids.end());
253     m_in_flight_tids.erase(tid_iter);
254
255     InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
256     if (r == -EOVERFLOW || m_overflowed) {
257       if (iter != m_in_flight_appends.end()) {
258         m_overflowed = true;
259       } else {
260         // must have seen an overflow on a previous append op
261         assert(r == -EOVERFLOW && m_overflowed);
262       }
263
264       // notify of overflow once all in-flight ops are complete
265       if (m_in_flight_tids.empty() && !m_aio_scheduled) {
266         append_overflowed();
267         notify_handler_unlock();
268       } else {
269         m_lock->Unlock();
270       }
271       return;
272     }
273
274     assert(iter != m_in_flight_appends.end());
275     append_buffers.swap(iter->second);
276     assert(!append_buffers.empty());
277
278     m_in_flight_appends.erase(iter);
279     m_in_flight_flushes = true;
280     m_lock->Unlock();
281   }
282
283   // Flag the associated futures as complete.
284   for (AppendBuffers::iterator buf_it = append_buffers.begin();
285        buf_it != append_buffers.end(); ++buf_it) {
286     ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe"
287                      << dendl;
288     buf_it->first->safe(r);
289   }
290
291   // wake up any flush requests that raced with a RADOS callback
292   m_lock->Lock();
293   m_in_flight_flushes = false;
294   m_in_flight_flushes_cond.Signal();
295
296   if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
297     // all remaining unsent appends should be redirected to new object
298     notify_handler_unlock();
299   } else {
300     m_lock->Unlock();
301   }
302 }
303
304 void ObjectRecorder::append_overflowed() {
305   ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
306                    << dendl;
307
308   assert(m_lock->is_locked());
309   assert(!m_in_flight_appends.empty());
310
311   cancel_append_task();
312
313   InFlightAppends in_flight_appends;
314   in_flight_appends.swap(m_in_flight_appends);
315
316   AppendBuffers restart_append_buffers;
317   for (InFlightAppends::iterator it = in_flight_appends.begin();
318        it != in_flight_appends.end(); ++it) {
319     restart_append_buffers.insert(restart_append_buffers.end(),
320                                   it->second.begin(), it->second.end());
321   }
322
323   restart_append_buffers.splice(restart_append_buffers.end(),
324                                 m_append_buffers,
325                                 m_append_buffers.begin(),
326                                 m_append_buffers.end());
327   restart_append_buffers.swap(m_append_buffers);
328
329   for (AppendBuffers::const_iterator it = m_append_buffers.begin();
330        it != m_append_buffers.end(); ++it) {
331     ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
332                      << dendl;
333     it->first->detach();
334   }
335 }
336
337 void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
338   assert(m_lock->is_locked());
339   assert(!append_buffers->empty());
340
341   for (AppendBuffers::iterator it = append_buffers->begin();
342        it != append_buffers->end(); ++it) {
343     ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
344                      << dendl;
345     it->first->set_flush_in_progress();
346     m_size += it->second.length();
347   }
348
349   m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
350                            append_buffers->begin(), append_buffers->end());
351   if (!m_aio_scheduled) {
352     m_op_work_queue->queue(new FunctionContext([this] (int r) {
353         send_appends_aio();
354     }));
355     m_aio_scheduled = true;
356   }
357 }
358
359 void ObjectRecorder::send_appends_aio() {
360   AppendBuffers *append_buffers;
361   uint64_t append_tid;
362   {
363     Mutex::Locker locker(*m_lock);
364     append_tid = m_append_tid++;
365     m_in_flight_tids.insert(append_tid);
366
367     // safe to hold pointer outside lock until op is submitted
368     append_buffers = &m_in_flight_appends[append_tid];
369     append_buffers->swap(m_pending_buffers);
370   }
371
372   ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
373                    << append_tid << dendl;
374   C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
375   C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);
376
377   librados::ObjectWriteOperation op;
378   client::guard_append(&op, m_soft_max_size);
379   for (AppendBuffers::iterator it = append_buffers->begin();
380        it != append_buffers->end(); ++it) {
381     ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
382                      << dendl;
383     op.append(it->second);
384     op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
385   }
386
387   librados::AioCompletion *rados_completion =
388     librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
389                                            utils::rados_ctx_callback);
390   int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
391   assert(r == 0);
392   rados_completion->release();
393
394   {
395     m_lock->Lock();
396     if (m_pending_buffers.empty()) {
397       m_aio_scheduled = false;
398       if (m_in_flight_appends.empty() && m_object_closed) {
399         // all remaining unsent appends should be redirected to new object
400         notify_handler_unlock();
401       } else {
402         m_lock->Unlock();
403       }
404     } else {
405       // additional pending items -- reschedule
406       m_op_work_queue->queue(new FunctionContext([this] (int r) {
407           send_appends_aio();
408         }));
409       m_lock->Unlock();
410     }
411   }
412
413   // allow append op to complete
414   gather_ctx->activate();
415 }
416
417 void ObjectRecorder::notify_handler_unlock() {
418   assert(m_lock->is_locked());
419   if (m_object_closed) {
420     m_lock->Unlock();
421     m_handler->closed(this);
422   } else {
423     // TODO need to delay completion until after aio_notify completes
424     m_lock->Unlock();
425     m_handler->overflow(this);
426   }
427 }
428
429 } // namespace journal