1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
25 #include "PosixStack.h"
27 #include "include/buffer.h"
28 #include "include/str_list.h"
29 #include "common/errno.h"
30 #include "common/strtol.h"
31 #include "common/dout.h"
32 #include "common/simple_spin.h"
33 #include "msg/Messenger.h"
34 #include "include/sock_compat.h"
36 #define dout_subsys ceph_subsys_ms
38 #define dout_prefix *_dout << "PosixStack "
40 class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
47 explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
48 : handler(h), _fd(f), sa(sa), connected(connected) {}
50 int is_connected() override {
54 int r = handler.reconnect(sa, _fd);
65 ssize_t zero_copy_read(bufferptr&) override {
69 ssize_t read(char *buf, size_t len) override {
70 ssize_t r = ::read(_fd, buf, len);
76 // return the sent length
77 // < 0 means error occured
78 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
84 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
88 } else if (errno == EAGAIN) {
95 if (len == sent) break;
98 if (msg.msg_iov[0].iov_len <= (size_t)r) {
99 // drain this whole item
100 r -= msg.msg_iov[0].iov_len;
104 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
105 msg.msg_iov[0].iov_len -= r;
110 return (ssize_t)sent;
113 ssize_t send(bufferlist &bl, bool more) override {
114 size_t sent_bytes = 0;
115 std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
116 uint64_t left_pbrs = bl.buffers().size();
119 struct iovec msgvec[IOV_MAX];
120 uint64_t size = MIN(left_pbrs, IOV_MAX);
122 memset(&msg, 0, sizeof(msg));
124 msg.msg_iov = msgvec;
127 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
128 msgvec[msg.msg_iovlen].iov_len = pb->length();
130 msglen += pb->length();
135 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
139 // "r" is the remaining length
141 if (static_cast<unsigned>(r) < msglen)
143 // only "r" == 0 continue
148 if (sent_bytes < bl.length()) {
149 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
156 return static_cast<ssize_t>(sent_bytes);
158 void shutdown() override {
159 ::shutdown(_fd, SHUT_RDWR);
161 void close() override {
164 int fd() const override {
167 friend class PosixServerSocketImpl;
168 friend class PosixNetworkStack;
171 class PosixServerSocketImpl : public ServerSocketImpl {
176 explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {}
177 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
178 void abort_accept() override {
181 int fd() const override {
186 int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
189 socklen_t slen = sizeof(ss);
190 int sd = ::accept(_fd, (sockaddr*)&ss, &slen);
195 handler.set_close_on_exec(sd);
196 int r = handler.set_nonblock(sd);
202 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
208 assert(NULL != out); //out should not be NULL in accept connection
210 out->set_sockaddr((sockaddr*)&ss);
211 handler.set_priority(sd, opt.priority, out->get_family());
213 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
214 *sock = ConnectedSocket(std::move(csi));
218 void PosixWorker::initialize()
222 int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
225 int listen_sd = net.create_socket(sa.get_family(), true);
230 int r = net.set_nonblock(listen_sd);
236 net.set_close_on_exec(listen_sd);
237 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
243 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
246 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
247 << ": " << cpp_strerror(r) << dendl;
252 r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
255 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
260 *sock = ServerSocket(
261 std::unique_ptr<PosixServerSocketImpl>(
262 new PosixServerSocketImpl(net, listen_sd)));
266 int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
270 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
272 sd = net.connect(addr, opts.connect_bind_addr);
279 net.set_priority(sd, opts.priority, addr.get_family());
280 *socket = ConnectedSocket(
281 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
285 PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
288 vector<string> corestrs;
289 get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
290 for (auto & corestr : corestrs) {
292 int coreid = strict_strtol(corestr.c_str(), 10, &err);
294 coreids.push_back(coreid);
296 lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl;