Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / crush / CrushTester.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "CrushTester.h"
6 #include "CrushTreeDumper.h"
7 #include "include/ceph_features.h"
8
9 #include <algorithm>
10 #include <stdlib.h>
11 #include <boost/lexical_cast.hpp>
12 // to workaround https://svn.boost.org/trac/boost/ticket/9501
13 #ifdef _LIBCPP_VERSION
14 #include <boost/version.hpp>
15 #if BOOST_VERSION < 105600
16 #define ICL_USE_BOOST_MOVE_IMPLEMENTATION
17 #endif
18 #endif
19 #include <boost/icl/interval_map.hpp>
20 #include <boost/algorithm/string/join.hpp>
21 #include "common/SubProcess.h"
22 #include "common/fork_function.h"
23
24 void CrushTester::set_device_weight(int dev, float f)
25 {
26   int w = (int)(f * 0x10000);
27   if (w < 0)
28     w = 0;
29   if (w > 0x10000)
30     w = 0x10000;
31   device_weight[dev] = w;
32 }
33
34 int CrushTester::get_maximum_affected_by_rule(int ruleno)
35 {
36   // get the number of steps in RULENO
37   int rule_size = crush.get_rule_len(ruleno);
38   vector<int> affected_types;
39   map<int,int> replications_by_type;
40
41   for (int i = 0; i < rule_size; i++){
42     // get what operation is done by the current step
43     int rule_operation = crush.get_rule_op(ruleno, i);
44
45     // if the operation specifies choosing a device type, store it
46     if (rule_operation >= 2 && rule_operation != 4){
47       int desired_replication = crush.get_rule_arg1(ruleno,i);
48       int affected_type = crush.get_rule_arg2(ruleno,i);
49       affected_types.push_back(affected_type);
50       replications_by_type[affected_type] = desired_replication;
51     }
52   }
53
54   /*
55    * now for each of the affected bucket types, see what is the
56    * maximum we are (a) requesting or (b) have
57    */
58
59   map<int,int> max_devices_of_type;
60
61   // loop through the vector of affected types
62   for (vector<int>::iterator it = affected_types.begin(); it != affected_types.end(); ++it){
63     // loop through the number of buckets looking for affected types
64     for (map<int,string>::iterator p = crush.name_map.begin(); p != crush.name_map.end(); ++p){
65       int bucket_type = crush.get_bucket_type(p->first);
66       if ( bucket_type == *it)
67         max_devices_of_type[*it]++;
68     }
69   }
70
71   for(std::vector<int>::iterator it = affected_types.begin(); it != affected_types.end(); ++it){
72     if ( replications_by_type[*it] > 0 && replications_by_type[*it] < max_devices_of_type[*it] )
73       max_devices_of_type[*it] = replications_by_type[*it];
74   }
75
76   /*
77    * get the smallest number of buckets available of any type as this is our upper bound on
78    * the number of replicas we can place
79   */
80   int max_affected = max( crush.get_max_buckets(), crush.get_max_devices() );
81
82   for(std::vector<int>::iterator it = affected_types.begin(); it != affected_types.end(); ++it){
83     if (max_devices_of_type[*it] > 0 && max_devices_of_type[*it] < max_affected )
84       max_affected = max_devices_of_type[*it];
85   }
86
87   return max_affected;
88 }
89
90
91 map<int,int> CrushTester::get_collapsed_mapping()
92 {
93   int num_to_check = crush.get_max_devices();
94   int next_id = 0;
95   map<int, int> collapse_mask;
96
97   for (int i = 0; i < num_to_check; i++){
98     if (crush.check_item_present(i)){
99       collapse_mask[i] = next_id;
100       next_id++;
101     }
102   }
103   
104   return collapse_mask;
105 }
106
107 void CrushTester::adjust_weights(vector<__u32>& weight)
108 {
109
110   if (mark_down_device_ratio > 0) {
111     // active buckets
112     vector<int> bucket_ids;
113     for (int i = 0; i < crush.get_max_buckets(); i++) {
114       int id = -1 - i;
115       if (crush.get_bucket_weight(id) > 0) {
116         bucket_ids.push_back(id);
117       }
118     }
119
120     // get buckets that are one level above a device
121     vector<int> buckets_above_devices;
122     for (unsigned i = 0; i < bucket_ids.size(); i++) {
123       // grab the first child object of a bucket and check if it's ID is less than 0
124       int id = bucket_ids[i];
125       if (crush.get_bucket_size(id) == 0)
126         continue;
127       int first_child = crush.get_bucket_item(id, 0); // returns the ID of the bucket or device
128       if (first_child >= 0) {
129         buckets_above_devices.push_back(id);
130       }
131     }
132
133     // permute bucket list
134     for (unsigned i = 0; i < buckets_above_devices.size(); i++) {
135       unsigned j = lrand48() % (buckets_above_devices.size() - 1);
136       std::swap(buckets_above_devices[i], buckets_above_devices[j]);
137     }
138
139     // calculate how many buckets and devices we need to reap...
140     int num_buckets_to_visit = (int) (mark_down_bucket_ratio * buckets_above_devices.size());
141
142     for (int i = 0; i < num_buckets_to_visit; i++) {
143       int id = buckets_above_devices[i];
144       int size = crush.get_bucket_size(id);
145       vector<int> items;
146       for (int o = 0; o < size; o++)
147         items.push_back(crush.get_bucket_item(id, o));
148
149       // permute items
150       for (int o = 0; o < size; o++) {
151         int j = lrand48() % (crush.get_bucket_size(id) - 1);
152         std::swap(items[o], items[j]);
153       }
154
155       int local_devices_to_visit = (int) (mark_down_device_ratio*size);
156       for (int o = 0; o < local_devices_to_visit; o++){
157         int item = crush.get_bucket_item(id, o);
158         weight[item] = 0;
159       }
160     }
161   }
162 }
163
164 bool CrushTester::check_valid_placement(int ruleno, vector<int> in, const vector<__u32>& weight)
165 {
166
167   bool valid_placement = true;
168   vector<int> included_devices;
169   map<string,string> seen_devices;
170
171   // first do the easy check that all devices are "up"
172   for (vector<int>::iterator it = in.begin(); it != in.end(); ++it) {
173     if (weight[(*it)] == 0) {
174       valid_placement = false;
175       break;
176     } else if (weight[(*it)] > 0) {
177       included_devices.push_back( (*it) );
178     }
179   }
180
181   /*
182    * now do the harder test of checking that the CRUSH rule r is not violated
183    * we could test that none of the devices mentioned in out are unique,
184    * but this is a special case of this test
185    */
186
187   // get the number of steps in RULENO
188   int rule_size = crush.get_rule_len(ruleno);
189   vector<string> affected_types;
190
191   // get the smallest type id, and name
192   int min_map_type = crush.get_num_type_names();
193   for (map<int,string>::iterator it = crush.type_map.begin(); it != crush.type_map.end(); ++it ) {
194     if ( (*it).first < min_map_type ) {
195       min_map_type = (*it).first;
196     }
197   }
198
199   string min_map_type_name = crush.type_map[min_map_type];
200
201   // get the types of devices affected by RULENO
202   for (int i = 0; i < rule_size; i++) {
203     // get what operation is done by the current step
204     int rule_operation = crush.get_rule_op(ruleno, i);
205
206     // if the operation specifies choosing a device type, store it
207     if (rule_operation >= 2 && rule_operation != 4) {
208       int affected_type = crush.get_rule_arg2(ruleno,i);
209       affected_types.push_back( crush.get_type_name(affected_type));
210     }
211   }
212
213   // find in if we are only dealing with osd's
214   bool only_osd_affected = false;
215   if (affected_types.size() == 1) {
216     if ((affected_types.back() == min_map_type_name) && (min_map_type_name == "osd")) {
217       only_osd_affected = true;
218     }
219   }
220
221   // check that we don't have any duplicate id's
222   for (vector<int>::iterator it = included_devices.begin(); it != included_devices.end(); ++it) {
223     int num_copies = std::count(included_devices.begin(), included_devices.end(), (*it) );
224     if (num_copies > 1) {
225       valid_placement = false;
226     }
227   }
228
229   // if we have more than just osd's affected we need to do a lot more work
230   if (!only_osd_affected) {
231     // loop through the devices that are "in/up"
232     for (vector<int>::iterator it = included_devices.begin(); it != included_devices.end(); ++it) {
233       if (valid_placement == false)
234         break;
235
236       // create a temporary map of the form (device type, device name in map)
237       map<string,string> device_location_hierarchy = crush.get_full_location(*it);
238
239       // loop over the types affected by RULENO looking for duplicate bucket assignments
240       for (vector<string>::iterator t = affected_types.begin(); t != affected_types.end(); ++t) {
241         if (seen_devices.count( device_location_hierarchy[*t])) {
242           valid_placement = false;
243           break;
244         } else {
245           // store the devices we have seen in the form of (device name, device type)
246           seen_devices[ device_location_hierarchy[*t] ] = *t;
247         }
248       }
249     }
250   }
251
252   return valid_placement;
253 }
254
255 int CrushTester::random_placement(int ruleno, vector<int>& out, int maxout, vector<__u32>& weight)
256 {
257   // get the total weight of the system
258   int total_weight = 0;
259   for (unsigned i = 0; i < weight.size(); i++)
260     total_weight += weight[i];
261
262   if (total_weight == 0 ||
263       crush.get_max_devices() == 0)
264     return -EINVAL;
265
266   // determine the real maximum number of devices to return
267   int devices_requested = min(maxout, get_maximum_affected_by_rule(ruleno));
268   bool accept_placement = false;
269
270   vector<int> trial_placement(devices_requested);
271   int attempted_tries = 0;
272   int max_tries = 100;
273   do {
274     // create a vector to hold our trial mappings
275     int temp_array[devices_requested];
276     for (int i = 0; i < devices_requested; i++){
277       temp_array[i] = lrand48() % (crush.get_max_devices());
278     }
279
280     trial_placement.assign(temp_array, temp_array + devices_requested);
281     accept_placement = check_valid_placement(ruleno, trial_placement, weight);
282     attempted_tries++;
283   } while (accept_placement == false && attempted_tries < max_tries);
284
285   // save our random placement to the out vector
286   if (accept_placement)
287     out.assign(trial_placement.begin(), trial_placement.end());
288
289   // or don't....
290   else if (attempted_tries == max_tries)
291     return -EINVAL;
292
293   return 0;
294 }
295
296 void CrushTester::write_integer_indexed_vector_data_string(vector<string> &dst, int index, vector<int> vector_data)
297 {
298   stringstream data_buffer (stringstream::in | stringstream::out);
299   unsigned input_size = vector_data.size();
300
301   // pass the indexing variable to the data buffer
302   data_buffer << index;
303
304   // pass the rest of the input data to the buffer
305   for (unsigned i = 0; i < input_size; i++) {
306     data_buffer << ',' << vector_data[i];
307   }
308
309   data_buffer << std::endl;
310
311   // write the data buffer to the destination
312   dst.push_back( data_buffer.str() );
313 }
314
315 void CrushTester::write_integer_indexed_vector_data_string(vector<string> &dst, int index, vector<float> vector_data)
316 {
317   stringstream data_buffer (stringstream::in | stringstream::out);
318   unsigned input_size = vector_data.size();
319
320   // pass the indexing variable to the data buffer
321   data_buffer << index;
322
323   // pass the rest of the input data to the buffer
324   for (unsigned i = 0; i < input_size; i++) {
325     data_buffer << ',' << vector_data[i];
326   }
327
328   data_buffer << std::endl;
329
330   // write the data buffer to the destination
331   dst.push_back( data_buffer.str() );
332 }
333
334 void CrushTester::write_integer_indexed_scalar_data_string(vector<string> &dst, int index, int scalar_data)
335 {
336   stringstream data_buffer (stringstream::in | stringstream::out);
337
338   // pass the indexing variable to the data buffer
339   data_buffer << index;
340
341   // pass the input data to the buffer
342   data_buffer << ',' << scalar_data;
343   data_buffer << std::endl;
344
345   // write the data buffer to the destination
346   dst.push_back( data_buffer.str() );
347 }
348 void CrushTester::write_integer_indexed_scalar_data_string(vector<string> &dst, int index, float scalar_data)
349 {
350   stringstream data_buffer (stringstream::in | stringstream::out);
351
352   // pass the indexing variable to the data buffer
353   data_buffer << index;
354
355   // pass the input data to the buffer
356   data_buffer << ',' << scalar_data;
357   data_buffer << std::endl;
358
359   // write the data buffer to the destination
360   dst.push_back( data_buffer.str() );
361 }
362
363 int CrushTester::test_with_fork(int timeout)
364 {
365   ostringstream sink;
366   int r = fork_function(timeout, sink, [&]() {
367       return test();
368     });
369   if (r == -ETIMEDOUT) {
370     err << "timed out during smoke test (" << timeout << " seconds)";
371   }
372   return r;
373 }
374
375 namespace {
376   class BadCrushMap : public std::runtime_error {
377   public:
378     int item;
379     BadCrushMap(const char* msg, int id)
380       : std::runtime_error(msg), item(id) {}
381   };
382   // throws if any node in the crush fail to print
383   class CrushWalker : public CrushTreeDumper::Dumper<void> {
384     typedef void DumbFormatter;
385     typedef CrushTreeDumper::Dumper<DumbFormatter> Parent;
386     int max_id;
387   public:
388     CrushWalker(const CrushWrapper *crush, unsigned max_id)
389       : Parent(crush, CrushTreeDumper::name_map_t()), max_id(max_id) {}
390     void dump_item(const CrushTreeDumper::Item &qi, DumbFormatter *) override {
391       int type = -1;
392       if (qi.is_bucket()) {
393         if (!crush->get_item_name(qi.id)) {
394           throw BadCrushMap("unknown item name", qi.id);
395         }
396         type = crush->get_bucket_type(qi.id);
397       } else {
398         if (max_id > 0 && qi.id >= max_id) {
399           throw BadCrushMap("item id too large", qi.id);
400         }
401         type = 0;
402       }
403       if (!crush->get_type_name(type)) {
404         throw BadCrushMap("unknown type name", qi.id);
405       }
406     }
407   };
408 }
409
410 bool CrushTester::check_name_maps(unsigned max_id) const
411 {
412   CrushWalker crush_walker(&crush, max_id);
413   try {
414     // walk through the crush, to see if its self-contained
415     crush_walker.dump(NULL);
416     // and see if the maps is also able to handle straying OSDs, whose id >= 0.
417     // "ceph osd tree" will try to print them, even they are not listed in the
418     // crush map.
419     crush_walker.dump_item(CrushTreeDumper::Item(0, 0, 0, 0), NULL);
420   } catch (const BadCrushMap& e) {
421     err << e.what() << ": item#" << e.item << std::endl;
422     return false;
423   }
424   return true;
425 }
426
427 static string get_rule_name(CrushWrapper& crush, int rule)
428 {
429   if (crush.get_rule_name(rule))
430     return crush.get_rule_name(rule);
431   else
432     return string("rule") + std::to_string(rule);
433 }
434
435 void CrushTester::check_overlapped_rules() const
436 {
437   namespace icl = boost::icl;
438   typedef std::set<string> RuleNames;
439   typedef icl::interval_map<int, RuleNames> Rules;
440   // <ruleset, type> => interval_map<size, {names}>
441   typedef std::map<std::pair<int, int>, Rules> RuleSets;
442   using interval = icl::interval<int>;
443
444   // mimic the logic of crush_find_rule(), but it only return the first matched
445   // one, but I am collecting all of them by the overlapped sizes.
446   RuleSets rulesets;
447   for (int rule = 0; rule < crush.get_max_rules(); rule++) {
448     if (!crush.rule_exists(rule)) {
449       continue;
450     }
451     Rules& rules = rulesets[{crush.get_rule_mask_ruleset(rule),
452                              crush.get_rule_mask_type(rule)}];
453     rules += make_pair(interval::closed(crush.get_rule_mask_min_size(rule),
454                                         crush.get_rule_mask_max_size(rule)),
455                        RuleNames{get_rule_name(crush, rule)});
456   }
457   for (auto i : rulesets) {
458     auto ruleset_type = i.first;
459     const Rules& rules = i.second;
460     for (auto r : rules) {
461       const RuleNames& names = r.second;
462       // if there are more than one rules covering the same size range,
463       // print them out.
464       if (names.size() > 1) {
465         err << "overlapped rules in ruleset " << ruleset_type.first << ": "
466             << boost::join(names, ", ") << "\n";
467       }
468     }
469   }
470 }
471
472 int CrushTester::test()
473 {
474   if (min_rule < 0 || max_rule < 0) {
475     min_rule = 0;
476     max_rule = crush.get_max_rules() - 1;
477   }
478   if (min_x < 0 || max_x < 0) {
479     min_x = 0;
480     max_x = 1023;
481   }
482
483   // initial osd weights
484   vector<__u32> weight;
485
486   /*
487    * note device weight is set by crushtool
488    * (likely due to a given a command line option)
489    */
490   for (int o = 0; o < crush.get_max_devices(); o++) {
491     if (device_weight.count(o)) {
492       weight.push_back(device_weight[o]);
493     } else if (crush.check_item_present(o)) {
494       weight.push_back(0x10000);
495     } else {
496       weight.push_back(0);
497     }
498   }
499
500   if (output_utilization_all)
501     err << "devices weights (hex): " << hex << weight << dec << std::endl;
502
503   // make adjustments
504   adjust_weights(weight);
505
506
507   int num_devices_active = 0;
508   for (vector<__u32>::iterator p = weight.begin(); p != weight.end(); ++p)
509     if (*p > 0)
510       num_devices_active++;
511
512   if (output_choose_tries)
513     crush.start_choose_profile();
514   
515   for (int r = min_rule; r < crush.get_max_rules() && r <= max_rule; r++) {
516     if (!crush.rule_exists(r)) {
517       if (output_statistics)
518         err << "rule " << r << " dne" << std::endl;
519       continue;
520     }
521     if (ruleset >= 0 &&
522         crush.get_rule_mask_ruleset(r) != ruleset) {
523       continue;
524     }
525     int minr = min_rep, maxr = max_rep;
526     if (min_rep < 0 || max_rep < 0) {
527       minr = crush.get_rule_mask_min_size(r);
528       maxr = crush.get_rule_mask_max_size(r);
529     }
530     
531     if (output_statistics)
532       err << "rule " << r << " (" << crush.get_rule_name(r)
533       << "), x = " << min_x << ".." << max_x
534       << ", numrep = " << minr << ".." << maxr
535       << std::endl;
536
537     for (int nr = minr; nr <= maxr; nr++) {
538       vector<int> per(crush.get_max_devices());
539       map<int,int> sizes;
540
541       int num_objects = ((max_x - min_x) + 1);
542       float num_devices = (float) per.size(); // get the total number of devices, better to cast as a float here 
543
544       // create a structure to hold data for post-processing
545       tester_data_set tester_data;
546       vector<float> vector_data_buffer_f;
547
548       // create a map to hold batch-level placement information
549       map<int, vector<int> > batch_per;
550       int objects_per_batch = num_objects / num_batches;
551       int batch_min = min_x;
552       int batch_max = min_x + objects_per_batch - 1;
553
554       // get the total weight of the system
555       int total_weight = 0;
556       for (unsigned i = 0; i < per.size(); i++)
557         total_weight += weight[i];
558
559       if (total_weight == 0)
560         continue;
561
562       // compute the expected number of objects stored per device in the absence of weighting
563       float expected_objects = min(nr, get_maximum_affected_by_rule(r)) * num_objects;
564
565       // compute each device's proportional weight
566       vector<float> proportional_weights( per.size() );
567
568       for (unsigned i = 0; i < per.size(); i++)
569         proportional_weights[i] = (float) weight[i] / (float) total_weight;
570
571       if (output_data_file) {
572         // stage the absolute weight information for post-processing
573         for (unsigned i = 0; i < per.size(); i++) {
574           tester_data.absolute_weights[i] = (float) weight[i] / (float)0x10000;
575         }
576
577         // stage the proportional weight information for post-processing
578         for (unsigned i = 0; i < per.size(); i++) {
579           if (proportional_weights[i] > 0 )
580             tester_data.proportional_weights[i] = proportional_weights[i];
581
582           tester_data.proportional_weights_all[i] = proportional_weights[i];
583         }
584
585       }
586       // compute the expected number of objects stored per device when a device's weight is considered
587       vector<float> num_objects_expected(num_devices);
588
589       for (unsigned i = 0; i < num_devices; i++)
590         num_objects_expected[i] = (proportional_weights[i]*expected_objects);
591
592       for (int current_batch = 0; current_batch < num_batches; current_batch++) {
593         if (current_batch == (num_batches - 1)) {
594           batch_max = max_x;
595           objects_per_batch = (batch_max - batch_min + 1);
596         }
597
598         float batch_expected_objects = min(nr, get_maximum_affected_by_rule(r)) * objects_per_batch;
599         vector<float> batch_num_objects_expected( per.size() );
600
601         for (unsigned i = 0; i < per.size() ; i++)
602           batch_num_objects_expected[i] = (proportional_weights[i]*batch_expected_objects);
603
604         // create a vector to hold placement results temporarily 
605         vector<int> temporary_per ( per.size() );
606
607         for (int x = batch_min; x <= batch_max; x++) {
608           // create a vector to hold the results of a CRUSH placement or RNG simulation
609           vector<int> out;
610
611           if (use_crush) {
612             if (output_mappings)
613               err << "CRUSH"; // prepend CRUSH to placement output
614             uint32_t real_x = x;
615             if (pool_id != -1) {
616               real_x = crush_hash32_2(CRUSH_HASH_RJENKINS1, x, (uint32_t)pool_id);
617             }
618             crush.do_rule(r, real_x, out, nr, weight, 0);
619           } else {
620             if (output_mappings)
621               err << "RNG"; // prepend RNG to placement output to denote simulation
622             // test our new monte carlo placement generator
623             random_placement(r, out, nr, weight);
624           }
625
626           if (output_mappings)
627             err << " rule " << r << " x " << x << " " << out << std::endl;
628
629           if (output_data_file)
630             write_integer_indexed_vector_data_string(tester_data.placement_information, x, out);
631
632           bool has_item_none = false;
633           for (unsigned i = 0; i < out.size(); i++) {
634             if (out[i] != CRUSH_ITEM_NONE) {
635               per[out[i]]++;
636               temporary_per[out[i]]++;
637             } else {
638               has_item_none = true;
639             }
640           }
641
642           batch_per[current_batch] = temporary_per;
643           sizes[out.size()]++;
644           if (output_bad_mappings && 
645               (out.size() != (unsigned)nr ||
646                has_item_none)) {
647             err << "bad mapping rule " << r << " x " << x << " num_rep " << nr << " result " << out << std::endl;
648           }
649         }
650
651         batch_min = batch_max + 1;
652         batch_max = batch_min + objects_per_batch - 1;
653       }
654
655       for (unsigned i = 0; i < per.size(); i++)
656         if (output_utilization && !output_statistics)
657           err << "  device " << i
658           << ":\t" << per[i] << std::endl;
659
660       for (map<int,int>::iterator p = sizes.begin(); p != sizes.end(); ++p)
661         if (output_statistics)
662           err << "rule " << r << " (" << crush.get_rule_name(r) << ") num_rep " << nr
663           << " result size == " << p->first << ":\t"
664           << p->second << "/" << (max_x-min_x+1) << std::endl;
665
666       if (output_statistics)
667         for (unsigned i = 0; i < per.size(); i++) {
668           if (output_utilization) {
669             if (num_objects_expected[i] > 0 && per[i] > 0) {
670               err << "  device " << i << ":\t"
671                   << "\t" << " stored " << ": " << per[i]
672                   << "\t" << " expected " << ": " << num_objects_expected[i]
673                   << std::endl;
674             }
675           } else if (output_utilization_all) {
676             err << "  device " << i << ":\t"
677                 << "\t" << " stored " << ": " << per[i]
678                 << "\t" << " expected " << ": " << num_objects_expected[i]
679                 << std::endl;
680           }
681         }
682
683       if (output_data_file)
684         for (unsigned i = 0; i < per.size(); i++) {
685           vector_data_buffer_f.clear();
686           vector_data_buffer_f.push_back( (float) per[i]);
687           vector_data_buffer_f.push_back( (float) num_objects_expected[i]);
688
689           write_integer_indexed_vector_data_string(tester_data.device_utilization_all, i, vector_data_buffer_f);
690
691           if (num_objects_expected[i] > 0 && per[i] > 0)
692             write_integer_indexed_vector_data_string(tester_data.device_utilization, i, vector_data_buffer_f);
693         }
694
695       if (output_data_file && num_batches > 1) {
696         // stage batch utilization information for post-processing
697         for (int i = 0; i < num_batches; i++) {
698           write_integer_indexed_vector_data_string(tester_data.batch_device_utilization_all, i, batch_per[i]);
699           write_integer_indexed_vector_data_string(tester_data.batch_device_expected_utilization_all, i, batch_per[i]);
700         }
701       }
702
703       string rule_tag = crush.get_rule_name(r);
704
705       if (output_csv)
706         write_data_set_to_csv(output_data_file_name+rule_tag,tester_data);
707     }
708   }
709
710   if (output_choose_tries) {
711     __u32 *v = 0;
712     int n = crush.get_choose_profile(&v);
713     for (int i=0; i<n; i++) {
714       cout.setf(std::ios::right);
715       cout << std::setw(2)
716       << i << ": " << std::setw(9) << v[i];
717       cout.unsetf(std::ios::right);
718       cout << std::endl;
719     }
720
721     crush.stop_choose_profile();
722   }
723
724   return 0;
725 }