X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FPosixStack.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FPosixStack.cc;h=5fb975ae906eaf89cb404794c2f5bddda6827535;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/PosixStack.cc b/src/ceph/src/msg/async/PosixStack.cc new file mode 100644 index 0000000..5fb975a --- /dev/null +++ b/src/ceph/src/msg/async/PosixStack.cc @@ -0,0 +1,298 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include + +#include + +#include "PosixStack.h" + +#include "include/buffer.h" +#include "include/str_list.h" +#include "common/errno.h" +#include "common/strtol.h" +#include "common/dout.h" +#include "common/simple_spin.h" +#include "msg/Messenger.h" +#include "include/sock_compat.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "PosixStack " + +class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + NetHandler &handler; + int _fd; + entity_addr_t sa; + bool connected; + + public: + explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected) + : handler(h), _fd(f), sa(sa), connected(connected) {} + + int is_connected() override { + if (connected) + return 1; + + int r = handler.reconnect(sa, _fd); + if (r == 0) { + connected = true; + return 1; + } else if (r < 0) { + return r; + } else { + return 0; + } + } + + ssize_t zero_copy_read(bufferptr&) override { + return -EOPNOTSUPP; + } + + ssize_t read(char *buf, size_t len) override { + ssize_t r = ::read(_fd, buf, len); + if (r < 0) + r = -errno; + return r; + } + + // return the sent length + // < 0 means error occured + static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) + { + size_t sent = 0; + while (1) { + MSGR_SIGPIPE_STOPPER; + ssize_t r; + r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); + if (r < 0) { + if (errno == EINTR) { + continue; + } else if (errno == EAGAIN) { + break; + } + return -errno; + } + + sent += r; + if (len == sent) break; + + while (r > 0) { + if (msg.msg_iov[0].iov_len <= (size_t)r) { + // drain this whole item + r -= msg.msg_iov[0].iov_len; + msg.msg_iov++; + msg.msg_iovlen--; + } else { + msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; + msg.msg_iov[0].iov_len -= r; + break; + } + } + } + return (ssize_t)sent; + } + + ssize_t send(bufferlist &bl, bool more) override { + size_t sent_bytes = 0; + std::list::const_iterator pb = bl.buffers().begin(); + uint64_t left_pbrs = bl.buffers().size(); + while (left_pbrs) { + struct msghdr msg; + struct iovec msgvec[IOV_MAX]; + uint64_t size = MIN(left_pbrs, IOV_MAX); + left_pbrs -= size; + memset(&msg, 0, sizeof(msg)); + msg.msg_iovlen = 0; + msg.msg_iov = msgvec; + unsigned msglen = 0; + while (size > 0) { + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); + msgvec[msg.msg_iovlen].iov_len = pb->length(); + msg.msg_iovlen++; + msglen += pb->length(); + ++pb; + size--; + } + + ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more); + if (r < 0) + return r; + + // "r" is the remaining length + sent_bytes += r; + if (static_cast(r) < msglen) + break; + // only "r" == 0 continue + } + + if (sent_bytes) { + bufferlist swapped; + if (sent_bytes < bl.length()) { + bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); + bl.swap(swapped); + } else { + bl.clear(); + } + } + + return static_cast(sent_bytes); + } + void shutdown() override { + ::shutdown(_fd, SHUT_RDWR); + } + void close() override { + ::close(_fd); + } + int fd() const override { + return _fd; + } + friend class PosixServerSocketImpl; + friend class PosixNetworkStack; +}; + +class PosixServerSocketImpl : public ServerSocketImpl { + NetHandler &handler; + int _fd; + + public: + explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {} + int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; + void abort_accept() override { + ::close(_fd); + } + int fd() const override { + return _fd; + } +}; + +int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { + assert(sock); + sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int sd = ::accept(_fd, (sockaddr*)&ss, &slen); + if (sd < 0) { + return -errno; + } + + handler.set_close_on_exec(sd); + int r = handler.set_nonblock(sd); + if (r < 0) { + ::close(sd); + return -errno; + } + + r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(sd); + return -errno; + } + + assert(NULL != out); //out should not be NULL in accept connection + + out->set_sockaddr((sockaddr*)&ss); + handler.set_priority(sd, opt.priority, out->get_family()); + + std::unique_ptr csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); + *sock = ConnectedSocket(std::move(csi)); + return 0; +} + +void PosixWorker::initialize() +{ +} + +int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, + ServerSocket *sock) +{ + int listen_sd = net.create_socket(sa.get_family(), true); + if (listen_sd < 0) { + return -errno; + } + + int r = net.set_nonblock(listen_sd); + if (r < 0) { + ::close(listen_sd); + return -errno; + } + + net.set_close_on_exec(listen_sd); + r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(listen_sd); + return -errno; + } + + r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); + if (r < 0) { + r = -errno; + ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() + << ": " << cpp_strerror(r) << dendl; + ::close(listen_sd); + return r; + } + + r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog); + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl; + ::close(listen_sd); + return r; + } + + *sock = ServerSocket( + std::unique_ptr( + new PosixServerSocketImpl(net, listen_sd))); + return 0; +} + +int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { + int sd; + + if (opts.nonblock) { + sd = net.nonblock_connect(addr, opts.connect_bind_addr); + } else { + sd = net.connect(addr, opts.connect_bind_addr); + } + + if (sd < 0) { + return -errno; + } + + net.set_priority(sd, opts.priority, addr.get_family()); + *socket = ConnectedSocket( + std::unique_ptr(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); + return 0; +} + +PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t) + : NetworkStack(c, t) +{ + vector corestrs; + get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); + for (auto & corestr : corestrs) { + string err; + int coreid = strict_strtol(corestr.c_str(), 10, &err); + if (err == "") + coreids.push_back(coreid); + else + lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl; + } +}