Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd / action / Journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "tools/rbd/ArgumentTypes.h"
5 #include "tools/rbd/Shell.h"
6 #include "tools/rbd/Utils.h"
7 #include "common/Cond.h"
8 #include "common/Formatter.h"
9 #include "common/ceph_json.h"
10 #include "common/errno.h"
11 #include "common/safe_io.h"
12 #include "include/stringify.h"
13 #include <fstream>
14 #include <sstream>
15 #include <boost/program_options.hpp>
16 #include "cls/rbd/cls_rbd_client.h"
17 #include "cls/journal/cls_journal_types.h"
18 #include "cls/journal/cls_journal_client.h"
19
20 #include "journal/Journaler.h"
21 #include "journal/ReplayEntry.h"
22 #include "journal/ReplayHandler.h"
23 #include "journal/Settings.h"
24 #include "librbd/journal/Types.h"
25
26 namespace rbd {
27 namespace action {
28 namespace journal {
29
30 namespace at = argument_types;
31 namespace po = boost::program_options;
32
33 static int do_show_journal_info(librados::Rados& rados, librados::IoCtx& io_ctx,
34                                 const std::string& journal_id, Formatter *f)
35 {
36   int r;
37   C_SaferCond cond;
38
39   std::string header_oid = ::journal::Journaler::header_oid(journal_id);
40   std::string object_oid_prefix = ::journal::Journaler::object_oid_prefix(
41     io_ctx.get_id(), journal_id);
42   uint8_t order;
43   uint8_t splay_width;
44   int64_t pool_id;
45
46   cls::journal::client::get_immutable_metadata(io_ctx, header_oid, &order,
47                                                &splay_width, &pool_id, &cond);
48   r = cond.wait();
49   if (r < 0) {
50     std::cerr << "failed to get journal metadata: "  << cpp_strerror(r)
51               << std::endl;
52     return r;
53   }
54
55   std::string object_pool_name;
56   if (pool_id >= 0) {
57     r = rados.pool_reverse_lookup(pool_id, &object_pool_name);
58     if (r < 0) {
59       std::cerr << "error looking up pool name for pool_id=" << pool_id << ": "
60                 << cpp_strerror(r) << std::endl;
61     }
62   }
63
64   if (f) {
65     f->open_object_section("journal");
66     f->dump_string("journal_id", journal_id);
67     f->dump_string("header_oid", header_oid);
68     f->dump_string("object_oid_prefix", object_oid_prefix);
69     f->dump_int("order", order);
70     f->dump_int("splay_width", splay_width);
71     if (!object_pool_name.empty()) {
72       f->dump_string("object_pool", object_pool_name);
73     }
74     f->close_section();
75     f->flush(std::cout);
76   } else {
77     std::cout << "rbd journal '" << journal_id << "':" << std::endl;
78     std::cout << "\theader_oid: " << header_oid << std::endl;
79     std::cout << "\tobject_oid_prefix: " << object_oid_prefix << std::endl;
80     std::cout << "\torder: " << static_cast<int>(order) << " ("
81               << prettybyte_t(1ull << order) << " objects)"<< std::endl;
82     std::cout << "\tsplay_width: " << static_cast<int>(splay_width) << std::endl;
83     if (!object_pool_name.empty()) {
84       std::cout << "\tobject_pool: " << object_pool_name << std::endl;
85     }
86   }
87   return 0;
88 }
89
90 static int do_show_journal_status(librados::IoCtx& io_ctx,
91                                   const std::string& journal_id, Formatter *f)
92 {
93   int r;
94
95   C_SaferCond cond;
96   uint64_t minimum_set;
97   uint64_t active_set;
98   std::set<cls::journal::Client> registered_clients;
99   std::string oid = ::journal::Journaler::header_oid(journal_id);
100
101   cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
102                                             &active_set, &registered_clients,
103                                             &cond);
104   r = cond.wait();
105   if (r < 0) {
106     std::cerr << "warning: failed to get journal metadata" << std::endl;
107     return r;
108   }
109
110   if (f) {
111     f->open_object_section("status");
112     f->dump_unsigned("minimum_set", minimum_set);
113     f->dump_unsigned("active_set", active_set);
114     f->open_array_section("registered_clients");
115     for (std::set<cls::journal::Client>::iterator c =
116           registered_clients.begin(); c != registered_clients.end(); ++c) {
117       f->open_object_section("client");
118       c->dump(f);
119       f->close_section();
120     }
121     f->close_section();
122     f->close_section();
123     f->flush(std::cout);
124   } else {
125     std::cout << "minimum_set: " << minimum_set << std::endl;
126     std::cout << "active_set: " << active_set << std::endl;
127     std::cout << "registered clients: " << std::endl;
128     for (std::set<cls::journal::Client>::iterator c =
129           registered_clients.begin(); c != registered_clients.end(); ++c) {
130       std::cout << "\t" << *c << std::endl;
131     }
132   }
133   return 0;
134 }
135
136 static int do_reset_journal(librados::IoCtx& io_ctx,
137                             const std::string& journal_id)
138 {
139   // disable/re-enable journaling to delete/re-create the journal
140   // to properly handle mirroring constraints
141   std::string image_name;
142   int r = librbd::cls_client::dir_get_name(&io_ctx, RBD_DIRECTORY, journal_id,
143                                            &image_name);
144   if (r < 0) {
145     std::cerr << "failed to locate journal's image: " << cpp_strerror(r)
146               << std::endl;
147     return r;
148   }
149
150   librbd::Image image;
151   r = utils::open_image(io_ctx, image_name, false, &image);
152   if (r < 0) {
153     std::cerr << "failed to open image: " << cpp_strerror(r) << std::endl;
154     return r;
155   }
156
157   r = image.update_features(RBD_FEATURE_JOURNALING, false);
158   if (r < 0) {
159     std::cerr << "failed to disable image journaling: " << cpp_strerror(r)
160               << std::endl;
161     return r;
162   }
163
164   r = image.update_features(RBD_FEATURE_JOURNALING, true);
165   if (r < 0) {
166     std::cerr << "failed to re-enable image journaling: " << cpp_strerror(r)
167               << std::endl;
168     return r;
169   }
170   return 0;
171 }
172
173 static int do_disconnect_journal_client(librados::IoCtx& io_ctx,
174                                         const std::string& journal_id,
175                                         const std::string& client_id)
176 {
177   int r;
178
179   C_SaferCond cond;
180   uint64_t minimum_set;
181   uint64_t active_set;
182   std::set<cls::journal::Client> registered_clients;
183   std::string oid = ::journal::Journaler::header_oid(journal_id);
184
185   cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
186                                             &active_set, &registered_clients,
187                                             &cond);
188   r = cond.wait();
189   if (r < 0) {
190     std::cerr << "warning: failed to get journal metadata" << std::endl;
191     return r;
192   }
193
194   static const std::string IMAGE_CLIENT_ID("");
195
196   bool found = false;
197   for (auto &c : registered_clients) {
198     if (c.id == IMAGE_CLIENT_ID || (!client_id.empty() && client_id != c.id)) {
199       continue;
200     }
201     r = cls::journal::client::client_update_state(io_ctx, oid, c.id,
202                                   cls::journal::CLIENT_STATE_DISCONNECTED);
203     if (r < 0) {
204       std::cerr << "warning: failed to disconnect client " << c.id << ": "
205                 << cpp_strerror(r) << std::endl;
206       return r;
207     }
208     std::cout << "client " << c.id << " disconnected" << std::endl;
209     found = true;
210   }
211
212   if (!found) {
213     if (!client_id.empty()) {
214       std::cerr << "warning: client " << client_id << " is not registered"
215                 << std::endl;
216     } else {
217       std::cerr << "no registered clients to disconnect" << std::endl;
218     }
219     return -ENOENT;
220   }
221
222   bufferlist bl;
223   r = io_ctx.notify2(oid, bl, 5000, NULL);
224   if (r < 0) {
225     std::cerr << "warning: failed to notify state change:" << ": "
226               << cpp_strerror(r) << std::endl;
227     return r;
228   }
229
230   return 0;
231 }
232
233 class Journaler : public ::journal::Journaler {
234 public:
235   Journaler(librados::IoCtx& io_ctx, const std::string& journal_id,
236             const std::string &client_id) :
237     ::journal::Journaler(io_ctx, journal_id, client_id, {}) {
238   }
239
240   int init() {
241     int r;
242
243     // TODO register with librbd payload
244     r = register_client(bufferlist());
245     if (r < 0) {
246       std::cerr << "failed to register client: " << cpp_strerror(r)
247                 << std::endl;
248       return r;
249     }
250
251     C_SaferCond cond;
252
253     ::journal::Journaler::init(&cond);
254     r = cond.wait();
255     if (r < 0) {
256       std::cerr << "failed to initialize journal: " << cpp_strerror(r)
257                 << std::endl;
258       (void) unregister_client();
259       return r;
260     }
261
262     return 0;
263   }
264
265   int shut_down() {
266     int r = unregister_client();
267     if (r < 0) {
268       std::cerr << "rbd: failed to unregister journal client: "
269                 << cpp_strerror(r) << std::endl;
270     }
271     ::journal::Journaler::shut_down();
272
273     return r;
274   }
275 };
276
277 class JournalPlayer {
278 public:
279   JournalPlayer(librados::IoCtx& io_ctx, const std::string& journal_id,
280                 const std::string &client_id) :
281     m_journaler(io_ctx, journal_id, client_id),
282     m_cond(),
283     m_r(0) {
284   }
285
286   virtual ~JournalPlayer() {}
287
288   virtual int exec() {
289     int r;
290
291     r = m_journaler.init();
292     if (r < 0) {
293       return r;
294     }
295
296     ReplayHandler replay_handler(this);
297
298     m_journaler.start_replay(&replay_handler);
299
300     r = m_cond.wait();
301     if (r < 0) {
302       std::cerr << "rbd: failed to process journal: " << cpp_strerror(r)
303                 << std::endl;
304       if (m_r == 0) {
305        m_r = r;
306       }
307     }
308     return m_r;
309   }
310
311   int shut_down() {
312     return m_journaler.shut_down();
313   }
314
315 protected:
316   struct ReplayHandler : public ::journal::ReplayHandler {
317     JournalPlayer *journal;
318     explicit ReplayHandler(JournalPlayer *_journal) : journal(_journal) {}
319
320     void get() override {}
321     void put() override {}
322
323     void handle_entries_available() override {
324       journal->handle_replay_ready();
325     }
326     void handle_complete(int r) override {
327       journal->handle_replay_complete(r);
328     }
329   };
330
331   void handle_replay_ready() {
332     int r = 0;
333     while (true) {
334       ::journal::ReplayEntry replay_entry;
335       uint64_t tag_id;
336       if (!m_journaler.try_pop_front(&replay_entry, &tag_id)) {
337         break;
338       }
339
340       r = process_entry(replay_entry, tag_id);
341       if (r < 0) {
342         break;
343       }
344     }
345   }
346
347   virtual int process_entry(::journal::ReplayEntry replay_entry,
348                             uint64_t tag_id) = 0;
349
350   void handle_replay_complete(int r) {
351     if (m_r == 0 && r < 0) {
352       m_r = r;
353     }
354     m_journaler.stop_replay(&m_cond);
355   }
356
357   Journaler m_journaler;
358   C_SaferCond m_cond;
359   int m_r;
360 };
361
362 static int inspect_entry(bufferlist& data,
363                          librbd::journal::EventEntry& event_entry,
364                          bool verbose) {
365   try {
366     bufferlist::iterator it = data.begin();
367     ::decode(event_entry, it);
368   } catch (const buffer::error &err) {
369     std::cerr << "failed to decode event entry: " << err.what() << std::endl;
370     return -EINVAL;
371   }
372   if (verbose) {
373     JSONFormatter f(true);
374     f.open_object_section("event_entry");
375     event_entry.dump(&f);
376     f.close_section();
377     f.flush(std::cout);
378   }
379   return 0;
380 }
381
382 class JournalInspector : public JournalPlayer {
383 public:
384   JournalInspector(librados::IoCtx& io_ctx, const std::string& journal_id,
385                    bool verbose) :
386     JournalPlayer(io_ctx, journal_id, "INSPECT"),
387     m_verbose(verbose),
388     m_s() {
389   }
390
391   int exec() override {
392     int r = JournalPlayer::exec();
393     m_s.print();
394     return r;
395   }
396
397 private:
398   struct Stats {
399     Stats() : total(0), error(0) {}
400
401     void print() {
402       std::cout << "Summary:" << std::endl
403                 << "  " << total << " entries inspected, " << error << " errors"
404                 << std::endl;
405     }
406
407     int total;
408     int error;
409   };
410
411   int process_entry(::journal::ReplayEntry replay_entry,
412                     uint64_t tag_id) override {
413     m_s.total++;
414     if (m_verbose) {
415       std::cout << "Entry: tag_id=" << tag_id << ", commit_tid="
416                 << replay_entry.get_commit_tid() << std::endl;
417     }
418     bufferlist data = replay_entry.get_data();
419     librbd::journal::EventEntry event_entry;
420     int r = inspect_entry(data, event_entry, m_verbose);
421     if (r < 0) {
422       m_r = r;
423       m_s.error++;
424     }
425     return 0;
426   }
427
428   bool m_verbose;
429   Stats m_s;
430 };
431
432 static int do_inspect_journal(librados::IoCtx& io_ctx,
433                               const std::string& journal_id,
434                               bool verbose) {
435   JournalInspector inspector(io_ctx, journal_id, verbose);
436   int r = inspector.exec();
437   if (r < 0) {
438     inspector.shut_down();
439     return r;
440   }
441
442   r = inspector.shut_down();
443   if (r < 0) {
444     return r;
445   }
446   return 0;
447 }
448
449 struct ExportEntry {
450   uint64_t tag_id;
451   uint64_t commit_tid;
452   int type;
453   bufferlist entry;
454
455   ExportEntry() : tag_id(0), commit_tid(0), type(0), entry() {}
456
457   ExportEntry(uint64_t tag_id, uint64_t commit_tid, int type,
458               const bufferlist& entry)
459     : tag_id(tag_id), commit_tid(commit_tid), type(type), entry(entry) {
460   }
461
462   void dump(Formatter *f) const {
463     ::encode_json("tag_id", tag_id, f);
464     ::encode_json("commit_tid", commit_tid, f);
465     ::encode_json("type", type, f);
466     ::encode_json("entry", entry, f);
467   }
468
469   void decode_json(JSONObj *obj) {
470     JSONDecoder::decode_json("tag_id", tag_id, obj);
471     JSONDecoder::decode_json("commit_tid", commit_tid, obj);
472     JSONDecoder::decode_json("type", type, obj);
473     JSONDecoder::decode_json("entry", entry, obj);
474   }
475 };
476
477 class JournalExporter : public JournalPlayer {
478 public:
479   JournalExporter(librados::IoCtx& io_ctx, const std::string& journal_id,
480                   int fd, bool no_error, bool verbose) :
481     JournalPlayer(io_ctx, journal_id, "EXPORT"),
482     m_journal_id(journal_id),
483     m_fd(fd),
484     m_no_error(no_error),
485     m_verbose(verbose),
486     m_s() {
487   }
488
489   int exec() override {
490     std::string header("# journal_id: " + m_journal_id + "\n");
491     int r;
492     r = safe_write(m_fd, header.c_str(), header.size());
493     if (r < 0) {
494       std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
495                 << std::endl;
496       return r;
497     }
498     r = JournalPlayer::exec();
499     m_s.print();
500     return r;
501   }
502
503 private:
504   struct Stats {
505     Stats() : total(0), error(0) {}
506
507     void print() {
508       std::cout << total << " entries processed, " << error << " errors"
509                 << std::endl;
510     }
511
512     int total;
513     int error;
514   };
515
516   int process_entry(::journal::ReplayEntry replay_entry,
517                     uint64_t tag_id) override {
518     m_s.total++;
519     int type = -1;
520     bufferlist entry = replay_entry.get_data();
521     librbd::journal::EventEntry event_entry;
522     int r = inspect_entry(entry, event_entry, m_verbose);
523     if (r < 0) {
524       m_s.error++;
525       m_r = r;
526       return m_no_error ? 0 : r;
527     } else {
528       type = event_entry.get_event_type();
529     }
530     ExportEntry export_entry(tag_id, replay_entry.get_commit_tid(), type,
531                              entry);
532     JSONFormatter f;
533     ::encode_json("event_entry", export_entry, &f);
534     std::ostringstream oss;
535     f.flush(oss);
536     std::string objstr = oss.str();
537     std::string header = stringify(objstr.size()) + " ";
538     r = safe_write(m_fd, header.c_str(), header.size());
539     if (r == 0) {
540       r = safe_write(m_fd, objstr.c_str(), objstr.size());
541     }
542     if (r == 0) {
543       r = safe_write(m_fd, "\n", 1);
544     }
545     if (r < 0) {
546       std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
547                 << std::endl;
548       m_s.error++;
549       return r;
550     }
551     return 0;
552   }
553
554   std::string m_journal_id;
555   int m_fd;
556   bool m_no_error;
557   bool m_verbose;
558   Stats m_s;
559 };
560
561 static int do_export_journal(librados::IoCtx& io_ctx,
562                              const std::string& journal_id,
563                              const std::string& path,
564                              bool no_error, bool verbose) {
565   int r;
566   int fd;
567   bool to_stdout = path == "-";
568   if (to_stdout) {
569     fd = STDOUT_FILENO;
570   } else {
571     fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
572     if (fd < 0) {
573       r = -errno;
574       std::cerr << "rbd: error creating " << path << std::endl;
575       return r;
576     }
577 #ifdef HAVE_POSIX_FADVISE
578     posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
579 #endif
580   }
581
582   JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose);
583   r = exporter.exec();
584
585   if (!to_stdout) {
586     close(fd);
587   }
588
589   int shut_down_r = exporter.shut_down();
590   if (r == 0 && shut_down_r < 0) {
591     r = shut_down_r;
592   }
593
594   return r;
595 }
596
597 class JournalImporter {
598 public:
599   JournalImporter(librados::IoCtx& io_ctx, const std::string& journal_id,
600                   int fd, bool no_error, bool verbose) :
601     m_journaler(io_ctx, journal_id, "IMPORT"),
602     m_fd(fd),
603     m_no_error(no_error),
604     m_verbose(verbose) {
605   }
606
607   bool read_entry(bufferlist& bl, int& r) {
608     // Entries are storead in the file using the following format:
609     //
610     //   # Optional comments
611     //   NNN {json encoded entry}
612     //   ...
613     //
614     // Where NNN is the encoded entry size.
615     bl.clear();
616     char buf[80];
617     // Skip line feed and comments (lines started with #).
618     while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
619       if (buf[0] == '\n') {
620         continue;
621       } else if (buf[0] == '#') {
622         while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
623           if (buf[0] == '\n') {
624             break;
625           }
626         }
627       } else {
628         break;
629       }
630     }
631     if (r < 0) {
632       if (r == -EDOM) {
633         r = 0;
634       }
635       return false;
636     }
637     // Read entry size to buf.
638     if (!isdigit(buf[0])) {
639       r = -EINVAL;
640       std::cerr << "rbd: import data invalid format (digit expected)"
641                 << std::endl;
642       return false;
643     }
644     for (size_t i = 1; i < sizeof(buf); i++) {
645       r = safe_read_exact(m_fd, buf + i, 1);
646       if (r < 0) {
647         std::cerr << "rbd: error reading import data" << std::endl;
648         return false;
649       }
650       if (!isdigit(buf[i])) {
651         if (buf[i] != ' ') {
652           r = -EINVAL;
653           std::cerr << "rbd: import data invalid format (space expected)"
654                     << std::endl;
655           return false;
656         }
657         buf[i] = '\0';
658         break;
659       }
660     }
661     int entry_size = atoi(buf);
662     if (entry_size == 0) {
663       r = -EINVAL;
664       std::cerr << "rbd: import data invalid format (zero entry size)"
665                 << std::endl;
666       return false;
667     }
668     assert(entry_size > 0);
669     // Read entry.
670     r = bl.read_fd(m_fd, entry_size);
671     if (r < 0) {
672       std::cerr << "rbd: error reading from stdin: " << cpp_strerror(r)
673                 << std::endl;
674       return false;
675     }
676     if (r != entry_size) {
677       std::cerr << "rbd: error reading from stdin: trucated"
678                 << std::endl;
679       r = -EINVAL;
680       return false;
681     }
682     r = 0;
683     return true;
684   }
685
686   int exec() {
687     int r = m_journaler.init();
688     if (r < 0) {
689       return r;
690     }
691     m_journaler.start_append(0, 0, 0);
692
693     int r1 = 0;
694     bufferlist bl;
695     int n = 0;
696     int error_count = 0;
697     while (read_entry(bl, r)) {
698       n++;
699       error_count++;
700       JSONParser p;
701       if (!p.parse(bl.c_str(), bl.length())) {
702         std::cerr << "rbd: error parsing input (entry " << n << ")"
703                   << std::endl;
704         r = -EINVAL;
705         if (m_no_error) {
706           r1 = r;
707           continue;
708         } else {
709           break;
710         }
711       }
712       ExportEntry e;
713       try {
714         decode_json_obj(e, &p);
715       } catch (JSONDecoder::err& err) {
716         std::cerr << "rbd: error json decoding import data (entry " << n << "):"
717                   << err.message << std::endl;
718         r = -EINVAL;
719         if (m_no_error) {
720           r1 = r;
721           continue;
722         } else {
723           break;
724         }
725       }
726       librbd::journal::EventEntry event_entry;
727       r = inspect_entry(e.entry, event_entry, m_verbose);
728       if (r < 0) {
729         std::cerr << "rbd: corrupted entry " << n << ": tag_tid=" << e.tag_id
730                   << ", commit_tid=" << e.commit_tid << std::endl;
731         if (m_no_error) {
732           r1 = r;
733           continue;
734         } else {
735           break;
736         }
737       }
738       m_journaler.append(e.tag_id, e.entry);
739       error_count--;
740     }
741
742     std::cout << n << " entries processed, " << error_count << " errors"  << std::endl;
743
744     std::cout << "Waiting for journal append to complete..."  << std::endl;
745
746     C_SaferCond cond;
747     m_journaler.stop_append(&cond);
748     r = cond.wait();
749
750     if (r < 0) {
751       std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl;
752     }
753
754     if (r1 < 0 && r == 0) {
755       r = r1;
756     }
757     return r;
758   }
759
760   int shut_down() {
761     return m_journaler.shut_down();
762   }
763
764 private:
765   Journaler m_journaler;
766   int m_fd;
767   bool m_no_error;
768   bool m_verbose;
769 };
770
771 static int do_import_journal(librados::IoCtx& io_ctx,
772                              const std::string& journal_id,
773                              const std::string& path,
774                              bool no_error, bool verbose) {
775   int r;
776
777   int fd;
778   bool from_stdin = path == "-";
779   if (from_stdin) {
780     fd = STDIN_FILENO;
781   } else {
782     if ((fd = open(path.c_str(), O_RDONLY)) < 0) {
783       r = -errno;
784       std::cerr << "rbd: error opening " << path << std::endl;
785       return r;
786     }
787 #ifdef HAVE_POSIX_FADVISE
788     posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
789 #endif
790   }
791
792   JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose);
793   r = importer.exec();
794
795   if (!from_stdin) {
796     close(fd);
797   }
798
799   int shut_down_r = importer.shut_down();
800   if (r == 0 && shut_down_r < 0) {
801     r = shut_down_r;
802   }
803
804   return r;
805 }
806
807 void get_info_arguments(po::options_description *positional,
808                         po::options_description *options) {
809   at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
810   at::add_format_options(options);
811 }
812
813 int execute_info(const po::variables_map &vm) {
814   size_t arg_index = 0;
815   std::string pool_name;
816   std::string journal_name;
817   int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
818                                         &arg_index, &pool_name, &journal_name);
819   if (r < 0) {
820     return r;
821   }
822
823   at::Format::Formatter formatter;
824   r = utils::get_formatter(vm, &formatter);
825   if (r < 0) {
826     return r;
827   }
828
829   librados::Rados rados;
830   librados::IoCtx io_ctx;
831   r = utils::init(pool_name, &rados, &io_ctx);
832   if (r < 0) {
833     return r;
834   }
835
836   r = do_show_journal_info(rados, io_ctx, journal_name, formatter.get());
837   if (r < 0) {
838     std::cerr << "rbd: journal info: " << cpp_strerror(r) << std::endl;
839     return r;
840   }
841   return 0;
842
843 }
844
845 void get_status_arguments(po::options_description *positional,
846                           po::options_description *options) {
847   at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
848   at::add_format_options(options);
849 }
850
851 int execute_status(const po::variables_map &vm) {
852   size_t arg_index = 0;
853   std::string pool_name;
854   std::string journal_name;
855   int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
856                                         &arg_index, &pool_name, &journal_name);
857   if (r < 0) {
858     return r;
859   }
860
861   at::Format::Formatter formatter;
862   r = utils::get_formatter(vm, &formatter);
863   if (r < 0) {
864     return r;
865   }
866
867   librados::Rados rados;
868   librados::IoCtx io_ctx;
869   r = utils::init(pool_name, &rados, &io_ctx);
870   if (r < 0) {
871     return r;
872   }
873
874   r = do_show_journal_status(io_ctx, journal_name, formatter.get());
875   if (r < 0) {
876     std::cerr << "rbd: journal status: " << cpp_strerror(r) << std::endl;
877     return r;
878   }
879   return 0;
880 }
881
882 void get_reset_arguments(po::options_description *positional,
883                          po::options_description *options) {
884   at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
885 }
886
887 int execute_reset(const po::variables_map &vm) {
888   size_t arg_index = 0;
889   std::string pool_name;
890   std::string journal_name;
891   int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
892                                         &arg_index, &pool_name, &journal_name);
893   if (r < 0) {
894     return r;
895   }
896
897   librados::Rados rados;
898   librados::IoCtx io_ctx;
899   r = utils::init(pool_name, &rados, &io_ctx);
900   if (r < 0) {
901     return r;
902   }
903
904   r = do_reset_journal(io_ctx, journal_name);
905   if (r < 0) {
906     std::cerr << "rbd: journal reset: " << cpp_strerror(r) << std::endl;
907     return r;
908   }
909   return 0;
910 }
911
912 void get_client_disconnect_arguments(po::options_description *positional,
913                                      po::options_description *options) {
914   at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
915   options->add_options()
916     ("client-id", po::value<std::string>(),
917      "client ID (or leave unspecified to disconnect all)");
918 }
919
920 int execute_client_disconnect(const po::variables_map &vm) {
921   size_t arg_index = 0;
922   std::string pool_name;
923   std::string journal_name;
924   int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
925                                         &arg_index, &pool_name, &journal_name);
926   if (r < 0) {
927     return r;
928   }
929
930   std::string client_id;
931   if (vm.count("client-id")) {
932     client_id = vm["client-id"].as<std::string>();
933   }
934
935   librados::Rados rados;
936   librados::IoCtx io_ctx;
937   r = utils::init(pool_name, &rados, &io_ctx);
938   if (r < 0) {
939     return r;
940   }
941
942   r = do_disconnect_journal_client(io_ctx, journal_name, client_id);
943   if (r < 0) {
944     std::cerr << "rbd: journal client disconnect: " << cpp_strerror(r)
945               << std::endl;
946     return r;
947   }
948   return 0;
949 }
950
951 void get_inspect_arguments(po::options_description *positional,
952                            po::options_description *options) {
953   at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
954   at::add_verbose_option(options);
955 }
956
957 int execute_inspect(const po::variables_map &vm) {
958   size_t arg_index = 0;
959   std::string pool_name;
960   std::string journal_name;
961   int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
962                                         &arg_index, &pool_name, &journal_name);
963   if (r < 0) {
964     return r;
965   }
966
967   librados::Rados rados;
968   librados::IoCtx io_ctx;
969   r = utils::init(pool_name, &rados, &io_ctx);
970   if (r < 0) {
971     return r;
972   }
973
974   r = do_inspect_journal(io_ctx, journal_name, vm[at::VERBOSE].as<bool>());
975   if (r < 0) {
976     std::cerr << "rbd: journal inspect: " << cpp_strerror(r) << std::endl;
977     return r;
978   }
979   return 0;
980 }
981
982 void get_export_arguments(po::options_description *positional,
983                           po::options_description *options) {
984   at::add_journal_spec_options(positional, options,
985                                at::ARGUMENT_MODIFIER_SOURCE);
986   at::add_path_options(positional, options,
987                        "export file (or '-' for stdout)");
988   at::add_verbose_option(options);
989   at::add_no_error_option(options);
990 }
991
992 int execute_export(const po::variables_map &vm) {
993   size_t arg_index = 0;
994   std::string pool_name;
995   std::string journal_name;
996   int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_SOURCE,
997                                         &arg_index, &pool_name, &journal_name);
998   if (r < 0) {
999     return r;
1000   }
1001
1002   std::string path;
1003   r = utils::get_path(vm, utils::get_positional_argument(vm, 1), &path);
1004   if (r < 0) {
1005     return r;
1006   }
1007
1008   librados::Rados rados;
1009   librados::IoCtx io_ctx;
1010   r = utils::init(pool_name, &rados, &io_ctx);
1011   if (r < 0) {
1012     return r;
1013   }
1014
1015   r = do_export_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(),
1016                         vm[at::VERBOSE].as<bool>());
1017   if (r < 0) {
1018     std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl;
1019     return r;
1020   }
1021   return 0;
1022 }
1023
1024 void get_import_arguments(po::options_description *positional,
1025                           po::options_description *options) {
1026   at::add_path_options(positional, options,
1027                        "import file (or '-' for stdin)");
1028   at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_DEST);
1029   at::add_verbose_option(options);
1030   at::add_no_error_option(options);
1031 }
1032
1033 int execute_import(const po::variables_map &vm) {
1034   std::string path;
1035   int r = utils::get_path(vm, utils::get_positional_argument(vm, 0), &path);
1036   if (r < 0) {
1037     return r;
1038   }
1039
1040   size_t arg_index = 1;
1041   std::string pool_name;
1042   std::string journal_name;
1043   r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_DEST,
1044                                     &arg_index, &pool_name, &journal_name);
1045   if (r < 0) {
1046     return r;
1047   }
1048
1049   librados::Rados rados;
1050   librados::IoCtx io_ctx;
1051   r = utils::init(pool_name, &rados, &io_ctx);
1052   if (r < 0) {
1053     return r;
1054   }
1055
1056   r = do_import_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(),
1057                         vm[at::VERBOSE].as<bool>());
1058   if (r < 0) {
1059     std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl;
1060     return r;
1061   }
1062   return 0;
1063 }
1064
1065 Shell::Action action_info(
1066   {"journal", "info"}, {}, "Show information about image journal.", "",
1067   &get_info_arguments, &execute_info);
1068
1069 Shell::Action action_status(
1070   {"journal", "status"}, {}, "Show status of image journal.", "",
1071   &get_status_arguments, &execute_status);
1072
1073 Shell::Action action_reset(
1074   {"journal", "reset"}, {}, "Reset image journal.", "",
1075   &get_reset_arguments, &execute_reset);
1076
1077 Shell::Action action_inspect(
1078   {"journal", "inspect"}, {}, "Inspect image journal for structural errors.", "",
1079   &get_inspect_arguments, &execute_inspect);
1080
1081 Shell::Action action_export(
1082   {"journal", "export"}, {}, "Export image journal.", "",
1083   &get_export_arguments, &execute_export);
1084
1085 Shell::Action action_import(
1086   {"journal", "import"}, {}, "Import image journal.", "",
1087   &get_import_arguments, &execute_import);
1088
1089 Shell::Action action_disconnect(
1090   {"journal", "client", "disconnect"}, {},
1091   "Flag image journal client as disconnected.", "",
1092   &get_client_disconnect_arguments, &execute_client_disconnect);
1093
1094 } // namespace journal
1095 } // namespace action
1096 } // namespace rbd