X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frbd_replay%2FReplayer.cc;fp=src%2Fceph%2Fsrc%2Frbd_replay%2FReplayer.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=ca1eaba84416fd74a4d4ad009a35b7be104b33e0;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rbd_replay/Replayer.cc b/src/ceph/src/rbd_replay/Replayer.cc deleted file mode 100644 index ca1eaba..0000000 --- a/src/ceph/src/rbd_replay/Replayer.cc +++ /dev/null @@ -1,402 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 Adam Crume - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "Replayer.hpp" -#include "common/errno.h" -#include "rbd_replay/ActionTypes.h" -#include "rbd_replay/BufferReader.h" -#include -#include -#include -#include -#include "global/global_context.h" -#include "rbd_replay_debug.hpp" - -#define dout_context g_ceph_context - -using namespace std; -using namespace rbd_replay; - -namespace { - -bool is_versioned_replay(BufferReader &buffer_reader) { - bufferlist::iterator *it; - int r = buffer_reader.fetch(&it); - if (r < 0) { - return false; - } - - if (it->get_remaining() < action::BANNER.size()) { - return false; - } - - std::string banner; - it->copy(action::BANNER.size(), banner); - bool versioned = (banner == action::BANNER); - if (!versioned) { - it->seek(0); - } - return versioned; -} - -} // anonymous namespace - -Worker::Worker(Replayer &replayer) - : m_replayer(replayer), - m_buffer(100), - m_done(false) { -} - -void Worker::start() { - m_thread = boost::shared_ptr(new boost::thread(boost::bind(&Worker::run, this))); -} - -// Should only be called by StopThreadAction -void Worker::stop() { - m_done = true; -} - -void Worker::join() { - m_thread->join(); -} - -void Worker::send(Action::ptr action) { - assert(action); - m_buffer.push_front(action); -} - -void Worker::add_pending(PendingIO::ptr io) { - assert(io); - boost::mutex::scoped_lock lock(m_pending_ios_mutex); - assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id()); - m_pending_ios[io->id()] = io; -} - -void Worker::run() { - dout(THREAD_LEVEL) << "Worker thread started" << dendl; - while (!m_done) { - Action::ptr action; - m_buffer.pop_back(&action); - m_replayer.wait_for_actions(action->predecessors()); - action->perform(*this); - m_replayer.set_action_complete(action->id()); - } - { - boost::mutex::scoped_lock lock(m_pending_ios_mutex); - bool first_time = true; - while (!m_pending_ios.empty()) { - if (!first_time) { - dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl; - pair p; - BOOST_FOREACH(p, m_pending_ios) { - dout(THREAD_LEVEL) << "> " << p.first << dendl; - } - } - m_pending_ios_empty.timed_wait(lock, boost::posix_time::seconds(1)); - first_time = false; - } - } - dout(THREAD_LEVEL) << "Worker thread stopped" << dendl; -} - - -void Worker::remove_pending(PendingIO::ptr io) { - assert(io); - m_replayer.set_action_complete(io->id()); - boost::mutex::scoped_lock lock(m_pending_ios_mutex); - size_t num_erased = m_pending_ios.erase(io->id()); - assertf(num_erased == 1, "id = %d", io->id()); - if (m_pending_ios.empty()) { - m_pending_ios_empty.notify_all(); - } -} - - -librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) { - return m_replayer.get_image(imagectx_id); -} - - -void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) { - assert(image); - m_replayer.put_image(imagectx_id, image); -} - - -void Worker::erase_image(imagectx_id_t imagectx_id) { - m_replayer.erase_image(imagectx_id); -} - - -librbd::RBD* Worker::rbd() { - return m_replayer.get_rbd(); -} - - -librados::IoCtx* Worker::ioctx() { - return m_replayer.get_ioctx(); -} - -void Worker::set_action_complete(action_id_t id) { - m_replayer.set_action_complete(id); -} - -bool Worker::readonly() const { - return m_replayer.readonly(); -} - -rbd_loc Worker::map_image_name(string image_name, string snap_name) const { - return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name)); -} - - -Replayer::Replayer(int num_action_trackers) - : m_rbd(NULL), m_ioctx(0), - m_latency_multiplier(1.0), - m_readonly(false), m_dump_perf_counters(false), - m_num_action_trackers(num_action_trackers), - m_action_trackers(new action_tracker_d[m_num_action_trackers]) { - assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers); -} - -Replayer::~Replayer() { - delete[] m_action_trackers; -} - -Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) { - return m_action_trackers[id % m_num_action_trackers]; -} - -void Replayer::run(const std::string& replay_file) { - { - librados::Rados rados; - rados.init(NULL); - int r = rados.init_with_context(g_ceph_context); - if (r) { - cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl; - goto out; - } - r = rados.connect(); - if (r) { - cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl; - goto out; - } - - if (m_pool_name.empty()) { - r = rados.conf_get("rbd_default_pool", m_pool_name); - if (r < 0) { - cerr << "Failed to retrieve default pool: " << cpp_strerror(r) - << std::endl; - goto out; - } - } - - m_ioctx = new librados::IoCtx(); - { - r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx); - if (r) { - cerr << "Failed to open pool " << m_pool_name << ": " - << cpp_strerror(r) << std::endl; - goto out2; - } - m_rbd = new librbd::RBD(); - map workers; - - int fd = open(replay_file.c_str(), O_RDONLY); - if (fd < 0) { - std::cerr << "Failed to open " << replay_file << ": " - << cpp_strerror(errno) << std::endl; - exit(1); - } - BOOST_SCOPE_EXIT( (fd) ) { - close(fd); - } BOOST_SCOPE_EXIT_END; - - BufferReader buffer_reader(fd); - bool versioned = is_versioned_replay(buffer_reader); - while (true) { - action::ActionEntry action_entry; - try { - bufferlist::iterator *it; - int r = buffer_reader.fetch(&it); - if (r < 0) { - std::cerr << "Failed to read from trace file: " << cpp_strerror(r) - << std::endl; - exit(-r); - } - if (it->get_remaining() == 0) { - break; - } - - if (versioned) { - action_entry.decode(*it); - } else { - action_entry.decode_unversioned(*it); - } - } catch (const buffer::error &err) { - std::cerr << "Failed to decode trace action: " << err.what() << std::endl; - exit(1); - } - - Action::ptr action = Action::construct(action_entry); - if (!action) { - // unknown / unsupported action - continue; - } - - if (action->is_start_thread()) { - Worker *worker = new Worker(*this); - workers[action->thread_id()] = worker; - worker->start(); - } else { - workers[action->thread_id()]->send(action); - } - } - - dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl; - pair w; - BOOST_FOREACH(w, workers) { - w.second->join(); - delete w.second; - } - clear_images(); - delete m_rbd; - m_rbd = NULL; - } - out2: - delete m_ioctx; - m_ioctx = NULL; - } - out: - ; -} - - -librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) { - boost::shared_lock lock(m_images_mutex); - return m_images[imagectx_id]; -} - -void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) { - assert(image); - boost::unique_lock lock(m_images_mutex); - assert(m_images.count(imagectx_id) == 0); - m_images[imagectx_id] = image; -} - -void Replayer::erase_image(imagectx_id_t imagectx_id) { - boost::unique_lock lock(m_images_mutex); - librbd::Image* image = m_images[imagectx_id]; - if (m_dump_perf_counters) { - string command = "perf dump"; - cmdmap_t cmdmap; - string format = "json-pretty"; - bufferlist out; - g_ceph_context->do_command(command, cmdmap, format, &out); - out.write_stream(cout); - cout << std::endl; - cout.flush(); - } - delete image; - m_images.erase(imagectx_id); -} - -void Replayer::set_action_complete(action_id_t id) { - dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl; - boost::system_time now(boost::get_system_time()); - action_tracker_d &tracker = tracker_for(id); - boost::unique_lock lock(tracker.mutex); - assert(tracker.actions.count(id) == 0); - tracker.actions[id] = now; - tracker.condition.notify_all(); -} - -bool Replayer::is_action_complete(action_id_t id) { - action_tracker_d &tracker = tracker_for(id); - boost::shared_lock lock(tracker.mutex); - return tracker.actions.count(id) > 0; -} - -void Replayer::wait_for_actions(const action::Dependencies &deps) { - boost::posix_time::ptime release_time(boost::posix_time::neg_infin); - BOOST_FOREACH(const action::Dependency &dep, deps) { - dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl; - boost::system_time start_time(boost::get_system_time()); - action_tracker_d &tracker = tracker_for(dep.id); - boost::shared_lock lock(tracker.mutex); - bool first_time = true; - while (tracker.actions.count(dep.id) == 0) { - if (!first_time) { - dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl; - } - tracker.condition.timed_wait(lock, boost::posix_time::seconds(1)); - first_time = false; - } - boost::system_time action_completed_time(tracker.actions[dep.id]); - lock.unlock(); - boost::system_time end_time(boost::get_system_time()); - long long micros = (end_time - start_time).total_microseconds(); - dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl; - // Apparently the nanoseconds constructor is optional: - // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options - boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000)); - if (sub_release_time > release_time) { - release_time = sub_release_time; - } - } - if (release_time > boost::get_system_time()) { - dout(SLEEP_LEVEL) << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << dendl; - boost::this_thread::sleep(release_time); - } -} - -void Replayer::clear_images() { - boost::unique_lock lock(m_images_mutex); - if (m_dump_perf_counters && !m_images.empty()) { - string command = "perf dump"; - cmdmap_t cmdmap; - string format = "json-pretty"; - bufferlist out; - g_ceph_context->do_command(command, cmdmap, format, &out); - out.write_stream(cout); - cout << std::endl; - cout.flush(); - } - pair p; - BOOST_FOREACH(p, m_images) { - delete p.second; - } - m_images.clear(); -} - -void Replayer::set_latency_multiplier(float f) { - assertf(f >= 0, "f = %f", f); - m_latency_multiplier = f; -} - -bool Replayer::readonly() const { - return m_readonly; -} - -void Replayer::set_readonly(bool readonly) { - m_readonly = readonly; -} - -string Replayer::pool_name() const { - return m_pool_name; -} - -void Replayer::set_pool_name(string pool_name) { - m_pool_name = pool_name; -}