Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / Message.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3
4 #ifdef ENCODE_DUMP
5 # include <typeinfo>
6 # include <cxxabi.h>
7 #endif
8
9 #include <iostream>
10 using namespace std;
11
12 #include "include/types.h"
13
14 #include "global/global_context.h"
15
16 #include "Message.h"
17
18 #include "messages/MPGStats.h"
19
20 #include "messages/MGenericMessage.h"
21
22 #include "messages/MPGStatsAck.h"
23
24 #include "messages/MStatfs.h"
25 #include "messages/MStatfsReply.h"
26
27 #include "messages/MGetPoolStats.h"
28 #include "messages/MGetPoolStatsReply.h"
29
30
31 #include "messages/MPoolOp.h"
32 #include "messages/MPoolOpReply.h"
33
34 #include "messages/PaxosServiceMessage.h"
35 #include "messages/MMonCommand.h"
36 #include "messages/MMonCommandAck.h"
37 #include "messages/MMonPaxos.h"
38
39 #include "messages/MMonProbe.h"
40 #include "messages/MMonJoin.h"
41 #include "messages/MMonElection.h"
42 #include "messages/MMonSync.h"
43 #include "messages/MMonScrub.h"
44
45 #include "messages/MLog.h"
46 #include "messages/MLogAck.h"
47
48 #include "messages/MPing.h"
49
50 #include "messages/MCommand.h"
51 #include "messages/MCommandReply.h"
52 #include "messages/MBackfillReserve.h"
53 #include "messages/MRecoveryReserve.h"
54
55 #include "messages/MRoute.h"
56 #include "messages/MForward.h"
57
58 #include "messages/MOSDBoot.h"
59 #include "messages/MOSDAlive.h"
60 #include "messages/MOSDBeacon.h"
61 #include "messages/MOSDPGTemp.h"
62 #include "messages/MOSDFailure.h"
63 #include "messages/MOSDMarkMeDown.h"
64 #include "messages/MOSDFull.h"
65 #include "messages/MOSDPing.h"
66 #include "messages/MOSDOp.h"
67 #include "messages/MOSDOpReply.h"
68 #include "messages/MOSDSubOp.h"
69 #include "messages/MOSDSubOpReply.h"
70 #include "messages/MOSDRepOp.h"
71 #include "messages/MOSDRepOpReply.h"
72 #include "messages/MOSDMap.h"
73 #include "messages/MMonGetOSDMap.h"
74
75 #include "messages/MOSDPGCreated.h"
76 #include "messages/MOSDPGNotify.h"
77 #include "messages/MOSDPGQuery.h"
78 #include "messages/MOSDPGLog.h"
79 #include "messages/MOSDPGRemove.h"
80 #include "messages/MOSDPGInfo.h"
81 #include "messages/MOSDPGCreate.h"
82 #include "messages/MOSDPGTrim.h"
83 #include "messages/MOSDScrub.h"
84 #include "messages/MOSDScrubReserve.h"
85 #include "messages/MOSDRepScrub.h"
86 #include "messages/MOSDRepScrubMap.h"
87 #include "messages/MOSDForceRecovery.h"
88 #include "messages/MOSDPGScan.h"
89 #include "messages/MOSDPGBackfill.h"
90 #include "messages/MOSDBackoff.h"
91 #include "messages/MOSDPGBackfillRemove.h"
92 #include "messages/MOSDPGRecoveryDelete.h"
93 #include "messages/MOSDPGRecoveryDeleteReply.h"
94
95 #include "messages/MRemoveSnaps.h"
96
97 #include "messages/MMonMap.h"
98 #include "messages/MMonGetMap.h"
99 #include "messages/MMonGetVersion.h"
100 #include "messages/MMonGetVersionReply.h"
101 #include "messages/MMonHealth.h"
102 #include "messages/MMonHealthChecks.h"
103 #include "messages/MMonMetadata.h"
104 #include "messages/MDataPing.h"
105 #include "messages/MAuth.h"
106 #include "messages/MAuthReply.h"
107 #include "messages/MMonSubscribe.h"
108 #include "messages/MMonSubscribeAck.h"
109 #include "messages/MMonGlobalID.h"
110 #include "messages/MClientSession.h"
111 #include "messages/MClientReconnect.h"
112 #include "messages/MClientRequest.h"
113 #include "messages/MClientRequestForward.h"
114 #include "messages/MClientReply.h"
115 #include "messages/MClientCaps.h"
116 #include "messages/MClientCapRelease.h"
117 #include "messages/MClientLease.h"
118 #include "messages/MClientSnap.h"
119 #include "messages/MClientQuota.h"
120
121 #include "messages/MMDSSlaveRequest.h"
122
123 #include "messages/MMDSMap.h"
124 #include "messages/MFSMap.h"
125 #include "messages/MFSMapUser.h"
126 #include "messages/MMDSBeacon.h"
127 #include "messages/MMDSLoadTargets.h"
128 #include "messages/MMDSResolve.h"
129 #include "messages/MMDSResolveAck.h"
130 #include "messages/MMDSCacheRejoin.h"
131 #include "messages/MMDSFindIno.h"
132 #include "messages/MMDSFindInoReply.h"
133 #include "messages/MMDSOpenIno.h"
134 #include "messages/MMDSOpenInoReply.h"
135
136 #include "messages/MDirUpdate.h"
137 #include "messages/MDiscover.h"
138 #include "messages/MDiscoverReply.h"
139
140 #include "messages/MMDSFragmentNotify.h"
141
142 #include "messages/MExportDirDiscover.h"
143 #include "messages/MExportDirDiscoverAck.h"
144 #include "messages/MExportDirCancel.h"
145 #include "messages/MExportDirPrep.h"
146 #include "messages/MExportDirPrepAck.h"
147 #include "messages/MExportDir.h"
148 #include "messages/MExportDirAck.h"
149 #include "messages/MExportDirNotify.h"
150 #include "messages/MExportDirNotifyAck.h"
151 #include "messages/MExportDirFinish.h"
152
153 #include "messages/MExportCaps.h"
154 #include "messages/MExportCapsAck.h"
155 #include "messages/MGatherCaps.h"
156
157
158 #include "messages/MDentryUnlink.h"
159 #include "messages/MDentryLink.h"
160
161 #include "messages/MHeartbeat.h"
162
163 #include "messages/MMDSTableRequest.h"
164
165 //#include "messages/MInodeUpdate.h"
166 #include "messages/MCacheExpire.h"
167 #include "messages/MInodeFileCaps.h"
168
169 #include "messages/MMgrBeacon.h"
170 #include "messages/MMgrMap.h"
171 #include "messages/MMgrDigest.h"
172 #include "messages/MMgrReport.h"
173 #include "messages/MMgrOpen.h"
174 #include "messages/MMgrConfigure.h"
175 #include "messages/MMonMgrReport.h"
176 #include "messages/MServiceMap.h"
177
178 #include "messages/MLock.h"
179
180 #include "messages/MWatchNotify.h"
181 #include "messages/MTimeCheck.h"
182
183 #include "common/config.h"
184
185 #include "messages/MOSDPGPush.h"
186 #include "messages/MOSDPGPushReply.h"
187 #include "messages/MOSDPGPull.h"
188
189 #include "messages/MOSDECSubOpWrite.h"
190 #include "messages/MOSDECSubOpWriteReply.h"
191 #include "messages/MOSDECSubOpRead.h"
192 #include "messages/MOSDECSubOpReadReply.h"
193
194 #include "messages/MOSDPGUpdateLogMissing.h"
195 #include "messages/MOSDPGUpdateLogMissingReply.h"
196
197 #define DEBUGLVL  10    // debug level of output
198
199 #define dout_subsys ceph_subsys_ms
200
201 void Message::encode(uint64_t features, int crcflags)
202 {
203   // encode and copy out of *m
204   if (empty_payload()) {
205     assert(middle.length() == 0);
206     encode_payload(features);
207
208     if (byte_throttler) {
209       byte_throttler->take(payload.length() + middle.length());
210     }
211
212     // if the encoder didn't specify past compatibility, we assume it
213     // is incompatible.
214     if (header.compat_version == 0)
215       header.compat_version = header.version;
216   }
217   if (crcflags & MSG_CRC_HEADER)
218     calc_front_crc();
219
220   // update envelope
221   header.front_len = get_payload().length();
222   header.middle_len = get_middle().length();
223   header.data_len = get_data().length();
224   if (crcflags & MSG_CRC_HEADER)
225     calc_header_crc();
226
227   footer.flags = CEPH_MSG_FOOTER_COMPLETE;
228
229   if (crcflags & MSG_CRC_DATA) {
230     calc_data_crc();
231
232 #ifdef ENCODE_DUMP
233     bufferlist bl;
234     ::encode(get_header(), bl);
235
236     // dump the old footer format
237     ceph_msg_footer_old old_footer;
238     old_footer.front_crc = footer.front_crc;
239     old_footer.middle_crc = footer.middle_crc;
240     old_footer.data_crc = footer.data_crc;
241     old_footer.flags = footer.flags;
242     ::encode(old_footer, bl);
243
244     ::encode(get_payload(), bl);
245     ::encode(get_middle(), bl);
246     ::encode(get_data(), bl);
247
248     // this is almost an exponential backoff, except because we count
249     // bits we tend to sample things we encode later, which should be
250     // more representative.
251     static int i = 0;
252     i++;
253     int bits = 0;
254     for (unsigned t = i; t; bits++)
255       t &= t - 1;
256     if (bits <= 2) {
257       char fn[200];
258       int status;
259       snprintf(fn, sizeof(fn), ENCODE_STRINGIFY(ENCODE_DUMP) "/%s__%d.%x",
260                abi::__cxa_demangle(typeid(*this).name(), 0, 0, &status),
261                getpid(), i++);
262       int fd = ::open(fn, O_WRONLY|O_TRUNC|O_CREAT, 0644);
263       if (fd >= 0) {
264         bl.write_fd(fd);
265         ::close(fd);
266       }
267     }
268 #endif
269   } else {
270     footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC;
271   }
272 }
273
274 void Message::dump(Formatter *f) const
275 {
276   stringstream ss;
277   print(ss);
278   f->dump_string("summary", ss.str());
279 }
280
281 Message *decode_message(CephContext *cct, int crcflags,
282                         ceph_msg_header& header,
283                         ceph_msg_footer& footer,
284                         bufferlist& front, bufferlist& middle,
285                         bufferlist& data, Connection* conn)
286 {
287   // verify crc
288   if (crcflags & MSG_CRC_HEADER) {
289     __u32 front_crc = front.crc32c(0);
290     __u32 middle_crc = middle.crc32c(0);
291
292     if (front_crc != footer.front_crc) {
293       if (cct) {
294         ldout(cct, 0) << "bad crc in front " << front_crc << " != exp " << footer.front_crc << dendl;
295         ldout(cct, 20) << " ";
296         front.hexdump(*_dout);
297         *_dout << dendl;
298       }
299       return 0;
300     }
301     if (middle_crc != footer.middle_crc) {
302       if (cct) {
303         ldout(cct, 0) << "bad crc in middle " << middle_crc << " != exp " << footer.middle_crc << dendl;
304         ldout(cct, 20) << " ";
305         middle.hexdump(*_dout);
306         *_dout << dendl;
307       }
308       return 0;
309     }
310   }
311   if (crcflags & MSG_CRC_DATA) {
312     if ((footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0) {
313       __u32 data_crc = data.crc32c(0);
314       if (data_crc != footer.data_crc) {
315         if (cct) {
316           ldout(cct, 0) << "bad crc in data " << data_crc << " != exp " << footer.data_crc << dendl;
317           ldout(cct, 20) << " ";
318           data.hexdump(*_dout);
319           *_dout << dendl;
320         }
321         return 0;
322       }
323     }
324   }
325
326   // make message
327   Message *m = 0;
328   int type = header.type;
329   switch (type) {
330
331     // -- with payload --
332
333   case MSG_PGSTATS:
334     m = new MPGStats;
335     break;
336   case MSG_PGSTATSACK:
337     m = new MPGStatsAck;
338     break;
339
340   case CEPH_MSG_STATFS:
341     m = new MStatfs;
342     break;
343   case CEPH_MSG_STATFS_REPLY:
344     m = new MStatfsReply;
345     break;
346   case MSG_GETPOOLSTATS:
347     m = new MGetPoolStats;
348     break;
349   case MSG_GETPOOLSTATSREPLY:
350     m = new MGetPoolStatsReply;
351     break;
352   case CEPH_MSG_POOLOP:
353     m = new MPoolOp;
354     break;
355   case CEPH_MSG_POOLOP_REPLY:
356     m = new MPoolOpReply;
357     break;
358   case MSG_MON_COMMAND:
359     m = new MMonCommand;
360     break;
361   case MSG_MON_COMMAND_ACK:
362     m = new MMonCommandAck;
363     break;
364   case MSG_MON_PAXOS:
365     m = new MMonPaxos;
366     break;
367
368   case MSG_MON_PROBE:
369     m = new MMonProbe;
370     break;
371   case MSG_MON_JOIN:
372     m = new MMonJoin;
373     break;
374   case MSG_MON_ELECTION:
375     m = new MMonElection;
376     break;
377   case MSG_MON_SYNC:
378     m = new MMonSync;
379     break;
380   case MSG_MON_SCRUB:
381     m = new MMonScrub;
382     break;
383
384   case MSG_LOG:
385     m = new MLog;
386     break;
387   case MSG_LOGACK:
388     m = new MLogAck;
389     break;
390
391   case CEPH_MSG_PING:
392     m = new MPing();
393     break;
394   case MSG_COMMAND:
395     m = new MCommand;
396     break;
397   case MSG_COMMAND_REPLY:
398     m = new MCommandReply;
399     break;
400   case MSG_OSD_BACKFILL_RESERVE:
401     m = new MBackfillReserve;
402     break;
403   case MSG_OSD_RECOVERY_RESERVE:
404     m = new MRecoveryReserve;
405     break;
406   case MSG_OSD_FORCE_RECOVERY:
407     m = new MOSDForceRecovery;
408     break;
409
410   case MSG_ROUTE:
411     m = new MRoute;
412     break;
413   case MSG_FORWARD:
414     m = new MForward;
415     break;
416     
417   case CEPH_MSG_MON_MAP:
418     m = new MMonMap;
419     break;
420   case CEPH_MSG_MON_GET_MAP:
421     m = new MMonGetMap;
422     break;
423   case CEPH_MSG_MON_GET_OSDMAP:
424     m = new MMonGetOSDMap;
425     break;
426   case CEPH_MSG_MON_GET_VERSION:
427     m = new MMonGetVersion();
428     break;
429   case CEPH_MSG_MON_GET_VERSION_REPLY:
430     m = new MMonGetVersionReply();
431     break;
432   case CEPH_MSG_MON_METADATA:
433     m = new MMonMetadata();
434     break;
435
436   case MSG_OSD_BOOT:
437     m = new MOSDBoot();
438     break;
439   case MSG_OSD_ALIVE:
440     m = new MOSDAlive();
441     break;
442   case MSG_OSD_BEACON:
443     m = new MOSDBeacon();
444     break;
445   case MSG_OSD_PGTEMP:
446     m = new MOSDPGTemp;
447     break;
448   case MSG_OSD_FAILURE:
449     m = new MOSDFailure();
450     break;
451   case MSG_OSD_MARK_ME_DOWN:
452     m = new MOSDMarkMeDown();
453     break;
454   case MSG_OSD_FULL:
455     m = new MOSDFull();
456     break;
457   case MSG_OSD_PING:
458     m = new MOSDPing();
459     break;
460   case CEPH_MSG_OSD_OP:
461     m = new MOSDOp();
462     break;
463   case CEPH_MSG_OSD_OPREPLY:
464     m = new MOSDOpReply();
465     break;
466   case MSG_OSD_SUBOP:
467     m = new MOSDSubOp();
468     break;
469   case MSG_OSD_SUBOPREPLY:
470     m = new MOSDSubOpReply();
471     break;
472   case MSG_OSD_REPOP:
473     m = new MOSDRepOp();
474     break;
475   case MSG_OSD_REPOPREPLY:
476     m = new MOSDRepOpReply();
477     break;
478   case MSG_OSD_PG_CREATED:
479     m = new MOSDPGCreated();
480     break;
481   case MSG_OSD_PG_UPDATE_LOG_MISSING:
482     m = new MOSDPGUpdateLogMissing();
483     break;
484   case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
485     m = new MOSDPGUpdateLogMissingReply();
486     break;
487   case CEPH_MSG_OSD_BACKOFF:
488     m = new MOSDBackoff;
489     break;
490
491   case CEPH_MSG_OSD_MAP:
492     m = new MOSDMap;
493     break;
494
495   case CEPH_MSG_WATCH_NOTIFY:
496     m = new MWatchNotify;
497     break;
498
499   case MSG_OSD_PG_NOTIFY:
500     m = new MOSDPGNotify;
501     break;
502   case MSG_OSD_PG_QUERY:
503     m = new MOSDPGQuery;
504     break;
505   case MSG_OSD_PG_LOG:
506     m = new MOSDPGLog;
507     break;
508   case MSG_OSD_PG_REMOVE:
509     m = new MOSDPGRemove;
510     break;
511   case MSG_OSD_PG_INFO:
512     m = new MOSDPGInfo;
513     break;
514   case MSG_OSD_PG_CREATE:
515     m = new MOSDPGCreate;
516     break;
517   case MSG_OSD_PG_TRIM:
518     m = new MOSDPGTrim;
519     break;
520
521   case MSG_OSD_SCRUB:
522     m = new MOSDScrub;
523     break;
524   case MSG_OSD_SCRUB_RESERVE:
525     m = new MOSDScrubReserve;
526     break;
527   case MSG_REMOVE_SNAPS:
528     m = new MRemoveSnaps;
529     break;
530   case MSG_OSD_REP_SCRUB:
531     m = new MOSDRepScrub;
532     break;
533   case MSG_OSD_REP_SCRUBMAP:
534     m = new MOSDRepScrubMap;
535     break;
536   case MSG_OSD_PG_SCAN:
537     m = new MOSDPGScan;
538     break;
539   case MSG_OSD_PG_BACKFILL:
540     m = new MOSDPGBackfill;
541     break;
542   case MSG_OSD_PG_BACKFILL_REMOVE:
543     m = new MOSDPGBackfillRemove;
544     break;
545   case MSG_OSD_PG_PUSH:
546     m = new MOSDPGPush;
547     break;
548   case MSG_OSD_PG_PULL:
549     m = new MOSDPGPull;
550     break;
551   case MSG_OSD_PG_PUSH_REPLY:
552     m = new MOSDPGPushReply;
553     break;
554   case MSG_OSD_PG_RECOVERY_DELETE:
555     m = new MOSDPGRecoveryDelete;
556     break;
557   case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
558     m = new MOSDPGRecoveryDeleteReply;
559     break;
560   case MSG_OSD_EC_WRITE:
561     m = new MOSDECSubOpWrite;
562     break;
563   case MSG_OSD_EC_WRITE_REPLY:
564     m = new MOSDECSubOpWriteReply;
565     break;
566   case MSG_OSD_EC_READ:
567     m = new MOSDECSubOpRead;
568     break;
569   case MSG_OSD_EC_READ_REPLY:
570     m = new MOSDECSubOpReadReply;
571     break;
572    // auth
573   case CEPH_MSG_AUTH:
574     m = new MAuth;
575     break;
576   case CEPH_MSG_AUTH_REPLY:
577     m = new MAuthReply;
578     break;
579
580   case MSG_MON_GLOBAL_ID:
581     m = new MMonGlobalID;
582     break; 
583
584     // clients
585   case CEPH_MSG_MON_SUBSCRIBE:
586     m = new MMonSubscribe;
587     break;
588   case CEPH_MSG_MON_SUBSCRIBE_ACK:
589     m = new MMonSubscribeAck;
590     break;
591   case CEPH_MSG_CLIENT_SESSION:
592     m = new MClientSession;
593     break;
594   case CEPH_MSG_CLIENT_RECONNECT:
595     m = new MClientReconnect;
596     break;
597   case CEPH_MSG_CLIENT_REQUEST:
598     m = new MClientRequest;
599     break;
600   case CEPH_MSG_CLIENT_REQUEST_FORWARD:
601     m = new MClientRequestForward;
602     break;
603   case CEPH_MSG_CLIENT_REPLY:
604     m = new MClientReply;
605     break;
606   case CEPH_MSG_CLIENT_CAPS:
607     m = new MClientCaps;
608     break;
609   case CEPH_MSG_CLIENT_CAPRELEASE:
610     m = new MClientCapRelease;
611     break;
612   case CEPH_MSG_CLIENT_LEASE:
613     m = new MClientLease;
614     break;
615   case CEPH_MSG_CLIENT_SNAP:
616     m = new MClientSnap;
617     break;
618   case CEPH_MSG_CLIENT_QUOTA:
619     m = new MClientQuota;
620     break;
621
622     // mds
623   case MSG_MDS_SLAVE_REQUEST:
624     m = new MMDSSlaveRequest;
625     break;
626
627   case CEPH_MSG_MDS_MAP:
628     m = new MMDSMap;
629     break;
630   case CEPH_MSG_FS_MAP:
631     m = new MFSMap;
632     break;
633   case CEPH_MSG_FS_MAP_USER:
634     m = new MFSMapUser;
635     break;
636   case MSG_MDS_BEACON:
637     m = new MMDSBeacon;
638     break;
639   case MSG_MDS_OFFLOAD_TARGETS:
640     m = new MMDSLoadTargets;
641     break;
642   case MSG_MDS_RESOLVE:
643     m = new MMDSResolve;
644     break;
645   case MSG_MDS_RESOLVEACK:
646     m = new MMDSResolveAck;
647     break;
648   case MSG_MDS_CACHEREJOIN:
649     m = new MMDSCacheRejoin;
650         break;
651   
652   case MSG_MDS_DIRUPDATE:
653     m = new MDirUpdate();
654     break;
655
656   case MSG_MDS_DISCOVER:
657     m = new MDiscover();
658     break;
659   case MSG_MDS_DISCOVERREPLY:
660     m = new MDiscoverReply();
661     break;
662
663   case MSG_MDS_FINDINO:
664     m = new MMDSFindIno;
665     break;
666   case MSG_MDS_FINDINOREPLY:
667     m = new MMDSFindInoReply;
668     break;
669
670   case MSG_MDS_OPENINO:
671     m = new MMDSOpenIno;
672     break;
673   case MSG_MDS_OPENINOREPLY:
674     m = new MMDSOpenInoReply;
675     break;
676
677   case MSG_MDS_FRAGMENTNOTIFY:
678     m = new MMDSFragmentNotify;
679     break;
680
681   case MSG_MDS_EXPORTDIRDISCOVER:
682     m = new MExportDirDiscover();
683     break;
684   case MSG_MDS_EXPORTDIRDISCOVERACK:
685     m = new MExportDirDiscoverAck();
686     break;
687   case MSG_MDS_EXPORTDIRCANCEL:
688     m = new MExportDirCancel();
689     break;
690
691   case MSG_MDS_EXPORTDIR:
692     m = new MExportDir;
693     break;
694   case MSG_MDS_EXPORTDIRACK:
695     m = new MExportDirAck;
696     break;
697   case MSG_MDS_EXPORTDIRFINISH:
698     m = new MExportDirFinish;
699     break;
700
701   case MSG_MDS_EXPORTDIRNOTIFY:
702     m = new MExportDirNotify();
703     break;
704
705   case MSG_MDS_EXPORTDIRNOTIFYACK:
706     m = new MExportDirNotifyAck();
707     break;
708
709   case MSG_MDS_EXPORTDIRPREP:
710     m = new MExportDirPrep();
711     break;
712
713   case MSG_MDS_EXPORTDIRPREPACK:
714     m = new MExportDirPrepAck();
715     break;
716
717   case MSG_MDS_EXPORTCAPS:
718     m = new MExportCaps;
719     break;
720   case MSG_MDS_EXPORTCAPSACK:
721     m = new MExportCapsAck;
722     break;
723   case MSG_MDS_GATHERCAPS:
724     m = new MGatherCaps;
725     break;
726
727
728   case MSG_MDS_DENTRYUNLINK:
729     m = new MDentryUnlink;
730     break;
731   case MSG_MDS_DENTRYLINK:
732     m = new MDentryLink;
733     break;
734
735   case MSG_MDS_HEARTBEAT:
736     m = new MHeartbeat();
737     break;
738
739   case MSG_MDS_CACHEEXPIRE:
740     m = new MCacheExpire();
741     break;
742
743   case MSG_MDS_TABLE_REQUEST:
744     m = new MMDSTableRequest;
745     break;
746
747         /*  case MSG_MDS_INODEUPDATE:
748     m = new MInodeUpdate();
749     break;
750         */
751
752   case MSG_MDS_INODEFILECAPS:
753     m = new MInodeFileCaps();
754     break;
755
756   case MSG_MDS_LOCK:
757     m = new MLock();
758     break;
759
760   case MSG_MGR_BEACON:
761     m = new MMgrBeacon();
762     break;
763
764   case MSG_MON_MGR_REPORT:
765     m = new MMonMgrReport();
766     break;
767
768   case MSG_SERVICE_MAP:
769     m = new MServiceMap();
770     break;
771
772   case MSG_MGR_MAP:
773     m = new MMgrMap();
774     break;
775
776   case MSG_MGR_DIGEST:
777     m = new MMgrDigest();
778     break;
779
780   case MSG_MGR_OPEN:
781     m = new MMgrOpen();
782     break;
783
784   case MSG_MGR_REPORT:
785     m = new MMgrReport();
786     break;
787
788   case MSG_MGR_CONFIGURE:
789     m = new MMgrConfigure();
790     break;
791
792   case MSG_TIMECHECK:
793     m = new MTimeCheck();
794     break;
795
796   case MSG_MON_HEALTH:
797     m = new MMonHealth();
798     break;
799
800   case MSG_MON_HEALTH_CHECKS:
801     m = new MMonHealthChecks();
802     break;
803
804 #if defined(HAVE_XIO)
805   case MSG_DATA_PING:
806     m = new MDataPing();
807     break;
808 #endif
809     // -- simple messages without payload --
810
811   case CEPH_MSG_SHUTDOWN:
812     m = new MGenericMessage(type);
813     break;
814
815   default:
816     if (cct) {
817       ldout(cct, 0) << "can't decode unknown message type " << type << " MSG_AUTH=" << CEPH_MSG_AUTH << dendl;
818       if (cct->_conf->ms_die_on_bad_msg)
819         ceph_abort();
820     }
821     return 0;
822   }
823
824   m->set_cct(cct);
825
826   // m->header.version, if non-zero, should be populated with the
827   // newest version of the encoding the code supports.  If set, check
828   // it against compat_version.
829   if (m->get_header().version &&
830       m->get_header().version < header.compat_version) {
831     if (cct) {
832       ldout(cct, 0) << "will not decode message of type " << type
833                     << " version " << header.version
834                     << " because compat_version " << header.compat_version
835                     << " > supported version " << m->get_header().version << dendl;
836       if (cct->_conf->ms_die_on_bad_msg)
837         ceph_abort();
838     }
839     m->put();
840     return 0;
841   }
842
843   m->set_connection(conn);
844   m->set_header(header);
845   m->set_footer(footer);
846   m->set_payload(front);
847   m->set_middle(middle);
848   m->set_data(data);
849
850   try {
851     m->decode_payload();
852   }
853   catch (const buffer::error &e) {
854     if (cct) {
855       lderr(cct) << "failed to decode message of type " << type
856                  << " v" << header.version
857                  << ": " << e.what() << dendl;
858       ldout(cct, cct->_conf->ms_dump_corrupt_message_level) << "dump: \n";
859       m->get_payload().hexdump(*_dout);
860       *_dout << dendl;
861       if (cct->_conf->ms_die_on_bad_msg)
862         ceph_abort();
863     }
864     m->put();
865     return 0;
866   }
867
868   // done!
869   return m;
870 }
871
872 void Message::encode_trace(bufferlist &bl, uint64_t features) const
873 {
874   auto p = trace.get_info();
875   static const blkin_trace_info empty = { 0, 0, 0 };
876   if (!p) {
877     p = &empty;
878   }
879   ::encode(*p, bl);
880 }
881
882 void Message::decode_trace(bufferlist::iterator &p, bool create)
883 {
884   blkin_trace_info info = {};
885   ::decode(info, p);
886
887 #ifdef WITH_BLKIN
888   if (!connection)
889     return;
890
891   const auto msgr = connection->get_messenger();
892   const auto endpoint = msgr->get_trace_endpoint();
893   if (info.trace_id) {
894     trace.init(get_type_name(), endpoint, &info, true);
895     trace.event("decoded trace");
896   } else if (create || (msgr->get_myname().is_osd() &&
897                         msgr->cct->_conf->osd_blkin_trace_all)) {
898     // create a trace even if we didn't get one on the wire
899     trace.init(get_type_name(), endpoint);
900     trace.event("created trace");
901   }
902   trace.keyval("tid", get_tid());
903   trace.keyval("entity type", get_source().type_str());
904   trace.keyval("entity num", get_source().num());
905 #endif
906 }
907
908
909 // This routine is not used for ordinary messages, but only when encapsulating a message
910 // for forwarding and routing.  It's also used in a backward compatibility test, which only
911 // effectively tests backward compability for those functions.  To avoid backward compatibility
912 // problems, we currently always encode and decode using the old footer format that doesn't
913 // allow for message authentication.  Eventually we should fix that.  PLR
914
915 void encode_message(Message *msg, uint64_t features, bufferlist& payload)
916 {
917   bufferlist front, middle, data;
918   ceph_msg_footer_old old_footer;
919   ceph_msg_footer footer;
920   msg->encode(features, MSG_CRC_ALL);
921   ::encode(msg->get_header(), payload);
922
923   // Here's where we switch to the old footer format.  PLR
924
925   footer = msg->get_footer();
926   old_footer.front_crc = footer.front_crc;   
927   old_footer.middle_crc = footer.middle_crc;   
928   old_footer.data_crc = footer.data_crc;   
929   old_footer.flags = footer.flags;   
930   ::encode(old_footer, payload);
931
932   ::encode(msg->get_payload(), payload);
933   ::encode(msg->get_middle(), payload);
934   ::encode(msg->get_data(), payload);
935 }
936
937 // See above for somewhat bogus use of the old message footer.  We switch to the current footer
938 // after decoding the old one so the other form of decode_message() doesn't have to change.
939 // We've slipped in a 0 signature at this point, so any signature checking after this will
940 // fail.  PLR
941
942 Message *decode_message(CephContext *cct, int crcflags, bufferlist::iterator& p)
943 {
944   ceph_msg_header h;
945   ceph_msg_footer_old fo;
946   ceph_msg_footer f;
947   bufferlist fr, mi, da;
948   ::decode(h, p);
949   ::decode(fo, p);
950   f.front_crc = fo.front_crc;
951   f.middle_crc = fo.middle_crc;
952   f.data_crc = fo.data_crc;
953   f.flags = fo.flags;
954   f.sig = 0;
955   ::decode(fr, p);
956   ::decode(mi, p);
957   ::decode(da, p);
958   return decode_message(cct, crcflags, h, f, fr, mi, da, nullptr);
959 }
960