Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rados / rados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "include/types.h"
16
17 #include "include/rados/librados.hpp"
18 #include "include/rados/rados_types.hpp"
19 #include "include/radosstriper/libradosstriper.hpp"
20 using namespace libradosstriper;
21
22 #include "common/config.h"
23 #include "common/ceph_argparse.h"
24 #include "global/global_init.h"
25 #include "common/Cond.h"
26 #include "common/debug.h"
27 #include "common/errno.h"
28 #include "common/Formatter.h"
29 #include "common/obj_bencher.h"
30 #include "common/TextTable.h"
31 #include "include/stringify.h"
32 #include "mds/inode_backtrace.h"
33 #include "auth/Crypto.h"
34 #include <iostream>
35 #include <fstream>
36
37 #include <stdlib.h>
38 #include <time.h>
39 #include <sstream>
40 #include <errno.h>
41 #include <dirent.h>
42 #include <stdexcept>
43 #include <climits>
44 #include <locale>
45 #include <memory>
46
47 #include "cls/lock/cls_lock_client.h"
48 #include "include/compat.h"
49 #include "include/util.h"
50 #include "common/hobject.h"
51
52 #include "PoolDump.h"
53 #include "RadosImport.h"
54
55 using namespace librados;
56
57 // two steps seem to be necessary to do this right
58 #define STR(x) _STR(x)
59 #define _STR(x) #x
60
61 void usage(ostream& out)
62 {
63   out <<                                        \
64 "usage: rados [options] [commands]\n"
65 "POOL COMMANDS\n"
66 "   lspools                          list pools\n"
67 "   mkpool <pool-name> [123[ 4]]     create pool <pool-name>'\n"
68 "                                    [with auid 123[and using crush rule 4]]\n"
69 "   cppool <pool-name> <dest-pool>   copy content of a pool\n"
70 "   rmpool <pool-name> [<pool-name> --yes-i-really-really-mean-it]\n"
71 "                                    remove pool <pool-name>'\n"
72 "   purge <pool-name> --yes-i-really-really-mean-it\n"
73 "                                    remove all objects from pool <pool-name> without removing it\n"
74 "   df                               show per-pool and total usage\n"
75 "   ls                               list objects in pool\n\n"
76 "   chown 123                        change the pool owner to auid 123\n"
77 "\n"
78 "POOL SNAP COMMANDS\n"
79 "   lssnap                           list snaps\n"
80 "   mksnap <snap-name>               create snap <snap-name>\n"
81 "   rmsnap <snap-name>               remove snap <snap-name>\n"
82 "\n"
83 "OBJECT COMMANDS\n"
84 "   get <obj-name> [outfile]         fetch object\n"
85 "   put <obj-name> [infile] [--offset offset]\n"
86 "                                    write object with start offset (default:0)\n"
87 "   append <obj-name> [infile]       append object\n"
88 "   truncate <obj-name> length       truncate object\n"
89 "   create <obj-name>                create object\n"
90 "   rm <obj-name> ...[--force-full]  [force no matter full or not]remove object(s)\n"
91 "   cp <obj-name> [target-obj]       copy object\n"
92 "   listxattr <obj-name>\n"
93 "   getxattr <obj-name> attr\n"
94 "   setxattr <obj-name> attr val\n"
95 "   rmxattr <obj-name> attr\n"
96 "   stat <obj-name>                  stat the named object\n"
97 "   mapext <obj-name>\n"
98 "   rollback <obj-name> <snap-name>  roll back object to snap <snap-name>\n"
99 "\n"
100 "   listsnaps <obj-name>             list the snapshots of this object\n"
101 "   bench <seconds> write|seq|rand [-t concurrent_operations] [--no-cleanup] [--run-name run_name] [--no-hints]\n"
102 "                                    default is 16 concurrent IOs and 4 MB ops\n"
103 "                                    default is to clean up after write benchmark\n"
104 "                                    default run-name is 'benchmark_last_metadata'\n"
105 "   cleanup [--run-name run_name] [--prefix prefix]\n"
106 "                                    clean up a previous benchmark operation\n"
107 "                                    default run-name is 'benchmark_last_metadata'\n"
108 "   load-gen [options]               generate load on the cluster\n"
109 "   listomapkeys <obj-name>          list the keys in the object map\n"
110 "   listomapvals <obj-name>          list the keys and vals in the object map \n"
111 "   getomapval <obj-name> <key> [file] show the value for the specified key\n"
112 "                                    in the object's object map\n"
113 "   setomapval <obj-name> <key> <val>\n"
114 "   rmomapkey <obj-name> <key>\n"
115 "   getomapheader <obj-name> [file]\n"
116 "   setomapheader <obj-name> <val>\n"
117 "   tmap-to-omap <obj-name>          convert tmap keys/values to omap\n"
118 "   watch <obj-name>                 add watcher on this object\n"
119 "   notify <obj-name> <message>      notify watcher of this object with message\n"
120 "   listwatchers <obj-name>          list the watchers of this object\n"
121 "   set-alloc-hint <obj-name> <expected-object-size> <expected-write-size>\n"
122 "                                    set allocation hint for an object\n"
123 "\n"
124 "IMPORT AND EXPORT\n"
125 "   export [filename]\n"
126 "       Serialize pool contents to a file or standard out.\n"
127 "   import [--dry-run] [--no-overwrite] < filename | - >\n"
128 "       Load pool contents from a file or standard in\n"
129 "\n"
130 "ADVISORY LOCKS\n"
131 "   lock list <obj-name>\n"
132 "       List all advisory locks on an object\n"
133 "   lock get <obj-name> <lock-name>\n"
134 "       Try to acquire a lock\n"
135 "   lock break <obj-name> <lock-name> <locker-name>\n"
136 "       Try to break a lock acquired by another client\n"
137 "   lock info <obj-name> <lock-name>\n"
138 "       Show lock information\n"
139 "   options:\n"
140 "       --lock-tag                   Lock tag, all locks operation should use\n"
141 "                                    the same tag\n"
142 "       --lock-cookie                Locker cookie\n"
143 "       --lock-description           Description of lock\n"
144 "       --lock-duration              Lock duration (in seconds)\n"
145 "       --lock-type                  Lock type (shared, exclusive)\n"
146 "\n"
147 "SCRUB AND REPAIR:\n"
148 "   list-inconsistent-pg <pool>      list inconsistent PGs in given pool\n"
149 "   list-inconsistent-obj <pgid>     list inconsistent objects in given pg\n"
150 "   list-inconsistent-snapset <pgid> list inconsistent snapsets in the given pg\n"
151 "\n"
152 "CACHE POOLS: (for testing/development only)\n"
153 "   cache-flush <obj-name>           flush cache pool object (blocking)\n"
154 "   cache-try-flush <obj-name>       flush cache pool object (non-blocking)\n"
155 "   cache-evict <obj-name>           evict cache pool object\n"
156 "   cache-flush-evict-all            flush+evict all objects\n"
157 "   cache-try-flush-evict-all        try-flush+evict all objects\n"
158 "\n"
159 "GLOBAL OPTIONS:\n"
160 "   --object_locator object_locator\n"
161 "        set object_locator for operation\n"
162 "   -p pool\n"
163 "   --pool=pool\n"
164 "        select given pool by name\n"
165 "   --target-pool=pool\n"
166 "        select target pool by name\n"
167 "   -b op_size\n"
168 "        set the block size for put/get ops and for write benchmarking\n"
169 "   -o object_size\n"
170 "        set the object size for put/get ops and for write benchmarking\n"
171 "   --max-objects\n"
172 "        set the max number of objects for write benchmarking\n"
173 "   -s name\n"
174 "   --snap name\n"
175 "        select given snap name for (read) IO\n"
176 "   -i infile\n"
177 "   --create\n"
178 "        create the pool or directory that was specified\n"
179 "   -N namespace\n"
180 "   --namespace=namespace\n"
181 "        specify the namespace to use for the object\n"
182 "   --all\n"
183 "        Use with ls to list objects in all namespaces\n"
184 "        Put in CEPH_ARGS environment variable to make this the default\n"
185 "   --default\n"
186 "        Use with ls to list objects in default namespace\n"
187 "        Takes precedence over --all in case --all is in environment\n"
188 "   --target-locator\n"
189 "        Use with cp to specify the locator of the new object\n"
190 "   --target-nspace\n"
191 "        Use with cp to specify the namespace of the new object\n"
192 "   --striper\n"
193 "        Use radostriper interface rather than pure rados\n"
194 "        Available for stat, get, put, truncate, rm, ls and \n"
195 "        all xattr related operations\n"
196 "\n"
197 "BENCH OPTIONS:\n"
198 "   -t N\n"
199 "   --concurrent-ios=N\n"
200 "        Set number of concurrent I/O operations\n"
201 "   --show-time\n"
202 "        prefix output with date/time\n"
203 "   --no-verify\n"
204 "        do not verify contents of read objects\n"
205 "   --write-object\n"
206 "        write contents to the objects\n"
207 "   --write-omap\n"
208 "        write contents to the omap\n"
209 "   --write-xattr\n"
210 "        write contents to the extended attributes\n"
211 "\n"
212 "LOAD GEN OPTIONS:\n"
213 "   --num-objects                    total number of objects\n"
214 "   --min-object-size                min object size\n"
215 "   --max-object-size                max object size\n"
216 "   --min-op-len                     min io size of operations\n"
217 "   --max-op-len                     max io size of operations\n"
218 "   --max-ops                        max number of operations\n"
219 "   --max-backlog                    max backlog size\n"
220 "   --read-percent                   percent of operations that are read\n"
221 "   --target-throughput              target throughput (in bytes)\n"
222 "   --run-length                     total time (in seconds)\n"
223 "CACHE POOLS OPTIONS:\n"
224 "   --with-clones                    include clones when doing flush or evict\n"
225 "OMAP OPTIONS:\n"
226 "    --omap-key-file file            read the omap key from a file\n";
227 }
228
229 unsigned default_op_size = 1 << 22;
230
231 static void usage_exit()
232 {
233   usage(cerr);
234   exit(1);
235 }
236
237
238 template <typename I, typename T>
239 static int rados_sistrtoll(I &i, T *val) {
240   std::string err;
241   *val = strict_sistrtoll(i->second.c_str(), &err);
242   if (err != "") {
243     cerr << "Invalid value for " << i->first << ": " << err << std::endl;
244     return -EINVAL;
245   } else {
246     return 0;
247   }
248 }
249
250
251 static int dump_data(std::string const &filename, bufferlist const &data)
252 {
253   int fd;
254   if (filename == "-") {
255     fd = STDOUT_FILENO;
256   } else {
257     fd = TEMP_FAILURE_RETRY(::open(filename.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0644));
258     if (fd < 0) {
259       int err = errno;
260       cerr << "failed to open file: " << cpp_strerror(err) << std::endl;
261       return -err;
262     }
263   }
264
265   int r = data.write_fd(fd);
266
267   if (fd != 1) {
268     VOID_TEMP_FAILURE_RETRY(::close(fd));
269   }
270
271   return r;
272 }
273
274
275 static int do_get(IoCtx& io_ctx, RadosStriper& striper,
276                   const char *objname, const char *outfile, unsigned op_size,
277                   bool use_striper)
278 {
279   string oid(objname);
280
281   int fd;
282   if (strcmp(outfile, "-") == 0) {
283     fd = STDOUT_FILENO;
284   } else {
285     fd = TEMP_FAILURE_RETRY(::open(outfile, O_WRONLY|O_CREAT|O_TRUNC, 0644));
286     if (fd < 0) {
287       int err = errno;
288       cerr << "failed to open file: " << cpp_strerror(err) << std::endl;
289       return -err;
290     }
291   }
292
293   uint64_t offset = 0;
294   int ret;
295   while (true) {
296     bufferlist outdata;
297     if (use_striper) {
298       ret = striper.read(oid, &outdata, op_size, offset);
299     } else {
300       ret = io_ctx.read(oid, outdata, op_size, offset);
301     }
302     if (ret <= 0) {
303       goto out;
304     }
305     ret = outdata.write_fd(fd);
306     if (ret < 0) {
307       cerr << "error writing to file: " << cpp_strerror(ret) << std::endl;
308       goto out;
309     }
310     if (outdata.length() < op_size)
311       break;
312     offset += outdata.length();
313   }
314   ret = 0;
315
316  out:
317   if (fd != 1)
318     VOID_TEMP_FAILURE_RETRY(::close(fd));
319   return ret;
320 }
321
322 static int do_copy(IoCtx& io_ctx, const char *objname,
323                    IoCtx& target_ctx, const char *target_obj)
324 {
325   __le32 src_fadvise_flags = LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL | LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
326   __le32 dest_fadvise_flags = LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL | LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
327   ObjectWriteOperation op;
328   op.copy_from2(objname, io_ctx, 0, src_fadvise_flags);
329   op.set_op_flags2(dest_fadvise_flags);
330
331   return target_ctx.operate(target_obj, &op);
332 }
333
334 static int do_copy_pool(Rados& rados, const char *src_pool, const char *target_pool)
335 {
336   IoCtx src_ctx, target_ctx;
337   int ret = rados.ioctx_create(src_pool, src_ctx);
338   if (ret < 0) {
339     cerr << "cannot open source pool: " << src_pool << std::endl;
340     return ret;
341   }
342   ret = rados.ioctx_create(target_pool, target_ctx);
343   if (ret < 0) {
344     cerr << "cannot open target pool: " << target_pool << std::endl;
345     return ret;
346   }
347   src_ctx.set_namespace(all_nspaces);
348   librados::NObjectIterator i = src_ctx.nobjects_begin();
349   librados::NObjectIterator i_end = src_ctx.nobjects_end();
350   for (; i != i_end; ++i) {
351     string nspace = i->get_nspace();
352     string oid = i->get_oid();
353     string locator = i->get_locator();
354
355     string target_name = (nspace.size() ? nspace + "/" : "") + oid;
356     string src_name = target_name;
357     if (locator.size())
358         src_name += "(@" + locator + ")";
359     cout << src_pool << ":" << src_name  << " => "
360          << target_pool << ":" << target_name << std::endl;
361
362     src_ctx.locator_set_key(locator);
363     src_ctx.set_namespace(nspace);
364     target_ctx.set_namespace(nspace);
365     ret = do_copy(src_ctx, oid.c_str(), target_ctx, oid.c_str());
366     if (ret < 0) {
367       cerr << "error copying object: " << cpp_strerror(errno) << std::endl;
368       return ret;
369     }
370   }
371
372   return 0;
373 }
374
375 static int do_put(IoCtx& io_ctx, RadosStriper& striper,
376                   const char *objname, const char *infile, int op_size,
377                   uint64_t obj_offset, bool use_striper)
378 {
379   string oid(objname);
380   bool stdio = (strcmp(infile, "-") == 0);
381   int ret = 0;
382   int fd = STDIN_FILENO;
383   if (!stdio)
384     fd = open(infile, O_RDONLY);
385   if (fd < 0) {
386     cerr << "error reading input file " << infile << ": " << cpp_strerror(errno) << std::endl;
387     return 1;
388   }
389   int count = op_size;
390   uint64_t offset = obj_offset;
391   while (count != 0) {
392     bufferlist indata;
393     count = indata.read_fd(fd, op_size);
394     if (count < 0) {
395       ret = -errno;
396       cerr << "error reading input file " << infile << ": " << cpp_strerror(ret) << std::endl;
397       goto out;
398     }
399  
400     if (count == 0) {
401      if (offset == obj_offset) { // in case we have to create an empty object & if obj_offset > 0 do a hole
402         if (use_striper) {
403           ret = striper.write_full(oid, indata); // indata is empty
404         } else {
405           ret = io_ctx.write_full(oid, indata); // indata is empty
406         }
407         if (ret < 0) {
408           goto out;
409         }
410         if (offset) {
411           if (use_striper) {
412             ret = striper.trunc(oid, offset); // before truncate, object must be existed.
413           } else {
414             ret = io_ctx.trunc(oid, offset); // before truncate, object must be existed.
415           }
416
417           if (ret < 0) {
418             goto out;
419           }
420         }
421       }
422       continue;
423     }
424     if (use_striper) {
425       if (offset == 0)
426         ret = striper.write_full(oid, indata);
427       else
428         ret = striper.write(oid, indata, count, offset);
429     } else {
430       if (offset == 0)
431         ret = io_ctx.write_full(oid, indata);
432       else
433         ret = io_ctx.write(oid, indata, count, offset);
434     }
435
436     if (ret < 0) {
437       goto out;
438     }
439     offset += count;
440   }
441   ret = 0;
442  out:
443   if (fd != STDOUT_FILENO)
444     VOID_TEMP_FAILURE_RETRY(close(fd));
445   return ret;
446 }
447
448 static int do_append(IoCtx& io_ctx, RadosStriper& striper,
449                   const char *objname, const char *infile, int op_size,
450                   bool use_striper)
451 {
452   string oid(objname);
453   bool stdio = (strcmp(infile, "-") == 0);
454   int ret = 0;
455   int fd = STDIN_FILENO;
456   if (!stdio)
457     fd = open(infile, O_RDONLY);
458   if (fd < 0) {
459     cerr << "error reading input file " << infile << ": " << cpp_strerror(errno) << std::endl;
460     return 1;
461   }
462   int count = op_size;
463   while (count != 0) {
464     bufferlist indata;
465     count = indata.read_fd(fd, op_size);
466     if (count < 0) {
467       ret = -errno;
468       cerr << "error reading input file " << infile << ": " << cpp_strerror(ret) << std::endl;
469       goto out;
470     }
471     if (use_striper) {
472       ret = striper.append(oid, indata, count);
473     } else {
474       ret = io_ctx.append(oid, indata, count);
475     }
476
477     if (ret < 0) {
478       goto out;
479     }
480   }
481   ret = 0;
482 out:
483   if (fd != STDOUT_FILENO)
484     VOID_TEMP_FAILURE_RETRY(close(fd));
485   return ret;
486 }
487
488 class RadosWatchCtx : public librados::WatchCtx2 {
489   IoCtx& ioctx;
490   string name;
491 public:
492   RadosWatchCtx(IoCtx& io, const char *imgname) : ioctx(io), name(imgname) {}
493   ~RadosWatchCtx() override {}
494   void handle_notify(uint64_t notify_id,
495                      uint64_t cookie,
496                      uint64_t notifier_id,
497                      bufferlist& bl) override {
498     cout << "NOTIFY"
499          << " cookie " << cookie
500          << " notify_id " << notify_id
501          << " from " << notifier_id
502          << std::endl;
503     bl.hexdump(cout);
504     ioctx.notify_ack(name, notify_id, cookie, bl);
505   }
506   void handle_error(uint64_t cookie, int err) override {
507     cout << "ERROR"
508          << " cookie " << cookie
509          << " err " << cpp_strerror(err)
510          << std::endl;
511   }
512 };
513
514 static const char alphanum_table[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
515
516 int gen_rand_alphanumeric(char *dest, int size) /* size should be the required string size + 1 */
517 {
518   int ret = get_random_bytes(dest, size);
519   if (ret < 0) {
520     cerr << "cannot get random bytes: " << cpp_strerror(ret) << std::endl;
521     return -1;
522   }
523
524   int i;
525   for (i=0; i<size - 1; i++) {
526     int pos = (unsigned)dest[i];
527     dest[i] = alphanum_table[pos & 63];
528   }
529   dest[i] = '\0';
530
531   return 0;
532 }
533
534 struct obj_info {
535   string name;
536   size_t len;
537 };
538
539 class LoadGen {
540   size_t total_sent;
541   size_t total_completed;
542
543   IoCtx io_ctx;
544   Rados *rados;
545
546   map<int, obj_info> objs;
547
548   utime_t start_time;
549
550   bool going_down;
551
552 public:
553   int read_percent;
554   int num_objs;
555   size_t min_obj_len;
556   uint64_t max_obj_len;
557   size_t min_op_len;
558   size_t max_op_len;
559   size_t max_ops;
560   size_t max_backlog;
561   size_t target_throughput;
562   int run_length;
563
564   enum {
565     OP_READ,
566     OP_WRITE,
567   };
568
569   struct LoadGenOp {
570     int id;
571     int type;
572     string oid;
573     size_t off;
574     size_t len;
575     bufferlist bl;
576     LoadGen *lg;
577     librados::AioCompletion *completion;
578
579     LoadGenOp() : id(0), type(0), off(0), len(0), lg(NULL), completion(NULL) {}
580     explicit LoadGenOp(LoadGen *_lg) : id(0), type(0), off(0), len(0), lg(_lg), completion(NULL) {}
581   };
582
583   int max_op;
584
585   map<int, LoadGenOp *> pending_ops;
586
587   void gen_op(LoadGenOp *op);
588   uint64_t gen_next_op();
589   void run_op(LoadGenOp *op);
590
591   uint64_t cur_sent_rate() {
592     return total_sent / time_passed();
593   }
594
595   uint64_t cur_completed_rate() {
596     return total_completed / time_passed();
597   }
598
599   uint64_t total_expected() {
600     return target_throughput * time_passed();
601   }
602
603   float time_passed() {
604     utime_t now = ceph_clock_now();
605     now -= start_time;
606     uint64_t ns = now.nsec();
607     float total = (float) ns / 1000000000.0;
608     total += now.sec();
609     return total;
610   }
611
612   Mutex lock;
613   Cond cond;
614
615   explicit LoadGen(Rados *_rados) : rados(_rados), going_down(false), lock("LoadGen") {
616     read_percent = 80;
617     min_obj_len = 1024;
618     max_obj_len = 5ull * 1024ull * 1024ull * 1024ull;
619     min_op_len = 1024;
620     target_throughput = 5 * 1024 * 1024; // B/sec
621     max_op_len = 2 * 1024 * 1024;
622     max_ops = 16; 
623     max_backlog = target_throughput * 2;
624     run_length = 60;
625
626     total_sent = 0;
627     total_completed = 0;
628     num_objs = 200;
629     max_op = 0;
630   }
631   int bootstrap(const char *pool);
632   int run();
633   void cleanup();
634
635   void io_cb(completion_t c, LoadGenOp *op) {
636     Mutex::Locker l(lock);
637
638     total_completed += op->len;
639
640     double rate = (double)cur_completed_rate() / (1024 * 1024);
641     std::streamsize original_precision = cout.precision();
642     cout.precision(3);
643     cout << "op " << op->id << " completed, throughput=" << rate  << "MB/sec" << std::endl;
644     cout.precision(original_precision);
645
646     map<int, LoadGenOp *>::iterator iter = pending_ops.find(op->id);
647     if (iter != pending_ops.end())
648       pending_ops.erase(iter);
649
650     if (!going_down)
651       op->completion->release();
652
653     delete op;
654
655     cond.Signal();
656   }
657 };
658
659 static void _load_gen_cb(completion_t c, void *param)
660 {
661   LoadGen::LoadGenOp *op = (LoadGen::LoadGenOp *)param;
662   op->lg->io_cb(c, op);
663 }
664
665 int LoadGen::bootstrap(const char *pool)
666 {
667   char buf[128];
668   int i;
669
670   if (!pool) {
671     cerr << "ERROR: pool name was not specified" << std::endl;
672     return -EINVAL;
673   }
674
675   int ret = rados->ioctx_create(pool, io_ctx);
676   if (ret < 0) {
677     cerr << "error opening pool " << pool << ": " << cpp_strerror(ret) << std::endl;
678     return ret;
679   }
680
681   int buf_len = 1;
682   bufferptr p = buffer::create(buf_len);
683   bufferlist bl;
684   memset(p.c_str(), 0, buf_len);
685   bl.push_back(p);
686
687   list<librados::AioCompletion *> completions;
688   for (i = 0; i < num_objs; i++) {
689     obj_info info;
690     gen_rand_alphanumeric(buf, 16);
691     info.name = "obj-";
692     info.name.append(buf);
693     info.len = get_random(min_obj_len, max_obj_len);
694
695     // throttle...
696     while (completions.size() > max_ops) {
697       AioCompletion *c = completions.front();
698       c->wait_for_complete();
699       ret = c->get_return_value();
700       c->release();
701       completions.pop_front();
702       if (ret < 0) {
703         cerr << "aio_write failed" << std::endl;
704         return ret;
705       }
706     }
707
708     librados::AioCompletion *c = rados->aio_create_completion(NULL, NULL, NULL);
709     completions.push_back(c);
710     // generate object
711     ret = io_ctx.aio_write(info.name, c, bl, buf_len, info.len - buf_len);
712     if (ret < 0) {
713       cerr << "couldn't write obj: " << info.name << " ret=" << ret << std::endl;
714       return ret;
715     }
716     objs[i] = info;
717   }
718
719   list<librados::AioCompletion *>::iterator iter;
720   for (iter = completions.begin(); iter != completions.end(); ++iter) {
721     AioCompletion *c = *iter;
722     c->wait_for_complete();
723     ret = c->get_return_value();
724     c->release();
725     if (ret < 0) { // yes, we leak.
726       cerr << "aio_write failed" << std::endl;
727       return ret;
728     }
729   }
730   return 0;
731 }
732
733 void LoadGen::run_op(LoadGenOp *op)
734 {
735   op->completion = rados->aio_create_completion(op, _load_gen_cb, NULL);
736
737   switch (op->type) {
738   case OP_READ:
739     io_ctx.aio_read(op->oid, op->completion, &op->bl, op->len, op->off);
740     break;
741   case OP_WRITE:
742     bufferptr p = buffer::create(op->len);
743     memset(p.c_str(), 0, op->len);
744     op->bl.push_back(p);
745     
746     io_ctx.aio_write(op->oid, op->completion, op->bl, op->len, op->off);
747     break;
748   }
749
750   total_sent += op->len;
751 }
752
753 void LoadGen::gen_op(LoadGenOp *op)
754 {
755   int i = get_random(0, objs.size() - 1);
756   obj_info& info = objs[i];
757   op->oid = info.name;
758
759   size_t len = get_random(min_op_len, max_op_len);
760   if (len > info.len)
761     len = info.len;
762   size_t off = get_random(0, info.len);
763
764   if (off + len > info.len)
765     off = info.len - len;
766
767   op->off = off;
768   op->len = len;
769
770   i = get_random(1, 100);
771   if (i > read_percent)
772     op->type = OP_WRITE;
773   else
774     op->type = OP_READ;
775
776   cout << (op->type == OP_READ ? "READ" : "WRITE") << " : oid=" << op->oid << " off=" << op->off << " len=" << op->len << std::endl;
777 }
778
779 uint64_t LoadGen::gen_next_op()
780 {
781   lock.Lock();
782
783   LoadGenOp *op = new LoadGenOp(this);
784   gen_op(op);
785   op->id = max_op++;
786   pending_ops[op->id] = op;
787
788   lock.Unlock();
789
790   run_op(op);
791
792   return op->len;
793 }
794
795 int LoadGen::run()
796 {
797   start_time = ceph_clock_now();
798   utime_t end_time = start_time;
799   end_time += run_length;
800   utime_t stamp_time = start_time;
801   uint32_t total_sec = 0;
802
803   while (1) {
804     lock.Lock();
805     utime_t one_second(1, 0);
806     cond.WaitInterval(lock, one_second);
807     lock.Unlock();
808     utime_t now = ceph_clock_now();
809
810     if (now > end_time)
811       break;
812
813     uint64_t expected = total_expected();
814     lock.Lock();
815     uint64_t sent = total_sent;
816     uint64_t completed = total_completed;
817     lock.Unlock();
818
819     if (now - stamp_time >= utime_t(1, 0)) {
820       double rate = (double)cur_completed_rate() / (1024 * 1024);
821       ++total_sec;
822       std::streamsize original_precision = cout.precision();
823       cout.precision(3);
824       cout << setw(5) << total_sec << ": throughput=" << rate  << "MB/sec" << " pending data=" << sent - completed << std::endl;
825       cout.precision(original_precision);
826       stamp_time = now; 
827     }
828
829     while (sent < expected &&
830            sent - completed < max_backlog &&
831            pending_ops.size() < max_ops) {
832       sent += gen_next_op();
833     }
834   }
835
836   // get a reference to all pending requests
837   vector<librados::AioCompletion *> completions;
838   lock.Lock();
839   going_down = true;
840   map<int, LoadGenOp *>::iterator iter;
841   for (iter = pending_ops.begin(); iter != pending_ops.end(); ++iter) {
842     LoadGenOp *op = iter->second;
843     completions.push_back(op->completion);
844   }
845   lock.Unlock();
846
847   cout << "waiting for all operations to complete" << std::endl;
848
849   // now wait on all the pending requests
850   for (vector<librados::AioCompletion *>::iterator citer = completions.begin(); citer != completions.end(); ++citer) {
851     librados::AioCompletion *c = *citer;
852     c->wait_for_complete();
853     c->release();
854   }
855
856   return 0;
857 }
858
859 void LoadGen::cleanup()
860 {
861   cout << "cleaning up objects" << std::endl;
862   map<int, obj_info>::iterator iter;
863   for (iter = objs.begin(); iter != objs.end(); ++iter) {
864     obj_info& info = iter->second;
865     int ret = io_ctx.remove(info.name);
866     if (ret < 0)
867       cerr << "couldn't remove obj: " << info.name << " ret=" << ret << std::endl;
868   }
869 }
870
871 enum OpWriteDest {
872   OP_WRITE_DEST_OBJ = 2 << 0,
873   OP_WRITE_DEST_OMAP = 2 << 1,
874   OP_WRITE_DEST_XATTR = 2 << 2,
875 };
876
877 class RadosBencher : public ObjBencher {
878   librados::AioCompletion **completions;
879   librados::Rados& rados;
880   librados::IoCtx& io_ctx;
881   librados::NObjectIterator oi;
882   bool iterator_valid;
883   OpWriteDest write_destination;
884
885 protected:
886   int completions_init(int concurrentios) override {
887     completions = new librados::AioCompletion *[concurrentios];
888     return 0;
889   }
890   void completions_done() override {
891     delete[] completions;
892     completions = NULL;
893   }
894   int create_completion(int slot, void (*cb)(void *, void*), void *arg) override {
895     completions[slot] = rados.aio_create_completion((void *) arg, 0, cb);
896
897     if (!completions[slot])
898       return -EINVAL;
899
900     return 0;
901   }
902   void release_completion(int slot) override {
903     completions[slot]->release();
904     completions[slot] = 0;
905   }
906
907   int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len,
908                size_t offset) override {
909     return io_ctx.aio_read(oid, completions[slot], pbl, len, 0);
910   }
911
912   int aio_write(const std::string& oid, int slot, bufferlist& bl, size_t len,
913                 size_t offset) override {
914     librados::ObjectWriteOperation op;
915
916     if (write_destination & OP_WRITE_DEST_OBJ) {
917       if (data.hints)
918         op.set_alloc_hint2(data.object_size, data.op_size,
919                            ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
920                            ALLOC_HINT_FLAG_SEQUENTIAL_READ |
921                            ALLOC_HINT_FLAG_APPEND_ONLY |
922                            ALLOC_HINT_FLAG_IMMUTABLE);
923       op.write(offset, bl);
924     }
925
926     if (write_destination & OP_WRITE_DEST_OMAP) {
927       std::map<std::string, librados::bufferlist> omap;
928       omap[string("bench-omap-key-") + stringify(offset)] = bl;
929       op.omap_set(omap);
930     }
931
932     if (write_destination & OP_WRITE_DEST_XATTR) {
933       char key[80];
934       snprintf(key, sizeof(key), "bench-xattr-key-%d", (int)offset);
935       op.setxattr(key, bl);
936     }
937
938     return io_ctx.aio_operate(oid, completions[slot], &op);
939   }
940
941   int aio_remove(const std::string& oid, int slot) override {
942     return io_ctx.aio_remove(oid, completions[slot]);
943   }
944
945   int sync_read(const std::string& oid, bufferlist& bl, size_t len) override {
946     return io_ctx.read(oid, bl, len, 0);
947   }
948   int sync_write(const std::string& oid, bufferlist& bl, size_t len) override {
949     return io_ctx.write_full(oid, bl);
950   }
951
952   int sync_remove(const std::string& oid) override {
953     return io_ctx.remove(oid);
954   }
955
956   bool completion_is_done(int slot) override {
957     return completions[slot]->is_safe();
958   }
959
960   int completion_wait(int slot) override {
961     return completions[slot]->wait_for_safe_and_cb();
962   }
963   int completion_ret(int slot) override {
964     return completions[slot]->get_return_value();
965   }
966
967   bool get_objects(std::list<Object>* objects, int num) override {
968     int count = 0;
969
970     if (!iterator_valid) {
971       oi = io_ctx.nobjects_begin();
972       iterator_valid = true;
973     }
974
975     librados::NObjectIterator ei = io_ctx.nobjects_end();
976
977     if (oi == ei) {
978       iterator_valid = false;
979       return false;
980     }
981
982     objects->clear();
983     for ( ; oi != ei && count < num; ++oi) {
984       Object obj(oi->get_oid(), oi->get_nspace());
985       objects->push_back(obj);
986       ++count;
987     }
988
989     return true;
990   }
991
992   void set_namespace( const std::string& ns) override {
993     io_ctx.set_namespace(ns);
994   }
995
996 public:
997   RadosBencher(CephContext *cct_, librados::Rados& _r, librados::IoCtx& _i)
998     : ObjBencher(cct_), completions(NULL), rados(_r), io_ctx(_i), iterator_valid(false), write_destination(OP_WRITE_DEST_OBJ) {}
999   ~RadosBencher() override { }
1000
1001   void set_write_destination(OpWriteDest dest) {
1002     write_destination = dest;
1003   }
1004 };
1005
1006 static int do_lock_cmd(std::vector<const char*> &nargs,
1007                        const std::map < std::string, std::string > &opts,
1008                        IoCtx *ioctx,
1009                        Formatter *formatter)
1010 {
1011   if (nargs.size() < 3)
1012     usage_exit();
1013
1014   string cmd(nargs[1]);
1015   string oid(nargs[2]);
1016
1017   string lock_tag;
1018   string lock_cookie;
1019   string lock_description;
1020   int lock_duration = 0;
1021   ClsLockType lock_type = LOCK_EXCLUSIVE;
1022
1023   map<string, string>::const_iterator i;
1024   i = opts.find("lock-tag");
1025   if (i != opts.end()) {
1026     lock_tag = i->second;
1027   }
1028   i = opts.find("lock-cookie");
1029   if (i != opts.end()) {
1030     lock_cookie = i->second;
1031   }
1032   i = opts.find("lock-description");
1033   if (i != opts.end()) {
1034     lock_description = i->second;
1035   }
1036   i = opts.find("lock-duration");
1037   if (i != opts.end()) {
1038     if (rados_sistrtoll(i, &lock_duration)) {
1039       return -EINVAL;
1040     }
1041   }
1042   i = opts.find("lock-type");
1043   if (i != opts.end()) {
1044     const string& type_str = i->second;
1045     if (type_str.compare("exclusive") == 0) {
1046       lock_type = LOCK_EXCLUSIVE;
1047     } else if (type_str.compare("shared") == 0) {
1048       lock_type = LOCK_SHARED;
1049     } else {
1050       cerr << "unknown lock type was specified, aborting" << std::endl;
1051       return -EINVAL;
1052     }
1053   }
1054
1055   if (cmd.compare("list") == 0) {
1056     list<string> locks;
1057     int ret = rados::cls::lock::list_locks(ioctx, oid, &locks);
1058     if (ret < 0) {
1059       cerr << "ERROR: rados_list_locks(): " << cpp_strerror(ret) << std::endl;
1060       return ret;
1061     }
1062
1063     formatter->open_object_section("object");
1064     formatter->dump_string("objname", oid);
1065     formatter->open_array_section("locks");
1066     list<string>::iterator iter;
1067     for (iter = locks.begin(); iter != locks.end(); ++iter) {
1068       formatter->open_object_section("lock");
1069       formatter->dump_string("name", *iter);
1070       formatter->close_section();
1071     }
1072     formatter->close_section();
1073     formatter->close_section();
1074     formatter->flush(cout);
1075     return 0;
1076   }
1077
1078   if (nargs.size() < 4)
1079     usage_exit();
1080
1081   string lock_name(nargs[3]);
1082
1083   if (cmd.compare("info") == 0) {
1084     map<rados::cls::lock::locker_id_t, rados::cls::lock::locker_info_t> lockers;
1085     ClsLockType type = LOCK_NONE;
1086     string tag;
1087     int ret = rados::cls::lock::get_lock_info(ioctx, oid, lock_name, &lockers, &type, &tag);
1088     if (ret < 0) {
1089       cerr << "ERROR: rados_lock_get_lock_info(): " << cpp_strerror(ret) << std::endl;
1090       return ret;
1091     }
1092
1093     formatter->open_object_section("lock");
1094     formatter->dump_string("name", lock_name);
1095     formatter->dump_string("type", cls_lock_type_str(type));
1096     formatter->dump_string("tag", tag);
1097     formatter->open_array_section("lockers");
1098     map<rados::cls::lock::locker_id_t, rados::cls::lock::locker_info_t>::iterator iter;
1099     for (iter = lockers.begin(); iter != lockers.end(); ++iter) {
1100       const rados::cls::lock::locker_id_t& id = iter->first;
1101       const rados::cls::lock::locker_info_t& info = iter->second;
1102       formatter->open_object_section("locker");
1103       formatter->dump_stream("name") << id.locker;
1104       formatter->dump_string("cookie", id.cookie);
1105       formatter->dump_string("description", info.description);
1106       formatter->dump_stream("expiration") << info.expiration;
1107       formatter->dump_stream("addr") << info.addr;
1108       formatter->close_section();
1109     }
1110     formatter->close_section();
1111     formatter->close_section();
1112     formatter->flush(cout);
1113     
1114     return ret;
1115   } else if (cmd.compare("get") == 0) {
1116     rados::cls::lock::Lock l(lock_name);
1117     l.set_cookie(lock_cookie);
1118     l.set_tag(lock_tag);
1119     l.set_duration(utime_t(lock_duration, 0));
1120     l.set_description(lock_description);
1121     int ret;
1122     switch (lock_type) {
1123     case LOCK_SHARED:
1124       ret = l.lock_shared(ioctx, oid);
1125       break;
1126     default:
1127       ret = l.lock_exclusive(ioctx, oid);
1128     }
1129     if (ret < 0) {
1130       cerr << "ERROR: failed locking: " << cpp_strerror(ret) << std::endl;
1131       return ret;
1132     }
1133
1134     return ret;
1135   }
1136
1137   if (nargs.size() < 5)
1138     usage_exit();
1139
1140   if (cmd.compare("break") == 0) {
1141     string locker(nargs[4]);
1142     rados::cls::lock::Lock l(lock_name);
1143     l.set_cookie(lock_cookie);
1144     l.set_tag(lock_tag);
1145     entity_name_t name;
1146     if (!name.parse(locker)) {
1147       cerr << "ERROR: failed to parse locker name (" << locker << ")" << std::endl;
1148       return -EINVAL;
1149     }
1150     int ret = l.break_lock(ioctx, oid, name);
1151     if (ret < 0) {
1152       cerr << "ERROR: failed breaking lock: " << cpp_strerror(ret) << std::endl;
1153       return ret;
1154     }
1155   } else {
1156     usage_exit();
1157   }
1158
1159   return 0;
1160 }
1161
1162 static int do_cache_flush(IoCtx& io_ctx, string oid)
1163 {
1164   ObjectReadOperation op;
1165   op.cache_flush();
1166   librados::AioCompletion *completion =
1167     librados::Rados::aio_create_completion();
1168   io_ctx.aio_operate(oid.c_str(), completion, &op,
1169                      librados::OPERATION_IGNORE_CACHE |
1170                      librados::OPERATION_IGNORE_OVERLAY,
1171                      NULL);
1172   completion->wait_for_safe();
1173   int r = completion->get_return_value();
1174   completion->release();
1175   return r;
1176 }
1177
1178 static int do_cache_try_flush(IoCtx& io_ctx, string oid)
1179 {
1180   ObjectReadOperation op;
1181   op.cache_try_flush();
1182   librados::AioCompletion *completion =
1183     librados::Rados::aio_create_completion();
1184   io_ctx.aio_operate(oid.c_str(), completion, &op,
1185                      librados::OPERATION_IGNORE_CACHE |
1186                      librados::OPERATION_IGNORE_OVERLAY |
1187                      librados::OPERATION_SKIPRWLOCKS,
1188                      NULL);
1189   completion->wait_for_safe();
1190   int r = completion->get_return_value();
1191   completion->release();
1192   return r;
1193 }
1194
1195 static int do_cache_evict(IoCtx& io_ctx, string oid)
1196 {
1197   ObjectReadOperation op;
1198   op.cache_evict();
1199   librados::AioCompletion *completion =
1200     librados::Rados::aio_create_completion();
1201   io_ctx.aio_operate(oid.c_str(), completion, &op,
1202                      librados::OPERATION_IGNORE_CACHE |
1203                      librados::OPERATION_IGNORE_OVERLAY |
1204                      librados::OPERATION_SKIPRWLOCKS,
1205                      NULL);
1206   completion->wait_for_safe();
1207   int r = completion->get_return_value();
1208   completion->release();
1209   return r;
1210 }
1211
1212 static int do_cache_flush_evict_all(IoCtx& io_ctx, bool blocking)
1213 {
1214   int errors = 0;
1215   io_ctx.set_namespace(all_nspaces);
1216   try {
1217     librados::NObjectIterator i = io_ctx.nobjects_begin();
1218     librados::NObjectIterator i_end = io_ctx.nobjects_end();
1219     for (; i != i_end; ++i) {
1220       int r;
1221       cout << i->get_nspace() << "\t" << i->get_oid() << "\t" << i->get_locator() << std::endl;
1222       if (i->get_locator().size()) {
1223         io_ctx.locator_set_key(i->get_locator());
1224       } else {
1225         io_ctx.locator_set_key(string());
1226       }
1227       io_ctx.set_namespace(i->get_nspace());
1228       snap_set_t ls;
1229       io_ctx.snap_set_read(LIBRADOS_SNAP_DIR);
1230       r = io_ctx.list_snaps(i->get_oid(), &ls);
1231       if (r < 0) {
1232         cerr << "error listing snap shots " << i->get_nspace() << "/" << i->get_oid() << ": "
1233              << cpp_strerror(r) << std::endl;
1234         ++errors;
1235         continue;
1236       }
1237       std::vector<clone_info_t>::iterator ci = ls.clones.begin();
1238       // no snapshots
1239       if (ci == ls.clones.end()) {
1240         io_ctx.snap_set_read(CEPH_NOSNAP);
1241         if (blocking)
1242           r = do_cache_flush(io_ctx, i->get_oid());
1243         else
1244           r = do_cache_try_flush(io_ctx, i->get_oid());
1245         if (r < 0) {
1246           cerr << "failed to flush " << i->get_nspace() << "/" << i->get_oid() << ": "
1247                << cpp_strerror(r) << std::endl;
1248           ++errors;
1249           continue;
1250         }
1251         r = do_cache_evict(io_ctx, i->get_oid());
1252         if (r < 0) {
1253           cerr << "failed to evict " << i->get_nspace() << "/" << i->get_oid() << ": "
1254                << cpp_strerror(r) << std::endl;
1255           ++errors;
1256           continue;
1257         }
1258       } else {
1259       // has snapshots
1260         for (std::vector<clone_info_t>::iterator ci = ls.clones.begin();
1261              ci != ls.clones.end(); ++ci) {
1262           io_ctx.snap_set_read(ci->cloneid);
1263           if (blocking)
1264             r = do_cache_flush(io_ctx, i->get_oid());
1265           else
1266             r = do_cache_try_flush(io_ctx, i->get_oid());
1267           if (r < 0) {
1268             cerr << "failed to flush " << i->get_nspace() << "/" << i->get_oid() << ": "
1269                  << cpp_strerror(r) << std::endl;
1270             ++errors;
1271             break;
1272           }
1273           r = do_cache_evict(io_ctx, i->get_oid());
1274           if (r < 0) {
1275             cerr << "failed to evict " << i->get_nspace() << "/" << i->get_oid() << ": "
1276                  << cpp_strerror(r) << std::endl;
1277             ++errors;
1278             break;
1279           }
1280         }
1281       }
1282     }
1283   }
1284   catch (const std::runtime_error& e) {
1285     cerr << e.what() << std::endl;
1286     return -1;
1287   }
1288   return errors ? -1 : 0;
1289 }
1290
1291 static int do_get_inconsistent_pg_cmd(const std::vector<const char*> &nargs,
1292                                       Rados& rados,
1293                                       Formatter& formatter)
1294 {
1295   if (nargs.size() < 2) {
1296     usage_exit();
1297   }
1298   int64_t pool_id = rados.pool_lookup(nargs[1]);
1299   if (pool_id < 0) {
1300     cerr << "pool \"" << nargs[1] << "\" not found" << std::endl;
1301     return (int)pool_id;
1302   }
1303   std::vector<PlacementGroup> pgs;
1304   int ret = rados.get_inconsistent_pgs(pool_id, &pgs);
1305   if (ret) {
1306     return ret;
1307   }
1308   formatter.open_array_section("pgs");
1309   for (auto& pg : pgs) {
1310     formatter.dump_stream("pg") << pg;
1311   }
1312   formatter.close_section();
1313   formatter.flush(cout);
1314   cout << std::endl;
1315   return 0;
1316 }
1317
1318 static void dump_errors(const err_t &err, Formatter &f, const char *name)
1319 {
1320   f.open_array_section(name);
1321   if (err.has_shard_missing())
1322     f.dump_string("error", "missing");
1323   if (err.has_stat_error())
1324     f.dump_string("error", "stat_error");
1325   if (err.has_read_error())
1326     f.dump_string("error", "read_error");
1327   if (err.has_data_digest_mismatch_oi())
1328     f.dump_string("error", "data_digest_mismatch_oi");
1329   if (err.has_omap_digest_mismatch_oi())
1330     f.dump_string("error", "omap_digest_mismatch_oi");
1331   if (err.has_size_mismatch_oi())
1332     f.dump_string("error", "size_mismatch_oi");
1333   if (err.has_ec_hash_error())
1334     f.dump_string("error", "ec_hash_error");
1335   if (err.has_ec_size_error())
1336     f.dump_string("error", "ec_size_error");
1337   if (err.has_oi_attr_missing())
1338     f.dump_string("error", "oi_attr_missing");
1339   if (err.has_oi_attr_corrupted())
1340     f.dump_string("error", "oi_attr_corrupted");
1341   if (err.has_obj_size_oi_mismatch())
1342     f.dump_string("error", "obj_size_oi_mismatch");
1343   if (err.has_ss_attr_missing())
1344     f.dump_string("error", "ss_attr_missing");
1345   if (err.has_ss_attr_corrupted())
1346     f.dump_string("error", "ss_attr_corrupted");
1347   f.close_section();
1348 }
1349
1350 static void dump_shard(const shard_info_t& shard,
1351                        const inconsistent_obj_t& inc,
1352                        Formatter &f)
1353 {
1354   dump_errors(shard, f, "errors");
1355
1356   if (shard.has_shard_missing())
1357     return;
1358
1359   if (!shard.has_stat_error())
1360     f.dump_unsigned("size", shard.size);
1361   if (shard.omap_digest_present) {
1362     f.dump_format("omap_digest", "0x%08x", shard.omap_digest);
1363   }
1364   if (shard.data_digest_present) {
1365     f.dump_format("data_digest", "0x%08x", shard.data_digest);
1366   }
1367
1368   if (!shard.has_oi_attr_missing() && !shard.has_oi_attr_corrupted() &&
1369       inc.has_object_info_inconsistency()) {
1370     object_info_t oi;
1371     bufferlist bl;
1372     map<std::string, ceph::bufferlist>::iterator k = (const_cast<shard_info_t&>(shard)).attrs.find(OI_ATTR);
1373     assert(k != shard.attrs.end()); // Can't be missing
1374     bufferlist::iterator bliter = k->second.begin();
1375     ::decode(oi, bliter);  // Can't be corrupted
1376     f.dump_stream("object_info") << oi;
1377   }
1378   if (inc.has_attr_name_mismatch() || inc.has_attr_value_mismatch()
1379      || inc.union_shards.has_oi_attr_missing()
1380      || inc.union_shards.has_oi_attr_corrupted()
1381      || inc.union_shards.has_ss_attr_missing()
1382      || inc.union_shards.has_ss_attr_corrupted()) {
1383     f.open_array_section("attrs");
1384     for (auto kv : shard.attrs) {
1385       f.open_object_section("attr");
1386       f.dump_string("name", kv.first);
1387       bool b64;
1388       f.dump_string("value", cleanbin(kv.second, b64));
1389       f.dump_bool("Base64", b64);
1390       f.close_section();
1391     }
1392     f.close_section();
1393   }
1394 }
1395
1396 static void dump_obj_errors(const obj_err_t &err, Formatter &f)
1397 {
1398   f.open_array_section("errors");
1399   if (err.has_object_info_inconsistency())
1400     f.dump_string("error", "object_info_inconsistency");
1401   if (err.has_data_digest_mismatch())
1402     f.dump_string("error", "data_digest_mismatch");
1403   if (err.has_omap_digest_mismatch())
1404     f.dump_string("error", "omap_digest_mismatch");
1405   if (err.has_size_mismatch())
1406     f.dump_string("error", "size_mismatch");
1407   if (err.has_attr_value_mismatch())
1408     f.dump_string("error", "attr_value_mismatch");
1409   if (err.has_attr_name_mismatch())
1410     f.dump_string("error", "attr_name_mismatch");
1411   f.close_section();
1412 }
1413
1414 static void dump_object_id(const object_id_t& object,
1415                         Formatter &f)
1416 {
1417   f.dump_string("name", object.name);
1418   f.dump_string("nspace", object.nspace);
1419   f.dump_string("locator", object.locator);
1420   switch (object.snap) {
1421   case CEPH_NOSNAP:
1422     f.dump_string("snap", "head");
1423     break;
1424   case CEPH_SNAPDIR:
1425     f.dump_string("snap", "snapdir");
1426     break;
1427   default:
1428     f.dump_unsigned("snap", object.snap);
1429     break;
1430   }
1431 }
1432
1433 static void dump_inconsistent(const inconsistent_obj_t& inc,
1434                               Formatter &f)
1435 {
1436   f.open_object_section("object");
1437   dump_object_id(inc.object, f);
1438   f.dump_unsigned("version", inc.version);
1439   f.close_section();
1440
1441   dump_obj_errors(inc, f);
1442   dump_errors(inc.union_shards, f, "union_shard_errors");
1443   for (const auto& shard_info : inc.shards) {
1444     shard_info_t shard = const_cast<shard_info_t&>(shard_info.second);
1445     if (shard.selected_oi) {
1446       object_info_t oi;
1447       bufferlist bl;
1448       auto k = shard.attrs.find(OI_ATTR);
1449       assert(k != shard.attrs.end()); // Can't be missing
1450       bufferlist::iterator bliter = k->second.begin();
1451       ::decode(oi, bliter);  // Can't be corrupted
1452       f.dump_stream("selected_object_info") << oi;
1453       break;
1454     }
1455   }
1456   f.open_array_section("shards");
1457   for (const auto& shard_info : inc.shards) {
1458     f.open_object_section("shard");
1459     auto& osd_shard = shard_info.first;
1460     f.dump_int("osd", osd_shard.osd);
1461     f.dump_bool("primary", shard_info.second.primary);
1462     auto shard = osd_shard.shard;
1463     if (shard != shard_id_t::NO_SHARD)
1464       f.dump_unsigned("shard", shard);
1465     dump_shard(shard_info.second, inc, f);
1466     f.close_section();
1467   }
1468   f.close_section();
1469 }
1470
1471 static void dump_inconsistent(const inconsistent_snapset_t& inc,
1472                               Formatter &f)
1473 {
1474   dump_object_id(inc.object, f);
1475
1476   f.open_array_section("errors");
1477   if (inc.ss_attr_missing())
1478     f.dump_string("error", "ss_attr_missing");
1479   if (inc.ss_attr_corrupted())
1480     f.dump_string("error", "ss_attr_corrupted");
1481   if (inc.oi_attr_missing())
1482     f.dump_string("error", "oi_attr_missing");
1483   if (inc.oi_attr_corrupted())
1484     f.dump_string("error", "oi_attr_corrupted");
1485   if (inc.snapset_mismatch())
1486     f.dump_string("error", "snapset_mismatch");
1487   if (inc.head_mismatch())
1488     f.dump_string("error", "head_mismatch");
1489   if (inc.headless())
1490     f.dump_string("error", "headless");
1491   if (inc.size_mismatch())
1492     f.dump_string("error", "size_mismatch");
1493   if (inc.extra_clones())
1494     f.dump_string("error", "extra_clones");
1495   if (inc.clone_missing())
1496     f.dump_string("error", "clone_missing");
1497   f.close_section();
1498
1499   if (inc.extra_clones()) {
1500     f.open_array_section("extra clones");
1501     for (auto snap : inc.clones) {
1502       f.dump_unsigned("snap", snap);
1503     }
1504     f.close_section();
1505   }
1506
1507   if (inc.clone_missing()) {
1508     f.open_array_section("missing");
1509     for (auto snap : inc.missing) {
1510       f.dump_unsigned("snap", snap);
1511     }
1512     f.close_section();
1513   }
1514 }
1515
1516 // dispatch the call by type
1517 static int do_get_inconsistent(Rados& rados,
1518                                const PlacementGroup& pg,
1519                                const librados::object_id_t &start,
1520                                unsigned max_return,
1521                                AioCompletion *c,
1522                                std::vector<inconsistent_obj_t>* objs,
1523                                uint32_t* interval)
1524 {
1525   return rados.get_inconsistent_objects(pg, start, max_return, c,
1526                                         objs, interval);
1527 }
1528
1529 static int do_get_inconsistent(Rados& rados,
1530                                const PlacementGroup& pg,
1531                                const librados::object_id_t &start,
1532                                unsigned max_return,
1533                                AioCompletion *c,
1534                                std::vector<inconsistent_snapset_t>* snapsets,
1535                                uint32_t* interval)
1536 {
1537   return rados.get_inconsistent_snapsets(pg, start, max_return, c,
1538                                          snapsets, interval);
1539 }
1540
1541 template <typename T>
1542 static int do_get_inconsistent_cmd(const std::vector<const char*> &nargs,
1543                                    Rados& rados,
1544                                    Formatter& formatter)
1545 {
1546   if (nargs.size() < 2) {
1547     usage_exit();
1548   }
1549   PlacementGroup pg;
1550   int ret = 0;
1551   ret = pg.parse(nargs[1]);
1552   if (!ret) {
1553     cerr << "bad pg: " << nargs[1] << std::endl;
1554     return ret;
1555   }
1556   uint32_t interval = 0, first_interval = 0;
1557   const unsigned max_item_num = 32;
1558   bool opened = false;
1559   for (librados::object_id_t start;;) {
1560     std::vector<T> items;
1561     auto completion = librados::Rados::aio_create_completion();
1562     ret = do_get_inconsistent(rados, pg, start, max_item_num, completion,
1563                               &items, &interval);
1564     completion->wait_for_safe();
1565     ret = completion->get_return_value();
1566     completion->release();
1567     if (ret < 0) {
1568       if (ret == -EAGAIN)
1569         cerr << "interval#" << interval << " expired." << std::endl;
1570       else if (ret == -ENOENT)
1571         cerr << "No scrub information available for pg " << pg << std::endl;
1572       else
1573         cerr << "Unknown error " << cpp_strerror(ret) << std::endl;
1574       break;
1575     }
1576     // It must be the same interval every time.  EAGAIN would
1577     // occur if interval changes.
1578     assert(start.name.empty() || first_interval == interval);
1579     if (start.name.empty()) {
1580       first_interval = interval;
1581       formatter.open_object_section("info");
1582       formatter.dump_int("epoch", interval);
1583       formatter.open_array_section("inconsistents");
1584       opened = true;
1585     }
1586     for (auto& inc : items) {
1587       formatter.open_object_section("inconsistent");
1588       dump_inconsistent(inc, formatter);
1589       formatter.close_section();
1590     }
1591     if (items.size() < max_item_num) {
1592       formatter.close_section();
1593       break;
1594     }
1595     if (!items.empty()) {
1596       start = items.back().object;
1597     }
1598     items.clear();
1599   }
1600   if (opened) {
1601     formatter.close_section();
1602     formatter.flush(cout);
1603   }
1604   return ret;
1605 }
1606
1607 /**********************************************
1608
1609 **********************************************/
1610 static int rados_tool_common(const std::map < std::string, std::string > &opts,
1611                              std::vector<const char*> &nargs)
1612 {
1613   int ret;
1614   bool create_pool = false;
1615   const char *pool_name = NULL;
1616   const char *target_pool_name = NULL;
1617   string oloc, target_oloc, nspace, target_nspace;
1618   int concurrent_ios = 16;
1619   unsigned op_size = default_op_size;
1620   unsigned object_size = 0;
1621   unsigned max_objects = 0;
1622   uint64_t obj_offset = 0;
1623   bool block_size_specified = false;
1624   int bench_write_dest = 0;
1625   bool cleanup = true;
1626   bool hints = true; // for rados bench
1627   bool no_verify = false;
1628   bool use_striper = false;
1629   bool with_clones = false;
1630   const char *snapname = NULL;
1631   snap_t snapid = CEPH_NOSNAP;
1632   std::map<std::string, std::string>::const_iterator i;
1633
1634   uint64_t min_obj_len = 0;
1635   uint64_t max_obj_len = 0;
1636   uint64_t min_op_len = 0;
1637   uint64_t max_op_len = 0;
1638   uint64_t max_ops = 0;
1639   uint64_t max_backlog = 0;
1640   uint64_t target_throughput = 0;
1641   int64_t read_percent = -1;
1642   uint64_t num_objs = 0;
1643   int run_length = 0;
1644
1645   bool show_time = false;
1646   bool wildcard = false;
1647
1648   std::string run_name;
1649   std::string prefix;
1650   bool forcefull = false;
1651   Formatter *formatter = NULL;
1652   bool pretty_format = false;
1653   const char *output = NULL;
1654   bool omap_key_valid = false;
1655   std::string omap_key;
1656   std::string omap_key_pretty;
1657
1658   Rados rados;
1659   IoCtx io_ctx;
1660   RadosStriper striper;
1661
1662   i = opts.find("create");
1663   if (i != opts.end()) {
1664     create_pool = true;
1665   }
1666   i = opts.find("pool");
1667   if (i != opts.end()) {
1668     pool_name = i->second.c_str();
1669   }
1670   i = opts.find("target_pool");
1671   if (i != opts.end()) {
1672     target_pool_name = i->second.c_str();
1673   }
1674   i = opts.find("object_locator");
1675   if (i != opts.end()) {
1676     oloc = i->second;
1677   }
1678   i = opts.find("target_locator");
1679   if (i != opts.end()) {
1680     target_oloc = i->second;
1681   }
1682   i = opts.find("target_nspace");
1683   if (i != opts.end()) {
1684     target_nspace = i->second;
1685   }
1686   i = opts.find("concurrent-ios");
1687   if (i != opts.end()) {
1688     if (rados_sistrtoll(i, &concurrent_ios)) {
1689       return -EINVAL;
1690     }
1691   }
1692   i = opts.find("run-name");
1693   if (i != opts.end()) {
1694     run_name = i->second;
1695   }
1696
1697   i = opts.find("force-full");
1698   if (i != opts.end()) {
1699     forcefull = true;
1700   }
1701   i = opts.find("prefix");
1702   if (i != opts.end()) {
1703     prefix = i->second;
1704   }
1705   i = opts.find("block-size");
1706   if (i != opts.end()) {
1707     if (rados_sistrtoll(i, &op_size)) {
1708       return -EINVAL;
1709     }
1710     block_size_specified = true;
1711   }
1712   i = opts.find("object-size");
1713   if (i != opts.end()) {
1714     if (rados_sistrtoll(i, &object_size)) {
1715       return -EINVAL;
1716     }
1717     block_size_specified = true;
1718   }
1719   i = opts.find("max-objects");
1720   if (i != opts.end()) {
1721     if (rados_sistrtoll(i, &max_objects)) {
1722       return -EINVAL;
1723     }
1724   }
1725   i = opts.find("offset");
1726   if (i != opts.end()) {
1727     if (rados_sistrtoll(i, &obj_offset)) {
1728       return -EINVAL;
1729     }
1730   }
1731   i = opts.find("snap");
1732   if (i != opts.end()) {
1733     snapname = i->second.c_str();
1734   }
1735   i = opts.find("snapid");
1736   if (i != opts.end()) {
1737     if (rados_sistrtoll(i, &snapid)) {
1738       return -EINVAL;
1739     }
1740   }
1741   i = opts.find("min-object-size");
1742   if (i != opts.end()) {
1743     if (rados_sistrtoll(i, &min_obj_len)) {
1744       return -EINVAL;
1745     }
1746   }
1747   i = opts.find("max-object-size");
1748   if (i != opts.end()) {
1749     if (rados_sistrtoll(i, &max_obj_len)) {
1750       return -EINVAL;
1751     }
1752   }
1753   i = opts.find("min-op-len");
1754   if (i != opts.end()) {
1755     if (rados_sistrtoll(i, &min_op_len)) {
1756       return -EINVAL;
1757     }
1758   }
1759   i = opts.find("max-op-len");
1760   if (i != opts.end()) {
1761     if (rados_sistrtoll(i, &max_op_len)) {
1762       return -EINVAL;
1763     }
1764   }
1765   i = opts.find("max-ops");
1766   if (i != opts.end()) {
1767     if (rados_sistrtoll(i, &max_ops)) {
1768       return -EINVAL;
1769     }
1770   }
1771   i = opts.find("max-backlog");
1772   if (i != opts.end()) {
1773     if (rados_sistrtoll(i, &max_backlog)) {
1774       return -EINVAL;
1775     }
1776   }
1777   i = opts.find("target-throughput");
1778   if (i != opts.end()) {
1779     if (rados_sistrtoll(i, &target_throughput)) {
1780       return -EINVAL;
1781     }
1782   }
1783   i = opts.find("read-percent");
1784   if (i != opts.end()) {
1785     if (rados_sistrtoll(i, &read_percent)) {
1786       return -EINVAL;
1787     }
1788   }
1789   i = opts.find("num-objects");
1790   if (i != opts.end()) {
1791     if (rados_sistrtoll(i, &num_objs)) {
1792       return -EINVAL;
1793     }
1794   }
1795   i = opts.find("run-length");
1796   if (i != opts.end()) {
1797     if (rados_sistrtoll(i, &run_length)) {
1798       return -EINVAL;
1799     }
1800   }
1801   i = opts.find("show-time");
1802   if (i != opts.end()) {
1803     show_time = true;
1804   }
1805   i = opts.find("no-cleanup");
1806   if (i != opts.end()) {
1807     cleanup = false;
1808   }
1809   i = opts.find("no-hints");
1810   if (i != opts.end()) {
1811     hints = false;
1812   }
1813   i = opts.find("pretty-format");
1814   if (i != opts.end()) {
1815     pretty_format = true;
1816   }
1817   i = opts.find("format");
1818   if (i != opts.end()) {
1819     const char *format = i->second.c_str();
1820     formatter = Formatter::create(format);
1821     if (!formatter) {
1822       cerr << "unrecognized format: " << format << std::endl;
1823       return -EINVAL;
1824     }
1825   }
1826   i = opts.find("namespace");
1827   if (i != opts.end()) {
1828     nspace = i->second;
1829   }
1830   i = opts.find("no-verify");
1831   if (i != opts.end()) {
1832     no_verify = true;
1833   }
1834   i = opts.find("output");
1835   if (i != opts.end()) {
1836     output = i->second.c_str();
1837   }
1838   i = opts.find("write-dest-obj");
1839   if (i != opts.end()) {
1840     bench_write_dest |= static_cast<int>(OP_WRITE_DEST_OBJ);
1841   }
1842   i = opts.find("write-dest-omap");
1843   if (i != opts.end()) {
1844     bench_write_dest |= static_cast<int>(OP_WRITE_DEST_OMAP);
1845   }
1846   i = opts.find("write-dest-xattr");
1847   if (i != opts.end()) {
1848     bench_write_dest |= static_cast<int>(OP_WRITE_DEST_XATTR);
1849   }
1850   i = opts.find("with-clones");
1851   if (i != opts.end()) {
1852     with_clones = true;
1853   }
1854   i = opts.find("omap-key-file");
1855   if (i != opts.end()) {
1856     string err;
1857     bufferlist indata;
1858     ret = indata.read_file(i->second.c_str(), &err);
1859     if (ret < 0) {
1860       cerr << err << std::endl;
1861       return 1;
1862     }
1863
1864     omap_key_valid = true;
1865     omap_key = std::string(indata.c_str(), indata.length());
1866     omap_key_pretty = omap_key;
1867     if (std::find_if_not(omap_key.begin(), omap_key.end(),
1868                          (int (*)(int))isprint) != omap_key.end()) {
1869         omap_key_pretty = "(binary key)";
1870     }
1871   }
1872
1873   // open rados
1874   ret = rados.init_with_context(g_ceph_context);
1875   if (ret < 0) {
1876      cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
1877      goto out;
1878   }
1879
1880   ret = rados.connect();
1881   if (ret) {
1882      cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
1883      ret = -1;
1884      goto out;
1885   }
1886
1887   if (create_pool && !pool_name) {
1888     cerr << "--create-pool requested but pool_name was not specified!" << std::endl;
1889     usage_exit();
1890   }
1891
1892   if (create_pool) {
1893     ret = rados.pool_create(pool_name, 0, 0);
1894     if (ret < 0) {
1895       cerr << "error creating pool " << pool_name << ": "
1896            << cpp_strerror(ret) << std::endl;
1897       goto out;
1898     }
1899   }
1900
1901   // open io context.
1902   if (pool_name) {
1903     ret = rados.ioctx_create(pool_name, io_ctx);
1904     if (ret < 0) {
1905       cerr << "error opening pool " << pool_name << ": "
1906            << cpp_strerror(ret) << std::endl;
1907       goto out;
1908     }
1909
1910    // align op_size
1911    {
1912       bool requires;
1913       ret = io_ctx.pool_requires_alignment2(&requires);
1914       if (ret < 0) {
1915         cerr << "error checking pool alignment requirement"
1916           << cpp_strerror(ret) << std::endl;
1917         goto out;       
1918       }
1919
1920       if (requires) {
1921         uint64_t align = 0;
1922         ret = io_ctx.pool_required_alignment2(&align);
1923         if (ret < 0) {
1924           cerr << "error getting pool alignment"
1925             << cpp_strerror(ret) << std::endl;
1926           goto out;     
1927         }
1928
1929         const uint64_t prev_op_size = op_size;
1930         op_size = uint64_t((op_size + align - 1) / align) * align;
1931         // Warn: if user specified and it was rounded
1932         if (prev_op_size != default_op_size && prev_op_size != op_size)
1933           cerr << "INFO: op_size has been rounded to " << op_size << std::endl;
1934       }
1935     }
1936
1937     // create striper interface
1938     if (opts.find("striper") != opts.end()) {
1939       ret = RadosStriper::striper_create(io_ctx, &striper);
1940       if (0 != ret) {
1941         cerr << "error opening pool " << pool_name << " with striper interface: "
1942              << cpp_strerror(ret) << std::endl;
1943         goto out;
1944       }
1945       use_striper = true;
1946     }
1947   }
1948
1949   // snapname?
1950   if (snapname) {
1951     if (!pool_name) {
1952       cerr << "pool name must be specified with --snap" << std::endl;
1953       ret = -1;
1954       goto out;
1955     }
1956     ret = io_ctx.snap_lookup(snapname, &snapid);
1957     if (ret < 0) {
1958       cerr << "error looking up snap '" << snapname << "': " << cpp_strerror(ret) << std::endl;
1959       goto out;
1960     }
1961   }
1962   if (oloc.size()) {
1963     if (!pool_name) {
1964       cerr << "pool name must be specified with --object_locator" << std::endl;
1965       ret = -1;
1966       goto out;
1967     }
1968     io_ctx.locator_set_key(oloc);
1969   }
1970   // Use namespace from command line if specified
1971   if (opts.find("namespace") != opts.end()) {
1972     if (!pool_name) {
1973       cerr << "pool name must be specified with --namespace" << std::endl;
1974       ret = -1;
1975       goto out;
1976     }
1977     io_ctx.set_namespace(nspace);
1978   // Use wildcard if --all specified and --default NOT specified
1979   } else if (opts.find("all") != opts.end() && opts.find("default") == opts.end()) {
1980     // Only the ls should ever set namespace to special value
1981     wildcard = true;
1982   }
1983   if (snapid != CEPH_NOSNAP) {
1984     if (!pool_name) {
1985       cerr << "pool name must be specified with --snapid" << std::endl;
1986       ret = -1;
1987       goto out;
1988     }
1989     string name;
1990     ret = io_ctx.snap_get_name(snapid, &name);
1991     if (ret < 0) {
1992       cerr << "snapid " << snapid << " doesn't exist in pool "
1993            << io_ctx.get_pool_name() << std::endl;
1994       goto out;
1995     }
1996     io_ctx.snap_set_read(snapid);
1997     cout << "selected snap " << snapid << " '" << name << "'" << std::endl;
1998   }
1999
2000   assert(!nargs.empty());
2001
2002   // list pools?
2003   if (strcmp(nargs[0], "lspools") == 0) {
2004     list<string> vec;
2005     ret = rados.pool_list(vec);
2006     if (ret < 0) {
2007       cerr << "error listing pools: " << cpp_strerror(ret) << std::endl;
2008       goto out;
2009     }
2010     for (list<string>::iterator i = vec.begin(); i != vec.end(); ++i)
2011       cout << *i << std::endl;
2012   }
2013   else if (strcmp(nargs[0], "df") == 0) {
2014     // pools
2015     list<string> vec;
2016
2017     if (!pool_name) {
2018       ret = rados.pool_list(vec);
2019       if (ret < 0) {
2020         cerr << "error listing pools: " << cpp_strerror(ret) << std::endl;
2021         goto out;
2022       }
2023     } else {
2024       vec.push_back(pool_name);
2025     }
2026
2027     map<string,librados::pool_stat_t> stats;
2028     ret = rados.get_pool_stats(vec, stats);
2029     if (ret < 0) {
2030       cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
2031       goto out;
2032     }
2033
2034     TextTable tab;
2035
2036     if (!formatter) {
2037       tab.define_column("POOL_NAME", TextTable::LEFT, TextTable::LEFT);
2038       tab.define_column("USED", TextTable::LEFT, TextTable::RIGHT);
2039       tab.define_column("OBJECTS", TextTable::LEFT, TextTable::RIGHT);
2040       tab.define_column("CLONES", TextTable::LEFT, TextTable::RIGHT);
2041       tab.define_column("COPIES", TextTable::LEFT, TextTable::RIGHT);
2042       tab.define_column("MISSING_ON_PRIMARY", TextTable::LEFT, TextTable::RIGHT);
2043       tab.define_column("UNFOUND", TextTable::LEFT, TextTable::RIGHT);
2044       tab.define_column("DEGRADED", TextTable::LEFT, TextTable::RIGHT);
2045       tab.define_column("RD_OPS", TextTable::LEFT, TextTable::RIGHT);
2046       tab.define_column("RD", TextTable::LEFT, TextTable::RIGHT);
2047       tab.define_column("WR_OPS", TextTable::LEFT, TextTable::RIGHT);
2048       tab.define_column("WR", TextTable::LEFT, TextTable::RIGHT);
2049     } else {
2050       formatter->open_object_section("stats");
2051       formatter->open_array_section("pools");
2052     }
2053     for (map<string,librados::pool_stat_t>::iterator i = stats.begin();
2054          i != stats.end();
2055          ++i) {
2056       const char *pool_name = i->first.c_str();
2057       librados::pool_stat_t& s = i->second;
2058       if (!formatter) {
2059         tab << pool_name
2060             << si_t(s.num_bytes)
2061             << s.num_objects
2062             << s.num_object_clones
2063             << s.num_object_copies
2064             << s.num_objects_missing_on_primary
2065             << s.num_objects_unfound
2066             << s.num_objects_degraded
2067             << s.num_rd
2068             << si_t(s.num_rd_kb << 10)
2069             << s.num_wr
2070             << si_t(s.num_wr_kb << 10)
2071             << TextTable::endrow;
2072       } else {
2073         formatter->open_object_section("pool");
2074         int64_t pool_id = rados.pool_lookup(pool_name);
2075         formatter->dump_string("name", pool_name);
2076         if (pool_id >= 0)
2077           formatter->dump_int("id", pool_id);
2078         else
2079           cerr << "ERROR: lookup_pg_pool_name for name=" << pool_name
2080                << " returned " << pool_id << std::endl;
2081         formatter->dump_int("size_bytes",s.num_bytes);
2082         formatter->dump_int("size_kb", s.num_kb);
2083         formatter->dump_int("num_objects", s.num_objects);
2084         formatter->dump_int("num_object_clones", s.num_object_clones);
2085         formatter->dump_int("num_object_copies", s.num_object_copies);
2086         formatter->dump_int("num_objects_missing_on_primary", s.num_objects_missing_on_primary);
2087         formatter->dump_int("num_objects_unfound", s.num_objects_unfound);
2088         formatter->dump_int("num_objects_degraded", s.num_objects_degraded);
2089         formatter->dump_int("read_ops", s.num_rd);
2090         formatter->dump_int("read_bytes", s.num_rd_kb * 1024ull);
2091         formatter->dump_int("write_ops", s.num_wr);
2092         formatter->dump_int("write_bytes", s.num_wr_kb * 1024ull);
2093         formatter->close_section();
2094       }
2095     }
2096
2097     if (!formatter) {
2098       cout << tab;
2099     }
2100
2101     // total
2102     cluster_stat_t tstats;
2103     ret = rados.cluster_stat(tstats);
2104     if (ret < 0) {
2105       cerr << "error getting total cluster usage: " << cpp_strerror(ret) << std::endl;
2106       goto out;
2107     }
2108     if (!formatter) {
2109       cout << std::endl;
2110       cout << "total_objects    " << tstats.num_objects
2111            << std::endl;
2112       cout << "total_used       " << si_t(tstats.kb_used << 10)
2113            << std::endl;
2114       cout << "total_avail      " << si_t(tstats.kb_avail << 10)
2115            << std::endl;
2116       cout << "total_space      " << si_t(tstats.kb << 10)
2117            << std::endl;
2118     } else {
2119       formatter->close_section();
2120       formatter->dump_int("total_objects", tstats.num_objects);
2121       formatter->dump_int("total_used", tstats.kb_used);
2122       formatter->dump_int("total_avail", tstats.kb_avail);
2123       formatter->dump_int("total_space", tstats.kb);
2124       formatter->close_section();
2125       formatter->flush(cout);
2126     }
2127   }
2128
2129   else if (strcmp(nargs[0], "ls") == 0) {
2130     if (!pool_name) {
2131       cerr << "pool name was not specified" << std::endl;
2132       ret = -1;
2133       goto out;
2134     }
2135
2136     if (wildcard)
2137       io_ctx.set_namespace(all_nspaces);
2138     bool use_stdout = (nargs.size() < 2) || (strcmp(nargs[1], "-") == 0);
2139     ostream *outstream;
2140     if(use_stdout)
2141       outstream = &cout;
2142     else
2143       outstream = new ofstream(nargs[1]);
2144
2145     {
2146       if (formatter)
2147         formatter->open_array_section("objects");
2148       try {
2149         librados::NObjectIterator i = io_ctx.nobjects_begin();
2150         librados::NObjectIterator i_end = io_ctx.nobjects_end();
2151         for (; i != i_end; ++i) {
2152           if (use_striper) {
2153             // in case of --striper option, we only list striped
2154             // objects, so we only display the first object of
2155             // each, without its suffix '.000...000'
2156             size_t l = i->get_oid().length();
2157             if (l <= 17 ||
2158                 (0 != i->get_oid().compare(l-17, 17,".0000000000000000"))) continue;
2159           }
2160           if (!formatter) {
2161             // Only include namespace in output when wildcard specified
2162             if (wildcard)
2163               *outstream << i->get_nspace() << "\t";
2164             if (use_striper) {
2165               *outstream << i->get_oid().substr(0, i->get_oid().length()-17);
2166             } else {
2167               *outstream << i->get_oid();
2168             }
2169             if (i->get_locator().size())
2170               *outstream << "\t" << i->get_locator();
2171             *outstream << std::endl;
2172           } else {
2173             formatter->open_object_section("object");
2174             formatter->dump_string("namespace", i->get_nspace());
2175             if (use_striper) {
2176               formatter->dump_string("name", i->get_oid().substr(0, i->get_oid().length()-17));
2177             } else {
2178               formatter->dump_string("name", i->get_oid());
2179             }
2180             if (i->get_locator().size())
2181               formatter->dump_string("locator", i->get_locator());
2182             formatter->close_section(); //object
2183           }
2184         }
2185       }
2186       catch (const std::runtime_error& e) {
2187         cerr << e.what() << std::endl;
2188         ret = -1;
2189         goto out;
2190       }
2191     }
2192     if (formatter) {
2193       formatter->close_section(); //objects
2194       formatter->flush(*outstream);
2195       if (pretty_format)
2196         *outstream << std::endl;
2197       formatter->flush(*outstream);
2198     }
2199     if (!stdout)
2200       delete outstream;
2201   }
2202   else if (strcmp(nargs[0], "chown") == 0) {
2203     if (!pool_name || nargs.size() < 2)
2204       usage_exit();
2205
2206     char* endptr = NULL;
2207     uint64_t new_auid = strtol(nargs[1], &endptr, 10);
2208     if (*endptr) {
2209       cerr << "Invalid value for new-auid: '" << nargs[1] << "'" << std::endl;
2210       ret = -1;
2211       goto out;
2212     }
2213     ret = io_ctx.set_auid(new_auid);
2214     if (ret < 0) {
2215       cerr << "error changing auid on pool " << io_ctx.get_pool_name() << ':'
2216            << cpp_strerror(ret) << std::endl;
2217     } else cerr << "changed auid on pool " << io_ctx.get_pool_name()
2218                 << " to " << new_auid << std::endl;
2219   }
2220   else if (strcmp(nargs[0], "mapext") == 0) {
2221     if (!pool_name || nargs.size() < 2)
2222       usage_exit();
2223     string oid(nargs[1]);
2224     std::map<uint64_t,uint64_t> m;
2225     ret = io_ctx.mapext(oid, 0, -1, m);
2226     if (ret < 0) {
2227       cerr << "mapext error on " << pool_name << "/" << oid << ": " << cpp_strerror(ret) << std::endl;
2228       goto out;
2229     }
2230     std::map<uint64_t,uint64_t>::iterator iter;
2231     for (iter = m.begin(); iter != m.end(); ++iter) {
2232       cout << hex << iter->first << "\t" << iter->second << dec << std::endl;
2233     }
2234   }
2235   else if (strcmp(nargs[0], "stat") == 0) {
2236     if (!pool_name || nargs.size() < 2)
2237       usage_exit();
2238     string oid(nargs[1]);
2239     uint64_t size;
2240     time_t mtime;
2241     if (use_striper) {
2242       ret = striper.stat(oid, &size, &mtime);
2243     } else {
2244       ret = io_ctx.stat(oid, &size, &mtime);
2245     }
2246     if (ret < 0) {
2247       cerr << " error stat-ing " << pool_name << "/" << oid << ": "
2248            << cpp_strerror(ret) << std::endl;
2249       goto out;
2250     } else {
2251       utime_t t(mtime, 0);
2252       cout << pool_name << "/" << oid
2253            << " mtime " << t << ", size " << size << std::endl;
2254     }
2255   }
2256   else if (strcmp(nargs[0], "get") == 0) {
2257     if (!pool_name || nargs.size() < 3)
2258       usage_exit();
2259     ret = do_get(io_ctx, striper, nargs[1], nargs[2], op_size, use_striper);
2260     if (ret < 0) {
2261       cerr << "error getting " << pool_name << "/" << nargs[1] << ": " << cpp_strerror(ret) << std::endl;
2262       goto out;
2263     }
2264   }
2265   else if (strcmp(nargs[0], "put") == 0) {
2266     if (!pool_name || nargs.size() < 3)
2267       usage_exit();
2268     ret = do_put(io_ctx, striper, nargs[1], nargs[2], op_size, obj_offset, use_striper);
2269     if (ret < 0) {
2270       cerr << "error putting " << pool_name << "/" << nargs[1] << ": " << cpp_strerror(ret) << std::endl;
2271       goto out;
2272     }
2273   }
2274   else if (strcmp(nargs[0], "append") == 0) {
2275     if (!pool_name || nargs.size() < 3)
2276       usage_exit();
2277     ret = do_append(io_ctx, striper, nargs[1], nargs[2], op_size, use_striper);
2278     if (ret < 0) {
2279       cerr << "error appending " << pool_name << "/" << nargs[1] << ": " << cpp_strerror(ret) << std::endl;
2280       goto out;
2281     }
2282   }
2283   else if (strcmp(nargs[0], "truncate") == 0) {
2284     if (!pool_name || nargs.size() < 3)
2285       usage_exit();
2286
2287     string oid(nargs[1]);
2288     char* endptr = NULL;
2289     long size = strtoll(nargs[2], &endptr, 10);
2290     if (*endptr) {
2291       cerr << "Invalid value for size: '" << nargs[2] << "'" << std::endl;
2292       ret = -EINVAL;
2293       goto out;
2294     }
2295     if (size < 0) {
2296       cerr << "error, cannot truncate to negative value" << std::endl;
2297       usage_exit();
2298     }
2299     if (use_striper) {
2300       ret = striper.trunc(oid, size);
2301     } else {
2302       ret = io_ctx.trunc(oid, size);
2303     }
2304     if (ret < 0) {
2305       cerr << "error truncating oid "
2306            << oid << " to " << size << ": "
2307            << cpp_strerror(ret) << std::endl;
2308     } else {
2309       ret = 0;
2310     }
2311   }
2312   else if (strcmp(nargs[0], "setxattr") == 0) {
2313     if (!pool_name || nargs.size() < 3 || nargs.size() > 4)
2314       usage_exit();
2315
2316     string oid(nargs[1]);
2317     string attr_name(nargs[2]);
2318     bufferlist bl;
2319     if (nargs.size() == 4) {
2320       string attr_val(nargs[3]);
2321       bl.append(attr_val.c_str(), attr_val.length());
2322     } else {
2323       do {
2324         ret = bl.read_fd(STDIN_FILENO, 1024); // from stdin
2325         if (ret < 0)
2326           goto out;
2327       } while (ret > 0);
2328     }
2329
2330     if (use_striper) {
2331       ret = striper.setxattr(oid, attr_name.c_str(), bl);
2332     } else {
2333       ret = io_ctx.setxattr(oid, attr_name.c_str(), bl);
2334     }
2335     if (ret < 0) {
2336       cerr << "error setting xattr " << pool_name << "/" << oid << "/" << attr_name << ": " << cpp_strerror(ret) << std::endl;
2337       goto out;
2338     }
2339     else
2340       ret = 0;
2341   }
2342   else if (strcmp(nargs[0], "getxattr") == 0) {
2343     if (!pool_name || nargs.size() < 3)
2344       usage_exit();
2345
2346     string oid(nargs[1]);
2347     string attr_name(nargs[2]);
2348
2349     bufferlist bl;
2350     if (use_striper) {
2351       ret = striper.getxattr(oid, attr_name.c_str(), bl);
2352     } else {
2353       ret = io_ctx.getxattr(oid, attr_name.c_str(), bl);
2354     }
2355     if (ret < 0) {
2356       cerr << "error getting xattr " << pool_name << "/" << oid << "/" << attr_name << ": " << cpp_strerror(ret) << std::endl;
2357       goto out;
2358     }
2359     else
2360       ret = 0;
2361     string s(bl.c_str(), bl.length());
2362     cout << s;
2363   } else if (strcmp(nargs[0], "rmxattr") == 0) {
2364     if (!pool_name || nargs.size() < 3)
2365       usage_exit();
2366
2367     string oid(nargs[1]);
2368     string attr_name(nargs[2]);
2369
2370     if (use_striper) {
2371       ret = striper.rmxattr(oid, attr_name.c_str());
2372     } else {
2373       ret = io_ctx.rmxattr(oid, attr_name.c_str());
2374     }
2375     if (ret < 0) {
2376       cerr << "error removing xattr " << pool_name << "/" << oid << "/" << attr_name << ": " << cpp_strerror(ret) << std::endl;
2377       goto out;
2378     }
2379   } else if (strcmp(nargs[0], "listxattr") == 0) {
2380     if (!pool_name || nargs.size() < 2)
2381       usage_exit();
2382
2383     string oid(nargs[1]);
2384     map<std::string, bufferlist> attrset;
2385     bufferlist bl;
2386     if (use_striper) {
2387       ret = striper.getxattrs(oid, attrset);
2388     } else {
2389       ret = io_ctx.getxattrs(oid, attrset);
2390     }
2391     if (ret < 0) {
2392       cerr << "error getting xattr set " << pool_name << "/" << oid << ": " << cpp_strerror(ret) << std::endl;
2393       goto out;
2394     }
2395
2396     for (map<std::string, bufferlist>::iterator iter = attrset.begin();
2397          iter != attrset.end(); ++iter) {
2398       cout << iter->first << std::endl;
2399     }
2400   } else if (strcmp(nargs[0], "getomapheader") == 0) {
2401     if (!pool_name || nargs.size() < 2)
2402       usage_exit();
2403
2404     string oid(nargs[1]);
2405     string outfile;
2406     if (nargs.size() >= 3) {
2407       outfile = nargs[2];
2408     }
2409
2410     bufferlist header;
2411     ret = io_ctx.omap_get_header(oid, &header);
2412     if (ret < 0) {
2413       cerr << "error getting omap header " << pool_name << "/" << oid
2414            << ": " << cpp_strerror(ret) << std::endl;
2415       goto out;
2416     } else {
2417       if (!outfile.empty()) {
2418         cerr << "Writing to " << outfile << std::endl;
2419         dump_data(outfile, header);
2420       } else {
2421         cout << "header (" << header.length() << " bytes) :\n";
2422         header.hexdump(cout);
2423         cout << std::endl;
2424       }
2425       ret = 0;
2426     }
2427   } else if (strcmp(nargs[0], "setomapheader") == 0) {
2428     if (!pool_name || nargs.size() < 3)
2429       usage_exit();
2430
2431     string oid(nargs[1]);
2432     string val(nargs[2]);
2433
2434     bufferlist bl;
2435     bl.append(val);
2436
2437     ret = io_ctx.omap_set_header(oid, bl);
2438     if (ret < 0) {
2439       cerr << "error setting omap value " << pool_name << "/" << oid
2440            << ": " << cpp_strerror(ret) << std::endl;
2441       goto out;
2442     } else {
2443       ret = 0;
2444     }
2445   } else if (strcmp(nargs[0], "setomapval") == 0) {
2446     uint32_t min_args = (omap_key_valid ? 2 : 3);
2447     if (!pool_name || nargs.size() < min_args || nargs.size() > min_args + 1) {
2448       usage_exit();
2449     }
2450
2451     string oid(nargs[1]);
2452     if (!omap_key_valid) {
2453       omap_key = nargs[2];
2454       omap_key_pretty = omap_key;
2455     }
2456
2457     bufferlist bl;
2458     if (nargs.size() > min_args) {
2459       string val(nargs[min_args]);
2460       bl.append(val);
2461     } else {
2462       do {
2463         ret = bl.read_fd(STDIN_FILENO, 1024); // from stdin
2464         if (ret < 0) {
2465           goto out;
2466         }
2467       } while (ret > 0);
2468     }
2469
2470     map<string, bufferlist> values;
2471     values[omap_key] = bl;
2472
2473     ret = io_ctx.omap_set(oid, values);
2474     if (ret < 0) {
2475       cerr << "error setting omap value " << pool_name << "/" << oid << "/"
2476            << omap_key_pretty << ": " << cpp_strerror(ret) << std::endl;
2477       goto out;
2478     } else {
2479       ret = 0;
2480     }
2481   } else if (strcmp(nargs[0], "getomapval") == 0) {
2482     uint32_t min_args = (omap_key_valid ? 2 : 3);
2483     if (!pool_name || nargs.size() < min_args || nargs.size() > min_args + 1) {
2484       usage_exit();
2485     }
2486
2487     string oid(nargs[1]);
2488     if (!omap_key_valid) {
2489       omap_key = nargs[2];
2490       omap_key_pretty = omap_key;
2491     }
2492
2493     set<string> keys;
2494     keys.insert(omap_key);
2495
2496     std::string outfile;
2497     if (nargs.size() > min_args) {
2498       outfile = nargs[min_args];
2499     }
2500
2501     map<string, bufferlist> values;
2502     ret = io_ctx.omap_get_vals_by_keys(oid, keys, &values);
2503     if (ret < 0) {
2504       cerr << "error getting omap value " << pool_name << "/" << oid << "/"
2505            << omap_key_pretty << ": " << cpp_strerror(ret) << std::endl;
2506       goto out;
2507     } else {
2508       ret = 0;
2509     }
2510
2511     if (values.size() && values.begin()->first == omap_key) {
2512       if (!outfile.empty()) {
2513         cerr << "Writing to " << outfile << std::endl;
2514         dump_data(outfile, values.begin()->second);
2515       } else {
2516         cout << "value (" << values.begin()->second.length() << " bytes) :\n";
2517         values.begin()->second.hexdump(cout);
2518         cout << std::endl;
2519       }
2520       ret = 0;
2521     } else {
2522       cout << "No such key: " << pool_name << "/" << oid << "/"
2523            << omap_key_pretty << std::endl;
2524       ret = -1;
2525       goto out;
2526     }
2527   } else if (strcmp(nargs[0], "rmomapkey") == 0) {
2528     uint32_t num_args = (omap_key_valid ? 2 : 3);
2529     if (!pool_name || nargs.size() != num_args) {
2530       usage_exit();
2531     }
2532
2533     string oid(nargs[1]);
2534     if (!omap_key_valid) {
2535       omap_key = nargs[2];
2536       omap_key_pretty = omap_key;
2537     }
2538     set<string> keys;
2539     keys.insert(omap_key);
2540
2541     ret = io_ctx.omap_rm_keys(oid, keys);
2542     if (ret < 0) {
2543       cerr << "error removing omap key " << pool_name << "/" << oid << "/"
2544            << omap_key_pretty << ": " << cpp_strerror(ret) << std::endl;
2545       goto out;
2546     } else {
2547       ret = 0;
2548     }
2549   } else if (strcmp(nargs[0], "listomapvals") == 0) {
2550     if (!pool_name || nargs.size() < 2)
2551       usage_exit();
2552
2553     string oid(nargs[1]);
2554     string last_read = "";
2555     int MAX_READ = 512;
2556     do {
2557       map<string, bufferlist> values;
2558       ret = io_ctx.omap_get_vals(oid, last_read, MAX_READ, &values);
2559       if (ret < 0) {
2560         cerr << "error getting omap keys " << pool_name << "/" << oid << ": "
2561              << cpp_strerror(ret) << std::endl;
2562         return 1;
2563       }
2564       ret = values.size();
2565       for (map<string, bufferlist>::const_iterator it = values.begin();
2566            it != values.end(); ++it) {
2567         last_read = it->first;
2568         // dump key in hex if it contains nonprintable characters
2569         if (std::count_if(it->first.begin(), it->first.end(),
2570             (int (*)(int))isprint) < (int)it->first.length()) {
2571           cout << "key (" << it->first.length() << " bytes):\n";
2572           bufferlist keybl;
2573           keybl.append(it->first);
2574           keybl.hexdump(cout);
2575         } else {
2576           cout << it->first;
2577         }
2578         cout << std::endl;
2579         cout << "value (" << it->second.length() << " bytes) :\n";
2580         it->second.hexdump(cout);
2581         cout << std::endl;
2582       }
2583     } while (ret == MAX_READ);
2584     ret = 0;
2585   }
2586   else if (strcmp(nargs[0], "cp") == 0) {
2587     if (!pool_name)
2588       usage_exit();
2589
2590     if (nargs.size() < 2 || nargs.size() > 3)
2591       usage_exit();
2592
2593     const char *target = target_pool_name;
2594     if (!target)
2595       target = pool_name;
2596
2597     const char *target_obj;
2598     if (nargs.size() < 3) {
2599       if (strcmp(target, pool_name) == 0) {
2600         cerr << "cannot copy object into itself" << std::endl;
2601         ret = -1;
2602         goto out;
2603       }
2604       target_obj = nargs[1];
2605     } else {
2606       target_obj = nargs[2];
2607     }
2608
2609     // open io context.
2610     IoCtx target_ctx;
2611     ret = rados.ioctx_create(target, target_ctx);
2612     if (ret < 0) {
2613       cerr << "error opening target pool " << target << ": "
2614            << cpp_strerror(ret) << std::endl;
2615       goto out;
2616     }
2617     if (target_oloc.size()) {
2618       target_ctx.locator_set_key(target_oloc);
2619     }
2620     if (target_nspace.size()) {
2621       target_ctx.set_namespace(target_nspace);
2622     }
2623
2624     ret = do_copy(io_ctx, nargs[1], target_ctx, target_obj);
2625     if (ret < 0) {
2626       cerr << "error copying " << pool_name << "/" << nargs[1] << " => " << target << "/" << target_obj << ": " << cpp_strerror(ret) << std::endl;
2627       goto out;
2628     }
2629   } else if (strcmp(nargs[0], "rm") == 0) {
2630     if (!pool_name || nargs.size() < 2)
2631       usage_exit();
2632     vector<const char *>::iterator iter = nargs.begin();
2633     ++iter;
2634     for (; iter != nargs.end(); ++iter) {
2635       const string & oid = *iter;
2636       if (use_striper) {
2637         if (forcefull) {
2638           ret = striper.remove(oid, CEPH_OSD_FLAG_FULL_FORCE);
2639         } else {
2640           ret = striper.remove(oid);
2641         }
2642       } else {
2643         if (forcefull) {
2644           ret = io_ctx.remove(oid, CEPH_OSD_FLAG_FULL_FORCE);
2645         } else {
2646           ret = io_ctx.remove(oid);
2647         }
2648       }
2649       if (ret < 0) {
2650         string name = (nspace.size() ? nspace + "/" : "" ) + oid;
2651         cerr << "error removing " << pool_name << ">" << name << ": " << cpp_strerror(ret) << std::endl;
2652         goto out;
2653       }
2654     }
2655   }
2656   else if (strcmp(nargs[0], "create") == 0) {
2657     if (!pool_name || nargs.size() < 2)
2658       usage_exit();
2659     string oid(nargs[1]);
2660     ret = io_ctx.create(oid, true);
2661     if (ret < 0) {
2662       cerr << "error creating " << pool_name << "/" << oid << ": " << cpp_strerror(ret) << std::endl;
2663       goto out;
2664     }
2665   }
2666
2667   else if (strcmp(nargs[0], "tmap") == 0) {
2668     if (nargs.size() < 3)
2669       usage_exit();
2670     if (strcmp(nargs[1], "dump") == 0) {
2671       bufferlist outdata;
2672       string oid(nargs[2]);
2673       ret = io_ctx.read(oid, outdata, 0, 0);
2674       if (ret < 0) {
2675         cerr << "error reading " << pool_name << "/" << oid << ": " << cpp_strerror(ret) << std::endl;
2676         goto out;
2677       }
2678       bufferlist::iterator p = outdata.begin();
2679       bufferlist header;
2680       map<string, bufferlist> kv;
2681       try {
2682         ::decode(header, p);
2683         ::decode(kv, p);
2684       }
2685       catch (buffer::error& e) {
2686         cerr << "error decoding tmap " << pool_name << "/" << oid << std::endl;
2687         ret = -EINVAL;
2688         goto out;
2689       }
2690       cout << "header (" << header.length() << " bytes):\n";
2691       header.hexdump(cout);
2692       cout << "\n";
2693       cout << kv.size() << " keys\n";
2694       for (map<string,bufferlist>::iterator q = kv.begin(); q != kv.end(); ++q) {
2695         cout << "key '" << q->first << "' (" << q->second.length() << " bytes):\n";
2696         q->second.hexdump(cout);
2697         cout << "\n";
2698       }
2699     }
2700     else if (strcmp(nargs[1], "set") == 0 ||
2701              strcmp(nargs[1], "create") == 0) {
2702       if (nargs.size() < 5)
2703         usage_exit();
2704       string oid(nargs[2]);
2705       string k(nargs[3]);
2706       string v(nargs[4]);
2707       bufferlist bl;
2708       char c = (strcmp(nargs[1], "set") == 0) ? CEPH_OSD_TMAP_SET : CEPH_OSD_TMAP_CREATE;
2709       ::encode(c, bl);
2710       ::encode(k, bl);
2711       ::encode(v, bl);
2712       ret = io_ctx.tmap_update(oid, bl);
2713     }
2714   }
2715
2716   else if (strcmp(nargs[0], "tmap-to-omap") == 0) {
2717     if (!pool_name || nargs.size() < 2)
2718       usage_exit();
2719     string oid(nargs[1]);
2720
2721     bufferlist bl;
2722     int r = io_ctx.tmap_get(oid, bl);
2723     if (r < 0) {
2724       ret = r;
2725       cerr << "error reading tmap " << pool_name << "/" << oid
2726            << ": " << cpp_strerror(ret) << std::endl;
2727       goto out;
2728     }
2729     bufferlist hdr;
2730     map<string, bufferlist> kv;
2731     bufferlist::iterator p = bl.begin();
2732     try {
2733       ::decode(hdr, p);
2734       ::decode(kv, p);
2735     }
2736     catch (buffer::error& e) {
2737       cerr << "error decoding tmap " << pool_name << "/" << oid << std::endl;
2738       ret = -EINVAL;
2739       goto out;
2740     }
2741     if (!p.end()) {
2742       cerr << "error decoding tmap (stray trailing data) in " << pool_name << "/" << oid << std::endl;
2743       ret = -EINVAL;
2744       goto out;
2745     }
2746     librados::ObjectWriteOperation wr;
2747     wr.omap_set_header(hdr);
2748     wr.omap_set(kv);
2749     wr.truncate(0);  // delete the old tmap data
2750     r = io_ctx.operate(oid, &wr);
2751     if (r < 0) {
2752       ret = r;
2753       cerr << "error writing tmap data as omap on " << pool_name << "/" << oid
2754            << ": " << cpp_strerror(ret) << std::endl;
2755       goto out;
2756     }
2757     ret = 0;
2758   }
2759
2760   else if (strcmp(nargs[0], "mkpool") == 0) {
2761     int auid = 0;
2762     __u8 crush_rule = 0;
2763     if (nargs.size() < 2)
2764       usage_exit();
2765     if (nargs.size() > 2) {
2766       char* endptr = NULL;
2767       auid = strtol(nargs[2], &endptr, 10);
2768       if (*endptr) {
2769         cerr << "Invalid value for auid: '" << nargs[2] << "'" << std::endl;
2770         ret = -EINVAL;
2771         goto out;
2772       }
2773       cerr << "setting auid:" << auid << std::endl;
2774       if (nargs.size() > 3) {
2775         crush_rule = (__u8)strtol(nargs[3], &endptr, 10);
2776         if (*endptr) {
2777           cerr << "Invalid value for crush-rule: '" << nargs[3] << "'" << std::endl;
2778           ret = -EINVAL;
2779           goto out;
2780         }
2781         cerr << "using crush rule " << (int)crush_rule << std::endl;
2782       }
2783     }
2784     ret = rados.pool_create(nargs[1], auid, crush_rule);
2785     if (ret < 0) {
2786       cerr << "error creating pool " << nargs[1] << ": "
2787            << cpp_strerror(ret) << std::endl;
2788       goto out;
2789     }
2790     cout << "successfully created pool " << nargs[1] << std::endl;
2791   }
2792   else if (strcmp(nargs[0], "cppool") == 0) {
2793     bool force = nargs.size() == 4 && !strcmp(nargs[3], "--yes-i-really-mean-it");
2794     if (nargs.size() != 3 && !(nargs.size() == 4 && force))
2795       usage_exit();
2796     const char *src_pool = nargs[1];
2797     const char *target_pool = nargs[2];
2798
2799     if (strcmp(src_pool, target_pool) == 0) {
2800       cerr << "cannot copy pool into itself" << std::endl;
2801       ret = -1;
2802       goto out;
2803     }
2804
2805     cerr << "WARNING: pool copy does not preserve user_version, which some "
2806          << "    apps may rely on." << std::endl;
2807
2808     if (rados.get_pool_is_selfmanaged_snaps_mode(src_pool)) {
2809       cerr << "WARNING: pool " << src_pool << " has selfmanaged snaps, which are not preserved\n"
2810            << "    by the cppool operation.  This will break any snapshot user."
2811            << std::endl;
2812       if (!force) {
2813         cerr << "    If you insist on making a broken copy, you can pass\n"
2814              << "    --yes-i-really-mean-it to proceed anyway."
2815              << std::endl;
2816         exit(1);
2817       }
2818     }
2819
2820     ret = do_copy_pool(rados, src_pool, target_pool);
2821     if (ret < 0) {
2822       cerr << "error copying pool " << src_pool << " => " << target_pool << ": "
2823            << cpp_strerror(ret) << std::endl;
2824       goto out;
2825     }
2826     cout << "successfully copied pool " << nargs[1] << std::endl;
2827   }
2828   else if (strcmp(nargs[0], "rmpool") == 0) {
2829     if (nargs.size() < 2)
2830       usage_exit();
2831     if (nargs.size() < 4 ||
2832         strcmp(nargs[1], nargs[2]) != 0 ||
2833         strcmp(nargs[3], "--yes-i-really-really-mean-it") != 0) {
2834       cerr << "WARNING:\n"
2835            << "  This will PERMANENTLY DESTROY an entire pool of objects with no way back.\n"
2836            << "  To confirm, pass the pool to remove twice, followed by\n"
2837            << "  --yes-i-really-really-mean-it" << std::endl;
2838       ret = -1;
2839       goto out;
2840     }
2841     ret = rados.pool_delete(nargs[1]);
2842     if (ret >= 0) {
2843       cout << "successfully deleted pool " << nargs[1] << std::endl;
2844     } else { //error
2845       cerr << "pool " << nargs[1] << " could not be removed" << std::endl;
2846       cerr << "Check your monitor configuration - `mon allow pool delete` is set to false by default,"
2847      << " change it to true to allow deletion of pools" << std::endl;
2848     }
2849   }
2850   else if (strcmp(nargs[0], "purge") == 0) {
2851     if (nargs.size() < 2)
2852       usage_exit();
2853     if (nargs.size() < 3 ||
2854         strcmp(nargs[2], "--yes-i-really-really-mean-it") != 0) {
2855       cerr << "WARNING:\n"
2856            << "  This will PERMANENTLY DESTROY all objects from a pool with no way back.\n"
2857            << "  To confirm, follow pool with --yes-i-really-really-mean-it" << std::endl;
2858       ret = -1;
2859       goto out;
2860     }
2861     ret = rados.ioctx_create(nargs[1], io_ctx);
2862     if (ret < 0) {
2863       cerr << "error pool " << nargs[1] << ": "
2864            << cpp_strerror(ret) << std::endl;
2865       goto out;
2866     }
2867     io_ctx.set_namespace(all_nspaces);
2868     io_ctx.set_osdmap_full_try();
2869     RadosBencher bencher(g_ceph_context, rados, io_ctx);
2870     ret = bencher.clean_up_slow("", concurrent_ios);
2871     if (ret >= 0) {
2872       cout << "successfully purged pool " << nargs[1] << std::endl;
2873     } else { //error
2874       cerr << "pool " << nargs[1] << " could not be purged" << std::endl;
2875       cerr << "Check your monitor configuration - `mon allow pool delete` is set to false by default,"
2876      << " change it to true to allow deletion of pools" << std::endl;
2877     }
2878   }
2879   else if (strcmp(nargs[0], "lssnap") == 0) {
2880     if (!pool_name || nargs.size() != 1)
2881       usage_exit();
2882
2883     vector<snap_t> snaps;
2884     io_ctx.snap_list(&snaps);
2885     for (vector<snap_t>::iterator i = snaps.begin();
2886          i != snaps.end();
2887          ++i) {
2888       string s;
2889       time_t t;
2890       if (io_ctx.snap_get_name(*i, &s) < 0)
2891         continue;
2892       if (io_ctx.snap_get_stamp(*i, &t) < 0)
2893         continue;
2894       struct tm bdt;
2895       localtime_r(&t, &bdt);
2896       cout << *i << "\t" << s << "\t";
2897
2898       std::ios_base::fmtflags original_flags = cout.flags();
2899       cout.setf(std::ios::right);
2900       cout.fill('0');
2901       cout << std::setw(4) << (bdt.tm_year+1900)
2902            << '.' << std::setw(2) << (bdt.tm_mon+1)
2903            << '.' << std::setw(2) << bdt.tm_mday
2904            << ' '
2905            << std::setw(2) << bdt.tm_hour
2906            << ':' << std::setw(2) << bdt.tm_min
2907            << ':' << std::setw(2) << bdt.tm_sec
2908            << std::endl;
2909       cout.flags(original_flags);
2910     }
2911     cout << snaps.size() << " snaps" << std::endl;
2912   }
2913
2914   else if (strcmp(nargs[0], "mksnap") == 0) {
2915     if (!pool_name || nargs.size() < 2)
2916       usage_exit();
2917
2918     ret = io_ctx.snap_create(nargs[1]);
2919     if (ret < 0) {
2920       cerr << "error creating pool " << pool_name << " snapshot " << nargs[1]
2921            << ": " << cpp_strerror(ret) << std::endl;
2922       goto out;
2923     }
2924     cout << "created pool " << pool_name << " snap " << nargs[1] << std::endl;
2925   }
2926
2927   else if (strcmp(nargs[0], "rmsnap") == 0) {
2928     if (!pool_name || nargs.size() < 2)
2929       usage_exit();
2930
2931     ret = io_ctx.snap_remove(nargs[1]);
2932     if (ret < 0) {
2933       cerr << "error removing pool " << pool_name << " snapshot " << nargs[1]
2934            << ": " << cpp_strerror(ret) << std::endl;
2935       goto out;
2936     }
2937     cout << "removed pool " << pool_name << " snap " << nargs[1] << std::endl;
2938   }
2939
2940   else if (strcmp(nargs[0], "rollback") == 0) {
2941     if (!pool_name || nargs.size() < 3)
2942       usage_exit();
2943
2944     ret = io_ctx.snap_rollback(nargs[1], nargs[2]);
2945     if (ret < 0) {
2946       cerr << "error rolling back pool " << pool_name << " to snapshot " << nargs[1]
2947            << cpp_strerror(ret) << std::endl;
2948       goto out;
2949     }
2950     cout << "rolled back pool " << pool_name
2951          << " to snapshot " << nargs[2] << std::endl;
2952   }
2953   else if (strcmp(nargs[0], "bench") == 0) {
2954     if (!pool_name || nargs.size() < 3)
2955       usage_exit();
2956     char* endptr = NULL;
2957     int seconds = strtol(nargs[1], &endptr, 10);
2958     if (*endptr) {
2959       cerr << "Invalid value for seconds: '" << nargs[1] << "'" << std::endl;
2960       ret = -EINVAL;
2961       goto out;
2962     }
2963     int operation = 0;
2964     if (strcmp(nargs[2], "write") == 0)
2965       operation = OP_WRITE;
2966     else if (strcmp(nargs[2], "seq") == 0)
2967       operation = OP_SEQ_READ;
2968     else if (strcmp(nargs[2], "rand") == 0)
2969       operation = OP_RAND_READ;
2970     else
2971       usage_exit();
2972     if (operation != OP_WRITE) {
2973       if (block_size_specified) {
2974         cerr << "-b|--block_size option can be used only with 'write' bench test"
2975              << std::endl;
2976         ret = -EINVAL;
2977         goto out;
2978       }
2979       if (bench_write_dest != 0) {
2980         cerr << "--write-object, --write-omap and --write-xattr options can "
2981                 "only be used with the 'write' bench test"
2982              << std::endl;
2983         ret = -EINVAL;
2984         goto out;
2985       }
2986     }
2987     else if (bench_write_dest == 0) {
2988       bench_write_dest = OP_WRITE_DEST_OBJ;
2989     }
2990
2991     if (!formatter && output) {
2992       cerr << "-o|--output option can only be used with '--format' option"
2993            << std::endl;
2994       ret = -EINVAL;
2995       goto out;
2996     }
2997     RadosBencher bencher(g_ceph_context, rados, io_ctx);
2998     bencher.set_show_time(show_time);
2999     bencher.set_write_destination(static_cast<OpWriteDest>(bench_write_dest));
3000
3001     ostream *outstream = NULL;
3002     if (formatter) {
3003       bencher.set_formatter(formatter);
3004       if (output)
3005         outstream = new ofstream(output);
3006       else
3007         outstream = &cout;
3008       bencher.set_outstream(*outstream);
3009     }
3010     if (!object_size)
3011       object_size = op_size;
3012     else if (object_size < op_size)
3013       op_size = object_size;
3014     cout << "hints = " << (int)hints << std::endl;
3015     ret = bencher.aio_bench(operation, seconds,
3016                             concurrent_ios, op_size, object_size,
3017                             max_objects, cleanup, hints, run_name, no_verify);
3018     if (ret != 0)
3019       cerr << "error during benchmark: " << cpp_strerror(ret) << std::endl;
3020     if (formatter && output)
3021       delete outstream;
3022   }
3023   else if (strcmp(nargs[0], "cleanup") == 0) {
3024     if (!pool_name)
3025       usage_exit();
3026     if (wildcard)
3027       io_ctx.set_namespace(all_nspaces);
3028     RadosBencher bencher(g_ceph_context, rados, io_ctx);
3029     ret = bencher.clean_up(prefix, concurrent_ios, run_name);
3030     if (ret != 0)
3031       cerr << "error during cleanup: " << cpp_strerror(ret) << std::endl;
3032   }
3033   else if (strcmp(nargs[0], "watch") == 0) {
3034     if (!pool_name || nargs.size() < 2)
3035       usage_exit();
3036     string oid(nargs[1]);
3037     RadosWatchCtx ctx(io_ctx, oid.c_str());
3038     uint64_t cookie;
3039     ret = io_ctx.watch2(oid, &cookie, &ctx);
3040     if (ret != 0)
3041       cerr << "error calling watch: " << cpp_strerror(ret) << std::endl;
3042     else {
3043       cout << "press enter to exit..." << std::endl;
3044       getchar();
3045       io_ctx.unwatch2(cookie);
3046       rados.watch_flush();
3047     }
3048   }
3049   else if (strcmp(nargs[0], "notify") == 0) {
3050     if (!pool_name || nargs.size() < 3)
3051       usage_exit();
3052     string oid(nargs[1]);
3053     string msg(nargs[2]);
3054     bufferlist bl, replybl;
3055     ::encode(msg, bl);
3056     ret = io_ctx.notify2(oid, bl, 10000, &replybl);
3057     if (ret != 0)
3058       cerr << "error calling notify: " << cpp_strerror(ret) << std::endl;
3059     if (replybl.length()) {
3060       map<pair<uint64_t,uint64_t>,bufferlist> rm;
3061       set<pair<uint64_t,uint64_t> > missed;
3062       bufferlist::iterator p = replybl.begin();
3063       ::decode(rm, p);
3064       ::decode(missed, p);
3065       for (map<pair<uint64_t,uint64_t>,bufferlist>::iterator p = rm.begin();
3066            p != rm.end();
3067            ++p) {
3068         cout << "reply client." << p->first.first
3069              << " cookie " << p->first.second
3070              << " : " << p->second.length() << " bytes" << std::endl;
3071         if (p->second.length())
3072           p->second.hexdump(cout);
3073       }
3074       for (multiset<pair<uint64_t,uint64_t> >::iterator p = missed.begin();
3075            p != missed.end(); ++p) {
3076         cout << "timeout client." << p->first
3077              << " cookie " << p->second << std::endl;
3078       }
3079     }
3080   } else if (strcmp(nargs[0], "set-alloc-hint") == 0) {
3081     if (!pool_name || nargs.size() < 4)
3082       usage_exit();
3083     string err;
3084     string oid(nargs[1]);
3085     uint64_t expected_object_size = strict_strtoll(nargs[2], 10, &err);
3086     if (!err.empty()) {
3087       cerr << "couldn't parse expected_object_size: " << err << std::endl;
3088       usage_exit();
3089     }
3090     uint64_t expected_write_size = strict_strtoll(nargs[3], 10, &err);
3091     if (!err.empty()) {
3092       cerr << "couldn't parse expected_write_size: " << err << std::endl;
3093       usage_exit();
3094     }
3095     ret = io_ctx.set_alloc_hint(oid, expected_object_size, expected_write_size);
3096     if (ret < 0) {
3097       cerr << "error setting alloc-hint " << pool_name << "/" << oid << ": "
3098            << cpp_strerror(ret) << std::endl;
3099       goto out;
3100     }
3101   } else if (strcmp(nargs[0], "load-gen") == 0) {
3102     if (!pool_name) {
3103       cerr << "error: must specify pool" << std::endl;
3104       usage_exit();
3105     }
3106     LoadGen lg(&rados);
3107     if (min_obj_len)
3108       lg.min_obj_len = min_obj_len;
3109     if (max_obj_len)
3110       lg.max_obj_len = max_obj_len;
3111     if (min_op_len)
3112       lg.min_op_len = min_op_len;
3113     if (max_op_len)
3114       lg.max_op_len = max_op_len;
3115     if (max_ops)
3116       lg.max_ops = max_ops;
3117     if (max_backlog)
3118       lg.max_backlog = max_backlog;
3119     if (target_throughput)
3120       lg.target_throughput = target_throughput << 20;
3121     if (read_percent >= 0)
3122       lg.read_percent = read_percent;
3123     if (num_objs)
3124       lg.num_objs = num_objs;
3125     if (run_length)
3126       lg.run_length = run_length;
3127
3128     cout << "run length " << run_length << " seconds" << std::endl;
3129     cout << "preparing " << lg.num_objs << " objects" << std::endl;
3130     ret = lg.bootstrap(pool_name);
3131     if (ret < 0) {
3132       cerr << "load-gen bootstrap failed" << std::endl;
3133       exit(1);
3134     }
3135     cout << "load-gen will run " << lg.run_length << " seconds" << std::endl;
3136     lg.run();
3137     lg.cleanup();
3138   } else if (strcmp(nargs[0], "listomapkeys") == 0) {
3139     if (!pool_name || nargs.size() < 2)
3140       usage_exit();
3141
3142     set<string> out_keys;
3143     ret = io_ctx.omap_get_keys(nargs[1], "", LONG_MAX, &out_keys);
3144     if (ret < 0) {
3145       cerr << "error getting omap key set " << pool_name << "/"
3146            << nargs[1] << ": "  << cpp_strerror(ret) << std::endl;
3147       goto out;
3148     }
3149
3150     for (set<string>::iterator iter = out_keys.begin();
3151          iter != out_keys.end(); ++iter) {
3152       cout << *iter << std::endl;
3153     }
3154   } else if (strcmp(nargs[0], "lock") == 0) {
3155     if (!pool_name)
3156       usage_exit();
3157
3158     if (!formatter) {
3159       formatter = new JSONFormatter(pretty_format);
3160     }
3161     ret = do_lock_cmd(nargs, opts, &io_ctx, formatter);
3162   } else if (strcmp(nargs[0], "listwatchers") == 0) {
3163     if (!pool_name || nargs.size() < 2)
3164       usage_exit();
3165
3166     string oid(nargs[1]);
3167     std::list<obj_watch_t> lw;
3168
3169     ret = io_ctx.list_watchers(oid, &lw);
3170     if (ret < 0) {
3171       cerr << "error listing watchers " << pool_name << "/" << oid << ": " << cpp_strerror(ret) << std::endl;
3172       goto out;
3173     }
3174     else
3175       ret = 0;
3176     
3177     for (std::list<obj_watch_t>::iterator i = lw.begin(); i != lw.end(); ++i) {
3178       cout << "watcher=" << i->addr << " client." << i->watcher_id << " cookie=" << i->cookie << std::endl;
3179     }
3180   } else if (strcmp(nargs[0], "listsnaps") == 0) {
3181     if (!pool_name || nargs.size() < 2)
3182       usage_exit();
3183
3184     string oid(nargs[1]);
3185     snap_set_t ls;
3186
3187     io_ctx.snap_set_read(LIBRADOS_SNAP_DIR);
3188     ret = io_ctx.list_snaps(oid, &ls);
3189     if (ret < 0) {
3190       cerr << "error listing snap shots " << pool_name << "/" << oid << ": " << cpp_strerror(ret) << std::endl;
3191       goto out;
3192     }
3193     else
3194       ret = 0;
3195
3196     map<snap_t,string> snamemap;
3197     if (formatter || pretty_format) {
3198       vector<snap_t> snaps;
3199       io_ctx.snap_list(&snaps);
3200       for (vector<snap_t>::iterator i = snaps.begin();
3201           i != snaps.end(); ++i) {
3202         string s;
3203         if (io_ctx.snap_get_name(*i, &s) < 0)
3204           continue;
3205         snamemap.insert(pair<snap_t,string>(*i, s));
3206       }
3207     }
3208
3209     if (formatter) {
3210       formatter->open_object_section("object");
3211       formatter->dump_string("name", oid);
3212       formatter->open_array_section("clones");
3213     } else {
3214       cout << oid << ":" << std::endl;
3215       cout << "cloneid  snaps   size    overlap" << std::endl;
3216     }
3217
3218     for (std::vector<clone_info_t>::iterator ci = ls.clones.begin();
3219           ci != ls.clones.end(); ++ci) {
3220
3221       if (formatter) formatter->open_object_section("clone");
3222
3223       if (ci->cloneid == librados::SNAP_HEAD) {
3224         if (formatter)
3225           formatter->dump_string("id", "head");
3226         else
3227           cout << "head";
3228       } else {
3229         if (formatter)
3230           formatter->dump_unsigned("id", ci->cloneid);
3231         else
3232           cout << ci->cloneid;
3233       }
3234
3235       if (formatter)
3236         formatter->open_array_section("snapshots");
3237       else
3238         cout << "\t";
3239
3240       if (!formatter && ci->snaps.empty()) {
3241         cout << "-";
3242       }
3243       for (std::vector<snap_t>::const_iterator snapindex = ci->snaps.begin();
3244           snapindex != ci->snaps.end(); ++snapindex) {
3245
3246         map<snap_t,string>::iterator si;
3247
3248         if (formatter || pretty_format) si = snamemap.find(*snapindex);
3249
3250         if (formatter) {
3251           formatter->open_object_section("snapshot");
3252           formatter->dump_unsigned("id", *snapindex);
3253           if (si != snamemap.end())
3254             formatter->dump_string("name", si->second);
3255           formatter->close_section(); //snapshot
3256         } else {
3257           if (snapindex != ci->snaps.begin()) cout << ",";
3258           if (!pretty_format || (si == snamemap.end()))
3259             cout << *snapindex;
3260           else
3261             cout << si->second << "(" << *snapindex << ")";
3262         }
3263       }
3264
3265       if (formatter) {
3266         formatter->close_section();     //Snapshots
3267         formatter->dump_unsigned("size", ci->size);
3268       } else {
3269         cout << "\t" << ci->size;
3270       }
3271
3272       if (ci->cloneid != librados::SNAP_HEAD) {
3273         if (formatter)
3274           formatter->open_array_section("overlaps");
3275         else
3276           cout << "\t[";
3277
3278         for (std::vector< std::pair<uint64_t,uint64_t> >::iterator ovi = ci->overlap.begin();
3279             ovi != ci->overlap.end(); ++ovi) {
3280           if (formatter) {
3281             formatter->open_object_section("section");
3282             formatter->dump_unsigned("start", ovi->first);
3283             formatter->dump_unsigned("length", ovi->second);
3284             formatter->close_section(); //section
3285           } else {
3286             if (ovi != ci->overlap.begin()) cout << ",";
3287             cout << ovi->first << "~" << ovi->second;
3288           }
3289         }
3290         if (formatter)
3291           formatter->close_section(); //overlaps
3292         else
3293           cout << "]" << std::endl;
3294       }
3295       if (formatter) formatter->close_section(); //clone
3296     }
3297     if (formatter) {
3298       formatter->close_section(); //clones
3299       formatter->close_section(); //object
3300       formatter->flush(cout);
3301     } else {
3302       cout << std::endl;
3303     }
3304   } else if (strcmp(nargs[0], "list-inconsistent-pg") == 0) {
3305     if (!formatter) {
3306       formatter = new JSONFormatter(pretty_format);
3307     }
3308     ret = do_get_inconsistent_pg_cmd(nargs, rados, *formatter);
3309   } else if (strcmp(nargs[0], "list-inconsistent-obj") == 0) {
3310     if (!formatter) {
3311       formatter = new JSONFormatter(pretty_format);
3312     }
3313     ret = do_get_inconsistent_cmd<inconsistent_obj_t>(nargs, rados, *formatter);
3314   } else if (strcmp(nargs[0], "list-inconsistent-snapset") == 0) {
3315     if (!formatter) {
3316       formatter = new JSONFormatter(pretty_format);
3317     }
3318     ret = do_get_inconsistent_cmd<inconsistent_snapset_t>(nargs, rados, *formatter);
3319   } else if (strcmp(nargs[0], "cache-flush") == 0) {
3320     if (!pool_name || nargs.size() < 2)
3321       usage_exit();
3322     string oid(nargs[1]);
3323     if (with_clones) {
3324       snap_set_t ls;
3325       io_ctx.snap_set_read(LIBRADOS_SNAP_DIR);
3326       ret = io_ctx.list_snaps(oid, &ls);
3327       if (ret < 0) {
3328         cerr << "error listing snapshots " << pool_name << "/" << oid << ": "
3329              << cpp_strerror(ret) << std::endl;
3330         goto out;
3331       }
3332       for (std::vector<clone_info_t>::iterator ci = ls.clones.begin();
3333            ci != ls.clones.end(); ++ci) {
3334         if (snapid != CEPH_NOSNAP && ci->cloneid > snapid)
3335           break;
3336         io_ctx.snap_set_read(ci->cloneid);
3337         ret = do_cache_flush(io_ctx, oid);
3338         if (ret < 0) {
3339           cerr << "error from cache-flush " << oid << ": "
3340                << cpp_strerror(ret) << std::endl;
3341           goto out;
3342         }
3343       }
3344     } else {
3345       ret = do_cache_flush(io_ctx, oid);
3346       if (ret < 0) {
3347         cerr << "error from cache-flush " << oid << ": "
3348              << cpp_strerror(ret) << std::endl;
3349         goto out;
3350       }
3351     }
3352   } else if (strcmp(nargs[0], "cache-try-flush") == 0) {
3353     if (!pool_name || nargs.size() < 2)
3354       usage_exit();
3355     string oid(nargs[1]);
3356     if (with_clones) {
3357       snap_set_t ls;
3358       io_ctx.snap_set_read(LIBRADOS_SNAP_DIR);
3359       ret = io_ctx.list_snaps(oid, &ls);
3360       if (ret < 0) {
3361         cerr << "error listing snapshots " << pool_name << "/" << oid << ": "
3362              << cpp_strerror(ret) << std::endl;
3363         goto out;
3364       }
3365       for (std::vector<clone_info_t>::iterator ci = ls.clones.begin();
3366            ci != ls.clones.end(); ++ci) {
3367         if (snapid != CEPH_NOSNAP && ci->cloneid > snapid)
3368           break;
3369         io_ctx.snap_set_read(ci->cloneid);
3370         ret = do_cache_try_flush(io_ctx, oid);
3371         if (ret < 0) {
3372           cerr << "error from cache-flush " << oid << ": "
3373                << cpp_strerror(ret) << std::endl;
3374           goto out;
3375         }
3376       }
3377     } else {
3378       ret = do_cache_try_flush(io_ctx, oid);
3379       if (ret < 0) {
3380         cerr << "error from cache-flush " << oid << ": "
3381              << cpp_strerror(ret) << std::endl;
3382         goto out;
3383       }
3384     }
3385   } else if (strcmp(nargs[0], "cache-evict") == 0) {
3386     if (!pool_name || nargs.size() < 2)
3387       usage_exit();
3388     string oid(nargs[1]);
3389     if (with_clones) {
3390       snap_set_t ls;
3391       io_ctx.snap_set_read(LIBRADOS_SNAP_DIR);
3392       ret = io_ctx.list_snaps(oid, &ls);
3393       if (ret < 0) {
3394         cerr << "error listing snapshots " << pool_name << "/" << oid << ": "
3395              << cpp_strerror(ret) << std::endl;
3396         goto out;
3397       }
3398       for (std::vector<clone_info_t>::iterator ci = ls.clones.begin();
3399            ci != ls.clones.end(); ++ci) {
3400         if (snapid != CEPH_NOSNAP && ci->cloneid > snapid)
3401           break;
3402         io_ctx.snap_set_read(ci->cloneid);
3403         ret = do_cache_evict(io_ctx, oid);
3404         if (ret < 0) {
3405           cerr << "error from cache-flush " << oid << ": "
3406                << cpp_strerror(ret) << std::endl;
3407           goto out;
3408         }
3409       }
3410     } else {
3411       ret = do_cache_evict(io_ctx, oid);
3412       if (ret < 0) {
3413         cerr << "error from cache-flush " << oid << ": "
3414              << cpp_strerror(ret) << std::endl;
3415         goto out;
3416       }
3417     }
3418   } else if (strcmp(nargs[0], "cache-flush-evict-all") == 0) {
3419     if (!pool_name)
3420       usage_exit();
3421     ret = do_cache_flush_evict_all(io_ctx, true);
3422     if (ret < 0) {
3423       cerr << "error from cache-flush-evict-all: "
3424            << cpp_strerror(ret) << std::endl;
3425       goto out;
3426     }
3427   } else if (strcmp(nargs[0], "cache-try-flush-evict-all") == 0) {
3428     if (!pool_name)
3429       usage_exit();
3430     ret = do_cache_flush_evict_all(io_ctx, false);
3431     if (ret < 0) {
3432       cerr << "error from cache-try-flush-evict-all: "
3433            << cpp_strerror(ret) << std::endl;
3434       goto out;
3435     }
3436   } else if (strcmp(nargs[0], "set-redirect") == 0) {
3437     if (!pool_name)
3438       usage_exit();
3439
3440     const char *target = target_pool_name;
3441     if (!target)
3442       target = pool_name;
3443
3444     const char *target_obj;
3445     if (nargs.size() < 3) {
3446       if (strcmp(target, pool_name) == 0) {
3447         cerr << "cannot copy object into itself" << std::endl;
3448         ret = -1;
3449         goto out;
3450       }
3451       target_obj = nargs[1];
3452     } else {
3453       target_obj = nargs[2];
3454     }
3455
3456     IoCtx target_ctx;
3457     ret = rados.ioctx_create(target, target_ctx);
3458     if (target_oloc.size()) {
3459       target_ctx.locator_set_key(target_oloc);
3460     }
3461     if (target_nspace.size()) {
3462       target_ctx.set_namespace(target_nspace);
3463     }
3464
3465     ObjectWriteOperation op;
3466     op.set_redirect(target_obj, target_ctx, 0);
3467     ret = io_ctx.operate(nargs[1], &op);
3468     if (ret < 0) {
3469       cerr << "error set-redirect " << pool_name << "/" << nargs[1] << " => " << target << "/" << target_obj << ": " << cpp_strerror(ret) << std::endl;
3470       goto out;
3471     }
3472   } else if (strcmp(nargs[0], "export") == 0) {
3473     // export [filename]
3474     if (!pool_name || nargs.size() > 2) {
3475       usage_exit();
3476     }
3477
3478     int file_fd;
3479     if (nargs.size() < 2 || std::string(nargs[1]) == "-") {
3480       file_fd = STDOUT_FILENO;
3481     } else {
3482       file_fd = open(nargs[1], O_WRONLY|O_CREAT|O_TRUNC, 0666);
3483       if (file_fd < 0) {
3484         cerr << "Error opening '" << nargs[1] << "': "
3485           << cpp_strerror(file_fd) << std::endl;
3486         ret = file_fd;
3487         goto out;
3488       }
3489     }
3490
3491     ret = PoolDump(file_fd).dump(&io_ctx);
3492
3493     if (file_fd != STDIN_FILENO) {
3494       VOID_TEMP_FAILURE_RETRY(::close(file_fd));
3495     }
3496
3497     if (ret < 0) {
3498       cerr << "error from export: "
3499            << cpp_strerror(ret) << std::endl;
3500       goto out;
3501     }
3502   } else if (strcmp(nargs[0], "import") == 0) {
3503     // import [--no-overwrite] [--dry-run] <filename | - >
3504     if (!pool_name || nargs.size() > 4 || nargs.size() < 2) {
3505       usage_exit();
3506     }
3507
3508     // Last arg is the filename
3509     std::string const filename = nargs[nargs.size() - 1];
3510
3511     // All other args may be flags
3512     bool dry_run = false;
3513     bool no_overwrite = false;
3514     for (unsigned i = 1; i < nargs.size() - 1; ++i) {
3515       std::string arg(nargs[i]);
3516       
3517       if (arg == std::string("--no-overwrite")) {
3518         no_overwrite = true;
3519       } else if (arg == std::string("--dry-run")) {
3520         dry_run = true;
3521       } else {
3522         std::cerr << "Invalid argument '" << arg << "'" << std::endl;
3523         ret = -EINVAL;
3524         goto out;
3525       }
3526     }
3527
3528     int file_fd;
3529     if (filename == "-") {
3530       file_fd = STDIN_FILENO;
3531     } else {
3532       file_fd = open(filename.c_str(), O_RDONLY);
3533       if (file_fd < 0) {
3534         cerr << "Error opening '" << filename << "': "
3535           << cpp_strerror(file_fd) << std::endl;
3536         ret = file_fd;
3537         goto out;
3538       }
3539     }
3540
3541     ret = RadosImport(file_fd, 0, dry_run).import(io_ctx, no_overwrite);
3542
3543     if (file_fd != STDIN_FILENO) {
3544       VOID_TEMP_FAILURE_RETRY(::close(file_fd));
3545     }
3546
3547     if (ret < 0) {
3548       cerr << "error from import: "
3549            << cpp_strerror(ret) << std::endl;
3550       goto out;
3551     }
3552   } else {
3553     cerr << "unrecognized command " << nargs[0] << "; -h or --help for usage" << std::endl;
3554     ret = -EINVAL;
3555     goto out;
3556   }
3557
3558   if (ret < 0)
3559     cerr << "error " << (-ret) << ": " << cpp_strerror(ret) << std::endl;
3560
3561 out:
3562   delete formatter;
3563   return (ret < 0) ? 1 : 0;
3564 }
3565
3566 int main(int argc, const char **argv)
3567 {
3568   vector<const char*> args;
3569   argv_to_vec(argc, argv, args);
3570   env_to_vec(args);
3571
3572   std::map < std::string, std::string > opts;
3573   std::string val;
3574
3575   // Necessary to support usage of -f for formatting,
3576   // since global_init will remove the -f using ceph
3577   // argparse procedures.
3578   for (auto j = args.begin(); j != args.end(); ++j) {
3579     if (strcmp(*j, "--") == 0) {
3580       break;
3581     } else if ((j+1) == args.end()) {
3582       // This can't be a formatting call (no format arg)
3583       break; 
3584     } else if (strcmp(*j, "-f") == 0) {
3585       val = *(j+1);
3586       unique_ptr<Formatter> formatter(Formatter::create(val.c_str()));
3587       
3588       if (formatter) {
3589         j = args.erase(j);
3590         opts["format"] = val;
3591         
3592         j = args.erase(j);
3593         break;
3594       }
3595     }
3596   }
3597
3598   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
3599                              CODE_ENVIRONMENT_UTILITY, 0);
3600   common_init_finish(g_ceph_context);
3601
3602   std::vector<const char*>::iterator i;
3603   for (i = args.begin(); i != args.end(); ) {
3604     if (ceph_argparse_double_dash(args, i)) {
3605       break;
3606     } else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
3607       usage(cout);
3608       exit(0);
3609     } else if (ceph_argparse_flag(args, i, "-f", "--force", (char*)NULL)) {
3610       opts["force"] = "true";
3611     } else if (ceph_argparse_flag(args, i, "--force-full", (char*)NULL)) {
3612       opts["force-full"] = "true";
3613     } else if (ceph_argparse_flag(args, i, "-d", "--delete-after", (char*)NULL)) {
3614       opts["delete-after"] = "true";
3615     } else if (ceph_argparse_flag(args, i, "-C", "--create", "--create-pool",
3616                                   (char*)NULL)) {
3617       opts["create"] = "true";
3618     } else if (ceph_argparse_flag(args, i, "--pretty-format", (char*)NULL)) {
3619       opts["pretty-format"] = "true";
3620     } else if (ceph_argparse_flag(args, i, "--show-time", (char*)NULL)) {
3621       opts["show-time"] = "true";
3622     } else if (ceph_argparse_flag(args, i, "--no-cleanup", (char*)NULL)) {
3623       opts["no-cleanup"] = "true";
3624     } else if (ceph_argparse_flag(args, i, "--no-hints", (char*)NULL)) {
3625       opts["no-hints"] = "true";
3626     } else if (ceph_argparse_flag(args, i, "--no-verify", (char*)NULL)) {
3627       opts["no-verify"] = "true";
3628     } else if (ceph_argparse_witharg(args, i, &val, "--run-name", (char*)NULL)) {
3629       opts["run-name"] = val;
3630     } else if (ceph_argparse_witharg(args, i, &val, "--prefix", (char*)NULL)) {
3631       opts["prefix"] = val;
3632     } else if (ceph_argparse_witharg(args, i, &val, "-p", "--pool", (char*)NULL)) {
3633       opts["pool"] = val;
3634     } else if (ceph_argparse_witharg(args, i, &val, "--target-pool", (char*)NULL)) {
3635       opts["target_pool"] = val;
3636     } else if (ceph_argparse_witharg(args, i, &val, "--object-locator" , (char *)NULL)) {
3637       opts["object_locator"] = val;
3638     } else if (ceph_argparse_witharg(args, i, &val, "--target-locator" , (char *)NULL)) {
3639       opts["target_locator"] = val;
3640     } else if (ceph_argparse_witharg(args, i, &val, "--target-nspace" , (char *)NULL)) {
3641       opts["target_nspace"] = val;
3642     } else if (ceph_argparse_flag(args, i, "--striper" , (char *)NULL)) {
3643       opts["striper"] = "true";
3644     } else if (ceph_argparse_witharg(args, i, &val, "-t", "--concurrent-ios", (char*)NULL)) {
3645       opts["concurrent-ios"] = val;
3646     } else if (ceph_argparse_witharg(args, i, &val, "--block-size", (char*)NULL)) {
3647       opts["block-size"] = val;
3648     } else if (ceph_argparse_witharg(args, i, &val, "-b", (char*)NULL)) {
3649       opts["block-size"] = val;
3650     } else if (ceph_argparse_witharg(args, i, &val, "--object-size", (char*)NULL)) {
3651       opts["object-size"] = val;
3652     } else if (ceph_argparse_witharg(args, i, &val, "--max-objects", (char*)NULL)) {
3653       opts["max-objects"] = val;
3654     } else if (ceph_argparse_witharg(args, i, &val, "--offset", (char*)NULL)) {
3655       opts["offset"] = val;
3656     } else if (ceph_argparse_witharg(args, i, &val, "-o", (char*)NULL)) {
3657       opts["object-size"] = val;
3658     } else if (ceph_argparse_witharg(args, i, &val, "-s", "--snap", (char*)NULL)) {
3659       opts["snap"] = val;
3660     } else if (ceph_argparse_witharg(args, i, &val, "-S", "--snapid", (char*)NULL)) {
3661       opts["snapid"] = val;
3662     } else if (ceph_argparse_witharg(args, i, &val, "--min-object-size", (char*)NULL)) {
3663       opts["min-object-size"] = val;
3664     } else if (ceph_argparse_witharg(args, i, &val, "--max-object-size", (char*)NULL)) {
3665       opts["max-object-size"] = val;
3666     } else if (ceph_argparse_witharg(args, i, &val, "--min-op-len", (char*)NULL)) {
3667       opts["min-op-len"] = val;
3668     } else if (ceph_argparse_witharg(args, i, &val, "--max-op-len", (char*)NULL)) {
3669       opts["max-op-len"] = val;
3670     } else if (ceph_argparse_witharg(args, i, &val, "--max-ops", (char*)NULL)) {
3671       opts["max-ops"] = val;
3672     } else if (ceph_argparse_witharg(args, i, &val, "--max-backlog", (char*)NULL)) {
3673       opts["max-backlog"] = val;
3674     } else if (ceph_argparse_witharg(args, i, &val, "--target-throughput", (char*)NULL)) {
3675       opts["target-throughput"] = val;
3676     } else if (ceph_argparse_witharg(args, i, &val, "--read-percent", (char*)NULL)) {
3677       opts["read-percent"] = val;
3678     } else if (ceph_argparse_witharg(args, i, &val, "--num-objects", (char*)NULL)) {
3679       opts["num-objects"] = val;
3680     } else if (ceph_argparse_witharg(args, i, &val, "--run-length", (char*)NULL)) {
3681       opts["run-length"] = val;
3682     } else if (ceph_argparse_witharg(args, i, &val, "--workers", (char*)NULL)) {
3683       opts["workers"] = val;
3684     } else if (ceph_argparse_witharg(args, i, &val, "--format", (char*)NULL)) {
3685       opts["format"] = val;
3686     } else if (ceph_argparse_witharg(args, i, &val, "--lock-tag", (char*)NULL)) {
3687       opts["lock-tag"] = val;
3688     } else if (ceph_argparse_witharg(args, i, &val, "--lock-cookie", (char*)NULL)) {
3689       opts["lock-cookie"] = val;
3690     } else if (ceph_argparse_witharg(args, i, &val, "--lock-description", (char*)NULL)) {
3691       opts["lock-description"] = val;
3692     } else if (ceph_argparse_witharg(args, i, &val, "--lock-duration", (char*)NULL)) {
3693       opts["lock-duration"] = val;
3694     } else if (ceph_argparse_witharg(args, i, &val, "--lock-type", (char*)NULL)) {
3695       opts["lock-type"] = val;
3696     } else if (ceph_argparse_witharg(args, i, &val, "-N", "--namespace", (char*)NULL)) {
3697       opts["namespace"] = val;
3698     } else if (ceph_argparse_flag(args, i, "--all", (char*)NULL)) {
3699       opts["all"] = "true";
3700     } else if (ceph_argparse_flag(args, i, "--default", (char*)NULL)) {
3701       opts["default"] = "true";
3702     } else if (ceph_argparse_witharg(args, i, &val, "-o", "--output", (char*)NULL)) {
3703       opts["output"] = val;
3704     } else if (ceph_argparse_flag(args, i, "--write-omap", (char*)NULL)) {
3705       opts["write-dest-omap"] = "true";
3706     } else if (ceph_argparse_flag(args, i, "--write-object", (char*)NULL)) {
3707       opts["write-dest-obj"] = "true";
3708     } else if (ceph_argparse_flag(args, i, "--write-xattr", (char*)NULL)) {
3709       opts["write-dest-xattr"] = "true";
3710     } else if (ceph_argparse_flag(args, i, "--with-clones", (char*)NULL)) {
3711       opts["with-clones"] = "true";
3712     } else if (ceph_argparse_witharg(args, i, &val, "--omap-key-file", (char*)NULL)) {
3713       opts["omap-key-file"] = val;
3714     } else {
3715       if (val[0] == '-')
3716         usage_exit();
3717       ++i;
3718     }
3719   }
3720
3721   if (args.empty()) {
3722     cerr << "rados: you must give an action. Try --help" << std::endl;
3723     return 1;
3724   }
3725
3726   return rados_tool_common(opts, args);
3727 }