Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / Message.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_MESSAGE_H
16 #define CEPH_MESSAGE_H
17  
18 #include <stdlib.h>
19 #include <ostream>
20
21 #include <boost/intrusive_ptr.hpp>
22 #include <boost/intrusive/list.hpp>
23 // Because intrusive_ptr clobbers our assert...
24 #include "include/assert.h"
25
26 #include "include/types.h"
27 #include "include/buffer.h"
28 #include "common/Throttle.h"
29 #include "common/zipkin_trace.h"
30 #include "msg_types.h"
31
32 #include "common/RefCountedObj.h"
33 #include "msg/Connection.h"
34
35 #include "common/debug.h"
36 #include "common/config.h"
37
38 // monitor internal
39 #define MSG_MON_SCRUB              64
40 #define MSG_MON_ELECTION           65
41 #define MSG_MON_PAXOS              66
42 #define MSG_MON_PROBE              67
43 #define MSG_MON_JOIN               68
44 #define MSG_MON_SYNC               69
45
46 /* monitor <-> mon admin tool */
47 #define MSG_MON_COMMAND            50
48 #define MSG_MON_COMMAND_ACK        51
49 #define MSG_LOG                    52
50 #define MSG_LOGACK                 53
51
52 #define MSG_GETPOOLSTATS           58
53 #define MSG_GETPOOLSTATSREPLY      59
54
55 #define MSG_MON_GLOBAL_ID          60
56
57 #define MSG_ROUTE                  47
58 #define MSG_FORWARD                46
59
60 #define MSG_PAXOS                  40
61
62
63 // osd internal
64 #define MSG_OSD_PING         70
65 #define MSG_OSD_BOOT         71
66 #define MSG_OSD_FAILURE      72
67 #define MSG_OSD_ALIVE        73
68 #define MSG_OSD_MARK_ME_DOWN 74
69 #define MSG_OSD_FULL         75
70
71 #define MSG_OSD_SUBOP        76
72 #define MSG_OSD_SUBOPREPLY   77
73
74 #define MSG_OSD_PGTEMP       78
75
76 #define MSG_OSD_BEACON       79
77
78 #define MSG_OSD_PG_NOTIFY      80
79 #define MSG_OSD_PG_QUERY       81
80 #define MSG_OSD_PG_LOG         83
81 #define MSG_OSD_PG_REMOVE      84
82 #define MSG_OSD_PG_INFO        85
83 #define MSG_OSD_PG_TRIM        86
84
85 #define MSG_PGSTATS            87
86 #define MSG_PGSTATSACK         88
87
88 #define MSG_OSD_PG_CREATE      89
89 #define MSG_REMOVE_SNAPS       90
90
91 #define MSG_OSD_SCRUB          91
92 #define MSG_OSD_SCRUB_RESERVE  92  // previous PG_MISSING
93 #define MSG_OSD_REP_SCRUB      93
94
95 #define MSG_OSD_PG_SCAN        94
96 #define MSG_OSD_PG_BACKFILL    95
97 #define MSG_OSD_PG_BACKFILL_REMOVE 96
98
99 #define MSG_COMMAND            97
100 #define MSG_COMMAND_REPLY      98
101
102 #define MSG_OSD_BACKFILL_RESERVE 99
103 #define MSG_OSD_RECOVERY_RESERVE 150
104 #define MSG_OSD_FORCE_RECOVERY 151
105
106 #define MSG_OSD_PG_PUSH        105
107 #define MSG_OSD_PG_PULL        106
108 #define MSG_OSD_PG_PUSH_REPLY  107
109
110 #define MSG_OSD_EC_WRITE       108
111 #define MSG_OSD_EC_WRITE_REPLY 109
112 #define MSG_OSD_EC_READ        110
113 #define MSG_OSD_EC_READ_REPLY  111
114
115 #define MSG_OSD_REPOP         112
116 #define MSG_OSD_REPOPREPLY    113
117 #define MSG_OSD_PG_UPDATE_LOG_MISSING  114
118 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY  115
119
120 #define MSG_OSD_PG_CREATED      116
121 #define MSG_OSD_REP_SCRUBMAP    117
122 #define MSG_OSD_PG_RECOVERY_DELETE 118
123 #define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
124
125 // *** MDS ***
126
127 #define MSG_MDS_BEACON             100  // to monitor
128 #define MSG_MDS_SLAVE_REQUEST      101
129 #define MSG_MDS_TABLE_REQUEST      102
130
131                                 // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
132
133 #define MSG_MDS_RESOLVE            0x200
134 #define MSG_MDS_RESOLVEACK         0x201
135 #define MSG_MDS_CACHEREJOIN        0x202
136 #define MSG_MDS_DISCOVER           0x203
137 #define MSG_MDS_DISCOVERREPLY      0x204
138 #define MSG_MDS_INODEUPDATE        0x205
139 #define MSG_MDS_DIRUPDATE          0x206
140 #define MSG_MDS_CACHEEXPIRE        0x207
141 #define MSG_MDS_DENTRYUNLINK       0x208
142 #define MSG_MDS_FRAGMENTNOTIFY     0x209
143 #define MSG_MDS_OFFLOAD_TARGETS    0x20a
144 #define MSG_MDS_DENTRYLINK         0x20c
145 #define MSG_MDS_FINDINO            0x20d
146 #define MSG_MDS_FINDINOREPLY       0x20e
147 #define MSG_MDS_OPENINO            0x20f
148 #define MSG_MDS_OPENINOREPLY       0x210
149
150 #define MSG_MDS_LOCK               0x300
151 #define MSG_MDS_INODEFILECAPS      0x301
152
153 #define MSG_MDS_EXPORTDIRDISCOVER     0x449
154 #define MSG_MDS_EXPORTDIRDISCOVERACK  0x450
155 #define MSG_MDS_EXPORTDIRCANCEL       0x451
156 #define MSG_MDS_EXPORTDIRPREP         0x452
157 #define MSG_MDS_EXPORTDIRPREPACK      0x453
158 #define MSG_MDS_EXPORTDIRWARNING      0x454
159 #define MSG_MDS_EXPORTDIRWARNINGACK   0x455
160 #define MSG_MDS_EXPORTDIR             0x456
161 #define MSG_MDS_EXPORTDIRACK          0x457
162 #define MSG_MDS_EXPORTDIRNOTIFY       0x458
163 #define MSG_MDS_EXPORTDIRNOTIFYACK    0x459
164 #define MSG_MDS_EXPORTDIRFINISH       0x460
165
166 #define MSG_MDS_EXPORTCAPS            0x470
167 #define MSG_MDS_EXPORTCAPSACK         0x471
168 #define MSG_MDS_GATHERCAPS            0x472
169
170 #define MSG_MDS_HEARTBEAT          0x500  // for mds load balancer
171
172 // *** generic ***
173 #define MSG_TIMECHECK             0x600
174 #define MSG_MON_HEALTH            0x601
175
176 // *** Message::encode() crcflags bits ***
177 #define MSG_CRC_DATA           (1 << 0)
178 #define MSG_CRC_HEADER         (1 << 1)
179 #define MSG_CRC_ALL            (MSG_CRC_DATA | MSG_CRC_HEADER)
180
181 // Xio Testing
182 #define MSG_DATA_PING             0x602
183
184 // Xio intends to define messages 0x603..0x606
185
186 // Special
187 #define MSG_NOP                   0x607
188
189 #define MSG_MON_HEALTH_CHECKS     0x608
190
191 // *** ceph-mgr <-> OSD/MDS daemons ***
192 #define MSG_MGR_OPEN              0x700
193 #define MSG_MGR_CONFIGURE         0x701
194 #define MSG_MGR_REPORT            0x702
195
196 // *** ceph-mgr <-> ceph-mon ***
197 #define MSG_MGR_BEACON            0x703
198
199 // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
200 #define MSG_MGR_MAP               0x704
201
202 // *** ceph-mon(MgrMonitor) -> ceph-mgr
203 #define MSG_MGR_DIGEST               0x705
204 // *** cephmgr -> ceph-mon
205 #define MSG_MON_MGR_REPORT        0x706
206 #define MSG_SERVICE_MAP           0x707
207
208 // ======================================================
209
210 // abstract Message class
211
212 namespace bi = boost::intrusive;
213
214 // XioMessenger conditional trace flags
215 #define MSG_MAGIC_XIO          0x0002
216 #define MSG_MAGIC_TRACE_XCON   0x0004
217 #define MSG_MAGIC_TRACE_DTOR   0x0008
218 #define MSG_MAGIC_TRACE_HDR    0x0010
219 #define MSG_MAGIC_TRACE_XIO    0x0020
220 #define MSG_MAGIC_TRACE_XMSGR  0x0040
221 #define MSG_MAGIC_TRACE_CTR    0x0080
222
223 // XioMessenger diagnostic "ping pong" flag (resend msg when send completes)
224 #define MSG_MAGIC_REDUPE       0x0100
225
226 class Message : public RefCountedObject {
227 protected:
228   ceph_msg_header  header;      // headerelope
229   ceph_msg_footer  footer;
230   bufferlist       payload;  // "front" unaligned blob
231   bufferlist       middle;   // "middle" unaligned blob
232   bufferlist       data;     // data payload (page-alignment will be preserved where possible)
233
234   /* recv_stamp is set when the Messenger starts reading the
235    * Message off the wire */
236   utime_t recv_stamp;
237   /* dispatch_stamp is set when the Messenger starts calling dispatch() on
238    * its endpoints */
239   utime_t dispatch_stamp;
240   /* throttle_stamp is the point at which we got throttle */
241   utime_t throttle_stamp;
242   /* time at which message was fully read */
243   utime_t recv_complete_stamp;
244
245   ConnectionRef connection;
246
247   uint32_t magic = 0;
248
249   bi::list_member_hook<> dispatch_q;
250
251 public:
252   // zipkin tracing
253   ZTracer::Trace trace;
254   void encode_trace(bufferlist &bl, uint64_t features) const;
255   void decode_trace(bufferlist::iterator &p, bool create = false);
256
257   class CompletionHook : public Context {
258   protected:
259     Message *m;
260     friend class Message;
261   public:
262     explicit CompletionHook(Message *_m) : m(_m) {}
263     virtual void set_message(Message *_m) { m = _m; }
264   };
265
266   typedef bi::list< Message,
267                     bi::member_hook< Message,
268                                      bi::list_member_hook<>,
269                                      &Message::dispatch_q > > Queue;
270
271 protected:
272   CompletionHook* completion_hook = nullptr; // owned by Messenger
273
274   // release our size in bytes back to this throttler when our payload
275   // is adjusted or when we are destroyed.
276   Throttle *byte_throttler = nullptr;
277
278   // release a count back to this throttler when we are destroyed
279   Throttle *msg_throttler = nullptr;
280
281   // keep track of how big this message was when we reserved space in
282   // the msgr dispatch_throttler, so that we can properly release it
283   // later.  this is necessary because messages can enter the dispatch
284   // queue locally (not via read_message()), and those are not
285   // currently throttled.
286   uint64_t dispatch_throttle_size = 0;
287
288   friend class Messenger;
289
290 public:
291   Message() {
292     memset(&header, 0, sizeof(header));
293     memset(&footer, 0, sizeof(footer));
294   }
295   Message(int t, int version=1, int compat_version=0) {
296     memset(&header, 0, sizeof(header));
297     header.type = t;
298     header.version = version;
299     header.compat_version = compat_version;
300     header.priority = 0;  // undef
301     header.data_off = 0;
302     memset(&footer, 0, sizeof(footer));
303   }
304
305   Message *get() {
306     return static_cast<Message *>(RefCountedObject::get());
307   }
308
309 protected:
310   ~Message() override {
311     if (byte_throttler)
312       byte_throttler->put(payload.length() + middle.length() + data.length());
313     release_message_throttle();
314     trace.event("message destructed");
315     /* call completion hooks (if any) */
316     if (completion_hook)
317       completion_hook->complete(0);
318   }
319 public:
320   const ConnectionRef& get_connection() const { return connection; }
321   void set_connection(const ConnectionRef& c) {
322     connection = c;
323   }
324   CompletionHook* get_completion_hook() { return completion_hook; }
325   void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
326   void set_byte_throttler(Throttle *t) { byte_throttler = t; }
327   Throttle *get_byte_throttler() { return byte_throttler; }
328   void set_message_throttler(Throttle *t) { msg_throttler = t; }
329   Throttle *get_message_throttler() { return msg_throttler; }
330
331   void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
332   uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
333
334   const ceph_msg_header &get_header() const { return header; }
335   ceph_msg_header &get_header() { return header; }
336   void set_header(const ceph_msg_header &e) { header = e; }
337   void set_footer(const ceph_msg_footer &e) { footer = e; }
338   const ceph_msg_footer &get_footer() const { return footer; }
339   ceph_msg_footer &get_footer() { return footer; }
340   void set_src(const entity_name_t& src) { header.src = src; }
341
342   uint32_t get_magic() const { return magic; }
343   void set_magic(int _magic) { magic = _magic; }
344
345   /*
346    * If you use get_[data, middle, payload] you shouldn't
347    * use it to change those bufferlists unless you KNOW
348    * there is no throttle being used. The other
349    * functions are throttling-aware as appropriate.
350    */
351
352   void clear_payload() {
353     if (byte_throttler) {
354       byte_throttler->put(payload.length() + middle.length());
355     }
356     payload.clear();
357     middle.clear();
358   }
359
360   virtual void clear_buffers() {}
361   void clear_data() {
362     if (byte_throttler)
363       byte_throttler->put(data.length());
364     data.clear();
365     clear_buffers(); // let subclass drop buffers as well
366   }
367   void release_message_throttle() {
368     if (msg_throttler)
369       msg_throttler->put();
370     msg_throttler = nullptr;
371   }
372
373   bool empty_payload() const { return payload.length() == 0; }
374   bufferlist& get_payload() { return payload; }
375   void set_payload(bufferlist& bl) {
376     if (byte_throttler)
377       byte_throttler->put(payload.length());
378     payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
379     if (byte_throttler)
380       byte_throttler->take(payload.length());
381   }
382
383   void set_middle(bufferlist& bl) {
384     if (byte_throttler)
385       byte_throttler->put(middle.length());
386     middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
387     if (byte_throttler)
388       byte_throttler->take(middle.length());
389   }
390   bufferlist& get_middle() { return middle; }
391
392   void set_data(const bufferlist &bl) {
393     if (byte_throttler)
394       byte_throttler->put(data.length());
395     data.share(bl);
396     if (byte_throttler)
397       byte_throttler->take(data.length());
398   }
399
400   const bufferlist& get_data() const { return data; }
401   bufferlist& get_data() { return data; }
402   void claim_data(bufferlist& bl,
403                   unsigned int flags = buffer::list::CLAIM_DEFAULT) {
404     if (byte_throttler)
405       byte_throttler->put(data.length());
406     bl.claim(data, flags);
407   }
408   off_t get_data_len() const { return data.length(); }
409
410   void set_recv_stamp(utime_t t) { recv_stamp = t; }
411   const utime_t& get_recv_stamp() const { return recv_stamp; }
412   void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
413   const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
414   void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
415   const utime_t& get_throttle_stamp() const { return throttle_stamp; }
416   void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
417   const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
418
419   void calc_header_crc() {
420     header.crc = ceph_crc32c(0, (unsigned char*)&header,
421                              sizeof(header) - sizeof(header.crc));
422   }
423   void calc_front_crc() {
424     footer.front_crc = payload.crc32c(0);
425     footer.middle_crc = middle.crc32c(0);
426   }
427   void calc_data_crc() {
428     footer.data_crc = data.crc32c(0);
429   }
430
431   virtual int get_cost() const {
432     return data.length();
433   }
434
435   // type
436   int get_type() const { return header.type; }
437   void set_type(int t) { header.type = t; }
438
439   uint64_t get_tid() const { return header.tid; }
440   void set_tid(uint64_t t) { header.tid = t; }
441
442   uint64_t get_seq() const { return header.seq; }
443   void set_seq(uint64_t s) { header.seq = s; }
444
445   unsigned get_priority() const { return header.priority; }
446   void set_priority(__s16 p) { header.priority = p; }
447
448   // source/dest
449   entity_inst_t get_source_inst() const {
450     return entity_inst_t(get_source(), get_source_addr());
451   }
452   entity_name_t get_source() const {
453     return entity_name_t(header.src);
454   }
455   entity_addr_t get_source_addr() const {
456     if (connection)
457       return connection->get_peer_addr();
458     return entity_addr_t();
459   }
460
461   // forwarded?
462   entity_inst_t get_orig_source_inst() const {
463     return get_source_inst();
464   }
465   entity_name_t get_orig_source() const {
466     return get_source();
467   }
468   entity_addr_t get_orig_source_addr() const {
469     return get_source_addr();
470   }
471
472   // virtual bits
473   virtual void decode_payload() = 0;
474   virtual void encode_payload(uint64_t features) = 0;
475   virtual const char *get_type_name() const = 0;
476   virtual void print(ostream& out) const {
477     out << get_type_name() << " magic: " << magic;
478   }
479
480   virtual void dump(Formatter *f) const;
481
482   void encode(uint64_t features, int crcflags);
483 };
484 typedef boost::intrusive_ptr<Message> MessageRef;
485
486 extern Message *decode_message(CephContext *cct, int crcflags,
487                                ceph_msg_header &header,
488                                ceph_msg_footer& footer, bufferlist& front,
489                                bufferlist& middle, bufferlist& data,
490                                Connection* conn);
491 inline ostream& operator<<(ostream& out, const Message& m) {
492   m.print(out);
493   if (m.get_header().version)
494     out << " v" << m.get_header().version;
495   return out;
496 }
497
498 extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
499 extern Message *decode_message(CephContext *cct, int crcflags,
500                                bufferlist::iterator& bl);
501
502 #endif