Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / dpdk / net.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3  * This file is open source software, licensed to you under the terms
4  * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
5  * distributed with this work for additional information regarding copyright
6  * ownership.  You may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 /*
20  * Copyright (C) 2014 Cloudius Systems, Ltd.
21  */
22 /*
23  * Ceph - scalable distributed file system
24  *
25  * Copyright (C) 2015 XSky <haomai@xsky.com>
26  *
27  * Author: Haomai Wang <haomaiwang@gmail.com>
28  *
29  * This is free software; you can redistribute it and/or
30  * modify it under the terms of the GNU Lesser General Public
31  * License version 2.1, as published by the Free Software
32  * Foundation.  See file COPYING.
33  *
34  */
35
36 #include "net.h"
37 #include "DPDK.h"
38 #include "DPDKStack.h"
39
40 #include "common/dout.h"
41 #include "include/assert.h"
42
43 #define dout_subsys ceph_subsys_dpdk
44 #undef dout_prefix
45 #define dout_prefix *_dout << "net "
46
47 interface::interface(CephContext *c, std::shared_ptr<DPDKDevice> dev, EventCenter *center)
48     : cct(c), _dev(dev),
49       _rx(_dev->receive(
50           center->get_id(),
51           [center, this] (Packet p) {
52             return dispatch_packet(center, std::move(p));
53           }
54       )),
55       _hw_address(_dev->hw_address()),
56       _hw_features(_dev->get_hw_features()) {
57   auto idx = 0u;
58   unsigned qid = center->get_id();
59   dev->queue_for_cpu(center->get_id()).register_packet_provider([this, idx, qid] () mutable {
60     Tub<Packet> p;
61     for (size_t i = 0; i < _pkt_providers.size(); i++) {
62       auto l3p = _pkt_providers[idx++]();
63       if (idx == _pkt_providers.size())
64         idx = 0;
65       if (l3p) {
66         auto l3pv = std::move(*l3p);
67         auto eh = l3pv.p.prepend_header<eth_hdr>();
68         eh->dst_mac = l3pv.to;
69         eh->src_mac = _hw_address;
70         eh->eth_proto = uint16_t(l3pv.proto_num);
71         *eh = eh->hton();
72         ldout(cct, 10) << "=== tx === proto " << std::hex << uint16_t(l3pv.proto_num)
73                        << " " << _hw_address << " -> " << l3pv.to
74                        << " length " << std::dec << l3pv.p.len() << dendl;
75         p = std::move(l3pv.p);
76         return p;
77       }
78     }
79     return p;
80   });
81 }
82
83 subscription<Packet, ethernet_address> interface::register_l3(
84     eth_protocol_num proto_num,
85     std::function<int (Packet p, ethernet_address from)> next,
86     std::function<bool (forward_hash&, Packet& p, size_t)> forward)
87 {
88   auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(uint16_t(proto_num)), std::forward_as_tuple(std::move(forward)));
89   assert(i.second);
90   l3_rx_stream& l3_rx = i.first->second;
91   return l3_rx.packet_stream.listen(std::move(next));
92 }
93
94 unsigned interface::hash2cpu(uint32_t hash) {
95   return _dev->hash2cpu(hash);
96 }
97
98 const rss_key_type& interface::rss_key() const {
99   return _dev->rss_key();
100 }
101
102 uint16_t interface::hw_queues_count() const {
103   return _dev->hw_queues_count();
104 }
105
106 class C_handle_l2forward : public EventCallback {
107   std::shared_ptr<DPDKDevice> sdev;
108   unsigned &queue_depth;
109   Packet p;
110   unsigned dst;
111
112  public:
113   C_handle_l2forward(std::shared_ptr<DPDKDevice> &p, unsigned &qd, Packet pkt, unsigned target)
114       : sdev(p), queue_depth(qd), p(std::move(pkt)), dst(target) {}
115   void do_request(int fd) {
116     sdev->l2receive(dst, std::move(p));
117     queue_depth--;
118     delete this;
119   }
120 };
121
122 void interface::forward(EventCenter *source, unsigned target, Packet p) {
123   static __thread unsigned queue_depth;
124
125   if (queue_depth < 1000) {
126     queue_depth++;
127     // FIXME: need ensure this event not be called after EventCenter destruct
128     _dev->workers[target]->center.dispatch_event_external(
129         new C_handle_l2forward(_dev, queue_depth, std::move(p.free_on_cpu(source)), target));
130   }
131 }
132
133 int interface::dispatch_packet(EventCenter *center, Packet p) {
134   auto eh = p.get_header<eth_hdr>();
135   if (eh) {
136     auto i = _proto_map.find(ntoh(eh->eth_proto));
137     auto hwrss = p.rss_hash();
138     if (hwrss) {
139       ldout(cct, 10) << __func__ << " === rx === proto " << std::hex << ::ntoh(eh->eth_proto)
140                      << " "<< eh->src_mac.ntoh() << " -> " << eh->dst_mac.ntoh()
141                      << " length " << std::dec << p.len() << " rss_hash " << *p.rss_hash() << dendl;
142     } else {
143       ldout(cct, 10) << __func__ << " === rx === proto " << std::hex << ::ntoh(eh->eth_proto)
144                      << " "<< eh->src_mac.ntoh() << " -> " << eh->dst_mac.ntoh()
145                      << " length " << std::dec << p.len() << dendl;
146     }
147     if (i != _proto_map.end()) {
148       l3_rx_stream& l3 = i->second;
149       auto fw = _dev->forward_dst(center->get_id(), [&p, &l3, this] () {
150         auto hwrss = p.rss_hash();
151         if (hwrss) {
152           return *hwrss;
153         } else {
154           forward_hash data;
155           if (l3.forward(data, p, sizeof(eth_hdr))) {
156             return toeplitz_hash(rss_key(), data);
157           }
158           return 0u;
159         }
160       });
161       if (fw != center->get_id()) {
162         ldout(cct, 1) << __func__ << " forward to " << fw << dendl;
163         forward(center, fw, std::move(p));
164       } else {
165         auto h = eh->ntoh();
166         auto from = h.src_mac;
167         p.trim_front(sizeof(*eh));
168         // avoid chaining, since queue length is unlimited
169         // drop instead.
170         if (l3.ready()) {
171           return l3.packet_stream.produce(std::move(p), from);
172         }
173       }
174     }
175   }
176   return 0;
177 }
178
179 class C_arp_learn : public EventCallback {
180   DPDKWorker *worker;
181   ethernet_address l2_addr;
182   ipv4_address l3_addr;
183
184  public:
185   C_arp_learn(DPDKWorker *w, ethernet_address l2, ipv4_address l3)
186       : worker(w), l2_addr(l2), l3_addr(l3) {}
187   void do_request(int id) {
188     worker->arp_learn(l2_addr, l3_addr);
189     delete this;
190   }
191 };
192
193 void interface::arp_learn(ethernet_address l2, ipv4_address l3)
194 {
195   for (auto &&w : _dev->workers) {
196     w->center.dispatch_event_external(
197         new C_arp_learn(w, l2, l3));
198   }
199 }
200
201 l3_protocol::l3_protocol(interface* netif, eth_protocol_num proto_num, packet_provider_type func)
202     : _netif(netif), _proto_num(proto_num)  {
203   _netif->register_packet_provider(std::move(func));
204 }
205
206 subscription<Packet, ethernet_address> l3_protocol::receive(
207     std::function<int (Packet, ethernet_address)> rx_fn,
208     std::function<bool (forward_hash &h, Packet &p, size_t s)> forward) {
209   return _netif->register_l3(_proto_num, std::move(rx_fn), std::move(forward));
210 };