Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / erasure-code / lrc / ErasureCodeLrc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
7  * Copyright (C) 2014 Red Hat <contact@redhat.com>
8  *
9  * Author: Loic Dachary <loic@dachary.org>
10  *
11  *  This library is free software; you can redistribute it and/or
12  *  modify it under the terms of the GNU Lesser General Public
13  *  License as published by the Free Software Foundation; either
14  *  version 2.1 of the License, or (at your option) any later version.
15  *
16  */
17
18 #include <errno.h>
19 #include <algorithm>
20
21 #include "include/str_map.h"
22 #include "common/debug.h"
23 #include "crush/CrushWrapper.h"
24 #include "osd/osd_types.h"
25 #include "include/stringify.h"
26 #include "erasure-code/ErasureCodePlugin.h"
27 #include "json_spirit/json_spirit_writer.h"
28
29 #include "ErasureCodeLrc.h"
30
31 // re-include our assert to clobber boost's
32 #include "include/assert.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_osd
36 #undef dout_prefix
37 #define dout_prefix _prefix(_dout)
38
39 using namespace std;
40
41 static ostream& _prefix(std::ostream* _dout)
42 {
43   return *_dout << "ErasureCodeLrc: ";
44 }
45
46 int ErasureCodeLrc::create_rule(const string &name,
47                                    CrushWrapper &crush,
48                                    ostream *ss) const
49 {
50   if (crush.rule_exists(name)) {
51     *ss << "rule " << name << " exists";
52     return -EEXIST;
53   }
54   if (!crush.name_exists(rule_root)) {
55     *ss << "root item " << rule_root << " does not exist";
56     return -ENOENT;
57   }
58   int root = crush.get_item_id(rule_root);
59   if (rule_device_class.size()) {
60     if (!crush.class_exists(rule_device_class)) {
61       *ss << "device class " << rule_device_class << " does not exist";
62       return -ENOENT;
63     }
64     int c = crush.get_class_id(rule_device_class);
65     if (crush.class_bucket.count(root) == 0 ||
66         crush.class_bucket[root].count(c) == 0) {
67       *ss << "root item " << rule_root << " has no devices with class "
68           << rule_device_class;
69       return -EINVAL;
70     }
71     root = crush.class_bucket[root][c];
72   }
73
74   int rno = 0;
75   for (rno = 0; rno < crush.get_max_rules(); rno++) {
76     if (!crush.rule_exists(rno) && !crush.ruleset_exists(rno))
77        break;
78   }
79
80   int steps = 4 + rule_steps.size();
81   int min_rep = 3;
82   int max_rep = get_chunk_count();
83   int ret;
84   ret = crush.add_rule(rno, steps, pg_pool_t::TYPE_ERASURE,
85                        min_rep, max_rep);
86   assert(ret == rno);
87   int step = 0;
88
89   ret = crush.set_rule_step(rno, step++, CRUSH_RULE_SET_CHOOSELEAF_TRIES, 5, 0);
90   assert(ret == 0);
91   ret = crush.set_rule_step(rno, step++, CRUSH_RULE_SET_CHOOSE_TRIES, 100, 0);
92   assert(ret == 0);
93   ret = crush.set_rule_step(rno, step++, CRUSH_RULE_TAKE, root, 0);
94   assert(ret == 0);
95   // [ [ "choose", "rack", 2 ],
96   //   [ "chooseleaf", "host", 5 ] ]
97   for (vector<Step>::const_iterator i = rule_steps.begin();
98        i != rule_steps.end();
99        ++i) {
100     int op = i->op == "chooseleaf" ?
101       CRUSH_RULE_CHOOSELEAF_INDEP : CRUSH_RULE_CHOOSE_INDEP;
102     int type = crush.get_type_id(i->type);
103     if (type < 0) {
104       *ss << "unknown crush type " << i->type;
105       return -EINVAL;
106     }
107     ret = crush.set_rule_step(rno, step++, op, i->n, type);
108     assert(ret == 0);
109   }
110   ret = crush.set_rule_step(rno, step++, CRUSH_RULE_EMIT, 0, 0);
111   assert(ret == 0);
112   crush.set_rule_name(rno, name);
113   return rno;
114 }
115
116 int ErasureCodeLrc::layers_description(const ErasureCodeProfile &profile,
117                                        json_spirit::mArray *description,
118                                        ostream *ss) const
119 {
120   if (profile.count("layers") == 0) {
121     *ss << "could not find 'layers' in " << profile << std::endl;
122     return ERROR_LRC_DESCRIPTION;
123   }
124   string str = profile.find("layers")->second;
125   try {
126     json_spirit::mValue json;
127     json_spirit::read_or_throw(str, json);
128
129     if (json.type() != json_spirit::array_type) {
130       *ss << "layers='" << str
131           << "' must be a JSON array but is of type "
132           << json.type() << " instead" << std::endl;
133       return ERROR_LRC_ARRAY;
134     }
135     *description = json.get_array();
136   } catch (json_spirit::Error_position &e) {
137     *ss << "failed to parse layers='" << str << "'"
138         << " at line " << e.line_ << ", column " << e.column_
139         << " : " << e.reason_ << std::endl;
140     return ERROR_LRC_PARSE_JSON;
141   }
142   return 0;
143 }
144
145 int ErasureCodeLrc::layers_parse(string description_string,
146                                  json_spirit::mArray description,
147                                  ostream *ss)
148 {
149   int position = 0;
150   for (vector<json_spirit::mValue>::iterator i = description.begin();
151        i != description.end();
152        ++i, position++) {
153     if (i->type() != json_spirit::array_type) {
154       stringstream json_string;
155       json_spirit::write(*i, json_string);
156       *ss << "each element of the array "
157           << description_string << " must be a JSON array but "
158           << json_string.str() << " at position " << position
159           << " is of type " << i->type() << " instead" << std::endl;
160       return ERROR_LRC_ARRAY;
161     }
162     json_spirit::mArray layer_json = i->get_array();
163     ErasureCodeProfile profile;
164     int index = 0;
165     for (vector<json_spirit::mValue>::iterator j = layer_json.begin();
166          j != layer_json.end();
167          ++j, ++index) {
168       if (index == 0) {
169         if (j->type() != json_spirit::str_type) {
170           stringstream element;
171           json_spirit::write(*j, element);
172           *ss << "the first element of the entry "
173               << element.str() << " (first is zero) "
174               << position << " in " << description_string
175               << " is of type " << (*j).type() << " instead of string" << std::endl;
176           return ERROR_LRC_STR;
177         }
178         layers.push_back(Layer(j->get_str()));
179         Layer &layer = layers.back();
180         layer.chunks_map = j->get_str();
181       } else if(index == 1) {
182         Layer &layer = layers.back();
183         if (j->type() != json_spirit::str_type &&
184             j->type() != json_spirit::obj_type) {
185           stringstream element;
186           json_spirit::write(*j, element);
187           *ss << "the second element of the entry "
188               << element.str() << " (first is zero) "
189               << position << " in " << description_string
190               << " is of type " << (*j).type() << " instead of string or object"
191               << std::endl;
192           return ERROR_LRC_CONFIG_OPTIONS;
193         }
194         if (j->type() == json_spirit::str_type) {
195           int err = get_json_str_map(j->get_str(), *ss, &layer.profile);
196           if (err)
197             return err;
198         } else if (j->type() == json_spirit::obj_type) {
199           json_spirit::mObject o = j->get_obj();
200
201           for (map<string, json_spirit::mValue>::iterator i = o.begin();
202                i != o.end();
203                ++i) {
204             layer.profile[i->first] = i->second.get_str();
205           }
206         }
207       } else {
208           // ignore trailing elements
209       }
210     }
211   }
212   return 0;
213 }
214
215 int ErasureCodeLrc::layers_init(ostream *ss)
216 {
217   ErasureCodePluginRegistry &registry = ErasureCodePluginRegistry::instance();
218   for (unsigned int i = 0; i < layers.size(); i++) {
219     Layer &layer = layers[i];
220     int position = 0;
221     for(std::string::iterator it = layer.chunks_map.begin();
222         it != layer.chunks_map.end();
223         ++it) {
224       if (*it == 'D')
225         layer.data.push_back(position);
226       if (*it == 'c')
227         layer.coding.push_back(position);
228       if (*it == 'c' || *it == 'D')
229         layer.chunks_as_set.insert(position);
230       position++;
231     }
232     layer.chunks = layer.data;
233     layer.chunks.insert(layer.chunks.end(),
234                         layer.coding.begin(), layer.coding.end());
235     if (layer.profile.find("k") == layer.profile.end())
236       layer.profile["k"] = stringify(layer.data.size());
237     if (layer.profile.find("m") == layer.profile.end())
238       layer.profile["m"] = stringify(layer.coding.size());
239     if (layer.profile.find("plugin") == layer.profile.end())
240       layer.profile["plugin"] = "jerasure";
241     if (layer.profile.find("technique") == layer.profile.end())
242       layer.profile["technique"] = "reed_sol_van";
243     int err = registry.factory(layer.profile["plugin"],
244                                directory,
245                                layer.profile,
246                                &layer.erasure_code,
247                                ss);
248     if (err)
249       return err;
250   }
251   return 0;
252 }
253
254 int ErasureCodeLrc::layers_sanity_checks(string description_string,
255                                          ostream *ss) const
256 {
257   int position = 0;
258
259   if (layers.size() < 1) {
260     *ss << "layers parameter has " << layers.size()
261         << " which is less than the minimum of one. "
262         << description_string << std::endl;
263     return ERROR_LRC_LAYERS_COUNT;
264   }
265   for (vector<Layer>::const_iterator layer = layers.begin();
266        layer != layers.end();
267        ++layer) {
268     if (chunk_count != layer->chunks_map.length()) {
269       *ss << "the first element of the array at position "
270           << position << " (starting from zero) "
271           << " is the string '" << layer->chunks_map
272           << " found in the layers parameter "
273           << description_string << ". It is expected to be "
274           << chunk_count << " characters long but is "
275           << layer->chunks_map.length() << " characters long instead "
276           << std::endl;
277       return ERROR_LRC_MAPPING_SIZE;
278     }
279   }
280   return 0;
281 }
282
283 int ErasureCodeLrc::parse(ErasureCodeProfile &profile,
284                           ostream *ss)
285 {
286   int r = ErasureCode::parse(profile, ss);
287   if (r)
288     return r;
289
290   return parse_rule(profile, ss);
291 }
292
293 const string ErasureCodeLrc::DEFAULT_KML("-1");
294
295 int ErasureCodeLrc::parse_kml(ErasureCodeProfile &profile,
296                               ostream *ss)
297 {
298   int err = ErasureCode::parse(profile, ss);
299   const int DEFAULT_INT = -1;
300   int k, m, l;
301   err |= to_int("k", profile, &k, DEFAULT_KML, ss);
302   err |= to_int("m", profile, &m, DEFAULT_KML, ss);
303   err |= to_int("l", profile, &l, DEFAULT_KML, ss);
304
305   if (k == DEFAULT_INT && m == DEFAULT_INT && l == DEFAULT_INT)
306     return err;
307
308   if ((k != DEFAULT_INT || m != DEFAULT_INT || l != DEFAULT_INT) &&
309       (k == DEFAULT_INT || m == DEFAULT_INT || l == DEFAULT_INT)) {
310     *ss << "All of k, m, l must be set or none of them in "
311         << profile << std::endl;
312     return ERROR_LRC_ALL_OR_NOTHING;
313   }
314
315   const char *generated[] = { "mapping",
316                               "layers",
317                               "crush-steps" };
318
319   for (int i = 0; i < 3; i++) {
320     if (profile.count(generated[i])) {
321       *ss << "The " << generated[i] << " parameter cannot be set "
322           << "when k, m, l are set in " << profile << std::endl;
323       return ERROR_LRC_GENERATED;
324     }
325   }
326
327   if ((k + m) % l) {
328     *ss << "k + m must be a multiple of l in "
329         << profile << std::endl;
330     return ERROR_LRC_K_M_MODULO;
331   }
332
333   int local_group_count = (k + m) / l;
334
335   if (k % local_group_count) {
336     *ss << "k must be a multiple of (k + m) / l in "
337         << profile << std::endl;
338     return ERROR_LRC_K_MODULO;
339   }
340
341   if (m % local_group_count) {
342     *ss << "m must be a multiple of (k + m) / l in "
343         << profile << std::endl;
344     return ERROR_LRC_M_MODULO;
345   }
346
347   string mapping;
348   for (int i = 0; i < local_group_count; i++) {
349     mapping += string(k / local_group_count, 'D') +
350       string(m / local_group_count, '_') + "_";
351   }
352   profile["mapping"] = mapping;
353
354   string layers = "[ ";
355
356   // global layer
357   layers += " [ \"";
358   for (int i = 0; i < local_group_count; i++) {
359     layers += string(k / local_group_count, 'D') +
360       string(m / local_group_count, 'c') + "_";
361   }
362   layers += "\", \"\" ],";
363
364   // local layers
365   for (int i = 0; i < local_group_count; i++) {
366     layers += " [ \"";
367     for (int j = 0; j < local_group_count; j++) {
368       if (i == j)
369         layers += string(l, 'D') + "c";
370       else
371         layers += string(l + 1, '_');
372     }
373     layers += "\", \"\" ],";
374   }
375   profile["layers"] = layers + "]";
376
377   ErasureCodeProfile::const_iterator parameter;
378   string rule_locality;
379   parameter = profile.find("crush-locality");
380   if (parameter != profile.end())
381     rule_locality = parameter->second;
382   string rule_failure_domain = "host";
383   parameter = profile.find("crush-failure-domain");
384   if (parameter != profile.end())
385     rule_failure_domain = parameter->second;
386
387   if (rule_locality != "") {
388     rule_steps.clear();
389     rule_steps.push_back(Step("choose", rule_locality,
390                                  local_group_count));
391     rule_steps.push_back(Step("chooseleaf", rule_failure_domain,
392                                  l + 1));
393   } else if (rule_failure_domain != "") {
394     rule_steps.clear();
395     rule_steps.push_back(Step("chooseleaf", rule_failure_domain, 0));
396   }
397
398   return err;
399 }
400
401 int ErasureCodeLrc::parse_rule(ErasureCodeProfile &profile,
402                                   ostream *ss)
403 {
404   int err = 0;
405   err |= to_string("crush-root", profile,
406                    &rule_root,
407                    "default", ss);
408   err |= to_string("crush-device-class", profile,
409                    &rule_device_class,
410                    "", ss);
411
412   if (profile.count("crush-steps") != 0) {
413     rule_steps.clear();
414     string str = profile.find("crush-steps")->second;
415     json_spirit::mArray description;
416     try {
417       json_spirit::mValue json;
418       json_spirit::read_or_throw(str, json);
419
420       if (json.type() != json_spirit::array_type) {
421         *ss << "crush-steps='" << str
422             << "' must be a JSON array but is of type "
423             << json.type() << " instead" << std::endl;
424         return ERROR_LRC_ARRAY;
425       }
426       description = json.get_array();
427     } catch (json_spirit::Error_position &e) {
428       *ss << "failed to parse crush-steps='" << str << "'"
429           << " at line " << e.line_ << ", column " << e.column_
430           << " : " << e.reason_ << std::endl;
431       return ERROR_LRC_PARSE_JSON;
432     }
433
434     int position = 0;
435     for (vector<json_spirit::mValue>::iterator i = description.begin();
436          i != description.end();
437          ++i, position++) {
438       if (i->type() != json_spirit::array_type) {
439         stringstream json_string;
440         json_spirit::write(*i, json_string);
441         *ss << "element of the array "
442             << str << " must be a JSON array but "
443             << json_string.str() << " at position " << position
444             << " is of type " << i->type() << " instead" << std::endl;
445         return ERROR_LRC_ARRAY;
446       }
447       int r = parse_rule_step(str, i->get_array(), ss);
448       if (r)
449         return r;
450     }
451   }
452   return 0;
453 }
454
455 int ErasureCodeLrc::parse_rule_step(string description_string,
456                                        json_spirit::mArray description,
457                                        ostream *ss)
458 {
459   stringstream json_string;
460   json_spirit::write(description, json_string);
461   string op;
462   string type;
463   int n = 0;
464   int position = 0;
465   for (vector<json_spirit::mValue>::iterator i = description.begin();
466        i != description.end();
467        ++i, position++) {
468     if ((position == 0 || position == 1) &&
469         i->type() != json_spirit::str_type) {
470       *ss << "element " << position << " of the array "
471           << json_string.str() << " found in " << description_string
472           << " must be a JSON string but is of type "
473           << i->type() << " instead" << std::endl;
474       return position == 0 ? ERROR_LRC_RULE_OP : ERROR_LRC_RULE_TYPE;
475     }
476     if (position == 2 && i->type() != json_spirit::int_type) {
477       *ss << "element " << position << " of the array "
478           << json_string.str() << " found in " << description_string
479           << " must be a JSON int but is of type "
480           << i->type() << " instead" << std::endl;
481       return ERROR_LRC_RULE_N;
482     }
483
484     if (position == 0)
485       op = i->get_str();
486     if (position == 1)
487       type = i->get_str();
488     if (position == 2)
489       n = i->get_int();
490   }
491   rule_steps.push_back(Step(op, type, n));
492   return 0;
493 }
494
495 int ErasureCodeLrc::init(ErasureCodeProfile &profile,
496                          ostream *ss)
497 {
498   int r;
499
500   r = parse_kml(profile, ss);
501   if (r)
502     return r;
503
504   r = parse(profile, ss);
505   if (r)
506     return r;
507
508   json_spirit::mArray description;
509   r = layers_description(profile, &description, ss);
510   if (r)
511     return r;
512
513   string description_string = profile.find("layers")->second;
514
515   dout(10) << "init(" << description_string << ")" << dendl;
516
517   r = layers_parse(description_string, description, ss);
518   if (r)
519     return r;
520
521   r = layers_init(ss);
522   if (r)
523     return r;
524
525   if (profile.count("mapping") == 0) {
526     *ss << "the 'mapping' profile is missing from " << profile;
527     return ERROR_LRC_MAPPING;
528   }
529   string mapping = profile.find("mapping")->second;
530   data_chunk_count = 0;
531   for(std::string::iterator it = mapping.begin(); it != mapping.end(); ++it) {
532     if (*it == 'D')
533       data_chunk_count++;
534   }
535   chunk_count = mapping.length();
536
537   r = layers_sanity_checks(description_string, ss);
538   if (r)
539     return r;
540
541   //
542   // When initialized with kml, the profile parameters
543   // that were generated should not be stored because
544   // they would otherwise be exposed to the caller.
545   //
546   if (profile.find("l") != profile.end() &&
547       profile.find("l")->second != DEFAULT_KML) {
548     profile.erase("mapping");
549     profile.erase("layers");
550   }
551   ErasureCode::init(profile, ss);
552   return 0;
553 }
554
555 set<int> ErasureCodeLrc::get_erasures(const set<int> &want,
556                                       const set<int> &available) const
557 {
558   set<int> result;
559   set_difference(want.begin(), want.end(),
560                  available.begin(), available.end(),
561                  inserter(result, result.end()));
562   return result;
563 }
564
565 unsigned int ErasureCodeLrc::get_chunk_size(unsigned int object_size) const
566 {
567   return layers.front().erasure_code->get_chunk_size(object_size);
568 }
569
570 void p(const set<int> &s) { cerr << s; } // for gdb
571
572 int ErasureCodeLrc::minimum_to_decode(const set<int> &want_to_read,
573                                       const set<int> &available_chunks,
574                                       set<int> *minimum)
575 {
576   dout(20) << __func__ << " want_to_read " << want_to_read
577            << " available_chunks " << available_chunks << dendl;
578   {
579     set<int> erasures_total;
580     set<int> erasures_not_recovered;
581     set<int> erasures_want;
582     for (unsigned int i = 0; i < get_chunk_count(); ++i) {
583       if (available_chunks.count(i) == 0) {
584         erasures_total.insert(i);
585         erasures_not_recovered.insert(i);
586         if (want_to_read.count(i) != 0)
587           erasures_want.insert(i);
588       }
589     }
590
591     //
592     // Case 1:
593     //
594     // When no chunk is missing there is no need to read more than what
595     // is wanted.
596     //
597     if (erasures_want.empty()) {
598       *minimum = want_to_read;
599       dout(20) << __func__ << " minimum == want_to_read == "
600                << want_to_read << dendl;
601       return 0;
602     }
603
604     //
605     // Case 2:
606     //
607     // Try to recover erasures with as few chunks as possible.
608     //
609     for (vector<Layer>::reverse_iterator i = layers.rbegin();
610          i != layers.rend();
611          ++i) {
612       //
613       // If this layer has no chunk that we want, skip it.
614       //
615       set<int> layer_want;
616       set_intersection(want_to_read.begin(), want_to_read.end(),
617                        i->chunks_as_set.begin(), i->chunks_as_set.end(),
618                        inserter(layer_want, layer_want.end()));
619       if (layer_want.empty())
620         continue;
621       //
622       // Are some of the chunks we want missing ?
623       //
624       set<int> layer_erasures;
625       set_intersection(layer_want.begin(), layer_want.end(),
626                        erasures_want.begin(), erasures_want.end(),
627                        inserter(layer_erasures, layer_erasures.end()));
628       set<int> layer_minimum;
629       if (layer_erasures.empty()) {
630         //
631         // The chunks we want are available, this is the minimum we need
632         // to read.
633         //
634         layer_minimum = layer_want;
635       } else {
636         set<int> erasures;
637         set_intersection(i->chunks_as_set.begin(), i->chunks_as_set.end(),
638                          erasures_not_recovered.begin(), erasures_not_recovered.end(),
639                          inserter(erasures, erasures.end()));
640
641         if (erasures.size() > i->erasure_code->get_coding_chunk_count()) {
642           //
643           // There are too many erasures for this layer to recover: skip
644           // it and hope that an upper layer will be do better.
645           //
646           continue;
647         } else {
648           //
649           // Get all available chunks in that layer to recover the
650           // missing one(s).
651           //
652           set_difference(i->chunks_as_set.begin(), i->chunks_as_set.end(),
653                          erasures_not_recovered.begin(), erasures_not_recovered.end(),
654                          inserter(layer_minimum, layer_minimum.end()));
655           //
656           // Chunks recovered by this layer are removed from the list of
657           // erasures so that upper levels do not attempt to recover
658           // them.
659           //
660           for (set<int>::const_iterator j = erasures.begin();
661                j != erasures.end();
662                ++j) {
663             erasures_not_recovered.erase(*j);
664             if (erasures_want.count(*j))
665               erasures_want.erase(*j);
666           }
667         }
668       }
669       minimum->insert(layer_minimum.begin(), layer_minimum.end());
670     }
671     if (erasures_want.empty()) {
672       minimum->insert(want_to_read.begin(), want_to_read.end());
673       for (set<int>::const_iterator i = erasures_total.begin();
674            i != erasures_total.end();
675            ++i) {
676         if (minimum->count(*i))
677           minimum->erase(*i);
678       }
679       dout(20) << __func__ << " minimum = " << *minimum << dendl;
680       return 0;
681     }
682   }
683
684   {
685     //
686     // Case 3:
687     //
688     // The previous strategy failed to recover from all erasures.
689     //
690     // Try to recover as many chunks as possible, even from layers
691     // that do not contain chunks that we want, in the hope that it
692     // will help the upper layers.
693     //
694     set<int> erasures_total;
695     for (unsigned int i = 0; i < get_chunk_count(); ++i) {
696       if (available_chunks.count(i) == 0)
697         erasures_total.insert(i);
698     }
699
700     for (vector<Layer>::reverse_iterator i = layers.rbegin();
701          i != layers.rend();
702          ++i) {
703       set<int> layer_erasures;
704       set_intersection(i->chunks_as_set.begin(), i->chunks_as_set.end(),
705                        erasures_total.begin(), erasures_total.end(),
706                        inserter(layer_erasures, layer_erasures.end()));
707       //
708       // If this layer has no erasure, skip it
709       //
710       if (layer_erasures.empty())
711         continue;
712
713       if (layer_erasures.size() > 0 &&
714           layer_erasures.size() <= i->erasure_code->get_coding_chunk_count()) {
715         //
716         // chunks recovered by this layer are removed from the list of
717         // erasures so that upper levels know they can rely on their
718         // availability
719         //
720         for (set<int>::const_iterator j = layer_erasures.begin();
721              j != layer_erasures.end();
722              ++j) {
723           erasures_total.erase(*j);
724         }
725       }
726     }
727     if (erasures_total.empty()) {
728       //
729       // Do not try to be smart about what chunks are necessary to
730       // recover, use all available chunks.
731       //
732       *minimum = available_chunks;
733       dout(20) << __func__ << " minimum == available_chunks == "
734                << available_chunks << dendl;
735       return 0;
736     }
737   }
738
739   derr << __func__ << " not enough chunks in " << available_chunks
740        << " to read " << want_to_read << dendl;
741   return -EIO;
742 }
743
744 int ErasureCodeLrc::encode_chunks(const set<int> &want_to_encode,
745                                   map<int, bufferlist> *encoded)
746 {
747   unsigned int top = layers.size();
748   for (vector<Layer>::reverse_iterator i = layers.rbegin();
749        i != layers.rend();
750        ++i) {
751     --top;
752     if (includes(i->chunks_as_set.begin(), i->chunks_as_set.end(),
753                  want_to_encode.begin(), want_to_encode.end()))
754       break;
755   }
756
757   for (unsigned int i = top; i < layers.size(); ++i) {
758     const Layer &layer = layers[i];
759     set<int> layer_want_to_encode;
760     map<int, bufferlist> layer_encoded;
761     int j = 0;
762     for (vector<int>::const_iterator c = layer.chunks.begin();
763          c != layer.chunks.end();
764          ++c) {
765       layer_encoded[j] = (*encoded)[*c];
766       if (want_to_encode.find(*c) != want_to_encode.end())
767         layer_want_to_encode.insert(j);
768       j++;
769     }
770     int err = layer.erasure_code->encode_chunks(layer_want_to_encode,
771                                                 &layer_encoded);
772     if (err) {
773       derr << __func__ << " layer " << layer.chunks_map
774            << " failed with " << err << " trying to encode "
775            << layer_want_to_encode << dendl;
776       return err;
777     }
778   }
779   return 0;
780 }
781
782 int ErasureCodeLrc::decode_chunks(const set<int> &want_to_read,
783                                   const map<int, bufferlist> &chunks,
784                                   map<int, bufferlist> *decoded)
785 {
786   set<int> available_chunks;
787   set<int> erasures;
788   for (unsigned int i = 0; i < get_chunk_count(); ++i) {
789     if (chunks.count(i) != 0)
790       available_chunks.insert(i);
791     else
792       erasures.insert(i);
793   }
794
795   set<int> want_to_read_erasures;
796
797   for (vector<Layer>::reverse_iterator layer = layers.rbegin();
798        layer != layers.rend();
799        ++layer) {
800     set<int> layer_erasures;
801     set_intersection(layer->chunks_as_set.begin(), layer->chunks_as_set.end(),
802                      erasures.begin(), erasures.end(),
803                      inserter(layer_erasures, layer_erasures.end()));
804
805     if (layer_erasures.size() >
806         layer->erasure_code->get_coding_chunk_count()) {
807       // skip because there are too many erasures for this layer to recover
808     } else if(layer_erasures.size() == 0) {
809       // skip because all chunks are already available
810     } else {
811       set<int> layer_want_to_read;
812       map<int, bufferlist> layer_chunks;
813       map<int, bufferlist> layer_decoded;
814       int j = 0;
815       for (vector<int>::const_iterator c = layer->chunks.begin();
816            c != layer->chunks.end();
817            ++c) {
818         //
819         // Pick chunks from *decoded* instead of *chunks* to re-use
820         // chunks recovered by previous layers. In other words
821         // *chunks* does not change but *decoded* gradually improves
822         // as more layers recover from erasures.
823         //
824         if (erasures.count(*c) == 0)
825           layer_chunks[j] = (*decoded)[*c];
826         if (want_to_read.count(*c) != 0)
827           layer_want_to_read.insert(j);
828         layer_decoded[j] = (*decoded)[*c];
829         ++j;
830       }
831       int err = layer->erasure_code->decode_chunks(layer_want_to_read,
832                                                    layer_chunks,
833                                                    &layer_decoded);
834       if (err) {
835         derr << __func__ << " layer " << layer->chunks_map
836              << " failed with " << err << " trying to decode "
837              << layer_want_to_read << " with " << available_chunks << dendl;
838         return err;
839       }
840       j = 0;
841       for (vector<int>::const_iterator c = layer->chunks.begin();
842            c != layer->chunks.end();
843            ++c) {
844         (*decoded)[*c] = layer_decoded[j];
845         ++j;
846         if (erasures.count(*c) != 0)
847           erasures.erase(*c);
848       }
849       want_to_read_erasures.clear();
850       set_intersection(erasures.begin(), erasures.end(),
851                        want_to_read.begin(), want_to_read.end(),
852                        inserter(want_to_read_erasures, want_to_read_erasures.end()));
853       if (want_to_read_erasures.size() == 0)
854         break;
855     }
856   }
857
858   if (want_to_read_erasures.size() > 0) {
859     derr << __func__ << " want to read " << want_to_read
860          << " with available_chunks = " << available_chunks
861          << " end up being unable to read " << want_to_read_erasures << dendl;
862     return -EIO;
863   } else {
864     return 0;
865   }
866 }