Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rbd_replay / Replayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "Replayer.hpp"
16 #include "common/errno.h"
17 #include "rbd_replay/ActionTypes.h"
18 #include "rbd_replay/BufferReader.h"
19 #include <boost/foreach.hpp>
20 #include <boost/thread/thread.hpp>
21 #include <boost/scope_exit.hpp>
22 #include <fstream>
23 #include "global/global_context.h"
24 #include "rbd_replay_debug.hpp"
25
26 #define dout_context g_ceph_context
27
28 using namespace std;
29 using namespace rbd_replay;
30
31 namespace {
32
33 bool is_versioned_replay(BufferReader &buffer_reader) {
34   bufferlist::iterator *it;
35   int r = buffer_reader.fetch(&it);
36   if (r < 0) {
37     return false;
38   }
39
40   if (it->get_remaining() < action::BANNER.size()) {
41     return false;
42   }
43
44   std::string banner;
45   it->copy(action::BANNER.size(), banner);
46   bool versioned = (banner == action::BANNER);
47   if (!versioned) {
48     it->seek(0);
49   }
50   return versioned;
51 }
52
53 } // anonymous namespace
54
55 Worker::Worker(Replayer &replayer)
56   : m_replayer(replayer),
57     m_buffer(100),
58     m_done(false) {
59 }
60
61 void Worker::start() {
62   m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Worker::run, this)));
63 }
64
65 // Should only be called by StopThreadAction
66 void Worker::stop() {
67   m_done = true;
68 }
69
70 void Worker::join() {
71   m_thread->join();
72 }
73
74 void Worker::send(Action::ptr action) {
75   assert(action);
76   m_buffer.push_front(action);
77 }
78
79 void Worker::add_pending(PendingIO::ptr io) {
80   assert(io);
81   boost::mutex::scoped_lock lock(m_pending_ios_mutex);
82   assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id());
83   m_pending_ios[io->id()] = io;
84 }
85
86 void Worker::run() {
87   dout(THREAD_LEVEL) << "Worker thread started" << dendl;
88   while (!m_done) {
89     Action::ptr action;
90     m_buffer.pop_back(&action);
91     m_replayer.wait_for_actions(action->predecessors());
92     action->perform(*this);
93     m_replayer.set_action_complete(action->id());
94   }
95   {
96     boost::mutex::scoped_lock lock(m_pending_ios_mutex);
97     bool first_time = true;
98     while (!m_pending_ios.empty()) {
99       if (!first_time) {
100         dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl;
101         pair<action_id_t, PendingIO::ptr> p;
102         BOOST_FOREACH(p, m_pending_ios) {
103           dout(THREAD_LEVEL) << "> " << p.first << dendl;
104         }
105       }
106       m_pending_ios_empty.timed_wait(lock, boost::posix_time::seconds(1));
107       first_time = false;
108     }
109   }
110   dout(THREAD_LEVEL) << "Worker thread stopped" << dendl;
111 }
112
113
114 void Worker::remove_pending(PendingIO::ptr io) {
115   assert(io);
116   m_replayer.set_action_complete(io->id());
117   boost::mutex::scoped_lock lock(m_pending_ios_mutex);
118   size_t num_erased = m_pending_ios.erase(io->id());
119   assertf(num_erased == 1, "id = %d", io->id());
120   if (m_pending_ios.empty()) {
121     m_pending_ios_empty.notify_all();
122   }
123 }
124
125
126 librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) {
127   return m_replayer.get_image(imagectx_id);
128 }
129
130
131 void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) {
132   assert(image);
133   m_replayer.put_image(imagectx_id, image);
134 }
135
136
137 void Worker::erase_image(imagectx_id_t imagectx_id) {
138   m_replayer.erase_image(imagectx_id);
139 }
140
141
142 librbd::RBD* Worker::rbd() {
143   return m_replayer.get_rbd();
144 }
145
146
147 librados::IoCtx* Worker::ioctx() {
148   return m_replayer.get_ioctx();
149 }
150
151 void Worker::set_action_complete(action_id_t id) {
152   m_replayer.set_action_complete(id);
153 }
154
155 bool Worker::readonly() const {
156   return m_replayer.readonly();
157 }
158
159 rbd_loc Worker::map_image_name(string image_name, string snap_name) const {
160   return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name));
161 }
162
163
164 Replayer::Replayer(int num_action_trackers)
165   : m_rbd(NULL), m_ioctx(0),
166     m_latency_multiplier(1.0),
167     m_readonly(false), m_dump_perf_counters(false),
168     m_num_action_trackers(num_action_trackers),
169     m_action_trackers(new action_tracker_d[m_num_action_trackers]) {
170   assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers);
171 }
172
173 Replayer::~Replayer() {
174   delete[] m_action_trackers;
175 }
176
177 Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) {
178   return m_action_trackers[id % m_num_action_trackers];
179 }
180
181 void Replayer::run(const std::string& replay_file) {
182   {
183     librados::Rados rados;
184     rados.init(NULL);
185     int r = rados.init_with_context(g_ceph_context);
186     if (r) {
187       cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl;
188       goto out;
189     }
190     r = rados.connect();
191     if (r) {
192       cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl;
193       goto out;
194     }
195
196     if (m_pool_name.empty()) {
197       r = rados.conf_get("rbd_default_pool", m_pool_name);
198       if (r < 0) {
199         cerr << "Failed to retrieve default pool: " << cpp_strerror(r)
200              << std::endl;
201         goto out;
202       }
203     }
204
205     m_ioctx = new librados::IoCtx();
206     {
207       r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx);
208       if (r) {
209         cerr << "Failed to open pool " << m_pool_name << ": "
210              << cpp_strerror(r) << std::endl;
211         goto out2;
212       }
213       m_rbd = new librbd::RBD();
214       map<thread_id_t, Worker*> workers;
215
216       int fd = open(replay_file.c_str(), O_RDONLY);
217       if (fd < 0) {
218         std::cerr << "Failed to open " << replay_file << ": "
219                   << cpp_strerror(errno) << std::endl;
220         exit(1);
221       }
222       BOOST_SCOPE_EXIT( (fd) ) {
223         close(fd);
224       } BOOST_SCOPE_EXIT_END;
225
226       BufferReader buffer_reader(fd);
227       bool versioned = is_versioned_replay(buffer_reader);
228       while (true) {
229         action::ActionEntry action_entry;
230         try {
231           bufferlist::iterator *it;
232           int r = buffer_reader.fetch(&it);
233           if (r < 0) {
234             std::cerr << "Failed to read from trace file: " << cpp_strerror(r)
235                       << std::endl;
236             exit(-r);
237           }
238           if (it->get_remaining() == 0) {
239             break;
240           }
241
242           if (versioned) {
243             action_entry.decode(*it);
244           } else {
245             action_entry.decode_unversioned(*it);
246           }
247         } catch (const buffer::error &err) {
248           std::cerr << "Failed to decode trace action: " << err.what() << std::endl;
249           exit(1);
250         }
251
252         Action::ptr action = Action::construct(action_entry);
253         if (!action) {
254           // unknown / unsupported action
255           continue;
256         }
257
258         if (action->is_start_thread()) {
259           Worker *worker = new Worker(*this);
260           workers[action->thread_id()] = worker;
261           worker->start();
262         } else {
263           workers[action->thread_id()]->send(action);
264         }
265       }
266
267       dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl;
268       pair<thread_id_t, Worker*> w;
269       BOOST_FOREACH(w, workers) {
270         w.second->join();
271         delete w.second;
272       }
273       clear_images();
274       delete m_rbd;
275       m_rbd = NULL;
276     }
277   out2:
278     delete m_ioctx;
279     m_ioctx = NULL;
280   }
281  out:
282   ;
283 }
284
285
286 librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) {
287   boost::shared_lock<boost::shared_mutex> lock(m_images_mutex);
288   return m_images[imagectx_id];
289 }
290
291 void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) {
292   assert(image);
293   boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
294   assert(m_images.count(imagectx_id) == 0);
295   m_images[imagectx_id] = image;
296 }
297
298 void Replayer::erase_image(imagectx_id_t imagectx_id) {
299   boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
300   librbd::Image* image = m_images[imagectx_id];
301   if (m_dump_perf_counters) {
302     string command = "perf dump";
303     cmdmap_t cmdmap;
304     string format = "json-pretty";
305     bufferlist out;
306     g_ceph_context->do_command(command, cmdmap, format, &out);
307     out.write_stream(cout);
308     cout << std::endl;
309     cout.flush();
310   }
311   delete image;
312   m_images.erase(imagectx_id);
313 }
314
315 void Replayer::set_action_complete(action_id_t id) {
316   dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
317   boost::system_time now(boost::get_system_time());
318   action_tracker_d &tracker = tracker_for(id);
319   boost::unique_lock<boost::shared_mutex> lock(tracker.mutex);
320   assert(tracker.actions.count(id) == 0);
321   tracker.actions[id] = now;
322   tracker.condition.notify_all();
323 }
324
325 bool Replayer::is_action_complete(action_id_t id) {
326   action_tracker_d &tracker = tracker_for(id);
327   boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
328   return tracker.actions.count(id) > 0;
329 }
330
331 void Replayer::wait_for_actions(const action::Dependencies &deps) {
332   boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
333   BOOST_FOREACH(const action::Dependency &dep, deps) {
334     dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
335     boost::system_time start_time(boost::get_system_time());
336     action_tracker_d &tracker = tracker_for(dep.id);
337     boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
338     bool first_time = true;
339     while (tracker.actions.count(dep.id) == 0) {
340       if (!first_time) {
341         dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
342       }
343       tracker.condition.timed_wait(lock, boost::posix_time::seconds(1));
344       first_time = false;
345     }
346     boost::system_time action_completed_time(tracker.actions[dep.id]);
347     lock.unlock();
348     boost::system_time end_time(boost::get_system_time());
349     long long micros = (end_time - start_time).total_microseconds();
350     dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl;
351     // Apparently the nanoseconds constructor is optional:
352     // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
353     boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000));
354     if (sub_release_time > release_time) {
355       release_time = sub_release_time;
356     }
357   }
358   if (release_time > boost::get_system_time()) {
359     dout(SLEEP_LEVEL) << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << dendl;
360     boost::this_thread::sleep(release_time);
361   }
362 }
363
364 void Replayer::clear_images() {
365   boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
366   if (m_dump_perf_counters && !m_images.empty()) {
367     string command = "perf dump";
368     cmdmap_t cmdmap;
369     string format = "json-pretty";
370     bufferlist out;
371     g_ceph_context->do_command(command, cmdmap, format, &out);
372     out.write_stream(cout);
373     cout << std::endl;
374     cout.flush();
375   }
376   pair<imagectx_id_t, librbd::Image*> p;
377   BOOST_FOREACH(p, m_images) {
378     delete p.second;
379   }
380   m_images.clear();
381 }
382
383 void Replayer::set_latency_multiplier(float f) {
384   assertf(f >= 0, "f = %f", f);
385   m_latency_multiplier = f;
386 }
387
388 bool Replayer::readonly() const {
389   return m_readonly;
390 }
391
392 void Replayer::set_readonly(bool readonly) {
393   m_readonly = readonly;
394 }
395
396 string Replayer::pool_name() const {
397   return m_pool_name;
398 }
399
400 void Replayer::set_pool_name(string pool_name) {
401   m_pool_name = pool_name;
402 }