initial code repo
[stor4nfv.git] / src / ceph / src / messages / MOSDOp.h
diff --git a/src/ceph/src/messages/MOSDOp.h b/src/ceph/src/messages/MOSDOp.h
new file mode 100755 (executable)
index 0000000..80c6ecf
--- /dev/null
@@ -0,0 +1,589 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef CEPH_MOSDOP_H
+#define CEPH_MOSDOP_H
+
+#include "MOSDFastDispatchOp.h"
+#include "include/ceph_features.h"
+#include "common/hobject.h"
+#include <atomic>
+
+/*
+ * OSD op
+ *
+ * oid - object id
+ * op  - OSD_OP_DELETE, etc.
+ *
+ */
+
+class OSD;
+
+class MOSDOp : public MOSDFastDispatchOp {
+
+  static const int HEAD_VERSION = 8;
+  static const int COMPAT_VERSION = 3;
+
+private:
+  uint32_t client_inc = 0;
+  __u32 osdmap_epoch = 0;
+  __u32 flags = 0;
+  utime_t mtime;
+  int32_t retry_attempt = -1;   // 0 is first attempt.  -1 if we don't know.
+
+  hobject_t hobj;
+  spg_t pgid;
+  bufferlist::iterator p;
+  // Decoding flags. Decoding is only needed for messages catched by pipe reader.
+  // Transition from true -> false without locks being held
+  // Can never see final_decode_needed == false and partial_decode_needed == true
+  atomic<bool> partial_decode_needed;
+  atomic<bool> final_decode_needed;
+  //
+public:
+  vector<OSDOp> ops;
+private:
+  snapid_t snap_seq;
+  vector<snapid_t> snaps;
+
+  uint64_t features;
+
+  osd_reqid_t reqid; // reqid explicitly set by sender
+
+public:
+  friend class MOSDOpReply;
+
+  ceph_tid_t get_client_tid() { return header.tid; }
+  void set_snapid(const snapid_t& s) {
+    hobj.snap = s;
+  }
+  void set_snaps(const vector<snapid_t>& i) {
+    snaps = i;
+  }
+  void set_snap_seq(const snapid_t& s) { snap_seq = s; }
+  void set_reqid(const osd_reqid_t rid) {
+    reqid = rid;
+  }
+  void set_spg(spg_t p) {
+    pgid = p;
+  }
+
+  // Fields decoded in partial decoding
+  pg_t get_pg() const {
+    assert(!partial_decode_needed);
+    return pgid.pgid;
+  }
+  spg_t get_spg() const override {
+    assert(!partial_decode_needed);
+    return pgid;
+  }
+  pg_t get_raw_pg() const {
+    assert(!partial_decode_needed);
+    return pg_t(hobj.get_hash(), pgid.pgid.pool());
+  }
+  epoch_t get_map_epoch() const override {
+    assert(!partial_decode_needed);
+    return osdmap_epoch;
+  }
+  int get_flags() const {
+    assert(!partial_decode_needed);
+    return flags;
+  }
+  osd_reqid_t get_reqid() const {
+    assert(!partial_decode_needed);
+    if (reqid.name != entity_name_t() || reqid.tid != 0) {
+      return reqid;
+    } else {
+      if (!final_decode_needed)
+       assert(reqid.inc == (int32_t)client_inc);  // decode() should have done this
+      return osd_reqid_t(get_orig_source(),
+                         reqid.inc,
+                        header.tid);
+    }
+  }
+
+  // Fields decoded in final decoding
+  int get_client_inc() const {
+    assert(!final_decode_needed);
+    return client_inc;
+  }
+  utime_t get_mtime() const {
+    assert(!final_decode_needed);
+    return mtime;
+  }
+  object_locator_t get_object_locator() const {
+    assert(!final_decode_needed);
+    if (hobj.oid.name.empty())
+      return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
+    else
+      return object_locator_t(hobj);
+  }
+  const object_t& get_oid() const {
+    assert(!final_decode_needed);
+    return hobj.oid;
+  }
+  const hobject_t &get_hobj() const {
+    return hobj;
+  }
+  snapid_t get_snapid() const {
+    assert(!final_decode_needed);
+    return hobj.snap;
+  }
+  const snapid_t& get_snap_seq() const {
+    assert(!final_decode_needed);
+    return snap_seq;
+  }
+  const vector<snapid_t> &get_snaps() const {
+    assert(!final_decode_needed);
+    return snaps;
+  }
+
+  /**
+   * get retry attempt
+   *
+   * 0 is the first attempt.
+   *
+   * @return retry attempt, or -1 if we don't know
+   */
+  int get_retry_attempt() const {
+    return retry_attempt;
+  }
+  uint64_t get_features() const {
+    if (features)
+      return features;
+    return get_connection()->get_features();
+  }
+
+  MOSDOp()
+    : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
+      partial_decode_needed(true),
+      final_decode_needed(true) { }
+  MOSDOp(int inc, long tid, const hobject_t& ho, spg_t& _pgid,
+        epoch_t _osdmap_epoch,
+        int _flags, uint64_t feat)
+    : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
+      client_inc(inc),
+      osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
+      hobj(ho),
+      pgid(_pgid),
+      partial_decode_needed(false),
+      final_decode_needed(false),
+      features(feat) {
+    set_tid(tid);
+
+    // also put the client_inc in reqid.inc, so that get_reqid() can
+    // be used before the full message is decoded.
+    reqid.inc = inc;
+  }
+private:
+  ~MOSDOp() override {}
+
+public:
+  void set_mtime(utime_t mt) { mtime = mt; }
+  void set_mtime(ceph::real_time mt) {
+    mtime = ceph::real_clock::to_timespec(mt);
+  }
+
+  // ops
+  void add_simple_op(int o, uint64_t off, uint64_t len) {
+    OSDOp osd_op;
+    osd_op.op.op = o;
+    osd_op.op.extent.offset = off;
+    osd_op.op.extent.length = len;
+    ops.push_back(osd_op);
+  }
+  void write(uint64_t off, uint64_t len, bufferlist& bl) {
+    add_simple_op(CEPH_OSD_OP_WRITE, off, len);
+    data.claim(bl);
+    header.data_off = off;
+  }
+  void writefull(bufferlist& bl) {
+    add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
+    data.claim(bl);
+    header.data_off = 0;
+  }
+  void zero(uint64_t off, uint64_t len) {
+    add_simple_op(CEPH_OSD_OP_ZERO, off, len);
+  }
+  void truncate(uint64_t off) {
+    add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
+  }
+  void remove() {
+    add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
+  }
+
+  void read(uint64_t off, uint64_t len) {
+    add_simple_op(CEPH_OSD_OP_READ, off, len);
+  }
+  void stat() {
+    add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
+  }
+
+  bool has_flag(__u32 flag) const { return flags & flag; };
+
+  bool is_retry_attempt() const { return flags & CEPH_OSD_FLAG_RETRY; }
+  void set_retry_attempt(unsigned a) { 
+    if (a)
+      flags |= CEPH_OSD_FLAG_RETRY;
+    else
+      flags &= ~CEPH_OSD_FLAG_RETRY;
+    retry_attempt = a;
+  }
+
+  // marshalling
+  void encode_payload(uint64_t features) override {
+
+    OSDOp::merge_osd_op_vector_in_data(ops, data);
+
+    if ((features & CEPH_FEATURE_OBJECTLOCATOR) == 0) {
+      // here is the old structure we are encoding to: //
+#if 0
+struct ceph_osd_request_head {
+       __le32 client_inc;                 /* client incarnation */
+       struct ceph_object_layout layout;  /* pgid */
+       __le32 osdmap_epoch;               /* client's osdmap epoch */
+
+       __le32 flags;
+
+       struct ceph_timespec mtime;        /* for mutations only */
+       struct ceph_eversion reassert_version; /* if we are replaying op */
+
+       __le32 object_len;     /* length of object name */
+
+       __le64 snapid;         /* snapid to read */
+       __le64 snap_seq;       /* writer's snap context */
+       __le32 num_snaps;
+
+       __le16 num_ops;
+       struct ceph_osd_op ops[];  /* followed by ops[], obj, ticket, snaps */
+} __attribute__ ((packed));
+#endif
+      header.version = 1;
+
+      ::encode(client_inc, payload);
+
+      __u32 su = 0;
+      ::encode(get_raw_pg(), payload);
+      ::encode(su, payload);
+
+      ::encode(osdmap_epoch, payload);
+      ::encode(flags, payload);
+      ::encode(mtime, payload);
+      ::encode(eversion_t(), payload);  // reassert_version
+
+      __u32 oid_len = hobj.oid.name.length();
+      ::encode(oid_len, payload);
+      ::encode(hobj.snap, payload);
+      ::encode(snap_seq, payload);
+      __u32 num_snaps = snaps.size();
+      ::encode(num_snaps, payload);
+      
+      //::encode(ops, payload);
+      __u16 num_ops = ops.size();
+      ::encode(num_ops, payload);
+      for (unsigned i = 0; i < ops.size(); i++)
+       ::encode(ops[i].op, payload);
+
+      ::encode_nohead(hobj.oid.name, payload);
+      ::encode_nohead(snaps, payload);
+    } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
+      header.version = 6;
+      ::encode(client_inc, payload);
+      ::encode(osdmap_epoch, payload);
+      ::encode(flags, payload);
+      ::encode(mtime, payload);
+      ::encode(eversion_t(), payload); // reassert_version
+      ::encode(get_object_locator(), payload);
+      ::encode(get_raw_pg(), payload);
+
+      ::encode(hobj.oid, payload);
+
+      __u16 num_ops = ops.size();
+      ::encode(num_ops, payload);
+      for (unsigned i = 0; i < ops.size(); i++)
+        ::encode(ops[i].op, payload);
+
+      ::encode(hobj.snap, payload);
+      ::encode(snap_seq, payload);
+      ::encode(snaps, payload);
+
+      ::encode(retry_attempt, payload);
+      ::encode(features, payload);
+      if (reqid.name != entity_name_t() || reqid.tid != 0) {
+       ::encode(reqid, payload);
+      } else {
+       // don't include client_inc in the reqid for the legacy v6
+       // encoding or else we'll confuse older peers.
+       ::encode(osd_reqid_t(), payload);
+      }
+    } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) {
+      // reordered, v7 message encoding
+      header.version = 7;
+      ::encode(get_raw_pg(), payload);
+      ::encode(osdmap_epoch, payload);
+      ::encode(flags, payload);
+      ::encode(eversion_t(), payload); // reassert_version
+      ::encode(reqid, payload);
+      ::encode(client_inc, payload);
+      ::encode(mtime, payload);
+      ::encode(get_object_locator(), payload);
+      ::encode(hobj.oid, payload);
+
+      __u16 num_ops = ops.size();
+      ::encode(num_ops, payload);
+      for (unsigned i = 0; i < ops.size(); i++)
+       ::encode(ops[i].op, payload);
+
+      ::encode(hobj.snap, payload);
+      ::encode(snap_seq, payload);
+      ::encode(snaps, payload);
+
+      ::encode(retry_attempt, payload);
+      ::encode(features, payload);
+    } else {
+      // latest v8 encoding with hobject_t hash separate from pgid, no
+      // reassert version
+      header.version = HEAD_VERSION;
+      ::encode(pgid, payload);
+      ::encode(hobj.get_hash(), payload);
+      ::encode(osdmap_epoch, payload);
+      ::encode(flags, payload);
+      ::encode(reqid, payload);
+      encode_trace(payload, features);
+
+      // -- above decoded up front; below decoded post-dispatch thread --
+
+      ::encode(client_inc, payload);
+      ::encode(mtime, payload);
+      ::encode(get_object_locator(), payload);
+      ::encode(hobj.oid, payload);
+
+      __u16 num_ops = ops.size();
+      ::encode(num_ops, payload);
+      for (unsigned i = 0; i < ops.size(); i++)
+       ::encode(ops[i].op, payload);
+
+      ::encode(hobj.snap, payload);
+      ::encode(snap_seq, payload);
+      ::encode(snaps, payload);
+
+      ::encode(retry_attempt, payload);
+      ::encode(features, payload);
+    }
+  }
+
+  void decode_payload() override {
+    assert(partial_decode_needed && final_decode_needed);
+    p = payload.begin();
+
+    // Always keep here the newest version of decoding order/rule
+    if (header.version == HEAD_VERSION) {
+      ::decode(pgid, p);      // actual pgid
+      uint32_t hash;
+      ::decode(hash, p); // raw hash value
+      hobj.set_hash(hash);
+      ::decode(osdmap_epoch, p);
+      ::decode(flags, p);
+      ::decode(reqid, p);
+      decode_trace(p);
+    } else if (header.version == 7) {
+      ::decode(pgid.pgid, p);      // raw pgid
+      hobj.set_hash(pgid.pgid.ps());
+      ::decode(osdmap_epoch, p);
+      ::decode(flags, p);
+      eversion_t reassert_version;
+      ::decode(reassert_version, p);
+      ::decode(reqid, p);
+    } else if (header.version < 2) {
+      // old decode
+      ::decode(client_inc, p);
+
+      old_pg_t opgid;
+      ::decode_raw(opgid, p);
+      pgid.pgid = opgid;
+
+      __u32 su;
+      ::decode(su, p);
+
+      ::decode(osdmap_epoch, p);
+      ::decode(flags, p);
+      ::decode(mtime, p);
+      eversion_t reassert_version;
+      ::decode(reassert_version, p);
+
+      __u32 oid_len;
+      ::decode(oid_len, p);
+      ::decode(hobj.snap, p);
+      ::decode(snap_seq, p);
+      __u32 num_snaps;
+      ::decode(num_snaps, p);
+      
+      //::decode(ops, p);
+      __u16 num_ops;
+      ::decode(num_ops, p);
+      ops.resize(num_ops);
+      for (unsigned i = 0; i < num_ops; i++)
+       ::decode(ops[i].op, p);
+
+      decode_nohead(oid_len, hobj.oid.name, p);
+      decode_nohead(num_snaps, snaps, p);
+
+      // recalculate pgid hash value
+      pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
+                                    hobj.oid.name.c_str(),
+                                    hobj.oid.name.length()));
+      hobj.pool = pgid.pgid.pool();
+      hobj.set_hash(pgid.pgid.ps());
+
+      retry_attempt = -1;
+      features = 0;
+      OSDOp::split_osd_op_vector_in_data(ops, data);
+
+      // we did the full decode
+      final_decode_needed = false;
+
+      // put client_inc in reqid.inc for get_reqid()'s benefit
+      reqid = osd_reqid_t();
+      reqid.inc = client_inc;
+    } else if (header.version < 7) {
+      ::decode(client_inc, p);
+      ::decode(osdmap_epoch, p);
+      ::decode(flags, p);
+      ::decode(mtime, p);
+      eversion_t reassert_version;
+      ::decode(reassert_version, p);
+
+      object_locator_t oloc;
+      ::decode(oloc, p);
+
+      if (header.version < 3) {
+       old_pg_t opgid;
+       ::decode_raw(opgid, p);
+       pgid.pgid = opgid;
+      } else {
+       ::decode(pgid.pgid, p);
+      }
+
+      ::decode(hobj.oid, p);
+
+      //::decode(ops, p);
+      __u16 num_ops;
+      ::decode(num_ops, p);
+      ops.resize(num_ops);
+      for (unsigned i = 0; i < num_ops; i++)
+        ::decode(ops[i].op, p);
+
+      ::decode(hobj.snap, p);
+      ::decode(snap_seq, p);
+      ::decode(snaps, p);
+
+      if (header.version >= 4)
+        ::decode(retry_attempt, p);
+      else
+        retry_attempt = -1;
+
+      if (header.version >= 5)
+        ::decode(features, p);
+      else
+       features = 0;
+
+      if (header.version >= 6)
+       ::decode(reqid, p);
+      else
+       reqid = osd_reqid_t();
+
+      hobj.pool = pgid.pgid.pool();
+      hobj.set_key(oloc.key);
+      hobj.nspace = oloc.nspace;
+      hobj.set_hash(pgid.pgid.ps());
+
+      OSDOp::split_osd_op_vector_in_data(ops, data);
+
+      // we did the full decode
+      final_decode_needed = false;
+
+      // put client_inc in reqid.inc for get_reqid()'s benefit
+      if (reqid.name == entity_name_t() && reqid.tid == 0)
+       reqid.inc = client_inc;
+    }
+
+    partial_decode_needed = false;
+  }
+
+  bool finish_decode() {
+    assert(!partial_decode_needed); // partial decoding required
+    if (!final_decode_needed)
+      return false; // Message is already final decoded
+    assert(header.version >= 7);
+
+    ::decode(client_inc, p);
+    ::decode(mtime, p);
+    object_locator_t oloc;
+    ::decode(oloc, p);
+    ::decode(hobj.oid, p);
+
+    __u16 num_ops;
+    ::decode(num_ops, p);
+    ops.resize(num_ops);
+    for (unsigned i = 0; i < num_ops; i++)
+      ::decode(ops[i].op, p);
+
+    ::decode(hobj.snap, p);
+    ::decode(snap_seq, p);
+    ::decode(snaps, p);
+
+    ::decode(retry_attempt, p);
+
+    ::decode(features, p);
+
+    hobj.pool = pgid.pgid.pool();
+    hobj.set_key(oloc.key);
+    hobj.nspace = oloc.nspace;
+
+    OSDOp::split_osd_op_vector_in_data(ops, data);
+
+    final_decode_needed = false;
+    return true;
+  }
+
+  void clear_buffers() override {
+    OSDOp::clear_data(ops);
+  }
+
+  const char *get_type_name() const override { return "osd_op"; }
+  void print(ostream& out) const override {
+    out << "osd_op(";
+    if (!partial_decode_needed) {
+      out << get_reqid() << ' ';
+      out << pgid;
+      if (!final_decode_needed) {
+       out << ' ';
+       out << hobj
+           << " " << ops
+           << " snapc " << get_snap_seq() << "=" << snaps;
+       if (is_retry_attempt())
+         out << " RETRY=" << get_retry_attempt();
+      } else {
+       out << " " << get_raw_pg() << " (undecoded)";
+      }
+      out << " " << ceph_osd_flag_string(get_flags());
+      out << " e" << osdmap_epoch;
+    }
+    out << ")";
+  }
+};
+
+
+#endif