1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectRecorder.h"
5 #include "journal/Future.h"
6 #include "journal/Utils.h"
7 #include "include/assert.h"
8 #include "common/Timer.h"
9 #include "cls/journal/cls_journal_client.h"
11 #define dout_subsys ceph_subsys_journaler
13 #define dout_prefix *_dout << "ObjectRecorder: " << this << " "
15 using namespace cls::journal;
16 using std::shared_ptr;
20 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
21 uint64_t object_number, shared_ptr<Mutex> lock,
22 ContextWQ *work_queue, SafeTimer &timer,
23 Mutex &timer_lock, Handler *handler,
24 uint8_t order, uint32_t flush_interval,
25 uint64_t flush_bytes, double flush_age)
26 : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
27 m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
28 m_timer_lock(timer_lock), m_handler(handler), m_order(order),
29 m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
30 m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
31 m_lock(lock), m_append_tid(0), m_pending_bytes(0),
32 m_size(0), m_overflowed(false), m_object_closed(false),
33 m_in_flight_flushes(false), m_aio_scheduled(false) {
35 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
36 assert(m_handler != NULL);
39 ObjectRecorder::~ObjectRecorder() {
40 assert(m_append_task == NULL);
41 assert(m_append_buffers.empty());
42 assert(m_in_flight_tids.empty());
43 assert(m_in_flight_appends.empty());
44 assert(!m_aio_scheduled);
47 bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
48 assert(m_lock->is_locked());
50 FutureImplPtr last_flushed_future;
51 bool schedule_append = false;
54 m_append_buffers.insert(m_append_buffers.end(),
55 append_buffers.begin(), append_buffers.end());
60 for (AppendBuffers::const_iterator iter = append_buffers.begin();
61 iter != append_buffers.end(); ++iter) {
62 if (append(*iter, &schedule_append)) {
63 last_flushed_future = iter->first;
67 if (last_flushed_future) {
68 flush(last_flushed_future);
72 if (schedule_append) {
73 schedule_append_task();
78 return (!m_object_closed && !m_overflowed &&
79 m_size + m_pending_bytes >= m_soft_max_size);
82 void ObjectRecorder::flush(Context *on_safe) {
83 ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
88 Mutex::Locker locker(*m_lock);
90 // if currently handling flush notifications, wait so that
91 // we notify in the correct order (since lock is dropped on
93 if (m_in_flight_flushes) {
94 m_in_flight_flushes_cond.Wait(*(m_lock.get()));
97 // attach the flush to the most recent append
98 if (!m_append_buffers.empty()) {
99 future = Future(m_append_buffers.rbegin()->first);
102 } else if (!m_in_flight_appends.empty()) {
103 AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
104 assert(!append_buffers.empty());
105 future = Future(append_buffers.rbegin()->first);
109 if (future.is_valid()) {
110 future.flush(on_safe);
112 on_safe->complete(0);
116 void ObjectRecorder::flush(const FutureImplPtr &future) {
117 ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
120 assert(m_lock->is_locked());
122 if (future->get_flush_handler().get() != &m_flush_handler) {
123 // if we don't own this future, re-issue the flush so that it hits the
124 // correct journal object owner
127 } else if (future->is_flush_in_progress()) {
131 if (m_object_closed || m_overflowed) {
135 AppendBuffers::reverse_iterator r_it;
136 for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
138 if (r_it->first == future) {
142 assert(r_it != m_append_buffers.rend());
144 auto it = (++r_it).base();
145 assert(it != m_append_buffers.end());
148 AppendBuffers flush_buffers;
149 flush_buffers.splice(flush_buffers.end(), m_append_buffers,
150 m_append_buffers.begin(), it);
151 send_appends(&flush_buffers);
154 void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
155 ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
157 assert(m_lock->is_locked());
158 assert(m_in_flight_tids.empty());
159 assert(m_in_flight_appends.empty());
160 assert(m_object_closed || m_overflowed);
161 append_buffers->splice(append_buffers->end(), m_append_buffers,
162 m_append_buffers.begin(), m_append_buffers.end());
165 bool ObjectRecorder::close() {
166 assert (m_lock->is_locked());
168 ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
170 cancel_append_task();
174 assert(!m_object_closed);
175 m_object_closed = true;
176 return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled);
179 void ObjectRecorder::handle_append_task() {
180 assert(m_timer_lock.is_locked());
181 m_append_task = NULL;
183 Mutex::Locker locker(*m_lock);
187 void ObjectRecorder::cancel_append_task() {
188 Mutex::Locker locker(m_timer_lock);
189 if (m_append_task != NULL) {
190 m_timer.cancel_event(m_append_task);
191 m_append_task = NULL;
195 void ObjectRecorder::schedule_append_task() {
196 Mutex::Locker locker(m_timer_lock);
197 if (m_append_task == nullptr && m_flush_age > 0) {
198 m_append_task = m_timer.add_event_after(
199 m_flush_age, new FunctionContext([this](int) {
200 handle_append_task();
205 bool ObjectRecorder::append(const AppendBuffer &append_buffer,
206 bool *schedule_append) {
207 assert(m_lock->is_locked());
209 bool flush_requested = false;
210 if (!m_object_closed && !m_overflowed) {
211 flush_requested = append_buffer.first->attach(&m_flush_handler);
214 m_append_buffers.push_back(append_buffer);
215 m_pending_bytes += append_buffer.second.length();
217 if (!flush_appends(false)) {
218 *schedule_append = true;
220 return flush_requested;
223 bool ObjectRecorder::flush_appends(bool force) {
224 assert(m_lock->is_locked());
225 if (m_object_closed || m_overflowed) {
229 if (m_append_buffers.empty() ||
231 m_size + m_pending_bytes < m_soft_max_size &&
232 (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
233 (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
238 AppendBuffers append_buffers;
239 append_buffers.swap(m_append_buffers);
240 send_appends(&append_buffers);
244 void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
245 ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
246 << ", r=" << r << dendl;
248 AppendBuffers append_buffers;
251 auto tid_iter = m_in_flight_tids.find(tid);
252 assert(tid_iter != m_in_flight_tids.end());
253 m_in_flight_tids.erase(tid_iter);
255 InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
256 if (r == -EOVERFLOW || m_overflowed) {
257 if (iter != m_in_flight_appends.end()) {
260 // must have seen an overflow on a previous append op
261 assert(r == -EOVERFLOW && m_overflowed);
264 // notify of overflow once all in-flight ops are complete
265 if (m_in_flight_tids.empty() && !m_aio_scheduled) {
267 notify_handler_unlock();
274 assert(iter != m_in_flight_appends.end());
275 append_buffers.swap(iter->second);
276 assert(!append_buffers.empty());
278 m_in_flight_appends.erase(iter);
279 m_in_flight_flushes = true;
283 // Flag the associated futures as complete.
284 for (AppendBuffers::iterator buf_it = append_buffers.begin();
285 buf_it != append_buffers.end(); ++buf_it) {
286 ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe"
288 buf_it->first->safe(r);
291 // wake up any flush requests that raced with a RADOS callback
293 m_in_flight_flushes = false;
294 m_in_flight_flushes_cond.Signal();
296 if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
297 // all remaining unsent appends should be redirected to new object
298 notify_handler_unlock();
304 void ObjectRecorder::append_overflowed() {
305 ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
308 assert(m_lock->is_locked());
309 assert(!m_in_flight_appends.empty());
311 cancel_append_task();
313 InFlightAppends in_flight_appends;
314 in_flight_appends.swap(m_in_flight_appends);
316 AppendBuffers restart_append_buffers;
317 for (InFlightAppends::iterator it = in_flight_appends.begin();
318 it != in_flight_appends.end(); ++it) {
319 restart_append_buffers.insert(restart_append_buffers.end(),
320 it->second.begin(), it->second.end());
323 restart_append_buffers.splice(restart_append_buffers.end(),
325 m_append_buffers.begin(),
326 m_append_buffers.end());
327 restart_append_buffers.swap(m_append_buffers);
329 for (AppendBuffers::const_iterator it = m_append_buffers.begin();
330 it != m_append_buffers.end(); ++it) {
331 ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
337 void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
338 assert(m_lock->is_locked());
339 assert(!append_buffers->empty());
341 for (AppendBuffers::iterator it = append_buffers->begin();
342 it != append_buffers->end(); ++it) {
343 ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
345 it->first->set_flush_in_progress();
346 m_size += it->second.length();
349 m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
350 append_buffers->begin(), append_buffers->end());
351 if (!m_aio_scheduled) {
352 m_op_work_queue->queue(new FunctionContext([this] (int r) {
355 m_aio_scheduled = true;
359 void ObjectRecorder::send_appends_aio() {
360 AppendBuffers *append_buffers;
363 Mutex::Locker locker(*m_lock);
364 append_tid = m_append_tid++;
365 m_in_flight_tids.insert(append_tid);
367 // safe to hold pointer outside lock until op is submitted
368 append_buffers = &m_in_flight_appends[append_tid];
369 append_buffers->swap(m_pending_buffers);
372 ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
373 << append_tid << dendl;
374 C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
375 C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);
377 librados::ObjectWriteOperation op;
378 client::guard_append(&op, m_soft_max_size);
379 for (AppendBuffers::iterator it = append_buffers->begin();
380 it != append_buffers->end(); ++it) {
381 ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
383 op.append(it->second);
384 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
387 librados::AioCompletion *rados_completion =
388 librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
389 utils::rados_ctx_callback);
390 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
392 rados_completion->release();
396 if (m_pending_buffers.empty()) {
397 m_aio_scheduled = false;
398 if (m_in_flight_appends.empty() && m_object_closed) {
399 // all remaining unsent appends should be redirected to new object
400 notify_handler_unlock();
405 // additional pending items -- reschedule
406 m_op_work_queue->queue(new FunctionContext([this] (int r) {
413 // allow append op to complete
414 gather_ctx->activate();
417 void ObjectRecorder::notify_handler_unlock() {
418 assert(m_lock->is_locked());
419 if (m_object_closed) {
421 m_handler->closed(this);
423 // TODO need to delay completion until after aio_notify completes
425 m_handler->overflow(this);
429 } // namespace journal