Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / obj_bencher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  * Series of functions to test your rados installation. Notice
14  * that this code is not terribly robust -- for instance, if you
15  * try and bench on a pool you don't have permission to access
16  * it will just loop forever.
17  */
18 #include "include/compat.h"
19 #include <pthread.h>
20 #include "common/Cond.h"
21 #include "obj_bencher.h"
22
23 const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
24 const std::string BENCH_PREFIX = "benchmark_data";
25 static char cached_hostname[30] = {0};
26 int cached_pid = 0;
27
28 static std::string generate_object_prefix_nopid() {
29   if (cached_hostname[0] == 0) {
30     gethostname(cached_hostname, sizeof(cached_hostname)-1);
31     cached_hostname[sizeof(cached_hostname)-1] = 0;
32   }
33
34   std::ostringstream oss;
35   oss << BENCH_PREFIX << "_" << cached_hostname;
36   return oss.str();
37 }
38
39 static std::string generate_object_prefix(int pid = 0) {
40   if (pid)
41     cached_pid = pid;
42   else if (!cached_pid)
43     cached_pid = getpid();
44
45   std::ostringstream oss;
46   oss << generate_object_prefix_nopid() << "_" << cached_pid;
47   return oss.str();
48 }
49
50 static std::string generate_object_name(int objnum, int pid = 0)
51 {
52   std::ostringstream oss;
53   oss << generate_object_prefix(pid) << "_object" << objnum;
54   return oss.str();
55 }
56
57 static void sanitize_object_contents (bench_data *data, size_t length) {
58   memset(data->object_contents, 'z', length);
59 }
60
61 ostream& ObjBencher::out(ostream& os, utime_t& t)
62 {
63   if (show_time)
64     return t.localtime(os) << " ";
65   else
66     return os;
67 }
68
69 ostream& ObjBencher::out(ostream& os)
70 {
71   utime_t cur_time = ceph_clock_now();
72   return out(os, cur_time);
73 }
74
75 void *ObjBencher::status_printer(void *_bencher) {
76   ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
77   bench_data& data = bencher->data;
78   Formatter *formatter = bencher->formatter;
79   ostream *outstream = bencher->outstream;
80   Cond cond;
81   int i = 0;
82   int previous_writes = 0;
83   int cycleSinceChange = 0;
84   double bandwidth;
85   int iops;
86   utime_t ONE_SECOND;
87   ONE_SECOND.set_from_double(1.0);
88   bencher->lock.Lock();
89   if (formatter)
90     formatter->open_array_section("datas");
91   while(!data.done) {
92     utime_t cur_time = ceph_clock_now();
93
94     if (i % 20 == 0 && !formatter) {
95       if (i > 0)
96         cur_time.localtime(cout) << " min lat: " << data.min_latency
97           << " max lat: " << data.max_latency
98           << " avg lat: " << data.avg_latency << std::endl;
99       //I'm naughty and don't reset the fill
100       bencher->out(cout, cur_time) << setfill(' ')
101           << setw(5) << "sec"
102           << setw(8) << "Cur ops"
103           << setw(10) << "started"
104           << setw(10) << "finished"
105           << setw(10) << "avg MB/s"
106           << setw(10) << "cur MB/s"
107           << setw(12) << "last lat(s)"
108           << setw(12) << "avg lat(s)" << std::endl;
109     }
110     if (cycleSinceChange)
111       bandwidth = (double)(data.finished - previous_writes)
112         * (data.op_size)
113         / (1024*1024)
114         / cycleSinceChange;
115     else
116       bandwidth = -1;
117
118     if (!std::isnan(bandwidth) && bandwidth > -1) {
119       if (bandwidth > data.idata.max_bandwidth)
120         data.idata.max_bandwidth = bandwidth;
121       if (bandwidth < data.idata.min_bandwidth)
122         data.idata.min_bandwidth = bandwidth;
123
124       data.history.bandwidth.push_back(bandwidth);
125     }
126
127     if (cycleSinceChange)
128       iops = (double)(data.finished - previous_writes)
129         / cycleSinceChange;
130     else
131       iops = -1;
132
133     if (!std::isnan(iops) && iops > -1) {
134       if (iops > data.idata.max_iops)
135         data.idata.max_iops = iops;
136       if (iops < data.idata.min_iops)
137         data.idata.min_iops = iops;
138
139       data.history.iops.push_back(iops);
140     }
141     
142     if (formatter)
143       formatter->open_object_section("data");
144
145     double avg_bandwidth = (double) (data.op_size) * (data.finished)
146       / (double)(cur_time - data.start_time) / (1024*1024);
147     if (previous_writes != data.finished) {
148       previous_writes = data.finished;
149       cycleSinceChange = 0;
150       if (!formatter) {
151         bencher->out(cout, cur_time)
152           << setfill(' ')
153           << setw(5) << i
154           << ' ' << setw(7) << data.in_flight
155           << ' ' << setw(9) << data.started
156           << ' ' << setw(9) << data.finished
157           << ' ' << setw(9) << avg_bandwidth
158           << ' ' << setw(9) << bandwidth
159           << ' ' << setw(11) << (double)data.cur_latency
160           << ' ' << setw(11) << data.avg_latency << std::endl;
161       } else {
162         formatter->dump_format("sec", "%d", i);
163         formatter->dump_format("cur_ops", "%d", data.in_flight);
164         formatter->dump_format("started", "%d", data.started);
165         formatter->dump_format("finished", "%d", data.finished);
166         formatter->dump_format("avg_bw", "%f", avg_bandwidth);
167         formatter->dump_format("cur_bw", "%f", bandwidth);
168         formatter->dump_format("last_lat", "%f", (double)data.cur_latency);
169         formatter->dump_format("avg_lat", "%f", data.avg_latency);
170       }
171     }
172     else {
173       if (!formatter) {
174         bencher->out(cout, cur_time)
175           << setfill(' ')
176           << setw(5) << i
177           << ' ' << setw(7) << data.in_flight
178           << ' ' << setw(9) << data.started
179           << ' ' << setw(9) << data.finished
180           << ' ' << setw(9) << avg_bandwidth
181           << ' ' << setw(9) << '0'
182           << ' ' << setw(11) << '-'
183           << ' '<< setw(11) << data.avg_latency << std::endl;
184       } else {
185         formatter->dump_format("sec", "%d", i);
186         formatter->dump_format("cur_ops", "%d", data.in_flight);
187         formatter->dump_format("started", "%d", data.started);
188         formatter->dump_format("finished", "%d", data.finished);
189         formatter->dump_format("avg_bw", "%f", avg_bandwidth);
190         formatter->dump_format("cur_bw", "%f", 0);
191         formatter->dump_format("last_lat", "%f", 0);
192         formatter->dump_format("avg_lat", "%f", data.avg_latency);
193       }
194     }
195     if (formatter) {
196       formatter->close_section(); // data
197       formatter->flush(*outstream);
198     }
199     ++i;
200     ++cycleSinceChange;
201     cond.WaitInterval(bencher->lock, ONE_SECOND);
202   }
203   if (formatter)
204     formatter->close_section(); //datas
205   bencher->lock.Unlock();
206   return NULL;
207 }
208
209 int ObjBencher::aio_bench(
210   int operation, int secondsToRun,
211   int concurrentios,
212   uint64_t op_size, uint64_t object_size,
213   unsigned max_objects,
214   bool cleanup, bool hints,
215   const std::string& run_name, bool no_verify) {
216
217   if (concurrentios <= 0)
218     return -EINVAL;
219
220   int num_objects = 0;
221   int r = 0;
222   int prevPid = 0;
223   utime_t runtime;
224
225   // default metadata object is used if user does not specify one
226   const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
227
228   //get data from previous write run, if available
229   if (operation != OP_WRITE) {
230     uint64_t prev_op_size, prev_object_size;
231     r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
232                              &num_objects, &prevPid);
233     if (r < 0) {
234       if (r == -ENOENT)
235         cerr << "Must write data before running a read benchmark!" << std::endl;
236       return r;
237     }
238     object_size = prev_object_size;   
239     op_size = prev_op_size;           
240   }
241
242   char* contentsChars = new char[op_size];
243   lock.Lock();
244   data.done = false;
245   data.hints = hints;
246   data.object_size = object_size;
247   data.op_size = op_size;
248   data.in_flight = 0;
249   data.started = 0;
250   data.finished = 0;
251   data.min_latency = 9999.0; // this better be higher than initial latency!
252   data.max_latency = 0;
253   data.avg_latency = 0;
254   data.object_contents = contentsChars;
255   lock.Unlock();
256
257   //fill in contentsChars deterministically so we can check returns
258   sanitize_object_contents(&data, data.op_size);
259
260   if (formatter)
261     formatter->open_object_section("bench");
262
263   if (OP_WRITE == operation) {
264     r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects);
265     if (r != 0) goto out;
266   }
267   else if (OP_SEQ_READ == operation) {
268     r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
269     if (r != 0) goto out;
270   }
271   else if (OP_RAND_READ == operation) {
272     r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
273     if (r != 0) goto out;
274   }
275
276   if (OP_WRITE == operation && cleanup) {
277     r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
278                              &num_objects, &prevPid);
279     if (r < 0) {
280       if (r == -ENOENT)
281         cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
282       goto out;
283     }
284
285     data.start_time = ceph_clock_now();
286     out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl;
287
288     r = clean_up(num_objects, prevPid, concurrentios);
289     if (r != 0) goto out;
290
291     runtime = ceph_clock_now() - data.start_time;
292     out(cout) << "Clean up completed and total clean up time :" << runtime << std::endl;
293
294     // lastrun file
295     r = sync_remove(run_name_meta);
296     if (r != 0) goto out;
297   }
298
299  out:
300   if (formatter) {
301     formatter->close_section(); // bench
302     formatter->flush(*outstream);
303     *outstream << std::endl;
304   }
305   delete[] contentsChars;
306   return r;
307 }
308
309 struct lock_cond {
310   explicit lock_cond(Mutex *_lock) : lock(_lock) {}
311   Mutex *lock;
312   Cond cond;
313 };
314
315 void _aio_cb(void *cb, void *arg) {
316   struct lock_cond *lc = (struct lock_cond *)arg;
317   lc->lock->Lock();
318   lc->cond.Signal();
319   lc->lock->Unlock();
320 }
321
322 template<class T>
323 static T vec_stddev(vector<T>& v)
324 {
325   T mean = 0;
326
327   if (v.size() < 2)
328     return 0;
329
330   typename vector<T>::iterator iter;
331   for (iter = v.begin(); iter != v.end(); ++iter) {
332     mean += *iter;
333   }
334
335   mean /= v.size();
336
337   T stddev = 0;
338   for (iter = v.begin(); iter != v.end(); ++iter) {
339     T dev = *iter - mean;
340     dev *= dev;
341     stddev += dev;
342   }
343   stddev /= (v.size() - 1);
344   return sqrt(stddev);
345 }
346
347 int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
348                                      uint64_t *op_size, uint64_t* object_size,
349                                      int* num_objects, int* prevPid) {
350   int r = 0;
351   bufferlist object_data;
352
353   r = sync_read(metadata_file, object_data,
354                 sizeof(int) * 2 + sizeof(size_t) * 2);
355   if (r <= 0) {
356     // treat an empty file as a file that does not exist
357     if (r == 0) {
358       r = -ENOENT;
359     }
360     return r;
361   }
362   bufferlist::iterator p = object_data.begin();
363   ::decode(*object_size, p);
364   ::decode(*num_objects, p);
365   ::decode(*prevPid, p);
366   if (!p.end()) {
367     ::decode(*op_size, p);
368   } else {
369     *op_size = *object_size;
370   }
371
372   return 0;
373 }
374
375 int ObjBencher::write_bench(int secondsToRun,
376                             int concurrentios, const string& run_name_meta,
377                             unsigned max_objects) {
378   if (concurrentios <= 0) 
379     return -EINVAL;
380   
381   if (!formatter) {
382     out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
383               << data.op_size << " bytes to objects of size "
384               << data.object_size << " for up to "
385               << secondsToRun << " seconds or "
386               << max_objects << " objects"
387               << std::endl;
388   } else {
389     formatter->dump_format("concurrent_ios", "%d", concurrentios);
390     formatter->dump_format("object_size", "%d", data.object_size);
391     formatter->dump_format("op_size", "%d", data.op_size);
392     formatter->dump_format("seconds_to_run", "%d", secondsToRun);
393     formatter->dump_format("max_objects", "%d", max_objects);
394   }
395   bufferlist* newContents = 0;
396
397   std::string prefix = generate_object_prefix();
398   if (!formatter)
399     out(cout) << "Object prefix: " << prefix << std::endl;
400   else
401     formatter->dump_string("object_prefix", prefix);
402
403   std::vector<string> name(concurrentios);
404   std::string newName;
405   bufferlist* contents[concurrentios];
406   double total_latency = 0;
407   std::vector<utime_t> start_times(concurrentios);
408   utime_t stopTime;
409   int r = 0;
410   bufferlist b_write;
411   lock_cond lc(&lock);
412   utime_t runtime;
413   utime_t timePassed;
414
415   unsigned writes_per_object = 1;
416   if (data.op_size)
417     writes_per_object = data.object_size / data.op_size;
418
419   r = completions_init(concurrentios);
420
421   //set up writes so I can start them together
422   for (int i = 0; i<concurrentios; ++i) {
423     name[i] = generate_object_name(i / writes_per_object);
424     contents[i] = new bufferlist();
425     snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i);
426     contents[i]->append(data.object_contents, data.op_size);
427   }
428
429   pthread_t print_thread;
430
431   pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
432   ceph_pthread_setname(print_thread, "write_stat");
433   lock.Lock();
434   data.finished = 0;
435   data.start_time = ceph_clock_now();
436   lock.Unlock();
437   for (int i = 0; i<concurrentios; ++i) {
438     start_times[i] = ceph_clock_now();
439     r = create_completion(i, _aio_cb, (void *)&lc);
440     if (r < 0)
441       goto ERR;
442     r = aio_write(name[i], i, *contents[i], data.op_size,
443                   data.op_size * (i % writes_per_object));
444     if (r < 0) { //naughty, doesn't clean up heap
445       goto ERR;
446     }
447     lock.Lock();
448     ++data.started;
449     ++data.in_flight;
450     lock.Unlock();
451   }
452
453   //keep on adding new writes as old ones complete until we've passed minimum time
454   int slot;
455   int num_objects;
456
457   //don't need locking for reads because other thread doesn't write
458
459   runtime.set_from_double(secondsToRun);
460   stopTime = data.start_time + runtime;
461   slot = 0;
462   lock.Lock();
463   while (!secondsToRun || ceph_clock_now() < stopTime) {
464     bool found = false;
465     while (1) {
466       int old_slot = slot;
467       do {
468         if (completion_is_done(slot)) {
469             found = true;
470             break;
471         }
472         slot++;
473         if (slot == concurrentios) {
474           slot = 0;
475         }
476       } while (slot != old_slot);
477       if (found)
478         break;
479       lc.cond.Wait(lock);
480     }
481     lock.Unlock();
482     //create new contents and name on the heap, and fill them
483     newName = generate_object_name(data.started / writes_per_object);
484     newContents = contents[slot];
485     snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
486     // we wrote to buffer, going around internal crc cache, so invalidate it now.
487     newContents->invalidate_crc();
488
489     completion_wait(slot);
490     lock.Lock();
491     r = completion_ret(slot);
492     if (r != 0) {
493       lock.Unlock();
494       goto ERR;
495     }
496     data.cur_latency = ceph_clock_now() - start_times[slot];
497     data.history.latency.push_back(data.cur_latency);
498     total_latency += data.cur_latency;
499     if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
500     if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
501     ++data.finished;
502     data.avg_latency = total_latency / data.finished;
503     --data.in_flight;
504     lock.Unlock();
505     release_completion(slot);
506     timePassed = ceph_clock_now() - data.start_time;
507
508     //write new stuff to backend
509     start_times[slot] = ceph_clock_now();
510     r = create_completion(slot, _aio_cb, &lc);
511     if (r < 0)
512       goto ERR;
513     r = aio_write(newName, slot, *newContents, data.op_size,
514                   data.op_size * (data.started % writes_per_object));
515     if (r < 0) {//naughty; doesn't clean up heap space.
516       goto ERR;
517     }
518     name[slot] = newName;
519     lock.Lock();
520     ++data.started;
521     ++data.in_flight;
522     if (max_objects &&
523         data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
524                              data.op_size))
525       break;
526   }
527   lock.Unlock();
528
529   while (data.finished < data.started) {
530     slot = data.finished % concurrentios;
531     completion_wait(slot);
532     lock.Lock();
533     r = completion_ret(slot);
534     if (r != 0) {
535       lock.Unlock();
536       goto ERR;
537     }
538     data.cur_latency = ceph_clock_now() - start_times[slot];
539     data.history.latency.push_back(data.cur_latency);
540     total_latency += data.cur_latency;
541     if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
542     if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
543     ++data.finished;
544     data.avg_latency = total_latency / data.finished;
545     --data.in_flight;
546     lock.Unlock();
547     release_completion(slot);
548     delete contents[slot];
549     contents[slot] = 0;
550   }
551
552   timePassed = ceph_clock_now() - data.start_time;
553   lock.Lock();
554   data.done = true;
555   lock.Unlock();
556
557   pthread_join(print_thread, NULL);
558
559   double bandwidth;
560   bandwidth = ((double)data.finished)*((double)data.op_size)/(double)timePassed;
561   bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
562
563   if (!formatter) {
564     out(cout) << "Total time run:         " << timePassed << std::endl
565        << "Total writes made:      " << data.finished << std::endl
566        << "Write size:             " << data.op_size << std::endl
567        << "Object size:            " << data.object_size << std::endl      
568        << "Bandwidth (MB/sec):     " << setprecision(6) << bandwidth << std::endl
569        << "Stddev Bandwidth:       " << vec_stddev(data.history.bandwidth) << std::endl
570        << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
571        << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl
572        << "Average IOPS:           " << (int)(data.finished/timePassed) << std::endl
573        << "Stddev IOPS:            " << vec_stddev(data.history.iops) << std::endl
574        << "Max IOPS:               " << data.idata.max_iops << std::endl
575        << "Min IOPS:               " << data.idata.min_iops << std::endl
576        << "Average Latency(s):     " << data.avg_latency << std::endl
577        << "Stddev Latency(s):      " << vec_stddev(data.history.latency) << std::endl
578        << "Max latency(s):         " << data.max_latency << std::endl
579        << "Min latency(s):         " << data.min_latency << std::endl;
580   } else {
581     formatter->dump_format("total_time_run", "%f", (double)timePassed);
582     formatter->dump_format("total_writes_made", "%d", data.finished);
583     formatter->dump_format("write_size", "%d", data.op_size);
584     formatter->dump_format("object_size", "%d", data.object_size);
585     formatter->dump_format("bandwidth", "%f", bandwidth);
586     formatter->dump_format("stddev_bandwidth", "%f", vec_stddev(data.history.bandwidth));
587     formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth);
588     formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth);
589     formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed));
590     formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
591     formatter->dump_format("max_iops", "%d", data.idata.max_iops);
592     formatter->dump_format("min_iops", "%d", data.idata.min_iops);
593     formatter->dump_format("average_latency", "%f", data.avg_latency);
594     formatter->dump_format("stddev_latency", "%f", vec_stddev(data.history.latency));
595     formatter->dump_format("max_latency:", "%f", data.max_latency);
596     formatter->dump_format("min_latency", "%f", data.min_latency);
597   }
598   //write object size/number data for read benchmarks
599   ::encode(data.object_size, b_write);
600   num_objects = (data.finished + writes_per_object - 1) / writes_per_object;
601   ::encode(num_objects, b_write);
602   ::encode(getpid(), b_write);
603   ::encode(data.op_size, b_write);
604
605   // persist meta-data for further cleanup or read
606   sync_write(run_name_meta, b_write, sizeof(int)*3);
607
608   completions_done();
609   for (int i = 0; i < concurrentios; i++)
610       if (contents[i])
611           delete contents[i];
612
613   return 0;
614
615  ERR:
616   lock.Lock();
617   data.done = 1;
618   lock.Unlock();
619   pthread_join(print_thread, NULL);
620   for (int i = 0; i < concurrentios; i++)
621       if (contents[i])
622           delete contents[i];
623   return r;
624 }
625
626 int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
627   lock_cond lc(&lock);
628
629   if (concurrentios <= 0) 
630     return -EINVAL;
631
632   std::vector<string> name(concurrentios);
633   std::string newName;
634   bufferlist* contents[concurrentios];
635   int index[concurrentios];
636   int errors = 0;
637   utime_t start_time;
638   std::vector<utime_t> start_times(concurrentios);
639   utime_t time_to_run;
640   time_to_run.set_from_double(seconds_to_run);
641   double total_latency = 0;
642   int r = 0;
643   utime_t runtime;
644   sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
645   //changes will be safe because string length should remain the same
646
647   unsigned writes_per_object = 1;
648   if (data.op_size)
649     writes_per_object = data.object_size / data.op_size;
650
651   r = completions_init(concurrentios);
652   if (r < 0)
653     return r;
654
655   //set up initial reads
656   for (int i = 0; i < concurrentios; ++i) {
657     name[i] = generate_object_name(i / writes_per_object, pid);
658     contents[i] = new bufferlist();
659   }
660
661   lock.Lock();
662   data.finished = 0;
663   data.start_time = ceph_clock_now();
664   lock.Unlock();
665
666   pthread_t print_thread;
667   pthread_create(&print_thread, NULL, status_printer, (void *)this);
668   ceph_pthread_setname(print_thread, "seq_read_stat");
669
670   utime_t finish_time = data.start_time + time_to_run;
671   //start initial reads
672   for (int i = 0; i < concurrentios; ++i) {
673     index[i] = i;
674     start_times[i] = ceph_clock_now();
675     create_completion(i, _aio_cb, (void *)&lc);
676     r = aio_read(name[i], i, contents[i], data.op_size,
677                  data.op_size * (i % writes_per_object));
678     if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
679       cerr << "r = " << r << std::endl;
680       goto ERR;
681     }
682     lock.Lock();
683     ++data.started;
684     ++data.in_flight;
685     lock.Unlock();
686   }
687
688   //keep on adding new reads as old ones complete
689   int slot;
690   bufferlist *cur_contents;
691
692   slot = 0;
693   while ((!seconds_to_run || ceph_clock_now() < finish_time) &&
694          num_objects > data.started) {
695     lock.Lock();
696     int old_slot = slot;
697     bool found = false;
698     while (1) {
699       do {
700         if (completion_is_done(slot)) {
701           found = true;
702           break;
703         }
704         slot++;
705         if (slot == concurrentios) {
706           slot = 0;
707         }
708       } while (slot != old_slot);
709       if (found) {
710         break;
711       }
712       lc.cond.Wait(lock);
713     }
714
715     // calculate latency here, so memcmp doesn't inflate it
716     data.cur_latency = ceph_clock_now() - start_times[slot];
717
718     cur_contents = contents[slot];
719     int current_index = index[slot];
720     
721     // invalidate internal crc cache
722     cur_contents->invalidate_crc();
723   
724     if (!no_verify) {
725       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
726       if ( (cur_contents->length() != data.op_size) || 
727            (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
728         cerr << name[slot] << " is not correct!" << std::endl;
729         ++errors;
730       }
731     }
732
733     newName = generate_object_name(data.started / writes_per_object, pid);
734     index[slot] = data.started;
735     lock.Unlock();
736     completion_wait(slot);
737     lock.Lock();
738     r = completion_ret(slot);
739     if (r < 0) {
740       cerr << "read got " << r << std::endl;
741       lock.Unlock();
742       goto ERR;
743     }
744     total_latency += data.cur_latency;
745     if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
746     if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
747     ++data.finished;
748     data.avg_latency = total_latency / data.finished;
749     --data.in_flight;
750     lock.Unlock();
751     release_completion(slot);
752
753     //start new read and check data if requested
754     start_times[slot] = ceph_clock_now();
755     create_completion(slot, _aio_cb, (void *)&lc);
756     r = aio_read(newName, slot, contents[slot], data.op_size,
757                  data.op_size * (data.started % writes_per_object));
758     if (r < 0) {
759       goto ERR;
760     }
761     lock.Lock();
762     ++data.started;
763     ++data.in_flight;
764     lock.Unlock();
765     name[slot] = newName;
766   }
767
768   //wait for final reads to complete
769   while (data.finished < data.started) {
770     slot = data.finished % concurrentios;
771     completion_wait(slot);
772     lock.Lock();
773     r = completion_ret(slot);
774     if (r < 0) {
775       cerr << "read got " << r << std::endl;
776       lock.Unlock();
777       goto ERR;
778     }
779     data.cur_latency = ceph_clock_now() - start_times[slot];
780     total_latency += data.cur_latency;
781     if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
782     if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
783     ++data.finished;
784     data.avg_latency = total_latency / data.finished;
785     --data.in_flight;
786     release_completion(slot);
787     if (!no_verify) {
788       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
789       lock.Unlock();
790       if ((contents[slot]->length() != data.op_size) || 
791          (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
792         cerr << name[slot] << " is not correct!" << std::endl;
793         ++errors;
794       }
795     } else {
796         lock.Unlock();
797     }
798     delete contents[slot];
799   }
800
801   runtime = ceph_clock_now() - data.start_time;
802   lock.Lock();
803   data.done = true;
804   lock.Unlock();
805
806   pthread_join(print_thread, NULL);
807
808   double bandwidth;
809   bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime;
810   bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
811
812   if (!formatter) {
813     out(cout) << "Total time run:       " << runtime << std::endl
814        << "Total reads made:     " << data.finished << std::endl
815        << "Read size:            " << data.op_size << std::endl
816        << "Object size:          " << data.object_size << std::endl
817        << "Bandwidth (MB/sec):   " << setprecision(6) << bandwidth << std::endl
818        << "Average IOPS:         " << (int)(data.finished/runtime) << std::endl
819        << "Stddev IOPS:          " << vec_stddev(data.history.iops) << std::endl
820        << "Max IOPS:             " << data.idata.max_iops << std::endl
821        << "Min IOPS:             " << data.idata.min_iops << std::endl
822        << "Average Latency(s):   " << data.avg_latency << std::endl
823        << "Max latency(s):       " << data.max_latency << std::endl
824        << "Min latency(s):       " << data.min_latency << std::endl;
825   } else {
826     formatter->dump_format("total_time_run", "%f", (double)runtime);
827     formatter->dump_format("total_reads_made", "%d", data.finished);
828     formatter->dump_format("read_size", "%d", data.op_size);
829     formatter->dump_format("object_size", "%d", data.object_size);
830     formatter->dump_format("bandwidth", "%f", bandwidth);
831     formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime));
832     formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
833     formatter->dump_format("max_iops", "%d", data.idata.max_iops);
834     formatter->dump_format("min_iops", "%d", data.idata.min_iops);
835     formatter->dump_format("average_latency", "%f", data.avg_latency);
836     formatter->dump_format("max_latency", "%f", data.max_latency);
837     formatter->dump_format("min_latency", "%f", data.min_latency);
838   }
839
840   completions_done();
841
842   return (errors > 0 ? -EIO : 0);
843
844  ERR:
845   lock.Lock();
846   data.done = 1;
847   lock.Unlock();
848   pthread_join(print_thread, NULL);
849   return r;
850 }
851
852 int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
853 {
854   lock_cond lc(&lock);
855
856   if (concurrentios <= 0)
857     return -EINVAL;
858
859   std::vector<string> name(concurrentios);
860   std::string newName;
861   bufferlist* contents[concurrentios];
862   int index[concurrentios];
863   int errors = 0;
864   utime_t start_time;
865   std::vector<utime_t> start_times(concurrentios);
866   utime_t time_to_run;
867   time_to_run.set_from_double(seconds_to_run);
868   double total_latency = 0;
869   int r = 0;
870   utime_t runtime;
871   sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
872   //changes will be safe because string length should remain the same
873
874   unsigned writes_per_object = 1;
875   if (data.op_size)
876     writes_per_object = data.object_size / data.op_size;
877
878   srand (time(NULL));
879
880   r = completions_init(concurrentios);
881   if (r < 0)
882     return r;
883
884   //set up initial reads
885   for (int i = 0; i < concurrentios; ++i) {
886     name[i] = generate_object_name(i / writes_per_object, pid);
887     contents[i] = new bufferlist();
888   }
889
890   lock.Lock();
891   data.finished = 0;
892   data.start_time = ceph_clock_now();
893   lock.Unlock();
894
895   pthread_t print_thread;
896   pthread_create(&print_thread, NULL, status_printer, (void *)this);
897   ceph_pthread_setname(print_thread, "rand_read_stat");
898
899   utime_t finish_time = data.start_time + time_to_run;
900   //start initial reads
901   for (int i = 0; i < concurrentios; ++i) {
902     index[i] = i;
903     start_times[i] = ceph_clock_now();
904     create_completion(i, _aio_cb, (void *)&lc);
905     r = aio_read(name[i], i, contents[i], data.op_size,
906                  data.op_size * (i % writes_per_object));
907     if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
908       cerr << "r = " << r << std::endl;
909       goto ERR;
910     }
911     lock.Lock();
912     ++data.started;
913     ++data.in_flight;
914     lock.Unlock();
915   }
916
917   //keep on adding new reads as old ones complete
918   int slot;
919   bufferlist *cur_contents;
920   int rand_id;
921
922   slot = 0;
923   while ((!seconds_to_run || ceph_clock_now() < finish_time)) {
924     lock.Lock();
925     int old_slot = slot;
926     bool found = false;
927     while (1) {
928       do {
929         if (completion_is_done(slot)) {
930           found = true;
931           break;
932         }
933         slot++;
934         if (slot == concurrentios) {
935           slot = 0;
936         }
937       } while (slot != old_slot);
938       if (found) {
939         break;
940       }
941       lc.cond.Wait(lock);
942     }
943
944     // calculate latency here, so memcmp doesn't inflate it
945     data.cur_latency = ceph_clock_now() - start_times[slot];
946
947     lock.Unlock();
948
949     int current_index = index[slot];
950     cur_contents = contents[slot];
951     completion_wait(slot);
952     lock.Lock();
953     r = completion_ret(slot);
954     if (r < 0) {
955       cerr << "read got " << r << std::endl;
956       lock.Unlock();
957       goto ERR;
958     }
959
960     total_latency += data.cur_latency;
961     if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
962     if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
963     ++data.finished;
964     data.avg_latency = total_latency / data.finished;
965     --data.in_flight;
966     lock.Unlock();
967     
968     if (!no_verify) {
969       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
970       if ((cur_contents->length() != data.op_size) || 
971           (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
972         cerr << name[slot] << " is not correct!" << std::endl;
973         ++errors;
974       }
975     } 
976
977     rand_id = rand() % num_objects;
978     newName = generate_object_name(rand_id / writes_per_object, pid);
979     index[slot] = rand_id;
980     release_completion(slot);
981
982     // invalidate internal crc cache
983     cur_contents->invalidate_crc();
984
985     //start new read and check data if requested
986     start_times[slot] = ceph_clock_now();
987     create_completion(slot, _aio_cb, (void *)&lc);
988     r = aio_read(newName, slot, contents[slot], data.op_size,
989                  data.op_size * (rand_id % writes_per_object));
990     if (r < 0) {
991       goto ERR;
992     }
993     lock.Lock();
994     ++data.started;
995     ++data.in_flight;
996     lock.Unlock();
997     name[slot] = newName;
998   }
999
1000
1001   //wait for final reads to complete
1002   while (data.finished < data.started) {
1003     slot = data.finished % concurrentios;
1004     completion_wait(slot);
1005     lock.Lock();
1006     r = completion_ret(slot);
1007     if (r < 0) {
1008       cerr << "read got " << r << std::endl;
1009       lock.Unlock();
1010       goto ERR;
1011     }
1012     data.cur_latency = ceph_clock_now() - start_times[slot];
1013     total_latency += data.cur_latency;
1014     if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
1015     if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
1016     ++data.finished;
1017     data.avg_latency = total_latency / data.finished;
1018     --data.in_flight;
1019     release_completion(slot);
1020     if (!no_verify) {
1021       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
1022       lock.Unlock();
1023       if ((contents[slot]->length() != data.op_size) || 
1024           (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
1025         cerr << name[slot] << " is not correct!" << std::endl;
1026         ++errors;
1027       }
1028     } else {
1029         lock.Unlock();
1030     }
1031     delete contents[slot];
1032   }
1033
1034   runtime = ceph_clock_now() - data.start_time;
1035   lock.Lock();
1036   data.done = true;
1037   lock.Unlock();
1038
1039   pthread_join(print_thread, NULL);
1040
1041   double bandwidth;
1042   bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime;
1043   bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
1044
1045   if (!formatter) {
1046     out(cout) << "Total time run:       " << runtime << std::endl
1047        << "Total reads made:     " << data.finished << std::endl
1048        << "Read size:            " << data.op_size << std::endl
1049        << "Object size:          " << data.object_size << std::endl
1050        << "Bandwidth (MB/sec):   " << setprecision(6) << bandwidth << std::endl
1051        << "Average IOPS:         " << (int)(data.finished/runtime) << std::endl
1052        << "Stddev IOPS:          " << vec_stddev(data.history.iops) << std::endl
1053        << "Max IOPS:             " << data.idata.max_iops << std::endl
1054        << "Min IOPS:             " << data.idata.min_iops << std::endl
1055        << "Average Latency(s):   " << data.avg_latency << std::endl
1056        << "Max latency(s):       " << data.max_latency << std::endl
1057        << "Min latency(s):       " << data.min_latency << std::endl;
1058   } else {
1059     formatter->dump_format("total_time_run", "%f", (double)runtime);
1060     formatter->dump_format("total_reads_made", "%d", data.finished);
1061     formatter->dump_format("read_size", "%d", data.op_size);
1062     formatter->dump_format("object_size", "%d", data.object_size);
1063     formatter->dump_format("bandwidth", "%f", bandwidth);
1064     formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime));
1065     formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
1066     formatter->dump_format("max_iops", "%d", data.idata.max_iops);
1067     formatter->dump_format("min_iops", "%d", data.idata.min_iops);
1068     formatter->dump_format("average_latency", "%f", data.avg_latency);
1069     formatter->dump_format("max_latency", "%f", data.max_latency);
1070     formatter->dump_format("min_latency", "%f", data.min_latency);
1071   }
1072   completions_done();
1073
1074   return (errors > 0 ? -EIO : 0);
1075
1076  ERR:
1077   lock.Lock();
1078   data.done = 1;
1079   lock.Unlock();
1080   pthread_join(print_thread, NULL);
1081   return r;
1082 }
1083
1084 int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
1085   int r = 0;
1086   uint64_t op_size, object_size;
1087   int num_objects;
1088   int prevPid;
1089
1090   // default meta object if user does not specify one
1091   const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
1092   const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix);
1093
1094   if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) {
1095     cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl;
1096     return -EINVAL;
1097   }
1098
1099   std::list<Object> unfiltered_objects;
1100   std::set<std::string> meta_namespaces, all_namespaces;
1101
1102   // If caller set all_nspaces this will be searching
1103   // across multiple namespaces.
1104   while (true) {
1105     bool objects_remain = get_objects(&unfiltered_objects, 20);
1106     if (!objects_remain)
1107       break;
1108
1109     std::list<Object>::const_iterator i = unfiltered_objects.begin();
1110     for ( ; i != unfiltered_objects.end(); ++i) {
1111       if (i->first == run_name_meta) {
1112         meta_namespaces.insert(i->second);
1113       }
1114       if (i->first.substr(0, prefix.length()) == prefix) {
1115         all_namespaces.insert(i->second);
1116       }
1117     }
1118   }
1119
1120   std::set<std::string>::const_iterator i = all_namespaces.begin();
1121   for ( ; i != all_namespaces.end(); ++i) {
1122     set_namespace(*i);
1123
1124     // if no metadata file found we should try to do a linear search on the prefix
1125     if (meta_namespaces.find(*i) == meta_namespaces.end()) {
1126       int r = clean_up_slow(prefix, concurrentios);
1127       if (r < 0) {
1128         cerr << "clean_up_slow error r= " << r << std::endl;
1129         return r;
1130       }
1131       continue;
1132     }
1133
1134     r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid);
1135     if (r < 0) {
1136       return r;
1137     }
1138
1139     r = clean_up(num_objects, prevPid, concurrentios);
1140     if (r != 0) return r;
1141
1142     r = sync_remove(run_name_meta);
1143     if (r != 0) return r;
1144   }
1145
1146   return 0;
1147 }
1148
1149 int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
1150   lock_cond lc(&lock);
1151   
1152   if (concurrentios <= 0) 
1153     return -EINVAL;
1154
1155   std::vector<string> name(concurrentios);
1156   std::string newName;
1157   int r = 0;
1158   utime_t runtime;
1159   int slot = 0;
1160
1161   lock.Lock();
1162   data.done = false;
1163   data.in_flight = 0;
1164   data.started = 0;
1165   data.finished = 0;
1166   lock.Unlock();
1167
1168   // don't start more completions than files
1169   if (num_objects == 0) {
1170     return 0;
1171   } else if (num_objects < concurrentios) {
1172     concurrentios = num_objects;
1173   }
1174
1175   r = completions_init(concurrentios);
1176   if (r < 0)
1177     return r;
1178
1179   //set up initial removes
1180   for (int i = 0; i < concurrentios; ++i) {
1181     name[i] = generate_object_name(i, prevPid);
1182   }
1183
1184   //start initial removes
1185   for (int i = 0; i < concurrentios; ++i) {
1186     create_completion(i, _aio_cb, (void *)&lc);
1187     r = aio_remove(name[i], i);
1188     if (r < 0) { //naughty, doesn't clean up heap
1189       cerr << "r = " << r << std::endl;
1190       goto ERR;
1191     }
1192     lock.Lock();
1193     ++data.started;
1194     ++data.in_flight;
1195     lock.Unlock();
1196   }
1197
1198   //keep on adding new removes as old ones complete
1199   while (data.started < num_objects) {
1200     lock.Lock();
1201     int old_slot = slot;
1202     bool found = false;
1203     while (1) {
1204       do {
1205         if (completion_is_done(slot)) {
1206           found = true;
1207           break;
1208         }
1209         slot++;
1210         if (slot == concurrentios) {
1211           slot = 0;
1212         }
1213       } while (slot != old_slot);
1214       if (found) {
1215         break;
1216       }
1217       lc.cond.Wait(lock);
1218     }
1219     lock.Unlock();
1220     newName = generate_object_name(data.started, prevPid);
1221     completion_wait(slot);
1222     lock.Lock();
1223     r = completion_ret(slot);
1224     if (r != 0 && r != -ENOENT) { // file does not exist
1225       cerr << "remove got " << r << std::endl;
1226       lock.Unlock();
1227       goto ERR;
1228     }
1229     ++data.finished;
1230     --data.in_flight;
1231     lock.Unlock();
1232     release_completion(slot);
1233
1234     //start new remove and check data if requested
1235     create_completion(slot, _aio_cb, (void *)&lc);
1236     r = aio_remove(newName, slot);
1237     if (r < 0) {
1238       goto ERR;
1239     }
1240     lock.Lock();
1241     ++data.started;
1242     ++data.in_flight;
1243     lock.Unlock();
1244     name[slot] = newName;
1245   }
1246
1247   //wait for final removes to complete
1248   while (data.finished < data.started) {
1249     slot = data.finished % concurrentios;
1250     completion_wait(slot);
1251     lock.Lock();
1252     r = completion_ret(slot);
1253     if (r != 0 && r != -ENOENT) { // file does not exist
1254       cerr << "remove got " << r << std::endl;
1255       lock.Unlock();
1256       goto ERR;
1257     }
1258     ++data.finished;
1259     --data.in_flight;
1260     release_completion(slot);
1261     lock.Unlock();
1262   }
1263
1264   lock.Lock();
1265   data.done = true;
1266   lock.Unlock();
1267
1268   completions_done();
1269
1270   out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1271
1272   return 0;
1273
1274  ERR:
1275   lock.Lock();
1276   data.done = 1;
1277   lock.Unlock();
1278   return r;
1279 }
1280
1281 /**
1282  * Return objects from the datastore which match a prefix.
1283  *
1284  * Clears the list and populates it with any objects which match the
1285  * prefix. The list is guaranteed to have at least one item when the
1286  * function returns true.
1287  *
1288  * @param prefix the prefix to match against
1289  * @param objects [out] return list of objects
1290  * @returns true if there are any objects in the store which match
1291  * the prefix, false if there are no more
1292  */
1293 bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) {
1294   std::list<Object> unfiltered_objects;
1295
1296   objects->clear();
1297
1298   while (objects->empty()) {
1299     bool objects_remain = get_objects(&unfiltered_objects, 20);
1300     if (!objects_remain)
1301       return false;
1302
1303     std::list<Object>::const_iterator i = unfiltered_objects.begin();
1304     for ( ; i != unfiltered_objects.end(); ++i) {
1305       if (i->first.substr(0, prefix.length()) == prefix) {
1306         objects->push_back(*i);
1307       }
1308     }
1309   }
1310
1311   return true;
1312 }
1313
1314 int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
1315   lock_cond lc(&lock);
1316
1317   if (concurrentios <= 0) 
1318     return -EINVAL;
1319
1320   std::vector<Object> name(concurrentios);
1321   Object newName;
1322   int r = 0;
1323   utime_t runtime;
1324   int slot = 0;
1325   std::list<Object> objects;
1326   bool objects_remain = true;
1327
1328   lock.Lock();
1329   data.done = false;
1330   data.in_flight = 0;
1331   data.started = 0;
1332   data.finished = 0;
1333   lock.Unlock();
1334
1335   out(cout) << "Warning: using slow linear search" << std::endl;
1336
1337   r = completions_init(concurrentios);
1338   if (r < 0)
1339     return r;
1340
1341   //set up initial removes
1342   for (int i = 0; i < concurrentios; ++i) {
1343     if (objects.empty()) {
1344       // if there are fewer objects than concurrent ios, don't generate extras
1345       bool objects_found = more_objects_matching_prefix(prefix, &objects);
1346       if (!objects_found) {
1347         concurrentios = i;
1348         objects_remain = false;
1349         break;
1350       }
1351     }
1352
1353     name[i] = objects.front();
1354     objects.pop_front();
1355   }
1356
1357   //start initial removes
1358   for (int i = 0; i < concurrentios; ++i) {
1359     create_completion(i, _aio_cb, (void *)&lc);
1360     set_namespace(name[i].second);
1361     r = aio_remove(name[i].first, i);
1362     if (r < 0) { //naughty, doesn't clean up heap
1363       cerr << "r = " << r << std::endl;
1364       goto ERR;
1365     }
1366     lock.Lock();
1367     ++data.started;
1368     ++data.in_flight;
1369     lock.Unlock();
1370   }
1371
1372   //keep on adding new removes as old ones complete
1373   while (objects_remain) {
1374     lock.Lock();
1375     int old_slot = slot;
1376     bool found = false;
1377     while (1) {
1378       do {
1379         if (completion_is_done(slot)) {
1380           found = true;
1381           break;
1382         }
1383         slot++;
1384         if (slot == concurrentios) {
1385           slot = 0;
1386         }
1387       } while (slot != old_slot);
1388       if (found) {
1389         break;
1390       }
1391       lc.cond.Wait(lock);
1392     }
1393     lock.Unlock();
1394
1395     // get more objects if necessary
1396     if (objects.empty()) {
1397       objects_remain = more_objects_matching_prefix(prefix, &objects);
1398       // quit if there are no more
1399       if (!objects_remain) {
1400         break;
1401       }
1402     }
1403
1404     // get the next object
1405     newName = objects.front();
1406     objects.pop_front();
1407
1408     completion_wait(slot);
1409     lock.Lock();
1410     r = completion_ret(slot);
1411     if (r != 0 && r != -ENOENT) { // file does not exist
1412       cerr << "remove got " << r << std::endl;
1413       lock.Unlock();
1414       goto ERR;
1415     }
1416     ++data.finished;
1417     --data.in_flight;
1418     lock.Unlock();
1419     release_completion(slot);
1420
1421     //start new remove and check data if requested
1422     create_completion(slot, _aio_cb, (void *)&lc);
1423     set_namespace(newName.second);
1424     r = aio_remove(newName.first, slot);
1425     if (r < 0) {
1426       goto ERR;
1427     }
1428     lock.Lock();
1429     ++data.started;
1430     ++data.in_flight;
1431     lock.Unlock();
1432     name[slot] = newName;
1433   }
1434
1435   //wait for final removes to complete
1436   while (data.finished < data.started) {
1437     slot = data.finished % concurrentios;
1438     completion_wait(slot);
1439     lock.Lock();
1440     r = completion_ret(slot);
1441     if (r != 0 && r != -ENOENT) { // file does not exist
1442       cerr << "remove got " << r << std::endl;
1443       lock.Unlock();
1444       goto ERR;
1445     }
1446     ++data.finished;
1447     --data.in_flight;
1448     release_completion(slot);
1449     lock.Unlock();
1450   }
1451
1452   lock.Lock();
1453   data.done = true;
1454   lock.Unlock();
1455
1456   completions_done();
1457
1458   out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1459
1460   return 0;
1461
1462  ERR:
1463   lock.Lock();
1464   data.done = 1;
1465   lock.Unlock();
1466   return -EIO;
1467 }