Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / ECTransaction.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank Storage, Inc.
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <iostream>
16 #include <vector>
17 #include <vector>
18 #include <sstream>
19
20 #include "ECTransaction.h"
21 #include "ECUtil.h"
22 #include "os/ObjectStore.h"
23 #include "common/inline_variant.h"
24
25
26 void encode_and_write(
27   pg_t pgid,
28   const hobject_t &oid,
29   const ECUtil::stripe_info_t &sinfo,
30   ErasureCodeInterfaceRef &ecimpl,
31   const set<int> &want,
32   uint64_t offset,
33   bufferlist bl,
34   uint32_t flags,
35   ECUtil::HashInfoRef hinfo,
36   extent_map &written,
37   map<shard_id_t, ObjectStore::Transaction> *transactions,
38   DoutPrefixProvider *dpp) {
39   const uint64_t before_size = hinfo->get_total_logical_size(sinfo);
40   assert(sinfo.logical_offset_is_stripe_aligned(offset));
41   assert(sinfo.logical_offset_is_stripe_aligned(bl.length()));
42   assert(bl.length());
43
44   map<int, bufferlist> buffers;
45   int r = ECUtil::encode(
46     sinfo, ecimpl, bl, want, &buffers);
47   assert(r == 0);
48
49   written.insert(offset, bl.length(), bl);
50
51   ldpp_dout(dpp, 20) << __func__ << ": " << oid
52                      << " new_size "
53                      << offset + bl.length()
54                      << dendl;
55
56   if (offset >= before_size) {
57     assert(offset == before_size);
58     hinfo->append(
59       sinfo.aligned_logical_offset_to_chunk_offset(offset),
60       buffers);
61   }
62
63   for (auto &&i : *transactions) {
64     assert(buffers.count(i.first));
65     bufferlist &enc_bl = buffers[i.first];
66     if (offset >= before_size) {
67       i.second.set_alloc_hint(
68         coll_t(spg_t(pgid, i.first)),
69         ghobject_t(oid, ghobject_t::NO_GEN, i.first),
70         0, 0,
71         CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
72         CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
73     }
74     i.second.write(
75       coll_t(spg_t(pgid, i.first)),
76       ghobject_t(oid, ghobject_t::NO_GEN, i.first),
77       sinfo.logical_to_prev_chunk_offset(
78         offset),
79       enc_bl.length(),
80       enc_bl,
81       flags);
82   }
83 }
84
85 bool ECTransaction::requires_overwrite(
86   uint64_t prev_size,
87   const PGTransaction::ObjectOperation &op) {
88   // special handling for truncates to 0
89   if (op.truncate && op.truncate->first == 0)
90     return false;
91   return op.is_none() &&
92     ((!op.buffer_updates.empty() &&
93       (op.buffer_updates.begin().get_off() < prev_size)) ||
94      (op.truncate &&
95       (op.truncate->first < prev_size)));
96 }
97
98 void ECTransaction::generate_transactions(
99   WritePlan &plan,
100   ErasureCodeInterfaceRef &ecimpl,
101   pg_t pgid,
102   bool legacy_log_entries,
103   const ECUtil::stripe_info_t &sinfo,
104   const map<hobject_t,extent_map> &partial_extents,
105   vector<pg_log_entry_t> &entries,
106   map<hobject_t,extent_map> *written_map,
107   map<shard_id_t, ObjectStore::Transaction> *transactions,
108   set<hobject_t> *temp_added,
109   set<hobject_t> *temp_removed,
110   DoutPrefixProvider *dpp)
111 {
112   assert(written_map);
113   assert(transactions);
114   assert(temp_added);
115   assert(temp_removed);
116   assert(plan.t);
117   auto &t = *(plan.t);
118
119   auto &hash_infos = plan.hash_infos;
120
121   assert(transactions);
122   assert(temp_added);
123   assert(temp_removed);
124
125   map<hobject_t, pg_log_entry_t*> obj_to_log;
126   for (auto &&i: entries) {
127     obj_to_log.insert(make_pair(i.soid, &i));
128   }
129
130   t.safe_create_traverse(
131     [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
132       const hobject_t &oid = opair.first;
133       auto &op = opair.second;
134       auto &obc_map = t.obc_map;
135       auto &written = (*written_map)[oid];
136
137       auto iter = obj_to_log.find(oid);
138       pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
139
140       ObjectContextRef obc;
141       auto obiter = t.obc_map.find(oid);
142       if (obiter != t.obc_map.end()) {
143         obc = obiter->second;
144       }
145       if (entry) {
146         assert(obc);
147       } else {
148         assert(oid.is_temp());
149       }
150
151       ECUtil::HashInfoRef hinfo;
152       {
153         auto iter = hash_infos.find(oid);
154         assert(iter != hash_infos.end());
155         hinfo = iter->second;
156       }
157
158       if (oid.is_temp()) {
159         if (op.is_fresh_object()) {
160           temp_added->insert(oid);
161         } else if (op.is_delete()) {
162           temp_removed->insert(oid);
163         }
164       }
165
166       if (entry &&
167           entry->is_modify() &&
168           op.updated_snaps) {
169         bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
170         ::encode(op.updated_snaps->second, bl);
171         entry->snaps.swap(bl);
172         entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
173       }
174
175       ldpp_dout(dpp, 20) << "generate_transactions: "
176                          << opair.first
177                          << ", current size is "
178                          << hinfo->get_total_logical_size(sinfo)
179                          << " buffers are "
180                          << op.buffer_updates
181                          << dendl;
182       if (op.truncate) {
183         ldpp_dout(dpp, 20) << "generate_transactions: "
184                            << " truncate is "
185                            << *(op.truncate)
186                            << dendl;
187       }
188
189       if (entry && op.updated_snaps) {
190         entry->mod_desc.update_snaps(op.updated_snaps->first);
191       }
192
193       map<string, boost::optional<bufferlist> > xattr_rollback;
194       assert(hinfo);
195       bufferlist old_hinfo;
196       ::encode(*hinfo, old_hinfo);
197       xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
198       
199       if (op.is_none() && op.truncate && op.truncate->first == 0) {
200         assert(op.truncate->first == 0);
201         assert(op.truncate->first ==
202                op.truncate->second);
203         assert(entry);
204         assert(obc);
205         
206         if (op.truncate->first != op.truncate->second) {
207           op.truncate->first = op.truncate->second;
208         } else {
209           op.truncate = boost::none;
210         }
211
212         op.delete_first = true;
213         op.init_type = PGTransaction::ObjectOperation::Init::Create();
214
215         if (obc) {
216           /* We need to reapply all of the cached xattrs.
217              * std::map insert fortunately only writes keys
218              * which don't already exist, so this should do
219              * the right thing. */
220           op.attr_updates.insert(
221             obc->attr_cache.begin(),
222             obc->attr_cache.end());
223         }
224       }
225
226       if (op.delete_first) {
227         /* We also want to remove the boost::none entries since
228            * the keys already won't exist */
229         for (auto j = op.attr_updates.begin();
230              j != op.attr_updates.end();
231           ) {
232           if (j->second) {
233             ++j;
234           } else {
235             op.attr_updates.erase(j++);
236           }
237         }
238         /* Fill in all current entries for xattr rollback */
239         if (obc) {
240           xattr_rollback.insert(
241             obc->attr_cache.begin(),
242             obc->attr_cache.end());
243           obc->attr_cache.clear();
244         }
245         if (entry) {
246           entry->mod_desc.rmobject(entry->version.version);
247           for (auto &&st: *transactions) {
248             st.second.collection_move_rename(
249               coll_t(spg_t(pgid, st.first)),
250               ghobject_t(oid, ghobject_t::NO_GEN, st.first),
251               coll_t(spg_t(pgid, st.first)),
252               ghobject_t(oid, entry->version.version, st.first));
253           }
254         } else {
255           for (auto &&st: *transactions) {
256             st.second.remove(
257               coll_t(spg_t(pgid, st.first)),
258               ghobject_t(oid, ghobject_t::NO_GEN, st.first));
259           }
260         }
261         hinfo->clear();
262       }
263
264       if (op.is_fresh_object() && entry) {
265         entry->mod_desc.create();
266       }
267
268       match(
269         op.init_type,
270         [&](const PGTransaction::ObjectOperation::Init::None &) {},
271         [&](const PGTransaction::ObjectOperation::Init::Create &op) {
272           for (auto &&st: *transactions) {
273             st.second.touch(
274               coll_t(spg_t(pgid, st.first)),
275               ghobject_t(oid, ghobject_t::NO_GEN, st.first));
276           }
277         },
278         [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
279           for (auto &&st: *transactions) {
280             st.second.clone(
281               coll_t(spg_t(pgid, st.first)),
282               ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
283               ghobject_t(oid, ghobject_t::NO_GEN, st.first));
284           }
285
286           auto siter = hash_infos.find(op.source);
287           assert(siter != hash_infos.end());
288           hinfo->update_to(*(siter->second));
289
290           if (obc) {
291             auto cobciter = obc_map.find(op.source);
292             assert(cobciter != obc_map.end());
293             obc->attr_cache = cobciter->second->attr_cache;
294           }
295         },
296         [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
297           assert(op.source.is_temp());
298           for (auto &&st: *transactions) {
299             st.second.collection_move_rename(
300               coll_t(spg_t(pgid, st.first)),
301               ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
302               coll_t(spg_t(pgid, st.first)),
303               ghobject_t(oid, ghobject_t::NO_GEN, st.first));
304           }
305           auto siter = hash_infos.find(op.source);
306           assert(siter != hash_infos.end());
307           hinfo->update_to(*(siter->second));
308           if (obc) {
309             auto cobciter = obc_map.find(op.source);
310             assert(cobciter == obc_map.end());
311             obc->attr_cache.clear();
312           }
313         });
314
315       // omap not supported (except 0, handled above)
316       assert(!(op.clear_omap));
317       assert(!(op.omap_header));
318       assert(op.omap_updates.empty());
319
320       if (!op.attr_updates.empty()) {
321         map<string, bufferlist> to_set;
322         for (auto &&j: op.attr_updates) {
323           if (j.second) {
324             to_set[j.first] = *(j.second);
325           } else {
326             for (auto &&st : *transactions) {
327               st.second.rmattr(
328                 coll_t(spg_t(pgid, st.first)),
329                 ghobject_t(oid, ghobject_t::NO_GEN, st.first),
330                 j.first);
331             }
332           }
333           if (obc) {
334             auto citer = obc->attr_cache.find(j.first);
335             if (entry) {
336               if (citer != obc->attr_cache.end()) {
337                 // won't overwrite anything we put in earlier
338                 xattr_rollback.insert(
339                   make_pair(
340                     j.first,
341                     boost::optional<bufferlist>(citer->second)));
342               } else {
343                 // won't overwrite anything we put in earlier
344                 xattr_rollback.insert(
345                   make_pair(
346                     j.first,
347                     boost::none));
348               }
349             }
350             if (j.second) {
351               obc->attr_cache[j.first] = *(j.second);
352             } else if (citer != obc->attr_cache.end()) {
353               obc->attr_cache.erase(citer);
354             }
355           } else {
356             assert(!entry);
357           }
358         }
359         for (auto &&st : *transactions) {
360           st.second.setattrs(
361             coll_t(spg_t(pgid, st.first)),
362             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
363             to_set);
364         }
365         assert(!xattr_rollback.empty());
366       }
367       if (entry && !xattr_rollback.empty()) {
368         entry->mod_desc.setattrs(xattr_rollback);
369       }
370
371       if (op.alloc_hint) {
372         /* logical_to_next_chunk_offset() scales down both aligned and
373            * unaligned offsets
374            
375            * we don't bother to roll this back at this time for two reasons:
376            * 1) it's advisory
377            * 2) we don't track the old value */
378         uint64_t object_size = sinfo.logical_to_next_chunk_offset(
379           op.alloc_hint->expected_object_size);
380         uint64_t write_size = sinfo.logical_to_next_chunk_offset(
381           op.alloc_hint->expected_write_size);
382         
383         for (auto &&st : *transactions) {
384           st.second.set_alloc_hint(
385             coll_t(spg_t(pgid, st.first)),
386             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
387             object_size,
388             write_size,
389             op.alloc_hint->flags);
390         }
391       }
392
393       extent_map to_write;
394       auto pextiter = partial_extents.find(oid);
395       if (pextiter != partial_extents.end()) {
396         to_write = pextiter->second;
397       }
398
399       vector<pair<uint64_t, uint64_t> > rollback_extents;
400       const uint64_t orig_size = hinfo->get_total_logical_size(sinfo);
401
402       uint64_t new_size = orig_size;
403       uint64_t append_after = new_size;
404       ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl;
405       if (op.truncate && op.truncate->first < new_size) {
406         assert(!op.is_fresh_object());
407         new_size = sinfo.logical_to_next_stripe_offset(
408           op.truncate->first);
409         ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down "
410                            << new_size << dendl;
411         if (new_size != op.truncate->first) { // 0 the unaligned part
412           bufferlist bl;
413           bl.append_zero(new_size - op.truncate->first);
414           to_write.insert(
415             op.truncate->first,
416             bl.length(),
417             bl);
418           append_after = sinfo.logical_to_prev_stripe_offset(
419             op.truncate->first);
420         } else {
421           append_after = new_size;
422         }
423         to_write.erase(
424           new_size,
425           std::numeric_limits<uint64_t>::max() - new_size);
426
427         if (entry && !op.is_fresh_object()) {
428           uint64_t restore_from = sinfo.logical_to_prev_chunk_offset(
429             op.truncate->first);
430           uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
431             orig_size -
432             sinfo.logical_to_prev_stripe_offset(op.truncate->first));
433           assert(rollback_extents.empty());
434
435           ldpp_dout(dpp, 20) << __func__ << ": saving extent "
436                              << make_pair(restore_from, restore_len)
437                              << dendl;
438           ldpp_dout(dpp, 20) << __func__ << ": truncating to "
439                              << new_size
440                              << dendl;
441           rollback_extents.emplace_back(
442             make_pair(restore_from, restore_len));
443           for (auto &&st : *transactions) {
444             st.second.touch(
445               coll_t(spg_t(pgid, st.first)),
446               ghobject_t(oid, entry->version.version, st.first));
447             st.second.clone_range(
448               coll_t(spg_t(pgid, st.first)),
449               ghobject_t(oid, ghobject_t::NO_GEN, st.first),
450               ghobject_t(oid, entry->version.version, st.first),
451               restore_from,
452               restore_len,
453               restore_from);
454             
455           }
456         } else {
457           ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object"
458                              << dendl;
459         }
460         for (auto &&st : *transactions) {
461           st.second.truncate(
462             coll_t(spg_t(pgid, st.first)),
463             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
464             sinfo.aligned_logical_offset_to_chunk_offset(new_size));
465         }
466       }
467
468       uint32_t fadvise_flags = 0;
469       for (auto &&extent: op.buffer_updates) {
470         using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
471         bufferlist bl;
472         match(
473           extent.get_val(),
474           [&](const BufferUpdate::Write &op) {
475             bl = op.buffer;
476             fadvise_flags |= op.fadvise_flags;
477           },
478           [&](const BufferUpdate::Zero &) {
479             bl.append_zero(extent.get_len());
480           },
481           [&](const BufferUpdate::CloneRange &) {
482             assert(
483               0 ==
484               "CloneRange is not allowed, do_op should have returned ENOTSUPP");
485           });
486
487         uint64_t off = extent.get_off();
488         uint64_t len = extent.get_len();
489         uint64_t end = off + len;
490         ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update "
491                            << make_pair(off, len)
492                            << dendl;
493         assert(len > 0);
494         if (off > new_size) {
495           assert(off > append_after);
496           bl.prepend_zero(off - new_size);
497           len += off - new_size;
498           ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align "
499                              << off << "->" << new_size
500                              << dendl;
501           off = new_size;
502         }
503         if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) {
504           uint64_t aligned_end = sinfo.logical_to_next_stripe_offset(
505             end);
506           uint64_t tail = aligned_end - end;
507           bl.append_zero(tail);
508           ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end "
509                              << end << "->" << end+tail
510                              << ", len: " << len << "->" << len+tail
511                              << dendl;
512           end += tail;
513           len += tail;
514         }
515
516         to_write.insert(off, len, bl);
517         if (end > new_size)
518           new_size = end;
519       }
520
521       if (op.truncate &&
522           op.truncate->second > new_size) {
523         assert(op.truncate->second > append_after);
524         uint64_t truncate_to =
525           sinfo.logical_to_next_stripe_offset(
526             op.truncate->second);
527         uint64_t zeroes = truncate_to - new_size;
528         bufferlist bl;
529         bl.append_zero(zeroes);
530         to_write.insert(
531           new_size,
532           zeroes,
533           bl);
534         new_size = truncate_to;
535         ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
536                            << truncate_to
537                            << dendl;
538       }
539
540       set<int> want;
541       for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
542         want.insert(i);
543       }
544       auto to_overwrite = to_write.intersect(0, append_after);
545       ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: "
546                          << to_overwrite
547                          << dendl;
548       for (auto &&extent: to_overwrite) {
549         assert(extent.get_off() + extent.get_len() <= append_after);
550         assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
551         assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
552         if (entry) {
553           uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset(
554             extent.get_off());
555           uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
556             extent.get_len());
557           ldpp_dout(dpp, 20) << __func__ << ": overwriting "
558                              << restore_from << "~" << restore_len
559                              << dendl;
560           if (rollback_extents.empty()) {
561             for (auto &&st : *transactions) {
562               st.second.touch(
563                 coll_t(spg_t(pgid, st.first)),
564                 ghobject_t(oid, entry->version.version, st.first));
565             }
566           }
567           rollback_extents.emplace_back(make_pair(restore_from, restore_len));
568           for (auto &&st : *transactions) {
569             st.second.clone_range(
570               coll_t(spg_t(pgid, st.first)),
571               ghobject_t(oid, ghobject_t::NO_GEN, st.first),
572               ghobject_t(oid, entry->version.version, st.first),
573               restore_from,
574               restore_len,
575               restore_from);
576           }
577         }
578         encode_and_write(
579           pgid,
580           oid,
581           sinfo,
582           ecimpl,
583           want,
584           extent.get_off(),
585           extent.get_val(),
586           fadvise_flags,
587           hinfo,
588           written,
589           transactions,
590           dpp);
591       }
592
593       auto to_append = to_write.intersect(
594         append_after,
595         std::numeric_limits<uint64_t>::max() - append_after);
596       ldpp_dout(dpp, 20) << __func__ << ": to_append: "
597                          << to_append
598                          << dendl;
599       for (auto &&extent: to_append) {
600         assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
601         assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
602         ldpp_dout(dpp, 20) << __func__ << ": appending "
603                            << extent.get_off() << "~" << extent.get_len()
604                            << dendl;
605         encode_and_write(
606           pgid,
607           oid,
608           sinfo,
609           ecimpl,
610           want,
611           extent.get_off(),
612           extent.get_val(),
613           fadvise_flags,
614           hinfo,
615           written,
616           transactions,
617           dpp);
618       }
619
620       ldpp_dout(dpp, 20) << __func__ << ": " << oid
621                          << " resetting hinfo to logical size "
622                          << new_size
623                          << dendl;
624       if (!rollback_extents.empty() && entry) {
625         if (entry) {
626           ldpp_dout(dpp, 20) << __func__ << ": " << oid
627                              << " marking rollback extents "
628                              << rollback_extents
629                              << dendl;
630           entry->mod_desc.rollback_extents(
631             entry->version.version, rollback_extents);
632         }
633         hinfo->set_total_chunk_size_clear_hash(
634           sinfo.aligned_logical_offset_to_chunk_offset(new_size));
635       } else {
636         assert(hinfo->get_total_logical_size(sinfo) == new_size);
637       }
638
639       if (entry && !to_append.empty()) {
640         ldpp_dout(dpp, 20) << __func__ << ": marking append "
641                            << append_after
642                            << dendl;
643         entry->mod_desc.append(append_after);
644       }
645
646       if (!op.is_delete()) {
647         bufferlist hbuf;
648         ::encode(*hinfo, hbuf);
649         for (auto &&i : *transactions) {
650           i.second.setattr(
651             coll_t(spg_t(pgid, i.first)),
652             ghobject_t(oid, ghobject_t::NO_GEN, i.first),
653             ECUtil::get_hinfo_key(),
654             hbuf);
655         }
656       }
657     });
658 }