X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fxio%2FXioMessenger.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fxio%2FXioMessenger.h;h=ea20d36bd8c5ee00060566718802abf060f8e3f1;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/xio/XioMessenger.h b/src/ceph/src/msg/xio/XioMessenger.h new file mode 100644 index 0000000..ea20d36 --- /dev/null +++ b/src/ceph/src/msg/xio/XioMessenger.h @@ -0,0 +1,169 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * Portions Copyright (C) 2013 CohortFS, LLC + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef XIO_MESSENGER_H +#define XIO_MESSENGER_H + +#include "msg/SimplePolicyMessenger.h" + +#include + +extern "C" { +#include "libxio.h" +} + +#include "XioConnection.h" +#include "XioPortal.h" +#include "QueueStrategy.h" +#include "common/Thread.h" +#include "common/Mutex.h" +#include "include/Spinlock.h" + +class XioInit { + /* safe to be called multiple times */ + void package_init(CephContext *cct); + +protected: + XioInit(CephContext *cct) { + this->package_init(cct); + } +}; + +class XioMessenger : public SimplePolicyMessenger, XioInit +{ +private: + static std::atomic nInstances = { 0 }; + std::atomic nsessions = { 0 }; + std::atomic shutdown_called = { false }; + Spinlock conns_sp; + XioConnection::ConnList conns_list; + XioConnection::EntitySet conns_entity_map; + XioPortals portals; + DispatchStrategy* dispatch_strategy; + XioLoopbackConnectionRef loop_con; + uint32_t special_handling; + Mutex sh_mtx; + Cond sh_cond; + bool need_addr; + bool did_bind; + + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; + + friend class XioConnection; + +public: + XioMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + uint64_t cflags = 0, + DispatchStrategy* ds = new QueueStrategy(1)); + + virtual ~XioMessenger(); + + XioPortal* get_portal() { return portals.get_next_portal(); } + + virtual void set_myaddr(const entity_addr_t& a) { + Messenger::set_myaddr(a); + loop_con->set_peer_addr(a); + } + + int _send_message(Message *m, const entity_inst_t &dest); + int _send_message(Message *m, Connection *con); + int _send_message_impl(Message *m, XioConnection *xcon); + + uint32_t get_special_handling() { return special_handling; } + void set_special_handling(int n) { special_handling = n; } + int pool_hint(uint32_t size); + void try_insert(XioConnection *xcon); + + /* xio hooks */ + int new_session(struct xio_session *session, + struct xio_new_session_req *req, + void *cb_user_context); + + int session_event(struct xio_session *session, + struct xio_session_event_data *event_data, + void *cb_user_context); + + /* Messenger interface */ + virtual void set_addr_unknowns(const entity_addr_t &addr) override + { } /* XXX applicable? */ + virtual void set_addr(const entity_addr_t &addr) override + { } /* XXX applicable? */ + + virtual int get_dispatch_queue_len() + { return 0; } /* XXX bogus? */ + + virtual double get_dispatch_queue_max_age(utime_t now) + { return 0; } /* XXX bogus? */ + + virtual void set_cluster_protocol(int p) + { } + + virtual int bind(const entity_addr_t& addr); + + virtual int rebind(const set& avoid_ports); + + virtual int start(); + + virtual void wait(); + + virtual int shutdown(); + + virtual int send_message(Message *m, const entity_inst_t &dest) { + return _send_message(m, dest); + } + + virtual int lazy_send_message(Message *m, const entity_inst_t& dest) + { return EINVAL; } + + virtual int lazy_send_message(Message *m, Connection *con) + { return EINVAL; } + + virtual ConnectionRef get_connection(const entity_inst_t& dest); + + virtual ConnectionRef get_loopback_connection(); + + void unregister_xcon(XioConnection *xcon); + virtual void mark_down(const entity_addr_t& a); + virtual void mark_down(Connection *con); + virtual void mark_down_all(); + virtual void mark_down_on_empty(Connection *con); + virtual void mark_disposable(Connection *con); + + void ds_dispatch(Message *m) + { dispatch_strategy->ds_dispatch(m); } + + /** + * Tell the XioMessenger its full IP address. + * + * This is used by clients when connecting to other endpoints, and + * probably shouldn't be called by anybody else. + */ + void learned_addr(const entity_addr_t& peer_addr_for_me); + +private: + int get_nconns_per_portal(uint64_t cflags); + int get_nportals(uint64_t cflags); + +protected: + virtual void ready() + { } +}; + +XioCommand* pool_alloc_xio_command(XioConnection *xcon); + + +#endif /* XIO_MESSENGER_H */