X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FTimer.cc;fp=src%2Fceph%2Fsrc%2Fcommon%2FTimer.cc;h=45305f553fa6c80ed45e23fd94a36ff598b68444;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/common/Timer.cc b/src/ceph/src/common/Timer.cc new file mode 100644 index 0000000..45305f5 --- /dev/null +++ b/src/ceph/src/common/Timer.cc @@ -0,0 +1,193 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Cond.h" +#include "Timer.h" + + +#define dout_subsys ceph_subsys_timer +#undef dout_prefix +#define dout_prefix *_dout << "timer(" << this << ")." + + + +class SafeTimerThread : public Thread { + SafeTimer *parent; +public: + explicit SafeTimerThread(SafeTimer *s) : parent(s) {} + void *entry() override { + parent->timer_thread(); + return NULL; + } +}; + + + +typedef std::multimap < utime_t, Context *> scheduled_map_t; +typedef std::map < Context*, scheduled_map_t::iterator > event_lookup_map_t; + +SafeTimer::SafeTimer(CephContext *cct_, Mutex &l, bool safe_callbacks) + : cct(cct_), lock(l), + safe_callbacks(safe_callbacks), + thread(NULL), + stopping(false) +{ +} + +SafeTimer::~SafeTimer() +{ + assert(thread == NULL); +} + +void SafeTimer::init() +{ + ldout(cct,10) << "init" << dendl; + thread = new SafeTimerThread(this); + thread->create("safe_timer"); +} + +void SafeTimer::shutdown() +{ + ldout(cct,10) << "shutdown" << dendl; + if (thread) { + assert(lock.is_locked()); + cancel_all_events(); + stopping = true; + cond.Signal(); + lock.Unlock(); + thread->join(); + lock.Lock(); + delete thread; + thread = NULL; + } +} + +void SafeTimer::timer_thread() +{ + lock.Lock(); + ldout(cct,10) << "timer_thread starting" << dendl; + while (!stopping) { + utime_t now = ceph_clock_now(); + + while (!schedule.empty()) { + scheduled_map_t::iterator p = schedule.begin(); + + // is the future now? + if (p->first > now) + break; + + Context *callback = p->second; + events.erase(callback); + schedule.erase(p); + ldout(cct,10) << "timer_thread executing " << callback << dendl; + + if (!safe_callbacks) + lock.Unlock(); + callback->complete(0); + if (!safe_callbacks) + lock.Lock(); + } + + // recheck stopping if we dropped the lock + if (!safe_callbacks && stopping) + break; + + ldout(cct,20) << "timer_thread going to sleep" << dendl; + if (schedule.empty()) + cond.Wait(lock); + else + cond.WaitUntil(lock, schedule.begin()->first); + ldout(cct,20) << "timer_thread awake" << dendl; + } + ldout(cct,10) << "timer_thread exiting" << dendl; + lock.Unlock(); +} + +Context* SafeTimer::add_event_after(double seconds, Context *callback) +{ + assert(lock.is_locked()); + + utime_t when = ceph_clock_now(); + when += seconds; + return add_event_at(when, callback); +} + +Context* SafeTimer::add_event_at(utime_t when, Context *callback) +{ + assert(lock.is_locked()); + ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl; + if (stopping) { + ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl; + delete callback; + return nullptr; + } + scheduled_map_t::value_type s_val(when, callback); + scheduled_map_t::iterator i = schedule.insert(s_val); + + event_lookup_map_t::value_type e_val(callback, i); + pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val)); + + /* If you hit this, you tried to insert the same Context* twice. */ + assert(rval.second); + + /* If the event we have just inserted comes before everything else, we need to + * adjust our timeout. */ + if (i == schedule.begin()) + cond.Signal(); + return callback; +} + +bool SafeTimer::cancel_event(Context *callback) +{ + assert(lock.is_locked()); + + auto p = events.find(callback); + if (p == events.end()) { + ldout(cct,10) << "cancel_event " << callback << " not found" << dendl; + return false; + } + + ldout(cct,10) << "cancel_event " << p->second->first << " -> " << callback << dendl; + delete p->first; + + schedule.erase(p->second); + events.erase(p); + return true; +} + +void SafeTimer::cancel_all_events() +{ + ldout(cct,10) << "cancel_all_events" << dendl; + assert(lock.is_locked()); + + while (!events.empty()) { + auto p = events.begin(); + ldout(cct,10) << " cancelled " << p->second->first << " -> " << p->first << dendl; + delete p->first; + schedule.erase(p->second); + events.erase(p); + } +} + +void SafeTimer::dump(const char *caller) const +{ + if (!caller) + caller = ""; + ldout(cct,10) << "dump " << caller << dendl; + + for (scheduled_map_t::const_iterator s = schedule.begin(); + s != schedule.end(); + ++s) + ldout(cct,10) << " " << s->first << "->" << s->second << dendl; +}