1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "tools/rbd/ArgumentTypes.h"
5 #include "tools/rbd/Shell.h"
6 #include "tools/rbd/Utils.h"
7 #include "common/Cond.h"
8 #include "common/Formatter.h"
9 #include "common/ceph_json.h"
10 #include "common/errno.h"
11 #include "common/safe_io.h"
12 #include "include/stringify.h"
15 #include <boost/program_options.hpp>
16 #include "cls/rbd/cls_rbd_client.h"
17 #include "cls/journal/cls_journal_types.h"
18 #include "cls/journal/cls_journal_client.h"
20 #include "journal/Journaler.h"
21 #include "journal/ReplayEntry.h"
22 #include "journal/ReplayHandler.h"
23 #include "journal/Settings.h"
24 #include "librbd/journal/Types.h"
30 namespace at = argument_types;
31 namespace po = boost::program_options;
33 static int do_show_journal_info(librados::Rados& rados, librados::IoCtx& io_ctx,
34 const std::string& journal_id, Formatter *f)
39 std::string header_oid = ::journal::Journaler::header_oid(journal_id);
40 std::string object_oid_prefix = ::journal::Journaler::object_oid_prefix(
41 io_ctx.get_id(), journal_id);
46 cls::journal::client::get_immutable_metadata(io_ctx, header_oid, &order,
47 &splay_width, &pool_id, &cond);
50 std::cerr << "failed to get journal metadata: " << cpp_strerror(r)
55 std::string object_pool_name;
57 r = rados.pool_reverse_lookup(pool_id, &object_pool_name);
59 std::cerr << "error looking up pool name for pool_id=" << pool_id << ": "
60 << cpp_strerror(r) << std::endl;
65 f->open_object_section("journal");
66 f->dump_string("journal_id", journal_id);
67 f->dump_string("header_oid", header_oid);
68 f->dump_string("object_oid_prefix", object_oid_prefix);
69 f->dump_int("order", order);
70 f->dump_int("splay_width", splay_width);
71 if (!object_pool_name.empty()) {
72 f->dump_string("object_pool", object_pool_name);
77 std::cout << "rbd journal '" << journal_id << "':" << std::endl;
78 std::cout << "\theader_oid: " << header_oid << std::endl;
79 std::cout << "\tobject_oid_prefix: " << object_oid_prefix << std::endl;
80 std::cout << "\torder: " << static_cast<int>(order) << " ("
81 << prettybyte_t(1ull << order) << " objects)"<< std::endl;
82 std::cout << "\tsplay_width: " << static_cast<int>(splay_width) << std::endl;
83 if (!object_pool_name.empty()) {
84 std::cout << "\tobject_pool: " << object_pool_name << std::endl;
90 static int do_show_journal_status(librados::IoCtx& io_ctx,
91 const std::string& journal_id, Formatter *f)
98 std::set<cls::journal::Client> registered_clients;
99 std::string oid = ::journal::Journaler::header_oid(journal_id);
101 cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
102 &active_set, ®istered_clients,
106 std::cerr << "warning: failed to get journal metadata" << std::endl;
111 f->open_object_section("status");
112 f->dump_unsigned("minimum_set", minimum_set);
113 f->dump_unsigned("active_set", active_set);
114 f->open_array_section("registered_clients");
115 for (std::set<cls::journal::Client>::iterator c =
116 registered_clients.begin(); c != registered_clients.end(); ++c) {
117 f->open_object_section("client");
125 std::cout << "minimum_set: " << minimum_set << std::endl;
126 std::cout << "active_set: " << active_set << std::endl;
127 std::cout << "registered clients: " << std::endl;
128 for (std::set<cls::journal::Client>::iterator c =
129 registered_clients.begin(); c != registered_clients.end(); ++c) {
130 std::cout << "\t" << *c << std::endl;
136 static int do_reset_journal(librados::IoCtx& io_ctx,
137 const std::string& journal_id)
139 // disable/re-enable journaling to delete/re-create the journal
140 // to properly handle mirroring constraints
141 std::string image_name;
142 int r = librbd::cls_client::dir_get_name(&io_ctx, RBD_DIRECTORY, journal_id,
145 std::cerr << "failed to locate journal's image: " << cpp_strerror(r)
151 r = utils::open_image(io_ctx, image_name, false, &image);
153 std::cerr << "failed to open image: " << cpp_strerror(r) << std::endl;
157 r = image.update_features(RBD_FEATURE_JOURNALING, false);
159 std::cerr << "failed to disable image journaling: " << cpp_strerror(r)
164 r = image.update_features(RBD_FEATURE_JOURNALING, true);
166 std::cerr << "failed to re-enable image journaling: " << cpp_strerror(r)
173 static int do_disconnect_journal_client(librados::IoCtx& io_ctx,
174 const std::string& journal_id,
175 const std::string& client_id)
180 uint64_t minimum_set;
182 std::set<cls::journal::Client> registered_clients;
183 std::string oid = ::journal::Journaler::header_oid(journal_id);
185 cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
186 &active_set, ®istered_clients,
190 std::cerr << "warning: failed to get journal metadata" << std::endl;
194 static const std::string IMAGE_CLIENT_ID("");
197 for (auto &c : registered_clients) {
198 if (c.id == IMAGE_CLIENT_ID || (!client_id.empty() && client_id != c.id)) {
201 r = cls::journal::client::client_update_state(io_ctx, oid, c.id,
202 cls::journal::CLIENT_STATE_DISCONNECTED);
204 std::cerr << "warning: failed to disconnect client " << c.id << ": "
205 << cpp_strerror(r) << std::endl;
208 std::cout << "client " << c.id << " disconnected" << std::endl;
213 if (!client_id.empty()) {
214 std::cerr << "warning: client " << client_id << " is not registered"
217 std::cerr << "no registered clients to disconnect" << std::endl;
223 r = io_ctx.notify2(oid, bl, 5000, NULL);
225 std::cerr << "warning: failed to notify state change:" << ": "
226 << cpp_strerror(r) << std::endl;
233 class Journaler : public ::journal::Journaler {
235 Journaler(librados::IoCtx& io_ctx, const std::string& journal_id,
236 const std::string &client_id) :
237 ::journal::Journaler(io_ctx, journal_id, client_id, {}) {
243 // TODO register with librbd payload
244 r = register_client(bufferlist());
246 std::cerr << "failed to register client: " << cpp_strerror(r)
253 ::journal::Journaler::init(&cond);
256 std::cerr << "failed to initialize journal: " << cpp_strerror(r)
258 (void) unregister_client();
266 int r = unregister_client();
268 std::cerr << "rbd: failed to unregister journal client: "
269 << cpp_strerror(r) << std::endl;
271 ::journal::Journaler::shut_down();
277 class JournalPlayer {
279 JournalPlayer(librados::IoCtx& io_ctx, const std::string& journal_id,
280 const std::string &client_id) :
281 m_journaler(io_ctx, journal_id, client_id),
286 virtual ~JournalPlayer() {}
291 r = m_journaler.init();
296 ReplayHandler replay_handler(this);
298 m_journaler.start_replay(&replay_handler);
302 std::cerr << "rbd: failed to process journal: " << cpp_strerror(r)
312 return m_journaler.shut_down();
316 struct ReplayHandler : public ::journal::ReplayHandler {
317 JournalPlayer *journal;
318 explicit ReplayHandler(JournalPlayer *_journal) : journal(_journal) {}
320 void get() override {}
321 void put() override {}
323 void handle_entries_available() override {
324 journal->handle_replay_ready();
326 void handle_complete(int r) override {
327 journal->handle_replay_complete(r);
331 void handle_replay_ready() {
334 ::journal::ReplayEntry replay_entry;
336 if (!m_journaler.try_pop_front(&replay_entry, &tag_id)) {
340 r = process_entry(replay_entry, tag_id);
347 virtual int process_entry(::journal::ReplayEntry replay_entry,
348 uint64_t tag_id) = 0;
350 void handle_replay_complete(int r) {
351 if (m_r == 0 && r < 0) {
354 m_journaler.stop_replay(&m_cond);
357 Journaler m_journaler;
362 static int inspect_entry(bufferlist& data,
363 librbd::journal::EventEntry& event_entry,
366 bufferlist::iterator it = data.begin();
367 ::decode(event_entry, it);
368 } catch (const buffer::error &err) {
369 std::cerr << "failed to decode event entry: " << err.what() << std::endl;
373 JSONFormatter f(true);
374 f.open_object_section("event_entry");
375 event_entry.dump(&f);
382 class JournalInspector : public JournalPlayer {
384 JournalInspector(librados::IoCtx& io_ctx, const std::string& journal_id,
386 JournalPlayer(io_ctx, journal_id, "INSPECT"),
391 int exec() override {
392 int r = JournalPlayer::exec();
399 Stats() : total(0), error(0) {}
402 std::cout << "Summary:" << std::endl
403 << " " << total << " entries inspected, " << error << " errors"
411 int process_entry(::journal::ReplayEntry replay_entry,
412 uint64_t tag_id) override {
415 std::cout << "Entry: tag_id=" << tag_id << ", commit_tid="
416 << replay_entry.get_commit_tid() << std::endl;
418 bufferlist data = replay_entry.get_data();
419 librbd::journal::EventEntry event_entry;
420 int r = inspect_entry(data, event_entry, m_verbose);
432 static int do_inspect_journal(librados::IoCtx& io_ctx,
433 const std::string& journal_id,
435 JournalInspector inspector(io_ctx, journal_id, verbose);
436 int r = inspector.exec();
438 inspector.shut_down();
442 r = inspector.shut_down();
455 ExportEntry() : tag_id(0), commit_tid(0), type(0), entry() {}
457 ExportEntry(uint64_t tag_id, uint64_t commit_tid, int type,
458 const bufferlist& entry)
459 : tag_id(tag_id), commit_tid(commit_tid), type(type), entry(entry) {
462 void dump(Formatter *f) const {
463 ::encode_json("tag_id", tag_id, f);
464 ::encode_json("commit_tid", commit_tid, f);
465 ::encode_json("type", type, f);
466 ::encode_json("entry", entry, f);
469 void decode_json(JSONObj *obj) {
470 JSONDecoder::decode_json("tag_id", tag_id, obj);
471 JSONDecoder::decode_json("commit_tid", commit_tid, obj);
472 JSONDecoder::decode_json("type", type, obj);
473 JSONDecoder::decode_json("entry", entry, obj);
477 class JournalExporter : public JournalPlayer {
479 JournalExporter(librados::IoCtx& io_ctx, const std::string& journal_id,
480 int fd, bool no_error, bool verbose) :
481 JournalPlayer(io_ctx, journal_id, "EXPORT"),
482 m_journal_id(journal_id),
484 m_no_error(no_error),
489 int exec() override {
490 std::string header("# journal_id: " + m_journal_id + "\n");
492 r = safe_write(m_fd, header.c_str(), header.size());
494 std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
498 r = JournalPlayer::exec();
505 Stats() : total(0), error(0) {}
508 std::cout << total << " entries processed, " << error << " errors"
516 int process_entry(::journal::ReplayEntry replay_entry,
517 uint64_t tag_id) override {
520 bufferlist entry = replay_entry.get_data();
521 librbd::journal::EventEntry event_entry;
522 int r = inspect_entry(entry, event_entry, m_verbose);
526 return m_no_error ? 0 : r;
528 type = event_entry.get_event_type();
530 ExportEntry export_entry(tag_id, replay_entry.get_commit_tid(), type,
533 ::encode_json("event_entry", export_entry, &f);
534 std::ostringstream oss;
536 std::string objstr = oss.str();
537 std::string header = stringify(objstr.size()) + " ";
538 r = safe_write(m_fd, header.c_str(), header.size());
540 r = safe_write(m_fd, objstr.c_str(), objstr.size());
543 r = safe_write(m_fd, "\n", 1);
546 std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
554 std::string m_journal_id;
561 static int do_export_journal(librados::IoCtx& io_ctx,
562 const std::string& journal_id,
563 const std::string& path,
564 bool no_error, bool verbose) {
567 bool to_stdout = path == "-";
571 fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
574 std::cerr << "rbd: error creating " << path << std::endl;
577 #ifdef HAVE_POSIX_FADVISE
578 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
582 JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose);
589 int shut_down_r = exporter.shut_down();
590 if (r == 0 && shut_down_r < 0) {
597 class JournalImporter {
599 JournalImporter(librados::IoCtx& io_ctx, const std::string& journal_id,
600 int fd, bool no_error, bool verbose) :
601 m_journaler(io_ctx, journal_id, "IMPORT"),
603 m_no_error(no_error),
607 bool read_entry(bufferlist& bl, int& r) {
608 // Entries are storead in the file using the following format:
610 // # Optional comments
611 // NNN {json encoded entry}
614 // Where NNN is the encoded entry size.
617 // Skip line feed and comments (lines started with #).
618 while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
619 if (buf[0] == '\n') {
621 } else if (buf[0] == '#') {
622 while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
623 if (buf[0] == '\n') {
637 // Read entry size to buf.
638 if (!isdigit(buf[0])) {
640 std::cerr << "rbd: import data invalid format (digit expected)"
644 for (size_t i = 1; i < sizeof(buf); i++) {
645 r = safe_read_exact(m_fd, buf + i, 1);
647 std::cerr << "rbd: error reading import data" << std::endl;
650 if (!isdigit(buf[i])) {
653 std::cerr << "rbd: import data invalid format (space expected)"
661 int entry_size = atoi(buf);
662 if (entry_size == 0) {
664 std::cerr << "rbd: import data invalid format (zero entry size)"
668 assert(entry_size > 0);
670 r = bl.read_fd(m_fd, entry_size);
672 std::cerr << "rbd: error reading from stdin: " << cpp_strerror(r)
676 if (r != entry_size) {
677 std::cerr << "rbd: error reading from stdin: trucated"
687 int r = m_journaler.init();
691 m_journaler.start_append(0, 0, 0);
697 while (read_entry(bl, r)) {
701 if (!p.parse(bl.c_str(), bl.length())) {
702 std::cerr << "rbd: error parsing input (entry " << n << ")"
714 decode_json_obj(e, &p);
715 } catch (JSONDecoder::err& err) {
716 std::cerr << "rbd: error json decoding import data (entry " << n << "):"
717 << err.message << std::endl;
726 librbd::journal::EventEntry event_entry;
727 r = inspect_entry(e.entry, event_entry, m_verbose);
729 std::cerr << "rbd: corrupted entry " << n << ": tag_tid=" << e.tag_id
730 << ", commit_tid=" << e.commit_tid << std::endl;
738 m_journaler.append(e.tag_id, e.entry);
742 std::cout << n << " entries processed, " << error_count << " errors" << std::endl;
744 std::cout << "Waiting for journal append to complete..." << std::endl;
747 m_journaler.stop_append(&cond);
751 std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl;
754 if (r1 < 0 && r == 0) {
761 return m_journaler.shut_down();
765 Journaler m_journaler;
771 static int do_import_journal(librados::IoCtx& io_ctx,
772 const std::string& journal_id,
773 const std::string& path,
774 bool no_error, bool verbose) {
778 bool from_stdin = path == "-";
782 if ((fd = open(path.c_str(), O_RDONLY)) < 0) {
784 std::cerr << "rbd: error opening " << path << std::endl;
787 #ifdef HAVE_POSIX_FADVISE
788 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
792 JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose);
799 int shut_down_r = importer.shut_down();
800 if (r == 0 && shut_down_r < 0) {
807 void get_info_arguments(po::options_description *positional,
808 po::options_description *options) {
809 at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
810 at::add_format_options(options);
813 int execute_info(const po::variables_map &vm) {
814 size_t arg_index = 0;
815 std::string pool_name;
816 std::string journal_name;
817 int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
818 &arg_index, &pool_name, &journal_name);
823 at::Format::Formatter formatter;
824 r = utils::get_formatter(vm, &formatter);
829 librados::Rados rados;
830 librados::IoCtx io_ctx;
831 r = utils::init(pool_name, &rados, &io_ctx);
836 r = do_show_journal_info(rados, io_ctx, journal_name, formatter.get());
838 std::cerr << "rbd: journal info: " << cpp_strerror(r) << std::endl;
845 void get_status_arguments(po::options_description *positional,
846 po::options_description *options) {
847 at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
848 at::add_format_options(options);
851 int execute_status(const po::variables_map &vm) {
852 size_t arg_index = 0;
853 std::string pool_name;
854 std::string journal_name;
855 int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
856 &arg_index, &pool_name, &journal_name);
861 at::Format::Formatter formatter;
862 r = utils::get_formatter(vm, &formatter);
867 librados::Rados rados;
868 librados::IoCtx io_ctx;
869 r = utils::init(pool_name, &rados, &io_ctx);
874 r = do_show_journal_status(io_ctx, journal_name, formatter.get());
876 std::cerr << "rbd: journal status: " << cpp_strerror(r) << std::endl;
882 void get_reset_arguments(po::options_description *positional,
883 po::options_description *options) {
884 at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
887 int execute_reset(const po::variables_map &vm) {
888 size_t arg_index = 0;
889 std::string pool_name;
890 std::string journal_name;
891 int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
892 &arg_index, &pool_name, &journal_name);
897 librados::Rados rados;
898 librados::IoCtx io_ctx;
899 r = utils::init(pool_name, &rados, &io_ctx);
904 r = do_reset_journal(io_ctx, journal_name);
906 std::cerr << "rbd: journal reset: " << cpp_strerror(r) << std::endl;
912 void get_client_disconnect_arguments(po::options_description *positional,
913 po::options_description *options) {
914 at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
915 options->add_options()
916 ("client-id", po::value<std::string>(),
917 "client ID (or leave unspecified to disconnect all)");
920 int execute_client_disconnect(const po::variables_map &vm) {
921 size_t arg_index = 0;
922 std::string pool_name;
923 std::string journal_name;
924 int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
925 &arg_index, &pool_name, &journal_name);
930 std::string client_id;
931 if (vm.count("client-id")) {
932 client_id = vm["client-id"].as<std::string>();
935 librados::Rados rados;
936 librados::IoCtx io_ctx;
937 r = utils::init(pool_name, &rados, &io_ctx);
942 r = do_disconnect_journal_client(io_ctx, journal_name, client_id);
944 std::cerr << "rbd: journal client disconnect: " << cpp_strerror(r)
951 void get_inspect_arguments(po::options_description *positional,
952 po::options_description *options) {
953 at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
954 at::add_verbose_option(options);
957 int execute_inspect(const po::variables_map &vm) {
958 size_t arg_index = 0;
959 std::string pool_name;
960 std::string journal_name;
961 int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
962 &arg_index, &pool_name, &journal_name);
967 librados::Rados rados;
968 librados::IoCtx io_ctx;
969 r = utils::init(pool_name, &rados, &io_ctx);
974 r = do_inspect_journal(io_ctx, journal_name, vm[at::VERBOSE].as<bool>());
976 std::cerr << "rbd: journal inspect: " << cpp_strerror(r) << std::endl;
982 void get_export_arguments(po::options_description *positional,
983 po::options_description *options) {
984 at::add_journal_spec_options(positional, options,
985 at::ARGUMENT_MODIFIER_SOURCE);
986 at::add_path_options(positional, options,
987 "export file (or '-' for stdout)");
988 at::add_verbose_option(options);
989 at::add_no_error_option(options);
992 int execute_export(const po::variables_map &vm) {
993 size_t arg_index = 0;
994 std::string pool_name;
995 std::string journal_name;
996 int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_SOURCE,
997 &arg_index, &pool_name, &journal_name);
1003 r = utils::get_path(vm, utils::get_positional_argument(vm, 1), &path);
1008 librados::Rados rados;
1009 librados::IoCtx io_ctx;
1010 r = utils::init(pool_name, &rados, &io_ctx);
1015 r = do_export_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(),
1016 vm[at::VERBOSE].as<bool>());
1018 std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl;
1024 void get_import_arguments(po::options_description *positional,
1025 po::options_description *options) {
1026 at::add_path_options(positional, options,
1027 "import file (or '-' for stdin)");
1028 at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_DEST);
1029 at::add_verbose_option(options);
1030 at::add_no_error_option(options);
1033 int execute_import(const po::variables_map &vm) {
1035 int r = utils::get_path(vm, utils::get_positional_argument(vm, 0), &path);
1040 size_t arg_index = 1;
1041 std::string pool_name;
1042 std::string journal_name;
1043 r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_DEST,
1044 &arg_index, &pool_name, &journal_name);
1049 librados::Rados rados;
1050 librados::IoCtx io_ctx;
1051 r = utils::init(pool_name, &rados, &io_ctx);
1056 r = do_import_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(),
1057 vm[at::VERBOSE].as<bool>());
1059 std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl;
1065 Shell::Action action_info(
1066 {"journal", "info"}, {}, "Show information about image journal.", "",
1067 &get_info_arguments, &execute_info);
1069 Shell::Action action_status(
1070 {"journal", "status"}, {}, "Show status of image journal.", "",
1071 &get_status_arguments, &execute_status);
1073 Shell::Action action_reset(
1074 {"journal", "reset"}, {}, "Reset image journal.", "",
1075 &get_reset_arguments, &execute_reset);
1077 Shell::Action action_inspect(
1078 {"journal", "inspect"}, {}, "Inspect image journal for structural errors.", "",
1079 &get_inspect_arguments, &execute_inspect);
1081 Shell::Action action_export(
1082 {"journal", "export"}, {}, "Export image journal.", "",
1083 &get_export_arguments, &execute_export);
1085 Shell::Action action_import(
1086 {"journal", "import"}, {}, "Import image journal.", "",
1087 &get_import_arguments, &execute_import);
1089 Shell::Action action_disconnect(
1090 {"journal", "client", "disconnect"}, {},
1091 "Flag image journal client as disconnected.", "",
1092 &get_client_disconnect_arguments, &execute_client_disconnect);
1094 } // namespace journal
1095 } // namespace action