initial code repo
[stor4nfv.git] / src / ceph / src / osd / ECTransaction.cc
diff --git a/src/ceph/src/osd/ECTransaction.cc b/src/ceph/src/osd/ECTransaction.cc
new file mode 100644 (file)
index 0000000..b23174f
--- /dev/null
@@ -0,0 +1,658 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <iostream>
+#include <vector>
+#include <vector>
+#include <sstream>
+
+#include "ECTransaction.h"
+#include "ECUtil.h"
+#include "os/ObjectStore.h"
+#include "common/inline_variant.h"
+
+
+void encode_and_write(
+  pg_t pgid,
+  const hobject_t &oid,
+  const ECUtil::stripe_info_t &sinfo,
+  ErasureCodeInterfaceRef &ecimpl,
+  const set<int> &want,
+  uint64_t offset,
+  bufferlist bl,
+  uint32_t flags,
+  ECUtil::HashInfoRef hinfo,
+  extent_map &written,
+  map<shard_id_t, ObjectStore::Transaction> *transactions,
+  DoutPrefixProvider *dpp) {
+  const uint64_t before_size = hinfo->get_total_logical_size(sinfo);
+  assert(sinfo.logical_offset_is_stripe_aligned(offset));
+  assert(sinfo.logical_offset_is_stripe_aligned(bl.length()));
+  assert(bl.length());
+
+  map<int, bufferlist> buffers;
+  int r = ECUtil::encode(
+    sinfo, ecimpl, bl, want, &buffers);
+  assert(r == 0);
+
+  written.insert(offset, bl.length(), bl);
+
+  ldpp_dout(dpp, 20) << __func__ << ": " << oid
+                    << " new_size "
+                    << offset + bl.length()
+                    << dendl;
+
+  if (offset >= before_size) {
+    assert(offset == before_size);
+    hinfo->append(
+      sinfo.aligned_logical_offset_to_chunk_offset(offset),
+      buffers);
+  }
+
+  for (auto &&i : *transactions) {
+    assert(buffers.count(i.first));
+    bufferlist &enc_bl = buffers[i.first];
+    if (offset >= before_size) {
+      i.second.set_alloc_hint(
+       coll_t(spg_t(pgid, i.first)),
+       ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+       0, 0,
+       CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
+       CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
+    }
+    i.second.write(
+      coll_t(spg_t(pgid, i.first)),
+      ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+      sinfo.logical_to_prev_chunk_offset(
+       offset),
+      enc_bl.length(),
+      enc_bl,
+      flags);
+  }
+}
+
+bool ECTransaction::requires_overwrite(
+  uint64_t prev_size,
+  const PGTransaction::ObjectOperation &op) {
+  // special handling for truncates to 0
+  if (op.truncate && op.truncate->first == 0)
+    return false;
+  return op.is_none() &&
+    ((!op.buffer_updates.empty() &&
+      (op.buffer_updates.begin().get_off() < prev_size)) ||
+     (op.truncate &&
+      (op.truncate->first < prev_size)));
+}
+
+void ECTransaction::generate_transactions(
+  WritePlan &plan,
+  ErasureCodeInterfaceRef &ecimpl,
+  pg_t pgid,
+  bool legacy_log_entries,
+  const ECUtil::stripe_info_t &sinfo,
+  const map<hobject_t,extent_map> &partial_extents,
+  vector<pg_log_entry_t> &entries,
+  map<hobject_t,extent_map> *written_map,
+  map<shard_id_t, ObjectStore::Transaction> *transactions,
+  set<hobject_t> *temp_added,
+  set<hobject_t> *temp_removed,
+  DoutPrefixProvider *dpp)
+{
+  assert(written_map);
+  assert(transactions);
+  assert(temp_added);
+  assert(temp_removed);
+  assert(plan.t);
+  auto &t = *(plan.t);
+
+  auto &hash_infos = plan.hash_infos;
+
+  assert(transactions);
+  assert(temp_added);
+  assert(temp_removed);
+
+  map<hobject_t, pg_log_entry_t*> obj_to_log;
+  for (auto &&i: entries) {
+    obj_to_log.insert(make_pair(i.soid, &i));
+  }
+
+  t.safe_create_traverse(
+    [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
+      const hobject_t &oid = opair.first;
+      auto &op = opair.second;
+      auto &obc_map = t.obc_map;
+      auto &written = (*written_map)[oid];
+
+      auto iter = obj_to_log.find(oid);
+      pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
+
+      ObjectContextRef obc;
+      auto obiter = t.obc_map.find(oid);
+      if (obiter != t.obc_map.end()) {
+       obc = obiter->second;
+      }
+      if (entry) {
+       assert(obc);
+      } else {
+       assert(oid.is_temp());
+      }
+
+      ECUtil::HashInfoRef hinfo;
+      {
+       auto iter = hash_infos.find(oid);
+       assert(iter != hash_infos.end());
+       hinfo = iter->second;
+      }
+
+      if (oid.is_temp()) {
+       if (op.is_fresh_object()) {
+         temp_added->insert(oid);
+       } else if (op.is_delete()) {
+         temp_removed->insert(oid);
+       }
+      }
+
+      if (entry &&
+         entry->is_modify() &&
+         op.updated_snaps) {
+       bufferlist bl(op.updated_snaps->second.size() * 8 + 8);
+       ::encode(op.updated_snaps->second, bl);
+       entry->snaps.swap(bl);
+       entry->snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
+      }
+
+      ldpp_dout(dpp, 20) << "generate_transactions: "
+                        << opair.first
+                        << ", current size is "
+                        << hinfo->get_total_logical_size(sinfo)
+                        << " buffers are "
+                        << op.buffer_updates
+                        << dendl;
+      if (op.truncate) {
+       ldpp_dout(dpp, 20) << "generate_transactions: "
+                          << " truncate is "
+                          << *(op.truncate)
+                          << dendl;
+      }
+
+      if (entry && op.updated_snaps) {
+       entry->mod_desc.update_snaps(op.updated_snaps->first);
+      }
+
+      map<string, boost::optional<bufferlist> > xattr_rollback;
+      assert(hinfo);
+      bufferlist old_hinfo;
+      ::encode(*hinfo, old_hinfo);
+      xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
+      
+      if (op.is_none() && op.truncate && op.truncate->first == 0) {
+       assert(op.truncate->first == 0);
+       assert(op.truncate->first ==
+              op.truncate->second);
+       assert(entry);
+       assert(obc);
+       
+       if (op.truncate->first != op.truncate->second) {
+         op.truncate->first = op.truncate->second;
+       } else {
+         op.truncate = boost::none;
+       }
+
+       op.delete_first = true;
+       op.init_type = PGTransaction::ObjectOperation::Init::Create();
+
+       if (obc) {
+         /* We need to reapply all of the cached xattrs.
+            * std::map insert fortunately only writes keys
+            * which don't already exist, so this should do
+            * the right thing. */
+         op.attr_updates.insert(
+           obc->attr_cache.begin(),
+           obc->attr_cache.end());
+       }
+      }
+
+      if (op.delete_first) {
+       /* We also want to remove the boost::none entries since
+          * the keys already won't exist */
+       for (auto j = op.attr_updates.begin();
+            j != op.attr_updates.end();
+         ) {
+         if (j->second) {
+           ++j;
+         } else {
+           op.attr_updates.erase(j++);
+         }
+       }
+       /* Fill in all current entries for xattr rollback */
+       if (obc) {
+         xattr_rollback.insert(
+           obc->attr_cache.begin(),
+           obc->attr_cache.end());
+         obc->attr_cache.clear();
+       }
+       if (entry) {
+         entry->mod_desc.rmobject(entry->version.version);
+         for (auto &&st: *transactions) {
+           st.second.collection_move_rename(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, entry->version.version, st.first));
+         }
+       } else {
+         for (auto &&st: *transactions) {
+           st.second.remove(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
+       }
+       hinfo->clear();
+      }
+
+      if (op.is_fresh_object() && entry) {
+       entry->mod_desc.create();
+      }
+
+      match(
+       op.init_type,
+       [&](const PGTransaction::ObjectOperation::Init::None &) {},
+       [&](const PGTransaction::ObjectOperation::Init::Create &op) {
+         for (auto &&st: *transactions) {
+           st.second.touch(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
+       },
+       [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
+         for (auto &&st: *transactions) {
+           st.second.clone(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
+
+         auto siter = hash_infos.find(op.source);
+         assert(siter != hash_infos.end());
+         hinfo->update_to(*(siter->second));
+
+         if (obc) {
+           auto cobciter = obc_map.find(op.source);
+           assert(cobciter != obc_map.end());
+           obc->attr_cache = cobciter->second->attr_cache;
+         }
+       },
+       [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
+         assert(op.source.is_temp());
+         for (auto &&st: *transactions) {
+           st.second.collection_move_rename(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
+         auto siter = hash_infos.find(op.source);
+         assert(siter != hash_infos.end());
+         hinfo->update_to(*(siter->second));
+         if (obc) {
+           auto cobciter = obc_map.find(op.source);
+           assert(cobciter == obc_map.end());
+           obc->attr_cache.clear();
+         }
+       });
+
+      // omap not supported (except 0, handled above)
+      assert(!(op.clear_omap));
+      assert(!(op.omap_header));
+      assert(op.omap_updates.empty());
+
+      if (!op.attr_updates.empty()) {
+       map<string, bufferlist> to_set;
+       for (auto &&j: op.attr_updates) {
+         if (j.second) {
+           to_set[j.first] = *(j.second);
+         } else {
+           for (auto &&st : *transactions) {
+             st.second.rmattr(
+               coll_t(spg_t(pgid, st.first)),
+               ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+               j.first);
+           }
+         }
+         if (obc) {
+           auto citer = obc->attr_cache.find(j.first);
+           if (entry) {
+             if (citer != obc->attr_cache.end()) {
+               // won't overwrite anything we put in earlier
+               xattr_rollback.insert(
+                 make_pair(
+                   j.first,
+                   boost::optional<bufferlist>(citer->second)));
+             } else {
+               // won't overwrite anything we put in earlier
+               xattr_rollback.insert(
+                 make_pair(
+                   j.first,
+                   boost::none));
+             }
+           }
+           if (j.second) {
+             obc->attr_cache[j.first] = *(j.second);
+           } else if (citer != obc->attr_cache.end()) {
+             obc->attr_cache.erase(citer);
+           }
+         } else {
+           assert(!entry);
+         }
+       }
+       for (auto &&st : *transactions) {
+         st.second.setattrs(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+           to_set);
+       }
+       assert(!xattr_rollback.empty());
+      }
+      if (entry && !xattr_rollback.empty()) {
+       entry->mod_desc.setattrs(xattr_rollback);
+      }
+
+      if (op.alloc_hint) {
+       /* logical_to_next_chunk_offset() scales down both aligned and
+          * unaligned offsets
+          
+          * we don't bother to roll this back at this time for two reasons:
+          * 1) it's advisory
+          * 2) we don't track the old value */
+       uint64_t object_size = sinfo.logical_to_next_chunk_offset(
+         op.alloc_hint->expected_object_size);
+       uint64_t write_size = sinfo.logical_to_next_chunk_offset(
+         op.alloc_hint->expected_write_size);
+       
+       for (auto &&st : *transactions) {
+         st.second.set_alloc_hint(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+           object_size,
+           write_size,
+           op.alloc_hint->flags);
+       }
+      }
+
+      extent_map to_write;
+      auto pextiter = partial_extents.find(oid);
+      if (pextiter != partial_extents.end()) {
+       to_write = pextiter->second;
+      }
+
+      vector<pair<uint64_t, uint64_t> > rollback_extents;
+      const uint64_t orig_size = hinfo->get_total_logical_size(sinfo);
+
+      uint64_t new_size = orig_size;
+      uint64_t append_after = new_size;
+      ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl;
+      if (op.truncate && op.truncate->first < new_size) {
+       assert(!op.is_fresh_object());
+       new_size = sinfo.logical_to_next_stripe_offset(
+         op.truncate->first);
+       ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down "
+                          << new_size << dendl;
+       if (new_size != op.truncate->first) { // 0 the unaligned part
+         bufferlist bl;
+         bl.append_zero(new_size - op.truncate->first);
+         to_write.insert(
+           op.truncate->first,
+           bl.length(),
+           bl);
+         append_after = sinfo.logical_to_prev_stripe_offset(
+           op.truncate->first);
+       } else {
+         append_after = new_size;
+       }
+       to_write.erase(
+         new_size,
+         std::numeric_limits<uint64_t>::max() - new_size);
+
+       if (entry && !op.is_fresh_object()) {
+         uint64_t restore_from = sinfo.logical_to_prev_chunk_offset(
+           op.truncate->first);
+         uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
+           orig_size -
+           sinfo.logical_to_prev_stripe_offset(op.truncate->first));
+         assert(rollback_extents.empty());
+
+         ldpp_dout(dpp, 20) << __func__ << ": saving extent "
+                            << make_pair(restore_from, restore_len)
+                            << dendl;
+         ldpp_dout(dpp, 20) << __func__ << ": truncating to "
+                            << new_size
+                            << dendl;
+         rollback_extents.emplace_back(
+           make_pair(restore_from, restore_len));
+         for (auto &&st : *transactions) {
+           st.second.touch(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, entry->version.version, st.first));
+           st.second.clone_range(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+             ghobject_t(oid, entry->version.version, st.first),
+             restore_from,
+             restore_len,
+             restore_from);
+           
+         }
+       } else {
+         ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object"
+                            << dendl;
+       }
+       for (auto &&st : *transactions) {
+         st.second.truncate(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+           sinfo.aligned_logical_offset_to_chunk_offset(new_size));
+       }
+      }
+
+      uint32_t fadvise_flags = 0;
+      for (auto &&extent: op.buffer_updates) {
+       using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+       bufferlist bl;
+       match(
+         extent.get_val(),
+         [&](const BufferUpdate::Write &op) {
+           bl = op.buffer;
+           fadvise_flags |= op.fadvise_flags;
+         },
+         [&](const BufferUpdate::Zero &) {
+           bl.append_zero(extent.get_len());
+         },
+         [&](const BufferUpdate::CloneRange &) {
+           assert(
+             0 ==
+             "CloneRange is not allowed, do_op should have returned ENOTSUPP");
+         });
+
+       uint64_t off = extent.get_off();
+       uint64_t len = extent.get_len();
+       uint64_t end = off + len;
+       ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update "
+                          << make_pair(off, len)
+                          << dendl;
+       assert(len > 0);
+       if (off > new_size) {
+         assert(off > append_after);
+         bl.prepend_zero(off - new_size);
+         len += off - new_size;
+         ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align "
+                            << off << "->" << new_size
+                            << dendl;
+         off = new_size;
+       }
+       if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) {
+         uint64_t aligned_end = sinfo.logical_to_next_stripe_offset(
+           end);
+         uint64_t tail = aligned_end - end;
+         bl.append_zero(tail);
+         ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end "
+                            << end << "->" << end+tail
+                            << ", len: " << len << "->" << len+tail
+                            << dendl;
+         end += tail;
+         len += tail;
+       }
+
+       to_write.insert(off, len, bl);
+       if (end > new_size)
+         new_size = end;
+      }
+
+      if (op.truncate &&
+         op.truncate->second > new_size) {
+       assert(op.truncate->second > append_after);
+       uint64_t truncate_to =
+         sinfo.logical_to_next_stripe_offset(
+           op.truncate->second);
+       uint64_t zeroes = truncate_to - new_size;
+       bufferlist bl;
+       bl.append_zero(zeroes);
+       to_write.insert(
+         new_size,
+         zeroes,
+         bl);
+       new_size = truncate_to;
+       ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
+                          << truncate_to
+                          << dendl;
+      }
+
+      set<int> want;
+      for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
+       want.insert(i);
+      }
+      auto to_overwrite = to_write.intersect(0, append_after);
+      ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: "
+                        << to_overwrite
+                        << dendl;
+      for (auto &&extent: to_overwrite) {
+       assert(extent.get_off() + extent.get_len() <= append_after);
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
+       if (entry) {
+         uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset(
+           extent.get_off());
+         uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
+           extent.get_len());
+         ldpp_dout(dpp, 20) << __func__ << ": overwriting "
+                            << restore_from << "~" << restore_len
+                            << dendl;
+         if (rollback_extents.empty()) {
+           for (auto &&st : *transactions) {
+             st.second.touch(
+               coll_t(spg_t(pgid, st.first)),
+               ghobject_t(oid, entry->version.version, st.first));
+           }
+         }
+         rollback_extents.emplace_back(make_pair(restore_from, restore_len));
+         for (auto &&st : *transactions) {
+           st.second.clone_range(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+             ghobject_t(oid, entry->version.version, st.first),
+             restore_from,
+             restore_len,
+             restore_from);
+         }
+       }
+       encode_and_write(
+         pgid,
+         oid,
+         sinfo,
+         ecimpl,
+         want,
+         extent.get_off(),
+         extent.get_val(),
+         fadvise_flags,
+         hinfo,
+         written,
+         transactions,
+         dpp);
+      }
+
+      auto to_append = to_write.intersect(
+       append_after,
+       std::numeric_limits<uint64_t>::max() - append_after);
+      ldpp_dout(dpp, 20) << __func__ << ": to_append: "
+                        << to_append
+                        << dendl;
+      for (auto &&extent: to_append) {
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
+       ldpp_dout(dpp, 20) << __func__ << ": appending "
+                          << extent.get_off() << "~" << extent.get_len()
+                          << dendl;
+       encode_and_write(
+         pgid,
+         oid,
+         sinfo,
+         ecimpl,
+         want,
+         extent.get_off(),
+         extent.get_val(),
+         fadvise_flags,
+         hinfo,
+         written,
+         transactions,
+         dpp);
+      }
+
+      ldpp_dout(dpp, 20) << __func__ << ": " << oid
+                        << " resetting hinfo to logical size "
+                        << new_size
+                        << dendl;
+      if (!rollback_extents.empty() && entry) {
+       if (entry) {
+         ldpp_dout(dpp, 20) << __func__ << ": " << oid
+                            << " marking rollback extents "
+                            << rollback_extents
+                            << dendl;
+         entry->mod_desc.rollback_extents(
+           entry->version.version, rollback_extents);
+       }
+       hinfo->set_total_chunk_size_clear_hash(
+         sinfo.aligned_logical_offset_to_chunk_offset(new_size));
+      } else {
+       assert(hinfo->get_total_logical_size(sinfo) == new_size);
+      }
+
+      if (entry && !to_append.empty()) {
+       ldpp_dout(dpp, 20) << __func__ << ": marking append "
+                          << append_after
+                          << dendl;
+       entry->mod_desc.append(append_after);
+      }
+
+      if (!op.is_delete()) {
+       bufferlist hbuf;
+       ::encode(*hinfo, hbuf);
+       for (auto &&i : *transactions) {
+         i.second.setattr(
+           coll_t(spg_t(pgid, i.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+           ECUtil::get_hinfo_key(),
+           hbuf);
+       }
+      }
+    });
+}