Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rbd_replay / rbd-replay-prep.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 // This code assumes that IO IDs and timestamps are related monotonically.
16 // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
17
18 #include "common/errno.h"
19 #include "rbd_replay/ActionTypes.h"
20 #include <babeltrace/babeltrace.h>
21 #include <babeltrace/ctf/events.h>
22 #include <babeltrace/ctf/iterator.h>
23 #include <sys/types.h>
24 #include <fcntl.h>
25 #include <cstdlib>
26 #include <string>
27 #include <assert.h>
28 #include <fstream>
29 #include <set>
30 #include <boost/thread/thread.hpp>
31 #include <boost/scope_exit.hpp>
32 #include "ios.hpp"
33
34 using namespace std;
35 using namespace rbd_replay;
36
37 #define ASSERT_EXIT(check, str)    \
38   if (!(check)) {                  \
39     std::cerr << str << std::endl; \
40     exit(1);                       \
41   }
42
43 class Thread {
44 public:
45   typedef boost::shared_ptr<Thread> ptr;
46
47   Thread(thread_id_t id,
48          uint64_t window)
49     : m_id(id),
50       m_window(window),
51       m_latest_io(IO::ptr()),
52       m_max_ts(0) {
53   }
54
55   void insert_ts(uint64_t ts) {
56     if (m_max_ts == 0 || ts > m_max_ts) {
57       m_max_ts = ts;
58     }
59   }
60
61   uint64_t max_ts() const {
62     return m_max_ts;
63   }
64
65   void issued_io(IO::ptr io, std::set<IO::ptr> *latest_ios) {
66     assert(io);
67     if (m_latest_io.get() != NULL) {
68       latest_ios->erase(m_latest_io);
69     }
70     m_latest_io = io;
71     latest_ios->insert(io);
72   }
73
74   thread_id_t id() const {
75     return m_id;
76   }
77
78   IO::ptr latest_io() {
79     return m_latest_io;
80   }
81
82 private:
83   thread_id_t m_id;
84   uint64_t m_window;
85   IO::ptr m_latest_io;
86   uint64_t m_max_ts;
87 };
88
89 class AnonymizedImage {
90 public:
91   void init(string image_name, int index) {
92     assert(m_image_name == "");
93     m_image_name = image_name;
94     ostringstream oss;
95     oss << "image" << index;
96     m_anonymized_image_name = oss.str();
97   }
98
99   string image_name() const {
100     return m_image_name;
101   }
102
103   pair<string, string> anonymize(string snap_name) {
104     if (snap_name == "") {
105       return pair<string, string>(m_anonymized_image_name, "");
106     }
107     string& anonymized_snap_name(m_snaps[snap_name]);
108     if (anonymized_snap_name == "") {
109       ostringstream oss;
110       oss << "snap" << m_snaps.size();
111       anonymized_snap_name = oss.str();
112     }
113     return pair<string, string>(m_anonymized_image_name, anonymized_snap_name);
114   }
115
116 private:
117   string m_image_name;
118   string m_anonymized_image_name;
119   map<string, string> m_snaps;
120 };
121
122 static void usage(string prog) {
123   std::stringstream str;
124   str << "Usage: " << prog << " ";
125   std::cout << str.str() << "[ --window <seconds> ] [ --anonymize ] [ --verbose ]" << std::endl
126             << std::string(str.str().size(), ' ') << "<trace-input> <replay-output>" << endl;
127 }
128
129 __attribute__((noreturn)) static void usage_exit(string prog, string msg) {
130   cerr << msg << endl;
131   usage(prog);
132   exit(1);
133 }
134
135 class Processor {
136 public:
137   Processor()
138     : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second
139       m_io_count(0),
140       m_anonymize(false),
141       m_verbose(false) {
142   }
143
144   void run(vector<string> args) {
145     string input_file_name;
146     string output_file_name;
147     bool got_input = false;
148     bool got_output = false;
149     for (int i = 1, nargs = args.size(); i < nargs; i++) {
150       const string& arg(args[i]);
151       if (arg == "--window") {
152         if (i == nargs - 1) {
153           usage_exit(args[0], "--window requires an argument");
154         }
155         m_window = (uint64_t)(1e9 * atof(args[++i].c_str()));
156       } else if (arg.compare(0, 9, "--window=") == 0) {
157         m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window=")));
158       } else if (arg == "--anonymize") {
159         m_anonymize = true;
160       } else if (arg == "--verbose") {
161         m_verbose = true;
162       } else if (arg == "-h" || arg == "--help") {
163         usage(args[0]);
164         exit(0);
165       } else if (arg.compare(0, 1, "-") == 0) {
166         usage_exit(args[0], "Unrecognized argument: " + arg);
167       } else if (!got_input) {
168         input_file_name = arg;
169         got_input = true;
170       } else if (!got_output) {
171         output_file_name = arg;
172         got_output = true;
173       } else {
174         usage_exit(args[0], "Too many arguments");
175       }
176     }
177     if (!got_output) {
178       usage_exit(args[0], "Not enough arguments");
179     }
180
181     struct bt_context *ctx = bt_context_create();
182     int trace_handle = bt_context_add_trace(ctx,
183                                             input_file_name.c_str(), // path
184                                             "ctf", // format
185                                             NULL, // packet_seek
186                                             NULL, // stream_list
187                                             NULL); // metadata
188     ASSERT_EXIT(trace_handle >= 0, "Error loading trace file");
189
190     uint64_t start_time_ns = bt_trace_handle_get_timestamp_begin(ctx, trace_handle, BT_CLOCK_REAL);
191     ASSERT_EXIT(start_time_ns != -1ULL,
192                 "Error extracting creation time from trace");
193
194     struct bt_ctf_iter *itr = bt_ctf_iter_create(ctx,
195                                                  NULL, // begin_pos
196                                                  NULL); // end_pos
197     assert(itr);
198
199     struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
200
201     int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
202     ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name <<
203                          ": " << cpp_strerror(errno));
204     BOOST_SCOPE_EXIT( (fd) ) {
205       close(fd);
206     } BOOST_SCOPE_EXIT_END;
207
208     write_banner(fd);
209
210     uint64_t trace_start = 0;
211     bool first = true;
212     while(true) {
213       struct bt_ctf_event *evt = bt_ctf_iter_read_event(itr);
214       if(!evt) {
215         break;
216       }
217       uint64_t ts = bt_ctf_get_timestamp(evt);
218       ASSERT_EXIT(ts != -1ULL, "Error extracting event timestamp");
219
220       if (first) {
221         trace_start = ts;
222         first = false;
223       }
224       ts -= trace_start;
225       ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is.
226
227       IO::ptrs ptrs;
228       process_event(ts, evt, &ptrs);
229       serialize_events(fd, ptrs);
230
231       int r = bt_iter_next(bt_itr);
232       ASSERT_EXIT(r == 0, "Error advancing event iterator");
233     }
234
235     bt_ctf_iter_destroy(itr);
236
237     insert_thread_stops(fd);
238   }
239
240 private:
241   void write_banner(int fd) {
242     bufferlist bl;
243     bl.append(rbd_replay::action::BANNER);
244     int r = bl.write_fd(fd);
245     ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
246   }
247
248   void serialize_events(int fd, const IO::ptrs &ptrs) {
249     for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
250       IO::ptr io(*it);
251
252       bufferlist bl;
253       io->encode(bl);
254
255       int r = bl.write_fd(fd);
256       ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
257
258       if (m_verbose) {
259         io->write_debug(std::cout);
260         std::cout << std::endl;
261       }
262     }
263   }
264
265   void insert_thread_stops(int fd) {
266     IO::ptrs ios;
267     for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
268          end = m_threads.end(); itr != end; ++itr) {
269       Thread::ptr thread(itr->second);
270       ios.push_back(IO::ptr(new StopThreadIO(next_id(), thread->max_ts(),
271                                              thread->id(),
272                                              m_recent_completions)));
273     }
274     serialize_events(fd, ios);
275   }
276
277   void process_event(uint64_t ts, struct bt_ctf_event *evt,
278                      IO::ptrs *ios) {
279     const char *event_name = bt_ctf_event_name(evt);
280     const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt,
281                                                                            BT_STREAM_EVENT_CONTEXT);
282     ASSERT_EXIT(scope_context != NULL, "Error retrieving event context");
283
284     const struct bt_definition *scope_fields = bt_ctf_get_top_level_scope(evt,
285                                                                           BT_EVENT_FIELDS);
286     ASSERT_EXIT(scope_fields != NULL, "Error retrieving event fields");
287
288     const struct bt_definition *pthread_id_field = bt_ctf_get_field(evt, scope_context, "pthread_id");
289     ASSERT_EXIT(pthread_id_field != NULL, "Error retrieving thread id");
290
291     thread_id_t threadID = bt_ctf_get_uint64(pthread_id_field);
292     Thread::ptr &thread(m_threads[threadID]);
293     if (!thread) {
294       thread.reset(new Thread(threadID, m_window));
295       IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID));
296       ios->push_back(io);
297     }
298     thread->insert_ts(ts);
299
300     class FieldLookup {
301     public:
302       FieldLookup(struct bt_ctf_event *evt,
303                   const struct bt_definition *scope)
304         : m_evt(evt),
305           m_scope(scope) {
306       }
307
308       const char* string(const char* name) {
309         const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
310         ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
311
312         const char* c = bt_ctf_get_string(field);
313         int err = bt_ctf_field_get_error();
314         ASSERT_EXIT(c && err == 0, "Error retrieving field value '" << name <<
315                                    "': error=" << err);
316         return c;
317       }
318
319       int64_t int64(const char* name) {
320         const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
321         ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
322
323         int64_t val = bt_ctf_get_int64(field);
324         int err = bt_ctf_field_get_error();
325         ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
326                               "': error=" << err);
327         return val;
328       }
329
330       uint64_t uint64(const char* name) {
331         const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
332         ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
333
334         uint64_t val = bt_ctf_get_uint64(field);
335         int err = bt_ctf_field_get_error();
336         ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
337                               "': error=" << err);
338         return val;
339       }
340
341     private:
342       struct bt_ctf_event *m_evt;
343       const struct bt_definition *m_scope;
344     } fields(evt, scope_fields);
345
346     if (strcmp(event_name, "librbd:open_image_enter") == 0) {
347       string name(fields.string("name"));
348       string snap_name(fields.string("snap_name"));
349       bool readonly = fields.uint64("read_only");
350       imagectx_id_t imagectx = fields.uint64("imagectx");
351       action_id_t ionum = next_id();
352       pair<string, string> aname(map_image_snap(name, snap_name));
353       IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions,
354                                  imagectx, aname.first, aname.second,
355                                  readonly));
356       thread->issued_io(io, &m_latest_ios);
357       ios->push_back(io);
358     } else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
359       completed(thread->latest_io());
360       boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io()));
361       assert(io);
362       m_open_images.insert(io->imagectx());
363     } else if (strcmp(event_name, "librbd:close_image_enter") == 0) {
364       imagectx_id_t imagectx = fields.uint64("imagectx");
365       action_id_t ionum = next_id();
366       IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions,
367                                   imagectx));
368       thread->issued_io(io, &m_latest_ios);
369       ios->push_back(thread->latest_io());
370     } else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
371       completed(thread->latest_io());
372       boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io()));
373       assert(io);
374       m_open_images.erase(io->imagectx());
375     } else if (strcmp(event_name, "librbd:aio_open_image_enter") == 0) {
376       string name(fields.string("name"));
377       string snap_name(fields.string("snap_name"));
378       bool readonly = fields.uint64("read_only");
379       imagectx_id_t imagectx = fields.uint64("imagectx");
380       uint64_t completion = fields.uint64("completion");
381       action_id_t ionum = next_id();
382       pair<string, string> aname(map_image_snap(name, snap_name));
383       IO::ptr io(new AioOpenImageIO(ionum, ts, threadID, m_recent_completions,
384                                     imagectx, aname.first, aname.second,
385                                     readonly));
386       thread->issued_io(io, &m_latest_ios);
387       ios->push_back(io);
388       m_pending_ios[completion] = io;
389     } else if (strcmp(event_name, "librbd:aio_close_image_enter") == 0) {
390       imagectx_id_t imagectx = fields.uint64("imagectx");
391       uint64_t completion = fields.uint64("completion");
392       action_id_t ionum = next_id();
393       IO::ptr io(new AioCloseImageIO(ionum, ts, threadID, m_recent_completions,
394                                      imagectx));
395       thread->issued_io(io, &m_latest_ios);
396       ios->push_back(thread->latest_io());
397       m_pending_ios[completion] = io;
398     } else if (strcmp(event_name, "librbd:read_enter") == 0 ||
399                strcmp(event_name, "librbd:read2_enter") == 0) {
400       string name(fields.string("name"));
401       string snap_name(fields.string("snap_name"));
402       bool readonly = fields.int64("read_only");
403       imagectx_id_t imagectx = fields.uint64("imagectx");
404       uint64_t offset = fields.uint64("offset");
405       uint64_t length = fields.uint64("length");
406       require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
407       action_id_t ionum = next_id();
408       IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx,
409                             offset, length));
410       thread->issued_io(io, &m_latest_ios);
411       ios->push_back(io);
412     } else if (strcmp(event_name, "librbd:read_exit") == 0) {
413       completed(thread->latest_io());
414     } else if (strcmp(event_name, "librbd:write_enter") == 0 ||
415                strcmp(event_name, "librbd:write2_enter") == 0) {
416       string name(fields.string("name"));
417       string snap_name(fields.string("snap_name"));
418       bool readonly = fields.int64("read_only");
419       uint64_t offset = fields.uint64("off");
420       uint64_t length = fields.uint64("buf_len");
421       imagectx_id_t imagectx = fields.uint64("imagectx");
422       require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
423       action_id_t ionum = next_id();
424       IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions,
425                              imagectx, offset, length));
426       thread->issued_io(io, &m_latest_ios);
427       ios->push_back(io);
428     } else if (strcmp(event_name, "librbd:write_exit") == 0) {
429       completed(thread->latest_io());
430     } else if (strcmp(event_name, "librbd:discard_enter") == 0) {
431       string name(fields.string("name"));
432       string snap_name(fields.string("snap_name"));
433       bool readonly = fields.int64("read_only");
434       uint64_t offset = fields.uint64("off");
435       uint64_t length = fields.uint64("len");
436       imagectx_id_t imagectx = fields.uint64("imagectx");
437       require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
438       action_id_t ionum = next_id();
439       IO::ptr io(new DiscardIO(ionum, ts, threadID, m_recent_completions,
440                                 imagectx, offset, length));
441       thread->issued_io(io, &m_latest_ios);
442       ios->push_back(io);
443     } else if (strcmp(event_name, "librbd:discard_exit") == 0) {
444       completed(thread->latest_io());
445     } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 ||
446                strcmp(event_name, "librbd:aio_read2_enter") == 0) {
447       string name(fields.string("name"));
448       string snap_name(fields.string("snap_name"));
449       bool readonly = fields.int64("read_only");
450       uint64_t completion = fields.uint64("completion");
451       imagectx_id_t imagectx = fields.uint64("imagectx");
452       uint64_t offset = fields.uint64("offset");
453       uint64_t length = fields.uint64("length");
454       require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
455       action_id_t ionum = next_id();
456       IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions,
457                                imagectx, offset, length));
458       ios->push_back(io);
459       thread->issued_io(io, &m_latest_ios);
460       m_pending_ios[completion] = io;
461     } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 ||
462                strcmp(event_name, "librbd:aio_write2_enter") == 0) {
463       string name(fields.string("name"));
464       string snap_name(fields.string("snap_name"));
465       bool readonly = fields.int64("read_only");
466       uint64_t offset = fields.uint64("off");
467       uint64_t length = fields.uint64("len");
468       uint64_t completion = fields.uint64("completion");
469       imagectx_id_t imagectx = fields.uint64("imagectx");
470       require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
471       action_id_t ionum = next_id();
472       IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions,
473                                 imagectx, offset, length));
474       thread->issued_io(io, &m_latest_ios);
475       ios->push_back(io);
476       m_pending_ios[completion] = io;
477     } else if (strcmp(event_name, "librbd:aio_discard_enter") == 0) {
478       string name(fields.string("name"));
479       string snap_name(fields.string("snap_name"));
480       bool readonly = fields.int64("read_only");
481       uint64_t offset = fields.uint64("off");
482       uint64_t length = fields.uint64("len");
483       uint64_t completion = fields.uint64("completion");
484       imagectx_id_t imagectx = fields.uint64("imagectx");
485       require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
486       action_id_t ionum = next_id();
487       IO::ptr io(new AioDiscardIO(ionum, ts, threadID, m_recent_completions,
488                                 imagectx, offset, length));
489       thread->issued_io(io, &m_latest_ios);
490       ios->push_back(io);
491       m_pending_ios[completion] = io;
492     } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
493       uint64_t completion = fields.uint64("completion");
494       map<uint64_t, IO::ptr>::iterator itr = m_pending_ios.find(completion);
495       if (itr != m_pending_ios.end()) {
496         IO::ptr completedIO(itr->second);
497         m_pending_ios.erase(itr);
498         completed(completedIO);
499       }
500     }
501   }
502
503   action_id_t next_id() {
504     action_id_t id = m_io_count;
505     m_io_count += 2;
506     return id;
507   }
508
509   void completed(IO::ptr io) {
510     uint64_t limit = (io->start_time() < m_window ?
511       0 : io->start_time() - m_window);
512     for (io_set_t::iterator itr = m_recent_completions.begin();
513          itr != m_recent_completions.end(); ) {
514       IO::ptr recent_comp(*itr);
515       if ((recent_comp->start_time() < limit ||
516            io->dependencies().count(recent_comp) != 0) &&
517           m_latest_ios.count(recent_comp) == 0) {
518         m_recent_completions.erase(itr++);
519       } else {
520         ++itr;
521       }
522     }
523     m_recent_completions.insert(io);
524   }
525
526   pair<string, string> map_image_snap(string image_name, string snap_name) {
527     if (!m_anonymize) {
528       return pair<string, string>(image_name, snap_name);
529     }
530     AnonymizedImage& m(m_anonymized_images[image_name]);
531     if (m.image_name() == "") {
532       m.init(image_name, m_anonymized_images.size());
533     }
534     return m.anonymize(snap_name);
535   }
536
537   void require_image(uint64_t ts,
538                      Thread::ptr thread,
539                      imagectx_id_t imagectx,
540                      const string& name,
541                      const string& snap_name,
542                      bool readonly,
543                      IO::ptrs *ios) {
544     assert(thread);
545     if (m_open_images.count(imagectx) > 0) {
546       return;
547     }
548     action_id_t ionum = next_id();
549     pair<string, string> aname(map_image_snap(name, snap_name));
550     IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(),
551                                m_recent_completions, imagectx, aname.first,
552                                aname.second, readonly));
553     thread->issued_io(io, &m_latest_ios);
554     ios->push_back(io);
555     completed(io);
556     m_open_images.insert(imagectx);
557   }
558
559   uint64_t m_window;
560   map<thread_id_t, Thread::ptr> m_threads;
561   uint32_t m_io_count;
562   io_set_t m_recent_completions;
563   set<imagectx_id_t> m_open_images;
564
565   // keyed by completion
566   map<uint64_t, IO::ptr> m_pending_ios;
567   std::set<IO::ptr> m_latest_ios;
568
569   bool m_anonymize;
570   map<string, AnonymizedImage> m_anonymized_images;
571
572   bool m_verbose;
573 };
574
575 int main(int argc, char** argv) {
576   vector<string> args;
577   for (int i = 0; i < argc; i++) {
578     args.push_back(string(argv[i]));
579   }
580
581   Processor p;
582   p.run(args);
583 }