Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / dpdk / DPDKStack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3  * Ceph - scalable distributed file system
4  *
5  * Copyright (C) 2015 XSky <haomai@xsky.com>
6  *
7  * Author: Haomai Wang <haomaiwang@gmail.com>
8  *
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15 #ifndef CEPH_MSG_DPDKSTACK_H
16 #define CEPH_MSG_DPDKSTACK_H
17
18 #include <functional>
19
20 #include "common/ceph_context.h"
21 #include "common/Tub.h"
22
23 #include "msg/async/Stack.h"
24 #include "dpdk_rte.h"
25 #include "DPDK.h"
26 #include "net.h"
27 #include "const.h"
28 #include "IP.h"
29 #include "Packet.h"
30
31 class interface;
32
33 template <typename Protocol>
34 class NativeConnectedSocketImpl;
35
36 // DPDKServerSocketImpl
37 template <typename Protocol>
38 class DPDKServerSocketImpl : public ServerSocketImpl {
39   typename Protocol::listener _listener;
40  public:
41   DPDKServerSocketImpl(Protocol& proto, uint16_t port, const SocketOptions &opt);
42   int listen() {
43     return _listener.listen();
44   }
45   virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
46   virtual void abort_accept() override;
47   virtual int fd() const override {
48     return _listener.fd();
49   }
50 };
51
52 // NativeConnectedSocketImpl
53 template <typename Protocol>
54 class NativeConnectedSocketImpl : public ConnectedSocketImpl {
55   typename Protocol::connection _conn;
56   uint32_t _cur_frag = 0;
57   uint32_t _cur_off = 0;
58   Tub<Packet> _buf;
59   Tub<bufferptr> _cache_ptr;
60
61  public:
62   explicit NativeConnectedSocketImpl(typename Protocol::connection conn)
63           : _conn(std::move(conn)) {}
64   NativeConnectedSocketImpl(NativeConnectedSocketImpl &&rhs)
65       : _conn(std::move(rhs._conn)), _buf(std::move(rhs.buf))  {}
66   virtual int is_connected() override {
67     return _conn.is_connected();
68   }
69
70   virtual ssize_t read(char *buf, size_t len) override {
71     size_t left = len;
72     ssize_t r = 0;
73     size_t off = 0;
74     while (left > 0) {
75       if (!_cache_ptr) {
76         _cache_ptr.construct();
77         r = zero_copy_read(*_cache_ptr);
78         if (r <= 0) {
79           _cache_ptr.destroy();
80           if (r == -EAGAIN)
81             break;
82           return r;
83         }
84       }
85       if (_cache_ptr->length() <= left) {
86         _cache_ptr->copy_out(0, _cache_ptr->length(), buf+off);
87         left -= _cache_ptr->length();
88         off += _cache_ptr->length();
89         _cache_ptr.destroy();
90       } else {
91         _cache_ptr->copy_out(0, left, buf+off);
92         _cache_ptr->set_offset(_cache_ptr->offset() + left);
93         _cache_ptr->set_length(_cache_ptr->length() - left);
94         left = 0;
95         break;
96       }
97     }
98     return len - left ? len - left : -EAGAIN;
99   }
100
101   virtual ssize_t zero_copy_read(bufferptr &data) override {
102     auto err = _conn.get_errno();
103     if (err <= 0)
104       return err;
105
106     if (!_buf) {
107       _buf = std::move(_conn.read());
108       if (!_buf)
109         return -EAGAIN;
110     }
111
112     fragment &f = _buf->frag(_cur_frag);
113     Packet p = _buf->share(_cur_off, f.size);
114     auto del = std::bind(
115             [](Packet &p) {}, std::move(p));
116     data = buffer::claim_buffer(
117             f.size, f.base, make_deleter(std::move(del)));
118     if (++_cur_frag == _buf->nr_frags()) {
119       _cur_frag = 0;
120       _cur_off = 0;
121       _buf.destroy();
122     } else {
123       _cur_off += f.size;
124     }
125     assert(data.length());
126     return data.length();
127   }
128   virtual ssize_t send(bufferlist &bl, bool more) override {
129     auto err = _conn.get_errno();
130     if (err < 0)
131       return (ssize_t)err;
132
133     size_t available = _conn.peek_sent_available();
134     if (available == 0) {
135       return 0;
136     }
137
138     std::vector<fragment> frags;
139     std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
140     uint64_t left_pbrs = bl.buffers().size();
141     uint64_t len = 0;
142     uint64_t seglen = 0;
143     while (len < available && left_pbrs--) {
144       seglen = pb->length();
145       if (len + seglen > available) {
146         // don't continue if we enough at least 1 fragment since no available
147         // space for next ptr.
148         if (len > 0)
149           break;
150         seglen = MIN(seglen, available);
151       }
152       len += seglen;
153       frags.push_back(fragment{(char*)pb->c_str(), seglen});
154       ++pb;
155     }
156
157     if (len != bl.length()) {
158       bufferlist swapped;
159       bl.splice(0, len, &swapped);
160       auto del = std::bind(
161               [](bufferlist &bl) {}, std::move(swapped));
162       return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
163     } else {
164       auto del = std::bind(
165               [](bufferlist &bl) {}, std::move(bl));
166
167       return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
168     }
169   }
170   virtual void shutdown() override {
171     _conn.close_write();
172   }
173   // FIXME need to impl close
174   virtual void close() override {
175     _conn.close_write();
176   }
177   virtual int fd() const override {
178     return _conn.fd();
179   }
180 };
181
182 template <typename Protocol>
183 DPDKServerSocketImpl<Protocol>::DPDKServerSocketImpl(
184         Protocol& proto, uint16_t port, const SocketOptions &opt)
185         : _listener(proto.listen(port)) {}
186
187 template <typename Protocol>
188 int DPDKServerSocketImpl<Protocol>::accept(ConnectedSocket *s, const SocketOptions &options, entity_addr_t *out, Worker *w) {
189   if (_listener.get_errno() < 0)
190     return _listener.get_errno();
191   auto c = _listener.accept();
192   if (!c)
193     return -EAGAIN;
194
195   if (out)
196     *out = c->remote_addr();
197   std::unique_ptr<NativeConnectedSocketImpl<Protocol>> csi(
198           new NativeConnectedSocketImpl<Protocol>(std::move(*c)));
199   *s = ConnectedSocket(std::move(csi));
200   return 0;
201 }
202
203 template <typename Protocol>
204 void DPDKServerSocketImpl<Protocol>::abort_accept() {
205   _listener.abort_accept();
206 }
207
208 class DPDKWorker : public Worker {
209   struct Impl {
210     unsigned id;
211     interface _netif;
212     std::shared_ptr<DPDKDevice> _dev;
213     ipv4 _inet;
214     Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr<DPDKDevice> dev);
215     ~Impl() {
216       _dev->unset_local_queue(id);
217     }
218   };
219   std::unique_ptr<Impl> _impl;
220
221   virtual void initialize();
222   void set_ipv4_packet_filter(ip_packet_filter* filter) {
223     _impl->_inet.set_packet_filter(filter);
224   }
225   using tcp4 = tcp<ipv4_traits>;
226
227  public:
228   explicit DPDKWorker(CephContext *c, unsigned i): Worker(c, i) {}
229   virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
230   virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
231   void arp_learn(ethernet_address l2, ipv4_address l3) {
232     _impl->_inet.learn(l2, l3);
233   }
234   virtual void destroy() override {
235     _impl.reset();
236   }
237
238   friend class DPDKServerSocketImpl<tcp4>;
239 };
240
241 class DPDKStack : public NetworkStack {
242   vector<std::function<void()> > funcs;
243  public:
244   explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) {
245     funcs.resize(cct->_conf->ms_async_max_op_threads);
246   }
247   virtual bool support_zero_copy_read() const override { return true; }
248   virtual bool support_local_listen_table() const { return true; }
249
250   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
251   virtual void join_worker(unsigned i) override {
252     dpdk::eal::execute_on_master([&]() {
253       rte_eal_wait_lcore(i+1);
254     });
255   }
256 };
257
258 #endif