Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / dpdk / IP.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 /*
3  * This file is open source software, licensed to you under the terms
4  * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
5  * distributed with this work for additional information regarding copyright
6  * ownership.  You may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 /*
20  * Copyright (C) 2014 Cloudius Systems, Ltd.
21  *
22  */
23 /*
24  * Ceph - scalable distributed file system
25  *
26  * Copyright (C) 2015 XSky <haomai@xsky.com>
27  *
28  * Author: Haomai Wang <haomaiwang@gmail.com>
29  *
30  * This is free software; you can redistribute it and/or
31  * modify it under the terms of the GNU Lesser General Public
32  * License version 2.1, as published by the Free Software
33  * Foundation.  See file COPYING.
34  *
35  */
36
37 #include "common/perf_counters.h"
38
39 #include "capture.h"
40 #include "IP.h"
41 #include "shared_ptr.h"
42 #include "toeplitz.h"
43
44 #include "common/dout.h"
45 #include "include/assert.h"
46
47 #define dout_subsys ceph_subsys_dpdk
48 #undef dout_prefix
49 #define dout_prefix *_dout << "dpdk "
50
51 std::ostream& operator<<(std::ostream& os, const ipv4_address& a) {
52   auto ip = a.ip;
53   return os << ((ip >> 24) & 0xff) << "." << ((ip >> 16) & 0xff)
54             << "." << ((ip >> 8) & 0xff) << "." << ((ip >> 0) & 0xff);
55 }
56
57 utime_t ipv4::_frag_timeout = utime_t(30, 0);
58 constexpr uint32_t ipv4::_frag_low_thresh;
59 constexpr uint32_t ipv4::_frag_high_thresh;
60
61 class C_handle_frag_timeout : public EventCallback {
62   ipv4 *_ipv4;
63
64  public:
65   C_handle_frag_timeout(ipv4 *i): _ipv4(i) {}
66   void do_request(int fd_or_id) {
67     _ipv4->frag_timeout();
68   }
69 };
70
71 enum {
72   l_dpdk_qp_first = 99000,
73   l_dpdk_total_linearize_operations,
74   l_dpdk_qp_last
75 };
76
77 ipv4::ipv4(CephContext *c, EventCenter *cen, interface* netif)
78   : cct(c), center(cen), _netif(netif), _global_arp(netif),
79     _arp(c, _global_arp, cen),
80     _host_address(0), _gw_address(0), _netmask(0),
81     _l3(netif, eth_protocol_num::ipv4, [this] { return get_packet(); }),
82     _rx_packets(
83       _l3.receive(
84         [this] (Packet p, ethernet_address ea) {
85           return handle_received_packet(std::move(p), ea);
86         },
87         [this] (forward_hash& out_hash_data, Packet& p, size_t off) {
88           return forward(out_hash_data, p, off);
89         }
90       )
91     ),
92     _tcp(*this, cen), _icmp(c, *this),
93     _l4({{ uint8_t(ip_protocol_num::tcp), &_tcp },
94          { uint8_t(ip_protocol_num::icmp), &_icmp }}),
95     _packet_filter(nullptr)
96 {
97   PerfCountersBuilder plb(cct, "ipv4", l_dpdk_qp_first, l_dpdk_qp_last);
98   plb.add_u64_counter(l_dpdk_total_linearize_operations, "dpdk_ip_linearize_operations", "DPDK IP Packet linearization operations");
99   perf_logger = plb.create_perf_counters();
100   cct->get_perfcounters_collection()->add(perf_logger);
101   frag_handler = new C_handle_frag_timeout(this);
102 }
103
104 bool ipv4::forward(forward_hash& out_hash_data, Packet& p, size_t off)
105 {
106   auto iph = p.get_header<ip_hdr>(off);
107
108   out_hash_data.push_back(iph->src_ip.ip);
109   out_hash_data.push_back(iph->dst_ip.ip);
110
111   auto h = iph->ntoh();
112   auto l4 = _l4[h.ip_proto];
113   if (l4) {
114     if (h.mf() == false && h.offset() == 0) {
115       // This IP datagram is atomic, forward according to tcp connection hash
116       l4->forward(out_hash_data, p, off + sizeof(ip_hdr));
117     }
118     // else forward according to ip fields only
119   }
120   return true;
121 }
122
123 int ipv4::handle_received_packet(Packet p, ethernet_address from)
124 {
125   auto iph = p.get_header<ip_hdr>(0);
126   if (!iph) {
127     return 0;
128   }
129
130   // Skip checking csum of reassembled IP datagram
131   if (!get_hw_features().rx_csum_offload && !p.offload_info_ref().reassembled) {
132     checksummer csum;
133     csum.sum(reinterpret_cast<char*>(iph), sizeof(*iph));
134     if (csum.get() != 0) {
135       return 0;
136     }
137   }
138
139   auto h = iph->ntoh();
140   unsigned ip_len = h.len;
141   unsigned ip_hdr_len = h.ihl * 4;
142   unsigned pkt_len = p.len();
143   auto offset = h.offset();
144
145   ldout(cct, 10) << __func__ << " get " << std::hex << int(h.ip_proto)
146                  << std::dec << " packet from "
147                  << h.src_ip << " -> " << h.dst_ip << " id=" << h.id
148                  << " ip_len=" << ip_len << " ip_hdr_len=" << ip_hdr_len
149                  << " pkt_len=" << pkt_len << " offset=" << offset << dendl;
150
151   if (pkt_len > ip_len) {
152     // Trim extra data in the packet beyond IP total length
153     p.trim_back(pkt_len - ip_len);
154   } else if (pkt_len < ip_len) {
155     // Drop if it contains less than IP total length
156     return 0;
157   }
158   // Drop if the reassembled datagram will be larger than maximum IP size
159   if (offset + p.len() > ip_packet_len_max) {
160     return 0;
161   }
162
163   // FIXME: process options
164   if (in_my_netmask(h.src_ip) && h.src_ip != _host_address) {
165     ldout(cct, 20) << __func__ << " learn mac " << from << " with " << h.src_ip << dendl;
166     _arp.learn(from, h.src_ip);
167   }
168
169   if (_packet_filter) {
170     bool handled = false;
171     _packet_filter->handle(p, &h, from, handled);
172     if (handled) {
173       return 0;
174     }
175   }
176
177   if (h.dst_ip != _host_address) {
178     // FIXME: forward
179     return 0;
180   }
181
182   // Does this IP datagram need reassembly
183   auto mf = h.mf();
184   if (mf == true || offset != 0) {
185     frag_limit_mem();
186     auto frag_id = ipv4_frag_id{h.src_ip, h.dst_ip, h.id, h.ip_proto};
187     auto& frag = _frags[frag_id];
188     if (mf == false) {
189       frag.last_frag_received = true;
190     }
191     // This is a newly created frag_id
192     if (frag.mem_size == 0) {
193       _frags_age.push_back(frag_id);
194       frag.rx_time = ceph_clock_now();
195     }
196     auto added_size = frag.merge(h, offset, std::move(p));
197     _frag_mem += added_size;
198     if (frag.is_complete()) {
199       // All the fragments are received
200       auto dropped_size = frag.mem_size;
201       auto& ip_data = frag.data.map.begin()->second;
202       // Choose a cpu to forward this packet
203       auto cpu_id = center->get_id();
204       auto l4 = _l4[h.ip_proto];
205       if (l4) {
206         size_t l4_offset = 0;
207         forward_hash hash_data;
208         hash_data.push_back(hton(h.src_ip.ip));
209         hash_data.push_back(hton(h.dst_ip.ip));
210         l4->forward(hash_data, ip_data, l4_offset);
211         cpu_id = _netif->hash2cpu(toeplitz_hash(_netif->rss_key(), hash_data));
212       }
213
214       // No need to forward if the dst cpu is the current cpu
215       if (cpu_id == center->get_id()) {
216         l4->received(std::move(ip_data), h.src_ip, h.dst_ip);
217       } else {
218         auto to = _netif->hw_address();
219         auto pkt = frag.get_assembled_packet(from, to);
220         _netif->forward(center, cpu_id, std::move(pkt));
221       }
222
223       // Delete this frag from _frags and _frags_age
224       frag_drop(frag_id, dropped_size);
225       _frags_age.remove(frag_id);
226       perf_logger->set(l_dpdk_total_linearize_operations,
227                        ipv4_packet_merger::linearizations());
228     } else {
229       // Some of the fragments are missing
230       if (frag_timefd) {
231         frag_arm();
232       }
233     }
234     return 0;
235   }
236
237   auto l4 = _l4[h.ip_proto];
238   if (l4) {
239     // Trim IP header and pass to upper layer
240     p.trim_front(ip_hdr_len);
241     l4->received(std::move(p), h.src_ip, h.dst_ip);
242   }
243   return 0;
244 }
245
246 void ipv4::wait_l2_dst_address(ipv4_address to, Packet p, resolution_cb cb) {
247   // Figure out where to send the packet to. If it is a directly connected
248   // host, send to it directly, otherwise send to the default gateway.
249   ipv4_address dst;
250   if (in_my_netmask(to)) {
251     dst = to;
252   } else {
253     dst = _gw_address;
254   }
255
256   _arp.wait(std::move(dst), std::move(p), std::move(cb));
257 }
258
259 const hw_features& ipv4::get_hw_features() const
260 {
261   return _netif->get_hw_features();
262 }
263
264 void ipv4::send(ipv4_address to, ip_protocol_num proto_num,
265         Packet p, ethernet_address e_dst) {
266   auto needs_frag = this->needs_frag(p, proto_num, get_hw_features());
267
268   auto send_pkt = [this, to, proto_num, needs_frag, e_dst] (Packet& pkt, uint16_t remaining, uint16_t offset) mutable  {
269     static uint16_t id = 0;
270     auto iph = pkt.prepend_header<ip_hdr>();
271     iph->ihl = sizeof(*iph) / 4;
272     iph->ver = 4;
273     iph->dscp = 0;
274     iph->ecn = 0;
275     iph->len = pkt.len();
276     // FIXME: a proper id
277     iph->id = id++;
278     if (needs_frag) {
279       uint16_t mf = remaining > 0;
280       // The fragment offset is measured in units of 8 octets (64 bits)
281       auto off = offset / 8;
282       iph->frag = (mf << uint8_t(ip_hdr::frag_bits::mf)) | off;
283     } else {
284       iph->frag = 0;
285     }
286     iph->ttl = 64;
287     iph->ip_proto = (uint8_t)proto_num;
288     iph->csum = 0;
289     iph->src_ip = _host_address;
290     iph->dst_ip = to;
291     ldout(cct, 20) << " ipv4::send " << " id=" << iph->id << " " << _host_address << " -> " << to
292                    << " len " << pkt.len() << dendl;
293     *iph = iph->hton();
294
295     if (get_hw_features().tx_csum_ip_offload) {
296       iph->csum = 0;
297       pkt.offload_info_ref().needs_ip_csum = true;
298     } else {
299       checksummer csum;
300       csum.sum(reinterpret_cast<char*>(iph), sizeof(*iph));
301       iph->csum = csum.get();
302     }
303
304     _packetq.push_back(
305             l3_protocol::l3packet{eth_protocol_num::ipv4, e_dst, std::move(pkt)});
306   };
307
308   if (needs_frag) {
309     uint16_t offset = 0;
310     uint16_t remaining = p.len();
311     auto mtu = get_hw_features().mtu;
312
313     while (remaining) {
314       auto can_send = std::min(uint16_t(mtu - ipv4_hdr_len_min), remaining);
315       remaining -= can_send;
316       auto pkt = p.share(offset, can_send);
317       send_pkt(pkt, remaining, offset);
318       offset += can_send;
319     }
320   } else {
321     // The whole packet can be send in one shot
322     send_pkt(p, 0, 0);
323   }
324 }
325
326 Tub<l3_protocol::l3packet> ipv4::get_packet() {
327   // _packetq will be mostly empty here unless it hold remnants of previously
328   // fragmented packet
329   if (_packetq.empty()) {
330     for (size_t i = 0; i < _pkt_providers.size(); i++) {
331       auto l4p = _pkt_providers[_pkt_provider_idx++]();
332       if (_pkt_provider_idx == _pkt_providers.size()) {
333         _pkt_provider_idx = 0;
334       }
335       if (l4p) {
336         ldout(cct, 20) << " ipv4::get_packet len " << l4p->p.len() << dendl;
337         send(l4p->to, l4p->proto_num, std::move(l4p->p), l4p->e_dst);
338         break;
339       }
340     }
341   }
342
343   Tub<l3_protocol::l3packet> p;
344   if (!_packetq.empty()) {
345     p = std::move(_packetq.front());
346     _packetq.pop_front();
347   }
348   return p;
349 }
350
351 void ipv4::frag_limit_mem() {
352   if (_frag_mem <= _frag_high_thresh) {
353     return;
354   }
355   auto drop = _frag_mem - _frag_low_thresh;
356   while (drop) {
357     if (_frags_age.empty()) {
358       return;
359     }
360     // Drop the oldest frag (first element) from _frags_age
361     auto frag_id = _frags_age.front();
362     _frags_age.pop_front();
363
364     // Drop from _frags as well
365     auto& frag = _frags[frag_id];
366     auto dropped_size = frag.mem_size;
367     frag_drop(frag_id, dropped_size);
368
369     drop -= std::min(drop, dropped_size);
370   }
371 }
372
373 void ipv4::frag_timeout() {
374   if (_frags.empty()) {
375     return;
376   }
377   auto now = ceph_clock_now();
378   for (auto it = _frags_age.begin(); it != _frags_age.end();) {
379     auto frag_id = *it;
380     auto& frag = _frags[frag_id];
381     if (now > frag.rx_time + _frag_timeout) {
382       auto dropped_size = frag.mem_size;
383       // Drop from _frags
384       frag_drop(frag_id, dropped_size);
385       // Drop from _frags_age
386       it = _frags_age.erase(it);
387     } else {
388       // The further items can only be younger
389       break;
390     }
391   }
392   if (_frags.size() != 0) {
393     frag_arm(now);
394   } else {
395     _frag_mem = 0;
396   }
397 }
398
399 int32_t ipv4::frag::merge(ip_hdr &h, uint16_t offset, Packet p) {
400   uint32_t old = mem_size;
401   unsigned ip_hdr_len = h.ihl * 4;
402   // Store IP header
403   if (offset == 0) {
404     header = p.share(0, ip_hdr_len);
405   }
406   // Sotre IP payload
407   p.trim_front(ip_hdr_len);
408   data.merge(offset, std::move(p));
409   // Update mem size
410   mem_size = header.memory();
411   for (const auto& x : data.map) {
412     mem_size += x.second.memory();
413   }
414   auto added_size = mem_size - old;
415   return added_size;
416 }
417
418 bool ipv4::frag::is_complete() {
419   // If all the fragments are received, ipv4::frag::merge() should merge all
420   // the fragments into a single packet
421   auto offset = data.map.begin()->first;
422   auto nr_packet = data.map.size();
423   return last_frag_received && nr_packet == 1 && offset == 0;
424 }
425
426 Packet ipv4::frag::get_assembled_packet(ethernet_address from, ethernet_address to) {
427   auto& ip_header = header;
428   auto& ip_data = data.map.begin()->second;
429   // Append a ethernet header, needed for forwarding
430   auto eh = ip_header.prepend_header<eth_hdr>();
431   eh->src_mac = from;
432   eh->dst_mac = to;
433   eh->eth_proto = uint16_t(eth_protocol_num::ipv4);
434   *eh = eh->hton();
435   // Prepare a packet contains both ethernet header, ip header and ip data
436   ip_header.append(std::move(ip_data));
437   auto pkt = std::move(ip_header);
438   auto iph = pkt.get_header<ip_hdr>(sizeof(eth_hdr));
439   // len is the sum of each fragment
440   iph->len = hton(uint16_t(pkt.len() - sizeof(eth_hdr)));
441   // No fragmentation for the assembled datagram
442   iph->frag = 0;
443   // Since each fragment's csum is checked, no need to csum
444   // again for the assembled datagram
445   offload_info oi;
446   oi.reassembled = true;
447   pkt.set_offload_info(oi);
448   return pkt;
449 }
450
451 void icmp::received(Packet p, ipaddr from, ipaddr to) {
452   auto hdr = p.get_header<icmp_hdr>(0);
453   if (!hdr || hdr->type != icmp_hdr::msg_type::echo_request) {
454     return;
455   }
456   hdr->type = icmp_hdr::msg_type::echo_reply;
457   hdr->code = 0;
458   hdr->csum = 0;
459   checksummer csum;
460   csum.sum(reinterpret_cast<char*>(hdr), p.len());
461   hdr->csum = csum.get();
462
463   if (_queue_space.get_or_fail(p.len())) { // drop packets that do not fit the queue
464     auto cb = [this, from] (const ethernet_address e_dst, Packet p, int r) mutable {
465         if (r == 0) {
466           _packetq.emplace_back(ipv4_traits::l4packet{from, std::move(p), e_dst, ip_protocol_num::icmp});
467         }
468     };
469     _inet.wait_l2_dst_address(from, std::move(p), cb);
470   }
471 }