X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2FMessage.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2FMessage.cc;h=7763caf77cd579b5760ecb65b3717dac9cc3a49d;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/Message.cc b/src/ceph/src/msg/Message.cc new file mode 100644 index 0000000..7763caf --- /dev/null +++ b/src/ceph/src/msg/Message.cc @@ -0,0 +1,960 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifdef ENCODE_DUMP +# include +# include +#endif + +#include +using namespace std; + +#include "include/types.h" + +#include "global/global_context.h" + +#include "Message.h" + +#include "messages/MPGStats.h" + +#include "messages/MGenericMessage.h" + +#include "messages/MPGStatsAck.h" + +#include "messages/MStatfs.h" +#include "messages/MStatfsReply.h" + +#include "messages/MGetPoolStats.h" +#include "messages/MGetPoolStatsReply.h" + + +#include "messages/MPoolOp.h" +#include "messages/MPoolOpReply.h" + +#include "messages/PaxosServiceMessage.h" +#include "messages/MMonCommand.h" +#include "messages/MMonCommandAck.h" +#include "messages/MMonPaxos.h" + +#include "messages/MMonProbe.h" +#include "messages/MMonJoin.h" +#include "messages/MMonElection.h" +#include "messages/MMonSync.h" +#include "messages/MMonScrub.h" + +#include "messages/MLog.h" +#include "messages/MLogAck.h" + +#include "messages/MPing.h" + +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" +#include "messages/MBackfillReserve.h" +#include "messages/MRecoveryReserve.h" + +#include "messages/MRoute.h" +#include "messages/MForward.h" + +#include "messages/MOSDBoot.h" +#include "messages/MOSDAlive.h" +#include "messages/MOSDBeacon.h" +#include "messages/MOSDPGTemp.h" +#include "messages/MOSDFailure.h" +#include "messages/MOSDMarkMeDown.h" +#include "messages/MOSDFull.h" +#include "messages/MOSDPing.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "messages/MOSDSubOp.h" +#include "messages/MOSDSubOpReply.h" +#include "messages/MOSDRepOp.h" +#include "messages/MOSDRepOpReply.h" +#include "messages/MOSDMap.h" +#include "messages/MMonGetOSDMap.h" + +#include "messages/MOSDPGCreated.h" +#include "messages/MOSDPGNotify.h" +#include "messages/MOSDPGQuery.h" +#include "messages/MOSDPGLog.h" +#include "messages/MOSDPGRemove.h" +#include "messages/MOSDPGInfo.h" +#include "messages/MOSDPGCreate.h" +#include "messages/MOSDPGTrim.h" +#include "messages/MOSDScrub.h" +#include "messages/MOSDScrubReserve.h" +#include "messages/MOSDRepScrub.h" +#include "messages/MOSDRepScrubMap.h" +#include "messages/MOSDForceRecovery.h" +#include "messages/MOSDPGScan.h" +#include "messages/MOSDPGBackfill.h" +#include "messages/MOSDBackoff.h" +#include "messages/MOSDPGBackfillRemove.h" +#include "messages/MOSDPGRecoveryDelete.h" +#include "messages/MOSDPGRecoveryDeleteReply.h" + +#include "messages/MRemoveSnaps.h" + +#include "messages/MMonMap.h" +#include "messages/MMonGetMap.h" +#include "messages/MMonGetVersion.h" +#include "messages/MMonGetVersionReply.h" +#include "messages/MMonHealth.h" +#include "messages/MMonHealthChecks.h" +#include "messages/MMonMetadata.h" +#include "messages/MDataPing.h" +#include "messages/MAuth.h" +#include "messages/MAuthReply.h" +#include "messages/MMonSubscribe.h" +#include "messages/MMonSubscribeAck.h" +#include "messages/MMonGlobalID.h" +#include "messages/MClientSession.h" +#include "messages/MClientReconnect.h" +#include "messages/MClientRequest.h" +#include "messages/MClientRequestForward.h" +#include "messages/MClientReply.h" +#include "messages/MClientCaps.h" +#include "messages/MClientCapRelease.h" +#include "messages/MClientLease.h" +#include "messages/MClientSnap.h" +#include "messages/MClientQuota.h" + +#include "messages/MMDSSlaveRequest.h" + +#include "messages/MMDSMap.h" +#include "messages/MFSMap.h" +#include "messages/MFSMapUser.h" +#include "messages/MMDSBeacon.h" +#include "messages/MMDSLoadTargets.h" +#include "messages/MMDSResolve.h" +#include "messages/MMDSResolveAck.h" +#include "messages/MMDSCacheRejoin.h" +#include "messages/MMDSFindIno.h" +#include "messages/MMDSFindInoReply.h" +#include "messages/MMDSOpenIno.h" +#include "messages/MMDSOpenInoReply.h" + +#include "messages/MDirUpdate.h" +#include "messages/MDiscover.h" +#include "messages/MDiscoverReply.h" + +#include "messages/MMDSFragmentNotify.h" + +#include "messages/MExportDirDiscover.h" +#include "messages/MExportDirDiscoverAck.h" +#include "messages/MExportDirCancel.h" +#include "messages/MExportDirPrep.h" +#include "messages/MExportDirPrepAck.h" +#include "messages/MExportDir.h" +#include "messages/MExportDirAck.h" +#include "messages/MExportDirNotify.h" +#include "messages/MExportDirNotifyAck.h" +#include "messages/MExportDirFinish.h" + +#include "messages/MExportCaps.h" +#include "messages/MExportCapsAck.h" +#include "messages/MGatherCaps.h" + + +#include "messages/MDentryUnlink.h" +#include "messages/MDentryLink.h" + +#include "messages/MHeartbeat.h" + +#include "messages/MMDSTableRequest.h" + +//#include "messages/MInodeUpdate.h" +#include "messages/MCacheExpire.h" +#include "messages/MInodeFileCaps.h" + +#include "messages/MMgrBeacon.h" +#include "messages/MMgrMap.h" +#include "messages/MMgrDigest.h" +#include "messages/MMgrReport.h" +#include "messages/MMgrOpen.h" +#include "messages/MMgrConfigure.h" +#include "messages/MMonMgrReport.h" +#include "messages/MServiceMap.h" + +#include "messages/MLock.h" + +#include "messages/MWatchNotify.h" +#include "messages/MTimeCheck.h" + +#include "common/config.h" + +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDPGPull.h" + +#include "messages/MOSDECSubOpWrite.h" +#include "messages/MOSDECSubOpWriteReply.h" +#include "messages/MOSDECSubOpRead.h" +#include "messages/MOSDECSubOpReadReply.h" + +#include "messages/MOSDPGUpdateLogMissing.h" +#include "messages/MOSDPGUpdateLogMissingReply.h" + +#define DEBUGLVL 10 // debug level of output + +#define dout_subsys ceph_subsys_ms + +void Message::encode(uint64_t features, int crcflags) +{ + // encode and copy out of *m + if (empty_payload()) { + assert(middle.length() == 0); + encode_payload(features); + + if (byte_throttler) { + byte_throttler->take(payload.length() + middle.length()); + } + + // if the encoder didn't specify past compatibility, we assume it + // is incompatible. + if (header.compat_version == 0) + header.compat_version = header.version; + } + if (crcflags & MSG_CRC_HEADER) + calc_front_crc(); + + // update envelope + header.front_len = get_payload().length(); + header.middle_len = get_middle().length(); + header.data_len = get_data().length(); + if (crcflags & MSG_CRC_HEADER) + calc_header_crc(); + + footer.flags = CEPH_MSG_FOOTER_COMPLETE; + + if (crcflags & MSG_CRC_DATA) { + calc_data_crc(); + +#ifdef ENCODE_DUMP + bufferlist bl; + ::encode(get_header(), bl); + + // dump the old footer format + ceph_msg_footer_old old_footer; + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + old_footer.data_crc = footer.data_crc; + old_footer.flags = footer.flags; + ::encode(old_footer, bl); + + ::encode(get_payload(), bl); + ::encode(get_middle(), bl); + ::encode(get_data(), bl); + + // this is almost an exponential backoff, except because we count + // bits we tend to sample things we encode later, which should be + // more representative. + static int i = 0; + i++; + int bits = 0; + for (unsigned t = i; t; bits++) + t &= t - 1; + if (bits <= 2) { + char fn[200]; + int status; + snprintf(fn, sizeof(fn), ENCODE_STRINGIFY(ENCODE_DUMP) "/%s__%d.%x", + abi::__cxa_demangle(typeid(*this).name(), 0, 0, &status), + getpid(), i++); + int fd = ::open(fn, O_WRONLY|O_TRUNC|O_CREAT, 0644); + if (fd >= 0) { + bl.write_fd(fd); + ::close(fd); + } + } +#endif + } else { + footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC; + } +} + +void Message::dump(Formatter *f) const +{ + stringstream ss; + print(ss); + f->dump_string("summary", ss.str()); +} + +Message *decode_message(CephContext *cct, int crcflags, + ceph_msg_header& header, + ceph_msg_footer& footer, + bufferlist& front, bufferlist& middle, + bufferlist& data, Connection* conn) +{ + // verify crc + if (crcflags & MSG_CRC_HEADER) { + __u32 front_crc = front.crc32c(0); + __u32 middle_crc = middle.crc32c(0); + + if (front_crc != footer.front_crc) { + if (cct) { + ldout(cct, 0) << "bad crc in front " << front_crc << " != exp " << footer.front_crc << dendl; + ldout(cct, 20) << " "; + front.hexdump(*_dout); + *_dout << dendl; + } + return 0; + } + if (middle_crc != footer.middle_crc) { + if (cct) { + ldout(cct, 0) << "bad crc in middle " << middle_crc << " != exp " << footer.middle_crc << dendl; + ldout(cct, 20) << " "; + middle.hexdump(*_dout); + *_dout << dendl; + } + return 0; + } + } + if (crcflags & MSG_CRC_DATA) { + if ((footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0) { + __u32 data_crc = data.crc32c(0); + if (data_crc != footer.data_crc) { + if (cct) { + ldout(cct, 0) << "bad crc in data " << data_crc << " != exp " << footer.data_crc << dendl; + ldout(cct, 20) << " "; + data.hexdump(*_dout); + *_dout << dendl; + } + return 0; + } + } + } + + // make message + Message *m = 0; + int type = header.type; + switch (type) { + + // -- with payload -- + + case MSG_PGSTATS: + m = new MPGStats; + break; + case MSG_PGSTATSACK: + m = new MPGStatsAck; + break; + + case CEPH_MSG_STATFS: + m = new MStatfs; + break; + case CEPH_MSG_STATFS_REPLY: + m = new MStatfsReply; + break; + case MSG_GETPOOLSTATS: + m = new MGetPoolStats; + break; + case MSG_GETPOOLSTATSREPLY: + m = new MGetPoolStatsReply; + break; + case CEPH_MSG_POOLOP: + m = new MPoolOp; + break; + case CEPH_MSG_POOLOP_REPLY: + m = new MPoolOpReply; + break; + case MSG_MON_COMMAND: + m = new MMonCommand; + break; + case MSG_MON_COMMAND_ACK: + m = new MMonCommandAck; + break; + case MSG_MON_PAXOS: + m = new MMonPaxos; + break; + + case MSG_MON_PROBE: + m = new MMonProbe; + break; + case MSG_MON_JOIN: + m = new MMonJoin; + break; + case MSG_MON_ELECTION: + m = new MMonElection; + break; + case MSG_MON_SYNC: + m = new MMonSync; + break; + case MSG_MON_SCRUB: + m = new MMonScrub; + break; + + case MSG_LOG: + m = new MLog; + break; + case MSG_LOGACK: + m = new MLogAck; + break; + + case CEPH_MSG_PING: + m = new MPing(); + break; + case MSG_COMMAND: + m = new MCommand; + break; + case MSG_COMMAND_REPLY: + m = new MCommandReply; + break; + case MSG_OSD_BACKFILL_RESERVE: + m = new MBackfillReserve; + break; + case MSG_OSD_RECOVERY_RESERVE: + m = new MRecoveryReserve; + break; + case MSG_OSD_FORCE_RECOVERY: + m = new MOSDForceRecovery; + break; + + case MSG_ROUTE: + m = new MRoute; + break; + case MSG_FORWARD: + m = new MForward; + break; + + case CEPH_MSG_MON_MAP: + m = new MMonMap; + break; + case CEPH_MSG_MON_GET_MAP: + m = new MMonGetMap; + break; + case CEPH_MSG_MON_GET_OSDMAP: + m = new MMonGetOSDMap; + break; + case CEPH_MSG_MON_GET_VERSION: + m = new MMonGetVersion(); + break; + case CEPH_MSG_MON_GET_VERSION_REPLY: + m = new MMonGetVersionReply(); + break; + case CEPH_MSG_MON_METADATA: + m = new MMonMetadata(); + break; + + case MSG_OSD_BOOT: + m = new MOSDBoot(); + break; + case MSG_OSD_ALIVE: + m = new MOSDAlive(); + break; + case MSG_OSD_BEACON: + m = new MOSDBeacon(); + break; + case MSG_OSD_PGTEMP: + m = new MOSDPGTemp; + break; + case MSG_OSD_FAILURE: + m = new MOSDFailure(); + break; + case MSG_OSD_MARK_ME_DOWN: + m = new MOSDMarkMeDown(); + break; + case MSG_OSD_FULL: + m = new MOSDFull(); + break; + case MSG_OSD_PING: + m = new MOSDPing(); + break; + case CEPH_MSG_OSD_OP: + m = new MOSDOp(); + break; + case CEPH_MSG_OSD_OPREPLY: + m = new MOSDOpReply(); + break; + case MSG_OSD_SUBOP: + m = new MOSDSubOp(); + break; + case MSG_OSD_SUBOPREPLY: + m = new MOSDSubOpReply(); + break; + case MSG_OSD_REPOP: + m = new MOSDRepOp(); + break; + case MSG_OSD_REPOPREPLY: + m = new MOSDRepOpReply(); + break; + case MSG_OSD_PG_CREATED: + m = new MOSDPGCreated(); + break; + case MSG_OSD_PG_UPDATE_LOG_MISSING: + m = new MOSDPGUpdateLogMissing(); + break; + case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: + m = new MOSDPGUpdateLogMissingReply(); + break; + case CEPH_MSG_OSD_BACKOFF: + m = new MOSDBackoff; + break; + + case CEPH_MSG_OSD_MAP: + m = new MOSDMap; + break; + + case CEPH_MSG_WATCH_NOTIFY: + m = new MWatchNotify; + break; + + case MSG_OSD_PG_NOTIFY: + m = new MOSDPGNotify; + break; + case MSG_OSD_PG_QUERY: + m = new MOSDPGQuery; + break; + case MSG_OSD_PG_LOG: + m = new MOSDPGLog; + break; + case MSG_OSD_PG_REMOVE: + m = new MOSDPGRemove; + break; + case MSG_OSD_PG_INFO: + m = new MOSDPGInfo; + break; + case MSG_OSD_PG_CREATE: + m = new MOSDPGCreate; + break; + case MSG_OSD_PG_TRIM: + m = new MOSDPGTrim; + break; + + case MSG_OSD_SCRUB: + m = new MOSDScrub; + break; + case MSG_OSD_SCRUB_RESERVE: + m = new MOSDScrubReserve; + break; + case MSG_REMOVE_SNAPS: + m = new MRemoveSnaps; + break; + case MSG_OSD_REP_SCRUB: + m = new MOSDRepScrub; + break; + case MSG_OSD_REP_SCRUBMAP: + m = new MOSDRepScrubMap; + break; + case MSG_OSD_PG_SCAN: + m = new MOSDPGScan; + break; + case MSG_OSD_PG_BACKFILL: + m = new MOSDPGBackfill; + break; + case MSG_OSD_PG_BACKFILL_REMOVE: + m = new MOSDPGBackfillRemove; + break; + case MSG_OSD_PG_PUSH: + m = new MOSDPGPush; + break; + case MSG_OSD_PG_PULL: + m = new MOSDPGPull; + break; + case MSG_OSD_PG_PUSH_REPLY: + m = new MOSDPGPushReply; + break; + case MSG_OSD_PG_RECOVERY_DELETE: + m = new MOSDPGRecoveryDelete; + break; + case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + m = new MOSDPGRecoveryDeleteReply; + break; + case MSG_OSD_EC_WRITE: + m = new MOSDECSubOpWrite; + break; + case MSG_OSD_EC_WRITE_REPLY: + m = new MOSDECSubOpWriteReply; + break; + case MSG_OSD_EC_READ: + m = new MOSDECSubOpRead; + break; + case MSG_OSD_EC_READ_REPLY: + m = new MOSDECSubOpReadReply; + break; + // auth + case CEPH_MSG_AUTH: + m = new MAuth; + break; + case CEPH_MSG_AUTH_REPLY: + m = new MAuthReply; + break; + + case MSG_MON_GLOBAL_ID: + m = new MMonGlobalID; + break; + + // clients + case CEPH_MSG_MON_SUBSCRIBE: + m = new MMonSubscribe; + break; + case CEPH_MSG_MON_SUBSCRIBE_ACK: + m = new MMonSubscribeAck; + break; + case CEPH_MSG_CLIENT_SESSION: + m = new MClientSession; + break; + case CEPH_MSG_CLIENT_RECONNECT: + m = new MClientReconnect; + break; + case CEPH_MSG_CLIENT_REQUEST: + m = new MClientRequest; + break; + case CEPH_MSG_CLIENT_REQUEST_FORWARD: + m = new MClientRequestForward; + break; + case CEPH_MSG_CLIENT_REPLY: + m = new MClientReply; + break; + case CEPH_MSG_CLIENT_CAPS: + m = new MClientCaps; + break; + case CEPH_MSG_CLIENT_CAPRELEASE: + m = new MClientCapRelease; + break; + case CEPH_MSG_CLIENT_LEASE: + m = new MClientLease; + break; + case CEPH_MSG_CLIENT_SNAP: + m = new MClientSnap; + break; + case CEPH_MSG_CLIENT_QUOTA: + m = new MClientQuota; + break; + + // mds + case MSG_MDS_SLAVE_REQUEST: + m = new MMDSSlaveRequest; + break; + + case CEPH_MSG_MDS_MAP: + m = new MMDSMap; + break; + case CEPH_MSG_FS_MAP: + m = new MFSMap; + break; + case CEPH_MSG_FS_MAP_USER: + m = new MFSMapUser; + break; + case MSG_MDS_BEACON: + m = new MMDSBeacon; + break; + case MSG_MDS_OFFLOAD_TARGETS: + m = new MMDSLoadTargets; + break; + case MSG_MDS_RESOLVE: + m = new MMDSResolve; + break; + case MSG_MDS_RESOLVEACK: + m = new MMDSResolveAck; + break; + case MSG_MDS_CACHEREJOIN: + m = new MMDSCacheRejoin; + break; + + case MSG_MDS_DIRUPDATE: + m = new MDirUpdate(); + break; + + case MSG_MDS_DISCOVER: + m = new MDiscover(); + break; + case MSG_MDS_DISCOVERREPLY: + m = new MDiscoverReply(); + break; + + case MSG_MDS_FINDINO: + m = new MMDSFindIno; + break; + case MSG_MDS_FINDINOREPLY: + m = new MMDSFindInoReply; + break; + + case MSG_MDS_OPENINO: + m = new MMDSOpenIno; + break; + case MSG_MDS_OPENINOREPLY: + m = new MMDSOpenInoReply; + break; + + case MSG_MDS_FRAGMENTNOTIFY: + m = new MMDSFragmentNotify; + break; + + case MSG_MDS_EXPORTDIRDISCOVER: + m = new MExportDirDiscover(); + break; + case MSG_MDS_EXPORTDIRDISCOVERACK: + m = new MExportDirDiscoverAck(); + break; + case MSG_MDS_EXPORTDIRCANCEL: + m = new MExportDirCancel(); + break; + + case MSG_MDS_EXPORTDIR: + m = new MExportDir; + break; + case MSG_MDS_EXPORTDIRACK: + m = new MExportDirAck; + break; + case MSG_MDS_EXPORTDIRFINISH: + m = new MExportDirFinish; + break; + + case MSG_MDS_EXPORTDIRNOTIFY: + m = new MExportDirNotify(); + break; + + case MSG_MDS_EXPORTDIRNOTIFYACK: + m = new MExportDirNotifyAck(); + break; + + case MSG_MDS_EXPORTDIRPREP: + m = new MExportDirPrep(); + break; + + case MSG_MDS_EXPORTDIRPREPACK: + m = new MExportDirPrepAck(); + break; + + case MSG_MDS_EXPORTCAPS: + m = new MExportCaps; + break; + case MSG_MDS_EXPORTCAPSACK: + m = new MExportCapsAck; + break; + case MSG_MDS_GATHERCAPS: + m = new MGatherCaps; + break; + + + case MSG_MDS_DENTRYUNLINK: + m = new MDentryUnlink; + break; + case MSG_MDS_DENTRYLINK: + m = new MDentryLink; + break; + + case MSG_MDS_HEARTBEAT: + m = new MHeartbeat(); + break; + + case MSG_MDS_CACHEEXPIRE: + m = new MCacheExpire(); + break; + + case MSG_MDS_TABLE_REQUEST: + m = new MMDSTableRequest; + break; + + /* case MSG_MDS_INODEUPDATE: + m = new MInodeUpdate(); + break; + */ + + case MSG_MDS_INODEFILECAPS: + m = new MInodeFileCaps(); + break; + + case MSG_MDS_LOCK: + m = new MLock(); + break; + + case MSG_MGR_BEACON: + m = new MMgrBeacon(); + break; + + case MSG_MON_MGR_REPORT: + m = new MMonMgrReport(); + break; + + case MSG_SERVICE_MAP: + m = new MServiceMap(); + break; + + case MSG_MGR_MAP: + m = new MMgrMap(); + break; + + case MSG_MGR_DIGEST: + m = new MMgrDigest(); + break; + + case MSG_MGR_OPEN: + m = new MMgrOpen(); + break; + + case MSG_MGR_REPORT: + m = new MMgrReport(); + break; + + case MSG_MGR_CONFIGURE: + m = new MMgrConfigure(); + break; + + case MSG_TIMECHECK: + m = new MTimeCheck(); + break; + + case MSG_MON_HEALTH: + m = new MMonHealth(); + break; + + case MSG_MON_HEALTH_CHECKS: + m = new MMonHealthChecks(); + break; + +#if defined(HAVE_XIO) + case MSG_DATA_PING: + m = new MDataPing(); + break; +#endif + // -- simple messages without payload -- + + case CEPH_MSG_SHUTDOWN: + m = new MGenericMessage(type); + break; + + default: + if (cct) { + ldout(cct, 0) << "can't decode unknown message type " << type << " MSG_AUTH=" << CEPH_MSG_AUTH << dendl; + if (cct->_conf->ms_die_on_bad_msg) + ceph_abort(); + } + return 0; + } + + m->set_cct(cct); + + // m->header.version, if non-zero, should be populated with the + // newest version of the encoding the code supports. If set, check + // it against compat_version. + if (m->get_header().version && + m->get_header().version < header.compat_version) { + if (cct) { + ldout(cct, 0) << "will not decode message of type " << type + << " version " << header.version + << " because compat_version " << header.compat_version + << " > supported version " << m->get_header().version << dendl; + if (cct->_conf->ms_die_on_bad_msg) + ceph_abort(); + } + m->put(); + return 0; + } + + m->set_connection(conn); + m->set_header(header); + m->set_footer(footer); + m->set_payload(front); + m->set_middle(middle); + m->set_data(data); + + try { + m->decode_payload(); + } + catch (const buffer::error &e) { + if (cct) { + lderr(cct) << "failed to decode message of type " << type + << " v" << header.version + << ": " << e.what() << dendl; + ldout(cct, cct->_conf->ms_dump_corrupt_message_level) << "dump: \n"; + m->get_payload().hexdump(*_dout); + *_dout << dendl; + if (cct->_conf->ms_die_on_bad_msg) + ceph_abort(); + } + m->put(); + return 0; + } + + // done! + return m; +} + +void Message::encode_trace(bufferlist &bl, uint64_t features) const +{ + auto p = trace.get_info(); + static const blkin_trace_info empty = { 0, 0, 0 }; + if (!p) { + p = ∅ + } + ::encode(*p, bl); +} + +void Message::decode_trace(bufferlist::iterator &p, bool create) +{ + blkin_trace_info info = {}; + ::decode(info, p); + +#ifdef WITH_BLKIN + if (!connection) + return; + + const auto msgr = connection->get_messenger(); + const auto endpoint = msgr->get_trace_endpoint(); + if (info.trace_id) { + trace.init(get_type_name(), endpoint, &info, true); + trace.event("decoded trace"); + } else if (create || (msgr->get_myname().is_osd() && + msgr->cct->_conf->osd_blkin_trace_all)) { + // create a trace even if we didn't get one on the wire + trace.init(get_type_name(), endpoint); + trace.event("created trace"); + } + trace.keyval("tid", get_tid()); + trace.keyval("entity type", get_source().type_str()); + trace.keyval("entity num", get_source().num()); +#endif +} + + +// This routine is not used for ordinary messages, but only when encapsulating a message +// for forwarding and routing. It's also used in a backward compatibility test, which only +// effectively tests backward compability for those functions. To avoid backward compatibility +// problems, we currently always encode and decode using the old footer format that doesn't +// allow for message authentication. Eventually we should fix that. PLR + +void encode_message(Message *msg, uint64_t features, bufferlist& payload) +{ + bufferlist front, middle, data; + ceph_msg_footer_old old_footer; + ceph_msg_footer footer; + msg->encode(features, MSG_CRC_ALL); + ::encode(msg->get_header(), payload); + + // Here's where we switch to the old footer format. PLR + + footer = msg->get_footer(); + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + old_footer.data_crc = footer.data_crc; + old_footer.flags = footer.flags; + ::encode(old_footer, payload); + + ::encode(msg->get_payload(), payload); + ::encode(msg->get_middle(), payload); + ::encode(msg->get_data(), payload); +} + +// See above for somewhat bogus use of the old message footer. We switch to the current footer +// after decoding the old one so the other form of decode_message() doesn't have to change. +// We've slipped in a 0 signature at this point, so any signature checking after this will +// fail. PLR + +Message *decode_message(CephContext *cct, int crcflags, bufferlist::iterator& p) +{ + ceph_msg_header h; + ceph_msg_footer_old fo; + ceph_msg_footer f; + bufferlist fr, mi, da; + ::decode(h, p); + ::decode(fo, p); + f.front_crc = fo.front_crc; + f.middle_crc = fo.middle_crc; + f.data_crc = fo.data_crc; + f.flags = fo.flags; + f.sig = 0; + ::decode(fr, p); + ::decode(mi, p); + ::decode(da, p); + return decode_message(cct, crcflags, h, f, fr, mi, da, nullptr); +} +