initial code repo
[stor4nfv.git] / src / ceph / src / msg / Message.h
diff --git a/src/ceph/src/msg/Message.h b/src/ceph/src/msg/Message.h
new file mode 100644 (file)
index 0000000..10a7004
--- /dev/null
@@ -0,0 +1,502 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_MESSAGE_H
+#define CEPH_MESSAGE_H
+#include <stdlib.h>
+#include <ostream>
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/intrusive/list.hpp>
+// Because intrusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include "include/types.h"
+#include "include/buffer.h"
+#include "common/Throttle.h"
+#include "common/zipkin_trace.h"
+#include "msg_types.h"
+
+#include "common/RefCountedObj.h"
+#include "msg/Connection.h"
+
+#include "common/debug.h"
+#include "common/config.h"
+
+// monitor internal
+#define MSG_MON_SCRUB              64
+#define MSG_MON_ELECTION           65
+#define MSG_MON_PAXOS              66
+#define MSG_MON_PROBE              67
+#define MSG_MON_JOIN               68
+#define MSG_MON_SYNC              69
+
+/* monitor <-> mon admin tool */
+#define MSG_MON_COMMAND            50
+#define MSG_MON_COMMAND_ACK        51
+#define MSG_LOG                    52
+#define MSG_LOGACK                 53
+
+#define MSG_GETPOOLSTATS           58
+#define MSG_GETPOOLSTATSREPLY      59
+
+#define MSG_MON_GLOBAL_ID          60
+
+#define MSG_ROUTE                  47
+#define MSG_FORWARD                46
+
+#define MSG_PAXOS                  40
+
+
+// osd internal
+#define MSG_OSD_PING         70
+#define MSG_OSD_BOOT         71
+#define MSG_OSD_FAILURE      72
+#define MSG_OSD_ALIVE        73
+#define MSG_OSD_MARK_ME_DOWN 74
+#define MSG_OSD_FULL         75
+
+#define MSG_OSD_SUBOP        76
+#define MSG_OSD_SUBOPREPLY   77
+
+#define MSG_OSD_PGTEMP       78
+
+#define MSG_OSD_BEACON       79
+
+#define MSG_OSD_PG_NOTIFY      80
+#define MSG_OSD_PG_QUERY       81
+#define MSG_OSD_PG_LOG         83
+#define MSG_OSD_PG_REMOVE      84
+#define MSG_OSD_PG_INFO        85
+#define MSG_OSD_PG_TRIM        86
+
+#define MSG_PGSTATS            87
+#define MSG_PGSTATSACK         88
+
+#define MSG_OSD_PG_CREATE      89
+#define MSG_REMOVE_SNAPS       90
+
+#define MSG_OSD_SCRUB          91
+#define MSG_OSD_SCRUB_RESERVE  92  // previous PG_MISSING
+#define MSG_OSD_REP_SCRUB      93
+
+#define MSG_OSD_PG_SCAN        94
+#define MSG_OSD_PG_BACKFILL    95
+#define MSG_OSD_PG_BACKFILL_REMOVE 96
+
+#define MSG_COMMAND            97
+#define MSG_COMMAND_REPLY      98
+
+#define MSG_OSD_BACKFILL_RESERVE 99
+#define MSG_OSD_RECOVERY_RESERVE 150
+#define MSG_OSD_FORCE_RECOVERY 151
+
+#define MSG_OSD_PG_PUSH        105
+#define MSG_OSD_PG_PULL        106
+#define MSG_OSD_PG_PUSH_REPLY  107
+
+#define MSG_OSD_EC_WRITE       108
+#define MSG_OSD_EC_WRITE_REPLY 109
+#define MSG_OSD_EC_READ        110
+#define MSG_OSD_EC_READ_REPLY  111
+
+#define MSG_OSD_REPOP         112
+#define MSG_OSD_REPOPREPLY    113
+#define MSG_OSD_PG_UPDATE_LOG_MISSING  114
+#define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY  115
+
+#define MSG_OSD_PG_CREATED      116
+#define MSG_OSD_REP_SCRUBMAP    117
+#define MSG_OSD_PG_RECOVERY_DELETE 118
+#define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
+
+// *** MDS ***
+
+#define MSG_MDS_BEACON             100  // to monitor
+#define MSG_MDS_SLAVE_REQUEST      101
+#define MSG_MDS_TABLE_REQUEST      102
+
+                                // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
+
+#define MSG_MDS_RESOLVE            0x200
+#define MSG_MDS_RESOLVEACK         0x201
+#define MSG_MDS_CACHEREJOIN        0x202
+#define MSG_MDS_DISCOVER           0x203
+#define MSG_MDS_DISCOVERREPLY      0x204
+#define MSG_MDS_INODEUPDATE        0x205
+#define MSG_MDS_DIRUPDATE          0x206
+#define MSG_MDS_CACHEEXPIRE        0x207
+#define MSG_MDS_DENTRYUNLINK       0x208
+#define MSG_MDS_FRAGMENTNOTIFY     0x209
+#define MSG_MDS_OFFLOAD_TARGETS    0x20a
+#define MSG_MDS_DENTRYLINK         0x20c
+#define MSG_MDS_FINDINO            0x20d
+#define MSG_MDS_FINDINOREPLY       0x20e
+#define MSG_MDS_OPENINO            0x20f
+#define MSG_MDS_OPENINOREPLY       0x210
+
+#define MSG_MDS_LOCK               0x300
+#define MSG_MDS_INODEFILECAPS      0x301
+
+#define MSG_MDS_EXPORTDIRDISCOVER     0x449
+#define MSG_MDS_EXPORTDIRDISCOVERACK  0x450
+#define MSG_MDS_EXPORTDIRCANCEL       0x451
+#define MSG_MDS_EXPORTDIRPREP         0x452
+#define MSG_MDS_EXPORTDIRPREPACK      0x453
+#define MSG_MDS_EXPORTDIRWARNING      0x454
+#define MSG_MDS_EXPORTDIRWARNINGACK   0x455
+#define MSG_MDS_EXPORTDIR             0x456
+#define MSG_MDS_EXPORTDIRACK          0x457
+#define MSG_MDS_EXPORTDIRNOTIFY       0x458
+#define MSG_MDS_EXPORTDIRNOTIFYACK    0x459
+#define MSG_MDS_EXPORTDIRFINISH       0x460
+
+#define MSG_MDS_EXPORTCAPS            0x470
+#define MSG_MDS_EXPORTCAPSACK         0x471
+#define MSG_MDS_GATHERCAPS            0x472
+
+#define MSG_MDS_HEARTBEAT          0x500  // for mds load balancer
+
+// *** generic ***
+#define MSG_TIMECHECK             0x600
+#define MSG_MON_HEALTH            0x601
+
+// *** Message::encode() crcflags bits ***
+#define MSG_CRC_DATA           (1 << 0)
+#define MSG_CRC_HEADER         (1 << 1)
+#define MSG_CRC_ALL            (MSG_CRC_DATA | MSG_CRC_HEADER)
+
+// Xio Testing
+#define MSG_DATA_PING            0x602
+
+// Xio intends to define messages 0x603..0x606
+
+// Special
+#define MSG_NOP                   0x607
+
+#define MSG_MON_HEALTH_CHECKS     0x608
+
+// *** ceph-mgr <-> OSD/MDS daemons ***
+#define MSG_MGR_OPEN              0x700
+#define MSG_MGR_CONFIGURE         0x701
+#define MSG_MGR_REPORT            0x702
+
+// *** ceph-mgr <-> ceph-mon ***
+#define MSG_MGR_BEACON            0x703
+
+// *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
+#define MSG_MGR_MAP               0x704
+
+// *** ceph-mon(MgrMonitor) -> ceph-mgr
+#define MSG_MGR_DIGEST               0x705
+// *** cephmgr -> ceph-mon
+#define MSG_MON_MGR_REPORT        0x706
+#define MSG_SERVICE_MAP           0x707
+
+// ======================================================
+
+// abstract Message class
+
+namespace bi = boost::intrusive;
+
+// XioMessenger conditional trace flags
+#define MSG_MAGIC_XIO          0x0002
+#define MSG_MAGIC_TRACE_XCON   0x0004
+#define MSG_MAGIC_TRACE_DTOR   0x0008
+#define MSG_MAGIC_TRACE_HDR    0x0010
+#define MSG_MAGIC_TRACE_XIO    0x0020
+#define MSG_MAGIC_TRACE_XMSGR  0x0040
+#define MSG_MAGIC_TRACE_CTR    0x0080
+
+// XioMessenger diagnostic "ping pong" flag (resend msg when send completes)
+#define MSG_MAGIC_REDUPE       0x0100
+
+class Message : public RefCountedObject {
+protected:
+  ceph_msg_header  header;      // headerelope
+  ceph_msg_footer  footer;
+  bufferlist       payload;  // "front" unaligned blob
+  bufferlist       middle;   // "middle" unaligned blob
+  bufferlist       data;     // data payload (page-alignment will be preserved where possible)
+
+  /* recv_stamp is set when the Messenger starts reading the
+   * Message off the wire */
+  utime_t recv_stamp;
+  /* dispatch_stamp is set when the Messenger starts calling dispatch() on
+   * its endpoints */
+  utime_t dispatch_stamp;
+  /* throttle_stamp is the point at which we got throttle */
+  utime_t throttle_stamp;
+  /* time at which message was fully read */
+  utime_t recv_complete_stamp;
+
+  ConnectionRef connection;
+
+  uint32_t magic = 0;
+
+  bi::list_member_hook<> dispatch_q;
+
+public:
+  // zipkin tracing
+  ZTracer::Trace trace;
+  void encode_trace(bufferlist &bl, uint64_t features) const;
+  void decode_trace(bufferlist::iterator &p, bool create = false);
+
+  class CompletionHook : public Context {
+  protected:
+    Message *m;
+    friend class Message;
+  public:
+    explicit CompletionHook(Message *_m) : m(_m) {}
+    virtual void set_message(Message *_m) { m = _m; }
+  };
+
+  typedef bi::list< Message,
+                   bi::member_hook< Message,
+                                    bi::list_member_hook<>,
+                                    &Message::dispatch_q > > Queue;
+
+protected:
+  CompletionHook* completion_hook = nullptr; // owned by Messenger
+
+  // release our size in bytes back to this throttler when our payload
+  // is adjusted or when we are destroyed.
+  Throttle *byte_throttler = nullptr;
+
+  // release a count back to this throttler when we are destroyed
+  Throttle *msg_throttler = nullptr;
+
+  // keep track of how big this message was when we reserved space in
+  // the msgr dispatch_throttler, so that we can properly release it
+  // later.  this is necessary because messages can enter the dispatch
+  // queue locally (not via read_message()), and those are not
+  // currently throttled.
+  uint64_t dispatch_throttle_size = 0;
+
+  friend class Messenger;
+
+public:
+  Message() {
+    memset(&header, 0, sizeof(header));
+    memset(&footer, 0, sizeof(footer));
+  }
+  Message(int t, int version=1, int compat_version=0) {
+    memset(&header, 0, sizeof(header));
+    header.type = t;
+    header.version = version;
+    header.compat_version = compat_version;
+    header.priority = 0;  // undef
+    header.data_off = 0;
+    memset(&footer, 0, sizeof(footer));
+  }
+
+  Message *get() {
+    return static_cast<Message *>(RefCountedObject::get());
+  }
+
+protected:
+  ~Message() override {
+    if (byte_throttler)
+      byte_throttler->put(payload.length() + middle.length() + data.length());
+    release_message_throttle();
+    trace.event("message destructed");
+    /* call completion hooks (if any) */
+    if (completion_hook)
+      completion_hook->complete(0);
+  }
+public:
+  const ConnectionRef& get_connection() const { return connection; }
+  void set_connection(const ConnectionRef& c) {
+    connection = c;
+  }
+  CompletionHook* get_completion_hook() { return completion_hook; }
+  void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
+  void set_byte_throttler(Throttle *t) { byte_throttler = t; }
+  Throttle *get_byte_throttler() { return byte_throttler; }
+  void set_message_throttler(Throttle *t) { msg_throttler = t; }
+  Throttle *get_message_throttler() { return msg_throttler; }
+
+  void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
+  uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
+
+  const ceph_msg_header &get_header() const { return header; }
+  ceph_msg_header &get_header() { return header; }
+  void set_header(const ceph_msg_header &e) { header = e; }
+  void set_footer(const ceph_msg_footer &e) { footer = e; }
+  const ceph_msg_footer &get_footer() const { return footer; }
+  ceph_msg_footer &get_footer() { return footer; }
+  void set_src(const entity_name_t& src) { header.src = src; }
+
+  uint32_t get_magic() const { return magic; }
+  void set_magic(int _magic) { magic = _magic; }
+
+  /*
+   * If you use get_[data, middle, payload] you shouldn't
+   * use it to change those bufferlists unless you KNOW
+   * there is no throttle being used. The other
+   * functions are throttling-aware as appropriate.
+   */
+
+  void clear_payload() {
+    if (byte_throttler) {
+      byte_throttler->put(payload.length() + middle.length());
+    }
+    payload.clear();
+    middle.clear();
+  }
+
+  virtual void clear_buffers() {}
+  void clear_data() {
+    if (byte_throttler)
+      byte_throttler->put(data.length());
+    data.clear();
+    clear_buffers(); // let subclass drop buffers as well
+  }
+  void release_message_throttle() {
+    if (msg_throttler)
+      msg_throttler->put();
+    msg_throttler = nullptr;
+  }
+
+  bool empty_payload() const { return payload.length() == 0; }
+  bufferlist& get_payload() { return payload; }
+  void set_payload(bufferlist& bl) {
+    if (byte_throttler)
+      byte_throttler->put(payload.length());
+    payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
+    if (byte_throttler)
+      byte_throttler->take(payload.length());
+  }
+
+  void set_middle(bufferlist& bl) {
+    if (byte_throttler)
+      byte_throttler->put(middle.length());
+    middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
+    if (byte_throttler)
+      byte_throttler->take(middle.length());
+  }
+  bufferlist& get_middle() { return middle; }
+
+  void set_data(const bufferlist &bl) {
+    if (byte_throttler)
+      byte_throttler->put(data.length());
+    data.share(bl);
+    if (byte_throttler)
+      byte_throttler->take(data.length());
+  }
+
+  const bufferlist& get_data() const { return data; }
+  bufferlist& get_data() { return data; }
+  void claim_data(bufferlist& bl,
+                 unsigned int flags = buffer::list::CLAIM_DEFAULT) {
+    if (byte_throttler)
+      byte_throttler->put(data.length());
+    bl.claim(data, flags);
+  }
+  off_t get_data_len() const { return data.length(); }
+
+  void set_recv_stamp(utime_t t) { recv_stamp = t; }
+  const utime_t& get_recv_stamp() const { return recv_stamp; }
+  void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
+  const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
+  void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
+  const utime_t& get_throttle_stamp() const { return throttle_stamp; }
+  void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
+  const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
+
+  void calc_header_crc() {
+    header.crc = ceph_crc32c(0, (unsigned char*)&header,
+                            sizeof(header) - sizeof(header.crc));
+  }
+  void calc_front_crc() {
+    footer.front_crc = payload.crc32c(0);
+    footer.middle_crc = middle.crc32c(0);
+  }
+  void calc_data_crc() {
+    footer.data_crc = data.crc32c(0);
+  }
+
+  virtual int get_cost() const {
+    return data.length();
+  }
+
+  // type
+  int get_type() const { return header.type; }
+  void set_type(int t) { header.type = t; }
+
+  uint64_t get_tid() const { return header.tid; }
+  void set_tid(uint64_t t) { header.tid = t; }
+
+  uint64_t get_seq() const { return header.seq; }
+  void set_seq(uint64_t s) { header.seq = s; }
+
+  unsigned get_priority() const { return header.priority; }
+  void set_priority(__s16 p) { header.priority = p; }
+
+  // source/dest
+  entity_inst_t get_source_inst() const {
+    return entity_inst_t(get_source(), get_source_addr());
+  }
+  entity_name_t get_source() const {
+    return entity_name_t(header.src);
+  }
+  entity_addr_t get_source_addr() const {
+    if (connection)
+      return connection->get_peer_addr();
+    return entity_addr_t();
+  }
+
+  // forwarded?
+  entity_inst_t get_orig_source_inst() const {
+    return get_source_inst();
+  }
+  entity_name_t get_orig_source() const {
+    return get_source();
+  }
+  entity_addr_t get_orig_source_addr() const {
+    return get_source_addr();
+  }
+
+  // virtual bits
+  virtual void decode_payload() = 0;
+  virtual void encode_payload(uint64_t features) = 0;
+  virtual const char *get_type_name() const = 0;
+  virtual void print(ostream& out) const {
+    out << get_type_name() << " magic: " << magic;
+  }
+
+  virtual void dump(Formatter *f) const;
+
+  void encode(uint64_t features, int crcflags);
+};
+typedef boost::intrusive_ptr<Message> MessageRef;
+
+extern Message *decode_message(CephContext *cct, int crcflags,
+                              ceph_msg_header &header,
+                              ceph_msg_footer& footer, bufferlist& front,
+                              bufferlist& middle, bufferlist& data,
+                              Connection* conn);
+inline ostream& operator<<(ostream& out, const Message& m) {
+  m.print(out);
+  if (m.get_header().version)
+    out << " v" << m.get_header().version;
+  return out;
+}
+
+extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
+extern Message *decode_message(CephContext *cct, int crcflags,
+                               bufferlist::iterator& bl);
+
+#endif