initial code repo
[stor4nfv.git] / src / ceph / src / msg / async / PosixStack.cc
diff --git a/src/ceph/src/msg/async/PosixStack.cc b/src/ceph/src/msg/async/PosixStack.cc
new file mode 100644 (file)
index 0000000..5fb975a
--- /dev/null
@@ -0,0 +1,298 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <sys/socket.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+
+#include <algorithm>
+
+#include "PosixStack.h"
+
+#include "include/buffer.h"
+#include "include/str_list.h"
+#include "common/errno.h"
+#include "common/strtol.h"
+#include "common/dout.h"
+#include "common/simple_spin.h"
+#include "msg/Messenger.h"
+#include "include/sock_compat.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "PosixStack "
+
+class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
+  NetHandler &handler;
+  int _fd;
+  entity_addr_t sa;
+  bool connected;
+
+ public:
+  explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
+      : handler(h), _fd(f), sa(sa), connected(connected) {}
+
+  int is_connected() override {
+    if (connected)
+      return 1;
+
+    int r = handler.reconnect(sa, _fd);
+    if (r == 0) {
+      connected = true;
+      return 1;
+    } else if (r < 0) {
+      return r;
+    } else {
+      return 0;
+    }
+  }
+
+  ssize_t zero_copy_read(bufferptr&) override {
+    return -EOPNOTSUPP;
+  }
+
+  ssize_t read(char *buf, size_t len) override {
+    ssize_t r = ::read(_fd, buf, len);
+    if (r < 0)
+      r = -errno;
+    return r;
+  }
+
+  // return the sent length
+  // < 0 means error occured
+  static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
+  {
+    size_t sent = 0;
+    while (1) {
+      MSGR_SIGPIPE_STOPPER;
+      ssize_t r;
+      r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+      if (r < 0) {
+        if (errno == EINTR) {
+          continue;
+        } else if (errno == EAGAIN) {
+          break;
+        }
+        return -errno;
+      }
+
+      sent += r;
+      if (len == sent) break;
+
+      while (r > 0) {
+        if (msg.msg_iov[0].iov_len <= (size_t)r) {
+          // drain this whole item
+          r -= msg.msg_iov[0].iov_len;
+          msg.msg_iov++;
+          msg.msg_iovlen--;
+        } else {
+          msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
+          msg.msg_iov[0].iov_len -= r;
+          break;
+        }
+      }
+    }
+    return (ssize_t)sent;
+  }
+
+  ssize_t send(bufferlist &bl, bool more) override {
+    size_t sent_bytes = 0;
+    std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
+    uint64_t left_pbrs = bl.buffers().size();
+    while (left_pbrs) {
+      struct msghdr msg;
+      struct iovec msgvec[IOV_MAX];
+      uint64_t size = MIN(left_pbrs, IOV_MAX);
+      left_pbrs -= size;
+      memset(&msg, 0, sizeof(msg));
+      msg.msg_iovlen = 0;
+      msg.msg_iov = msgvec;
+      unsigned msglen = 0;
+      while (size > 0) {
+        msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
+        msgvec[msg.msg_iovlen].iov_len = pb->length();
+        msg.msg_iovlen++;
+        msglen += pb->length();
+        ++pb;
+        size--;
+      }
+
+      ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
+      if (r < 0)
+        return r;
+
+      // "r" is the remaining length
+      sent_bytes += r;
+      if (static_cast<unsigned>(r) < msglen)
+        break;
+      // only "r" == 0 continue
+    }
+
+    if (sent_bytes) {
+      bufferlist swapped;
+      if (sent_bytes < bl.length()) {
+        bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
+        bl.swap(swapped);
+      } else {
+        bl.clear();
+      }
+    }
+
+    return static_cast<ssize_t>(sent_bytes);
+  }
+  void shutdown() override {
+    ::shutdown(_fd, SHUT_RDWR);
+  }
+  void close() override {
+    ::close(_fd);
+  }
+  int fd() const override {
+    return _fd;
+  }
+  friend class PosixServerSocketImpl;
+  friend class PosixNetworkStack;
+};
+
+class PosixServerSocketImpl : public ServerSocketImpl {
+  NetHandler &handler;
+  int _fd;
+
+ public:
+  explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {}
+  int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+  void abort_accept() override {
+    ::close(_fd);
+  }
+  int fd() const override {
+    return _fd;
+  }
+};
+
+int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
+  assert(sock);
+  sockaddr_storage ss;
+  socklen_t slen = sizeof(ss);
+  int sd = ::accept(_fd, (sockaddr*)&ss, &slen);
+  if (sd < 0) {
+    return -errno;
+  }
+
+  handler.set_close_on_exec(sd);
+  int r = handler.set_nonblock(sd);
+  if (r < 0) {
+    ::close(sd);
+    return -errno;
+  }
+
+  r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
+  if (r < 0) {
+    ::close(sd);
+    return -errno;
+  }
+
+  assert(NULL != out); //out should not be NULL in accept connection
+
+  out->set_sockaddr((sockaddr*)&ss);
+  handler.set_priority(sd, opt.priority, out->get_family());
+
+  std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
+  *sock = ConnectedSocket(std::move(csi));
+  return 0;
+}
+
+void PosixWorker::initialize()
+{
+}
+
+int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
+                        ServerSocket *sock)
+{
+  int listen_sd = net.create_socket(sa.get_family(), true);
+  if (listen_sd < 0) {
+    return -errno;
+  }
+
+  int r = net.set_nonblock(listen_sd);
+  if (r < 0) {
+    ::close(listen_sd);
+    return -errno;
+  }
+
+  net.set_close_on_exec(listen_sd);
+  r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
+  if (r < 0) {
+    ::close(listen_sd);
+    return -errno;
+  }
+
+  r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
+  if (r < 0) {
+    r = -errno;
+    ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
+                   << ": " << cpp_strerror(r) << dendl;
+    ::close(listen_sd);
+    return r;
+  }
+
+  r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
+  if (r < 0) {
+    r = -errno;
+    lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
+    ::close(listen_sd);
+    return r;
+  }
+
+  *sock = ServerSocket(
+          std::unique_ptr<PosixServerSocketImpl>(
+              new PosixServerSocketImpl(net, listen_sd)));
+  return 0;
+}
+
+int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
+  int sd;
+
+  if (opts.nonblock) {
+    sd = net.nonblock_connect(addr, opts.connect_bind_addr);
+  } else {
+    sd = net.connect(addr, opts.connect_bind_addr);
+  }
+
+  if (sd < 0) {
+    return -errno;
+  }
+
+  net.set_priority(sd, opts.priority, addr.get_family());
+  *socket = ConnectedSocket(
+      std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
+  return 0;
+}
+
+PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
+    : NetworkStack(c, t)
+{
+  vector<string> corestrs;
+  get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
+  for (auto & corestr : corestrs) {
+    string err;
+    int coreid = strict_strtol(corestr.c_str(), 10, &err);
+    if (err == "")
+      coreids.push_back(coreid);
+    else
+      lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl;
+  }
+}