Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / PosixStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSKY <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #include <errno.h>
22
23 #include <algorithm>
24
25 #include "PosixStack.h"
26
27 #include "include/buffer.h"
28 #include "include/str_list.h"
29 #include "common/errno.h"
30 #include "common/strtol.h"
31 #include "common/dout.h"
32 #include "common/simple_spin.h"
33 #include "msg/Messenger.h"
34 #include "include/sock_compat.h"
35
36 #define dout_subsys ceph_subsys_ms
37 #undef dout_prefix
38 #define dout_prefix *_dout << "PosixStack "
39
40 class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
41   NetHandler &handler;
42   int _fd;
43   entity_addr_t sa;
44   bool connected;
45
46  public:
47   explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
48       : handler(h), _fd(f), sa(sa), connected(connected) {}
49
50   int is_connected() override {
51     if (connected)
52       return 1;
53
54     int r = handler.reconnect(sa, _fd);
55     if (r == 0) {
56       connected = true;
57       return 1;
58     } else if (r < 0) {
59       return r;
60     } else {
61       return 0;
62     }
63   }
64
65   ssize_t zero_copy_read(bufferptr&) override {
66     return -EOPNOTSUPP;
67   }
68
69   ssize_t read(char *buf, size_t len) override {
70     ssize_t r = ::read(_fd, buf, len);
71     if (r < 0)
72       r = -errno;
73     return r;
74   }
75
76   // return the sent length
77   // < 0 means error occured
78   static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
79   {
80     size_t sent = 0;
81     while (1) {
82       MSGR_SIGPIPE_STOPPER;
83       ssize_t r;
84       r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
85       if (r < 0) {
86         if (errno == EINTR) {
87           continue;
88         } else if (errno == EAGAIN) {
89           break;
90         }
91         return -errno;
92       }
93
94       sent += r;
95       if (len == sent) break;
96
97       while (r > 0) {
98         if (msg.msg_iov[0].iov_len <= (size_t)r) {
99           // drain this whole item
100           r -= msg.msg_iov[0].iov_len;
101           msg.msg_iov++;
102           msg.msg_iovlen--;
103         } else {
104           msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
105           msg.msg_iov[0].iov_len -= r;
106           break;
107         }
108       }
109     }
110     return (ssize_t)sent;
111   }
112
113   ssize_t send(bufferlist &bl, bool more) override {
114     size_t sent_bytes = 0;
115     std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
116     uint64_t left_pbrs = bl.buffers().size();
117     while (left_pbrs) {
118       struct msghdr msg;
119       struct iovec msgvec[IOV_MAX];
120       uint64_t size = MIN(left_pbrs, IOV_MAX);
121       left_pbrs -= size;
122       memset(&msg, 0, sizeof(msg));
123       msg.msg_iovlen = 0;
124       msg.msg_iov = msgvec;
125       unsigned msglen = 0;
126       while (size > 0) {
127         msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
128         msgvec[msg.msg_iovlen].iov_len = pb->length();
129         msg.msg_iovlen++;
130         msglen += pb->length();
131         ++pb;
132         size--;
133       }
134
135       ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
136       if (r < 0)
137         return r;
138
139       // "r" is the remaining length
140       sent_bytes += r;
141       if (static_cast<unsigned>(r) < msglen)
142         break;
143       // only "r" == 0 continue
144     }
145
146     if (sent_bytes) {
147       bufferlist swapped;
148       if (sent_bytes < bl.length()) {
149         bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
150         bl.swap(swapped);
151       } else {
152         bl.clear();
153       }
154     }
155
156     return static_cast<ssize_t>(sent_bytes);
157   }
158   void shutdown() override {
159     ::shutdown(_fd, SHUT_RDWR);
160   }
161   void close() override {
162     ::close(_fd);
163   }
164   int fd() const override {
165     return _fd;
166   }
167   friend class PosixServerSocketImpl;
168   friend class PosixNetworkStack;
169 };
170
171 class PosixServerSocketImpl : public ServerSocketImpl {
172   NetHandler &handler;
173   int _fd;
174
175  public:
176   explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {}
177   int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
178   void abort_accept() override {
179     ::close(_fd);
180   }
181   int fd() const override {
182     return _fd;
183   }
184 };
185
186 int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
187   assert(sock);
188   sockaddr_storage ss;
189   socklen_t slen = sizeof(ss);
190   int sd = ::accept(_fd, (sockaddr*)&ss, &slen);
191   if (sd < 0) {
192     return -errno;
193   }
194
195   handler.set_close_on_exec(sd);
196   int r = handler.set_nonblock(sd);
197   if (r < 0) {
198     ::close(sd);
199     return -errno;
200   }
201
202   r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
203   if (r < 0) {
204     ::close(sd);
205     return -errno;
206   }
207
208   assert(NULL != out); //out should not be NULL in accept connection
209
210   out->set_sockaddr((sockaddr*)&ss);
211   handler.set_priority(sd, opt.priority, out->get_family());
212
213   std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
214   *sock = ConnectedSocket(std::move(csi));
215   return 0;
216 }
217
218 void PosixWorker::initialize()
219 {
220 }
221
222 int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
223                         ServerSocket *sock)
224 {
225   int listen_sd = net.create_socket(sa.get_family(), true);
226   if (listen_sd < 0) {
227     return -errno;
228   }
229
230   int r = net.set_nonblock(listen_sd);
231   if (r < 0) {
232     ::close(listen_sd);
233     return -errno;
234   }
235
236   net.set_close_on_exec(listen_sd);
237   r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
238   if (r < 0) {
239     ::close(listen_sd);
240     return -errno;
241   }
242
243   r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
244   if (r < 0) {
245     r = -errno;
246     ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
247                    << ": " << cpp_strerror(r) << dendl;
248     ::close(listen_sd);
249     return r;
250   }
251
252   r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
253   if (r < 0) {
254     r = -errno;
255     lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
256     ::close(listen_sd);
257     return r;
258   }
259
260   *sock = ServerSocket(
261           std::unique_ptr<PosixServerSocketImpl>(
262               new PosixServerSocketImpl(net, listen_sd)));
263   return 0;
264 }
265
266 int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
267   int sd;
268
269   if (opts.nonblock) {
270     sd = net.nonblock_connect(addr, opts.connect_bind_addr);
271   } else {
272     sd = net.connect(addr, opts.connect_bind_addr);
273   }
274
275   if (sd < 0) {
276     return -errno;
277   }
278
279   net.set_priority(sd, opts.priority, addr.get_family());
280   *socket = ConnectedSocket(
281       std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
282   return 0;
283 }
284
285 PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
286     : NetworkStack(c, t)
287 {
288   vector<string> corestrs;
289   get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
290   for (auto & corestr : corestrs) {
291     string err;
292     int coreid = strict_strtol(corestr.c_str(), 10, &err);
293     if (err == "")
294       coreids.push_back(coreid);
295     else
296       lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl;
297   }
298 }