Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / dpdk / DPDK.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3  * This file is open source software, licensed to you under the terms
4  * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
5  * distributed with this work for additional information regarding copyright
6  * ownership.  You may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 /*
20  * Copyright (C) 2014 Cloudius Systems, Ltd.
21  */
22 /*
23  * Ceph - scalable distributed file system
24  *
25  * Copyright (C) 2015 XSky <haomai@xsky.com>
26  *
27  * Author: Haomai Wang <haomaiwang@gmail.com>
28  *
29  * This is free software; you can redistribute it and/or
30  * modify it under the terms of the GNU Lesser General Public
31  * License version 2.1, as published by the Free Software
32  * Foundation.  See file COPYING.
33  *
34  */
35
36 #ifndef CEPH_DPDK_DEV_H
37 #define CEPH_DPDK_DEV_H
38
39 #include <memory>
40 #include <functional>
41 #include <rte_config.h>
42 #include <rte_common.h>
43 #include <rte_ethdev.h>
44 #include <rte_malloc.h>
45 #include <rte_version.h>
46
47 #include "include/page.h"
48 #include "common/Tub.h"
49 #include "common/perf_counters.h"
50 #include "msg/async/Event.h"
51 #include "const.h"
52 #include "circular_buffer.h"
53 #include "ethernet.h"
54 #include "memory.h"
55 #include "Packet.h"
56 #include "stream.h"
57 #include "net.h"
58 #include "toeplitz.h"
59
60
61 struct free_deleter {
62   void operator()(void* p) { ::free(p); }
63 };
64
65
66 enum {
67   l_dpdk_dev_first = 58800,
68   l_dpdk_dev_rx_mcast,
69   l_dpdk_dev_rx_total_errors,
70   l_dpdk_dev_tx_total_errors,
71   l_dpdk_dev_rx_badcrc_errors,
72   l_dpdk_dev_rx_dropped_errors,
73   l_dpdk_dev_rx_nombuf_errors,
74   l_dpdk_dev_last
75 };
76
77 enum {
78   l_dpdk_qp_first = 58900,
79   l_dpdk_qp_rx_packets,
80   l_dpdk_qp_tx_packets,
81   l_dpdk_qp_rx_bad_checksum_errors,
82   l_dpdk_qp_rx_no_memory_errors,
83   l_dpdk_qp_rx_bytes,
84   l_dpdk_qp_tx_bytes,
85   l_dpdk_qp_rx_last_bunch,
86   l_dpdk_qp_tx_last_bunch,
87   l_dpdk_qp_rx_fragments,
88   l_dpdk_qp_tx_fragments,
89   l_dpdk_qp_rx_copy_ops,
90   l_dpdk_qp_tx_copy_ops,
91   l_dpdk_qp_rx_copy_bytes,
92   l_dpdk_qp_tx_copy_bytes,
93   l_dpdk_qp_rx_linearize_ops,
94   l_dpdk_qp_tx_linearize_ops,
95   l_dpdk_qp_tx_queue_length,
96   l_dpdk_qp_last
97 };
98
99 class DPDKDevice;
100 class DPDKWorker;
101
102 class DPDKQueuePair {
103   using packet_provider_type = std::function<Tub<Packet> ()>;
104  public:
105   void configure_proxies(const std::map<unsigned, float>& cpu_weights);
106   // build REdirection TAble for cpu_weights map: target cpu -> weight
107   void build_sw_reta(const std::map<unsigned, float>& cpu_weights);
108   void proxy_send(Packet p) {
109     _proxy_packetq.push_back(std::move(p));
110   }
111   void register_packet_provider(packet_provider_type func) {
112     _pkt_providers.push_back(std::move(func));
113   }
114   bool poll_tx();
115   friend class DPDKDevice;
116
117   class tx_buf_factory;
118
119   class tx_buf {
120     friend class DPDKQueuePair;
121    public:
122     static tx_buf* me(rte_mbuf* mbuf) {
123       return reinterpret_cast<tx_buf*>(mbuf);
124     }
125
126    private:
127     /**
128      * Checks if the original packet of a given cluster should be linearized
129      * due to HW limitations.
130      *
131      * @param head head of a cluster to check
132      *
133      * @return TRUE if a packet should be linearized.
134      */
135     static bool i40e_should_linearize(rte_mbuf *head);
136
137     /**
138      * Sets the offload info in the head buffer of an rte_mbufs cluster.
139      *
140      * @param p an original packet the cluster is built for
141      * @param qp QP handle
142      * @param head a head of an rte_mbufs cluster
143      */
144     static void set_cluster_offload_info(const Packet& p, const DPDKQueuePair& qp, rte_mbuf* head);
145
146     /**
147      * Creates a tx_buf cluster representing a given packet in a "zero-copy"
148      * way.
149      *
150      * @param p packet to translate
151      * @param qp DPDKQueuePair handle
152      *
153      * @return the HEAD tx_buf of the cluster or nullptr in case of a
154      *         failure
155      */
156     static tx_buf* from_packet_zc(
157             CephContext *cct, Packet&& p, DPDKQueuePair& qp);
158
159     /**
160      * Copy the contents of the "packet" into the given cluster of
161      * rte_mbuf's.
162      *
163      * @note Size of the cluster has to be big enough to accommodate all the
164      *       contents of the given packet.
165      *
166      * @param p packet to copy
167      * @param head head of the rte_mbuf's cluster
168      */
169     static void copy_packet_to_cluster(const Packet& p, rte_mbuf* head);
170
171     /**
172      * Creates a tx_buf cluster representing a given packet in a "copy" way.
173      *
174      * @param p packet to translate
175      * @param qp DPDKQueuePair handle
176      *
177      * @return the HEAD tx_buf of the cluster or nullptr in case of a
178      *         failure
179      */
180     static tx_buf* from_packet_copy(Packet&& p, DPDKQueuePair& qp);
181
182     /**
183      * Zero-copy handling of a single fragment.
184      *
185      * @param do_one_buf Functor responsible for a single rte_mbuf
186      *                   handling
187      * @param qp DPDKQueuePair handle (in)
188      * @param frag Fragment to copy (in)
189      * @param head Head of the cluster (out)
190      * @param last_seg Last segment of the cluster (out)
191      * @param nsegs Number of segments in the cluster (out)
192      *
193      * @return TRUE in case of success
194      */
195     template <class DoOneBufFunc>
196     static bool do_one_frag(DoOneBufFunc do_one_buf, DPDKQueuePair& qp,
197                             fragment& frag, rte_mbuf*& head,
198                             rte_mbuf*& last_seg, unsigned& nsegs) {
199       size_t len, left_to_set = frag.size;
200       char* base = frag.base;
201
202       rte_mbuf* m;
203
204       // TODO: assert() in a fast path! Remove me ASAP!
205       assert(frag.size);
206
207       // Create a HEAD of mbufs' cluster and set the first bytes into it
208       len = do_one_buf(qp, head, base, left_to_set);
209       if (!len) {
210         return false;
211       }
212
213       left_to_set -= len;
214       base += len;
215       nsegs = 1;
216
217       //
218       // Set the rest of the data into the new mbufs and chain them to
219       // the cluster.
220       //
221       rte_mbuf* prev_seg = head;
222       while (left_to_set) {
223         len = do_one_buf(qp, m, base, left_to_set);
224         if (!len) {
225           me(head)->recycle();
226           return false;
227         }
228
229         left_to_set -= len;
230         base += len;
231         nsegs++;
232
233         prev_seg->next = m;
234         prev_seg = m;
235       }
236
237       // Return the last mbuf in the cluster
238       last_seg = prev_seg;
239
240       return true;
241     }
242
243     /**
244      * Zero-copy handling of a single fragment.
245      *
246      * @param qp DPDKQueuePair handle (in)
247      * @param frag Fragment to copy (in)
248      * @param head Head of the cluster (out)
249      * @param last_seg Last segment of the cluster (out)
250      * @param nsegs Number of segments in the cluster (out)
251      *
252      * @return TRUE in case of success
253      */
254     static bool translate_one_frag(DPDKQueuePair& qp, fragment& frag,
255                                    rte_mbuf*& head, rte_mbuf*& last_seg,
256                                    unsigned& nsegs) {
257       return do_one_frag(set_one_data_buf, qp, frag, head,
258                          last_seg, nsegs);
259     }
260
261     /**
262      * Copies one fragment into the cluster of rte_mbuf's.
263      *
264      * @param qp DPDKQueuePair handle (in)
265      * @param frag Fragment to copy (in)
266      * @param head Head of the cluster (out)
267      * @param last_seg Last segment of the cluster (out)
268      * @param nsegs Number of segments in the cluster (out)
269      *
270      * We return the "last_seg" to avoid traversing the cluster in order to get
271      * it.
272      *
273      * @return TRUE in case of success
274      */
275     static bool copy_one_frag(DPDKQueuePair& qp, fragment& frag,
276                               rte_mbuf*& head, rte_mbuf*& last_seg,
277                               unsigned& nsegs) {
278       return do_one_frag(copy_one_data_buf, qp, frag, head,
279                          last_seg, nsegs);
280     }
281
282     /**
283      * Allocates a single rte_mbuf and sets it to point to a given data
284      * buffer.
285      *
286      * @param qp DPDKQueuePair handle (in)
287      * @param m New allocated rte_mbuf (out)
288      * @param va virtual address of a data buffer (in)
289      * @param buf_len length of the data to copy (in)
290      *
291      * @return The actual number of bytes that has been set in the mbuf
292      */
293     static size_t set_one_data_buf(
294         DPDKQueuePair& qp, rte_mbuf*& m, char* va, size_t buf_len) {
295       static constexpr size_t max_frag_len = 15 * 1024; // 15K
296
297       // FIXME: current all tx buf is alloced without rte_malloc
298       return copy_one_data_buf(qp, m, va, buf_len);
299       //
300       // Currently we break a buffer on a 15K boundary because 82599
301       // devices have a 15.5K limitation on a maximum single fragment
302       // size.
303       //
304       phys_addr_t pa = rte_malloc_virt2phy(va);
305       if (!pa)
306         return copy_one_data_buf(qp, m, va, buf_len);
307
308       assert(buf_len);
309       tx_buf* buf = qp.get_tx_buf();
310       if (!buf) {
311         return 0;
312       }
313
314       size_t len = std::min(buf_len, max_frag_len);
315
316       buf->set_zc_info(va, pa, len);
317       m = buf->rte_mbuf_p();
318
319       return len;
320     }
321
322     /**
323      *  Allocates a single rte_mbuf and copies a given data into it.
324      *
325      * @param qp DPDKQueuePair handle (in)
326      * @param m New allocated rte_mbuf (out)
327      * @param data Data to copy from (in)
328      * @param buf_len length of the data to copy (in)
329      *
330      * @return The actual number of bytes that has been copied
331      */
332     static size_t copy_one_data_buf(
333         DPDKQueuePair& qp, rte_mbuf*& m, char* data, size_t buf_len);
334
335     /**
336      * Checks if the first fragment of the given packet satisfies the
337      * zero-copy flow requirement: its first 128 bytes should not cross the
338      * 4K page boundary. This is required in order to avoid splitting packet
339      * headers.
340      *
341      * @param p packet to check
342      *
343      * @return TRUE if packet is ok and FALSE otherwise.
344      */
345     static bool check_frag0(Packet& p)
346     {
347       //
348       // First frag is special - it has headers that should not be split.
349       // If the addressing is such that the first fragment has to be
350       // split, then send this packet in a (non-zero) copy flow. We'll
351       // check if the first 128 bytes of the first fragment reside in the
352       // physically contiguous area. If that's the case - we are good to
353       // go.
354       //
355       if (p.frag(0).size < 128)
356         return false;
357
358       return true;
359     }
360
361    public:
362     tx_buf(tx_buf_factory& fc) : _fc(fc) {
363
364       _buf_physaddr = _mbuf.buf_physaddr;
365       _data_off     = _mbuf.data_off;
366     }
367
368     rte_mbuf* rte_mbuf_p() { return &_mbuf; }
369
370     void set_zc_info(void* va, phys_addr_t pa, size_t len) {
371       // mbuf_put()
372       _mbuf.data_len           = len;
373       _mbuf.pkt_len            = len;
374
375       // Set the mbuf to point to our data
376       _mbuf.buf_addr           = va;
377       _mbuf.buf_physaddr       = pa;
378       _mbuf.data_off           = 0;
379       _is_zc                   = true;
380     }
381
382     void reset_zc() {
383
384       //
385       // If this mbuf was the last in a cluster and contains an
386       // original packet object then call the destructor of the
387       // original packet object.
388       //
389       if (_p) {
390         //
391         // Reset the std::optional. This in particular is going
392         // to call the "packet"'s destructor and reset the
393         // "optional" state to "nonengaged".
394         //
395         _p.destroy();
396
397       } else if (!_is_zc) {
398         return;
399       }
400
401       // Restore the rte_mbuf fields we trashed in set_zc_info()
402       _mbuf.buf_physaddr = _buf_physaddr;
403       _mbuf.buf_addr     = rte_mbuf_to_baddr(&_mbuf);
404       _mbuf.data_off     = _data_off;
405
406       _is_zc             = false;
407     }
408
409     void recycle() {
410       struct rte_mbuf *m = &_mbuf, *m_next;
411
412       while (m != nullptr) {
413         m_next = m->next;
414         rte_pktmbuf_reset(m);
415         _fc.put(me(m));
416         m = m_next;
417       }
418     }
419
420     void set_packet(Packet&& p) {
421       _p = std::move(p);
422     }
423
424    private:
425     struct rte_mbuf _mbuf;
426     MARKER private_start;
427     Tub<Packet> _p;
428     phys_addr_t _buf_physaddr;
429     uint16_t _data_off;
430     // TRUE if underlying mbuf has been used in the zero-copy flow
431     bool _is_zc = false;
432     // buffers' factory the buffer came from
433     tx_buf_factory& _fc;
434     MARKER private_end;
435   };
436
437   class tx_buf_factory {
438     //
439     // Number of buffers to free in each GC iteration:
440     // We want the buffers to be allocated from the mempool as many as
441     // possible.
442     //
443     // On the other hand if there is no Tx for some time we want the
444     // completions to be eventually handled. Thus we choose the smallest
445     // possible packets count number here.
446     //
447     static constexpr int gc_count = 1;
448    public:
449     tx_buf_factory(CephContext *c, DPDKDevice *dev, uint8_t qid);
450     ~tx_buf_factory() {
451       // put all mbuf back into mempool in order to make the next factory work
452       while (gc());
453       rte_mempool_put_bulk(_pool, (void**)_ring.data(),
454                            _ring.size());
455     }
456
457
458     /**
459      * @note Should not be called if there are no free tx_buf's
460      *
461      * @return a free tx_buf object
462      */
463     tx_buf* get() {
464       // Take completed from the HW first
465       tx_buf *pkt = get_one_completed();
466       if (pkt) {
467         pkt->reset_zc();
468         return pkt;
469       }
470
471       //
472       // If there are no completed at the moment - take from the
473       // factory's cache.
474       //
475       if (_ring.empty()) {
476         return nullptr;
477       }
478
479       pkt = _ring.back();
480       _ring.pop_back();
481
482       return pkt;
483     }
484
485     void put(tx_buf* buf) {
486       buf->reset_zc();
487       _ring.push_back(buf);
488     }
489
490     bool gc() {
491       for (int cnt = 0; cnt < gc_count; ++cnt) {
492         auto tx_buf_p = get_one_completed();
493         if (!tx_buf_p) {
494           return false;
495         }
496
497         put(tx_buf_p);
498       }
499
500       return true;
501     }
502    private:
503     /**
504      * Fill the mbufs circular buffer: after this the _pool will become
505      * empty. We will use it to catch the completed buffers:
506      *
507      * - Underlying PMD drivers will "free" the mbufs once they are
508      *   completed.
509      * - We will poll the _pktmbuf_pool_tx till it's empty and release
510      *   all the buffers from the freed mbufs.
511      */
512     void init_factory() {
513       while (rte_mbuf* mbuf = rte_pktmbuf_alloc(_pool)) {
514         _ring.push_back(new(tx_buf::me(mbuf)) tx_buf{*this});
515       }
516     }
517
518     /**
519      * PMD puts the completed buffers back into the mempool they have
520      * originally come from.
521      *
522      * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call
523      *       rte_pktmbuf_reset() here again.
524      *
525      * @return a single tx_buf that has been completed by HW.
526      */
527     tx_buf* get_one_completed() {
528       return tx_buf::me(rte_pktmbuf_alloc(_pool));
529     }
530
531    private:
532     CephContext *cct;
533     std::vector<tx_buf*> _ring;
534     rte_mempool* _pool = nullptr;
535   };
536
537  public:
538   explicit DPDKQueuePair(CephContext *c, EventCenter *cen, DPDKDevice* dev, uint8_t qid);
539   ~DPDKQueuePair() {
540     if (device_stat_time_fd) {
541       center->delete_time_event(device_stat_time_fd);
542     }
543     rx_gc(true);
544   }
545
546   void rx_start() {
547     _rx_poller.construct(this);
548   }
549
550   uint32_t send(circular_buffer<Packet>& pb) {
551     // Zero-copy send
552     return _send(pb, [&] (Packet&& p) {
553       return tx_buf::from_packet_zc(cct, std::move(p), *this);
554     });
555   }
556
557   DPDKDevice& port() const { return *_dev; }
558   tx_buf* get_tx_buf() { return _tx_buf_factory.get(); }
559
560   void handle_stats();
561
562  private:
563   template <class Func>
564   uint32_t _send(circular_buffer<Packet>& pb, Func &&packet_to_tx_buf_p) {
565     if (_tx_burst.size() == 0) {
566       for (auto&& p : pb) {
567         // TODO: assert() in a fast path! Remove me ASAP!
568         assert(p.len());
569
570         tx_buf* buf = packet_to_tx_buf_p(std::move(p));
571         if (!buf) {
572           break;
573         }
574
575         _tx_burst.push_back(buf->rte_mbuf_p());
576       }
577     }
578
579     uint16_t sent = rte_eth_tx_burst(_dev_port_idx, _qid,
580                                      _tx_burst.data() + _tx_burst_idx,
581                                      _tx_burst.size() - _tx_burst_idx);
582
583     uint64_t nr_frags = 0, bytes = 0;
584
585     for (int i = 0; i < sent; i++) {
586       rte_mbuf* m = _tx_burst[_tx_burst_idx + i];
587       bytes    += m->pkt_len;
588       nr_frags += m->nb_segs;
589       pb.pop_front();
590     }
591
592     perf_logger->inc(l_dpdk_qp_tx_fragments, nr_frags);
593     perf_logger->inc(l_dpdk_qp_tx_bytes, bytes);
594
595     _tx_burst_idx += sent;
596
597     if (_tx_burst_idx == _tx_burst.size()) {
598       _tx_burst_idx = 0;
599       _tx_burst.clear();
600     }
601
602     return sent;
603   }
604
605   /**
606    * Allocate a new data buffer and set the mbuf to point to it.
607    *
608    * Do some DPDK hacks to work on PMD: it assumes that the buf_addr
609    * points to the private data of RTE_PKTMBUF_HEADROOM before the actual
610    * data buffer.
611    *
612    * @param m mbuf to update
613    */
614   static bool refill_rx_mbuf(rte_mbuf* m, size_t size,
615                              std::vector<void*> &datas) {
616     if (datas.empty())
617       return false;
618     void *data = datas.back();
619     datas.pop_back();
620
621     //
622     // Set the mbuf to point to our data.
623     //
624     // Do some DPDK hacks to work on PMD: it assumes that the buf_addr
625     // points to the private data of RTE_PKTMBUF_HEADROOM before the
626     // actual data buffer.
627     //
628     m->buf_addr      = (char*)data - RTE_PKTMBUF_HEADROOM;
629     m->buf_physaddr  = rte_malloc_virt2phy(data) - RTE_PKTMBUF_HEADROOM;
630     return true;
631   }
632
633   static bool init_noninline_rx_mbuf(rte_mbuf* m, size_t size,
634                                      std::vector<void*> &datas) {
635     if (!refill_rx_mbuf(m, size, datas)) {
636       return false;
637     }
638     // The below fields stay constant during the execution.
639     m->buf_len       = size + RTE_PKTMBUF_HEADROOM;
640     m->data_off      = RTE_PKTMBUF_HEADROOM;
641     return true;
642   }
643
644   bool init_rx_mbuf_pool();
645   bool rx_gc(bool force=false);
646   bool refill_one_cluster(rte_mbuf* head);
647
648   /**
649    * Polls for a burst of incoming packets. This function will not block and
650    * will immediately return after processing all available packets.
651    *
652    */
653   bool poll_rx_once();
654
655   /**
656    * Translates an rte_mbuf's into packet and feeds them to _rx_stream.
657    *
658    * @param bufs An array of received rte_mbuf's
659    * @param count Number of buffers in the bufs[]
660    */
661   void process_packets(struct rte_mbuf **bufs, uint16_t count);
662
663   /**
664    * Translate rte_mbuf into the "packet".
665    * @param m mbuf to translate
666    *
667    * @return a "optional" object representing the newly received data if in an
668    *         "engaged" state or an error if in a "disengaged" state.
669    */
670   Tub<Packet> from_mbuf(rte_mbuf* m);
671
672   /**
673    * Transform an LRO rte_mbuf cluster into the "packet" object.
674    * @param m HEAD of the mbufs' cluster to transform
675    *
676    * @return a "optional" object representing the newly received LRO packet if
677    *         in an "engaged" state or an error if in a "disengaged" state.
678    */
679   Tub<Packet> from_mbuf_lro(rte_mbuf* m);
680
681  private:
682   CephContext *cct;
683   std::vector<packet_provider_type> _pkt_providers;
684   Tub<std::array<uint8_t, 128>> _sw_reta;
685   circular_buffer<Packet> _proxy_packetq;
686   stream<Packet> _rx_stream;
687   circular_buffer<Packet> _tx_packetq;
688   std::vector<void*> _alloc_bufs;
689
690   PerfCounters *perf_logger;
691   DPDKDevice* _dev;
692   uint8_t _dev_port_idx;
693   EventCenter *center;
694   uint8_t _qid;
695   rte_mempool *_pktmbuf_pool_rx;
696   std::vector<rte_mbuf*> _rx_free_pkts;
697   std::vector<rte_mbuf*> _rx_free_bufs;
698   std::vector<fragment> _frags;
699   std::vector<char*> _bufs;
700   size_t _num_rx_free_segs = 0;
701   uint64_t device_stat_time_fd = 0;
702
703 #ifdef CEPH_PERF_DEV
704   uint64_t rx_cycles = 0;
705   uint64_t rx_count = 0;
706   uint64_t tx_cycles = 0;
707   uint64_t tx_count = 0;
708 #endif
709
710   class DPDKTXPoller : public EventCenter::Poller {
711     DPDKQueuePair *qp;
712
713    public:
714     explicit DPDKTXPoller(DPDKQueuePair *qp)
715         : EventCenter::Poller(qp->center, "DPDK::DPDKTXPoller"), qp(qp) {}
716
717     virtual int poll() {
718       return qp->poll_tx();
719     }
720   } _tx_poller;
721
722   class DPDKRXGCPoller : public EventCenter::Poller {
723     DPDKQueuePair *qp;
724
725    public:
726     explicit DPDKRXGCPoller(DPDKQueuePair *qp)
727         : EventCenter::Poller(qp->center, "DPDK::DPDKRXGCPoller"), qp(qp) {}
728
729     virtual int poll() {
730       return qp->rx_gc();
731     }
732   } _rx_gc_poller;
733   tx_buf_factory _tx_buf_factory;
734   class DPDKRXPoller : public EventCenter::Poller {
735     DPDKQueuePair *qp;
736
737    public:
738     explicit DPDKRXPoller(DPDKQueuePair *qp)
739         : EventCenter::Poller(qp->center, "DPDK::DPDKRXPoller"), qp(qp) {}
740
741     virtual int poll() {
742       return qp->poll_rx_once();
743     }
744   };
745   Tub<DPDKRXPoller> _rx_poller;
746   class DPDKTXGCPoller : public EventCenter::Poller {
747     DPDKQueuePair *qp;
748
749    public:
750     explicit DPDKTXGCPoller(DPDKQueuePair *qp)
751         : EventCenter::Poller(qp->center, "DPDK::DPDKTXGCPoller"), qp(qp) {}
752
753     virtual int poll() {
754       return qp->_tx_buf_factory.gc();
755     }
756   } _tx_gc_poller;
757   std::vector<rte_mbuf*> _tx_burst;
758   uint16_t _tx_burst_idx = 0;
759 };
760
761 class DPDKDevice {
762  public:
763   CephContext *cct;
764   PerfCounters *perf_logger;
765   std::vector<std::unique_ptr<DPDKQueuePair>> _queues;
766   std::vector<DPDKWorker*> workers;
767   size_t _rss_table_bits = 0;
768   uint8_t _port_idx;
769   uint16_t _num_queues;
770   unsigned cores;
771   hw_features _hw_features;
772   uint8_t _queues_ready = 0;
773   unsigned _home_cpu;
774   bool _use_lro;
775   bool _enable_fc;
776   std::vector<uint8_t> _redir_table;
777   rss_key_type _rss_key;
778   bool _is_i40e_device = false;
779   bool _is_vmxnet3_device = false;
780
781  public:
782   rte_eth_dev_info _dev_info = {};
783
784   /**
785    * The final stage of a port initialization.
786    * @note Must be called *after* all queues from stage (2) have been
787    *       initialized.
788    */
789   int init_port_fini();
790
791  private:
792   /**
793    * Port initialization consists of 3 main stages:
794    * 1) General port initialization which ends with a call to
795    *    rte_eth_dev_configure() where we request the needed number of Rx and
796    *    Tx queues.
797    * 2) Individual queues initialization. This is done in the constructor of
798    *    DPDKQueuePair class. In particular the memory pools for queues are allocated
799    *    in this stage.
800    * 3) The final stage of the initialization which starts with the call of
801    *    rte_eth_dev_start() after which the port becomes fully functional. We
802    *    will also wait for a link to get up in this stage.
803    */
804
805
806   /**
807    * First stage of the port initialization.
808    *
809    * @return 0 in case of success and an appropriate error code in case of an
810    *         error.
811    */
812   int init_port_start();
813
814   /**
815    * Check the link status of out port in up to 9s, and print them finally.
816    */
817   int check_port_link_status();
818
819   /**
820    * Configures the HW Flow Control
821    */
822   void set_hw_flow_control();
823
824  public:
825   DPDKDevice(CephContext *c, uint8_t port_idx, uint16_t num_queues, bool use_lro, bool enable_fc):
826       cct(c), _port_idx(port_idx), _num_queues(num_queues),
827       _home_cpu(0), _use_lro(use_lro),
828       _enable_fc(enable_fc) {
829     _queues = std::vector<std::unique_ptr<DPDKQueuePair>>(_num_queues);
830     /* now initialise the port we will use */
831     int ret = init_port_start();
832     if (ret != 0) {
833       rte_exit(EXIT_FAILURE, "Cannot initialise port %u\n", _port_idx);
834     }
835     string name(std::string("port") + std::to_string(port_idx));
836     PerfCountersBuilder plb(cct, name, l_dpdk_dev_first, l_dpdk_dev_last);
837
838     plb.add_u64_counter(l_dpdk_dev_rx_mcast, "dpdk_device_receive_multicast_packets", "DPDK received multicast packets");
839     plb.add_u64_counter(l_dpdk_dev_rx_badcrc_errors, "dpdk_device_receive_badcrc_errors", "DPDK received bad crc errors");
840
841     plb.add_u64_counter(l_dpdk_dev_rx_total_errors, "dpdk_device_receive_total_errors", "DPDK received total_errors");
842     plb.add_u64_counter(l_dpdk_dev_tx_total_errors, "dpdk_device_send_total_errors", "DPDK sendd total_errors");
843     plb.add_u64_counter(l_dpdk_dev_rx_dropped_errors, "dpdk_device_receive_dropped_errors", "DPDK received dropped errors");
844     plb.add_u64_counter(l_dpdk_dev_rx_nombuf_errors, "dpdk_device_receive_nombuf_errors", "DPDK received RX mbuf allocation errors");
845
846     perf_logger = plb.create_perf_counters();
847     cct->get_perfcounters_collection()->add(perf_logger);
848   }
849
850   ~DPDKDevice() {
851     rte_eth_dev_stop(_port_idx);
852   }
853
854   DPDKQueuePair& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; }
855   void l2receive(int qid, Packet p) {
856     _queues[qid]->_rx_stream.produce(std::move(p));
857   }
858   subscription<Packet> receive(unsigned cpuid, std::function<int (Packet)> next_packet) {
859     auto sub = _queues[cpuid]->_rx_stream.listen(std::move(next_packet));
860     _queues[cpuid]->rx_start();
861     return std::move(sub);
862   }
863   ethernet_address hw_address() {
864     struct ether_addr mac;
865     rte_eth_macaddr_get(_port_idx, &mac);
866
867     return mac.addr_bytes;
868   }
869   hw_features get_hw_features() {
870     return _hw_features;
871   }
872   const rss_key_type& rss_key() const { return _rss_key; }
873   uint16_t hw_queues_count() { return _num_queues; }
874   std::unique_ptr<DPDKQueuePair> init_local_queue(CephContext *c, EventCenter *center, string hugepages, uint16_t qid) {
875     std::unique_ptr<DPDKQueuePair> qp;
876     qp = std::unique_ptr<DPDKQueuePair>(new DPDKQueuePair(c, center, this, qid));
877     return std::move(qp);
878   }
879   unsigned hash2qid(uint32_t hash) {
880     // return hash % hw_queues_count();
881     return _redir_table[hash & (_redir_table.size() - 1)];
882   }
883   void set_local_queue(unsigned i, std::unique_ptr<DPDKQueuePair> qp) {
884     assert(!_queues[i]);
885     _queues[i] = std::move(qp);
886   }
887   void unset_local_queue(unsigned i) {
888     assert(_queues[i]);
889     _queues[i].reset();
890   }
891   template <typename Func>
892   unsigned forward_dst(unsigned src_cpuid, Func&& hashfn) {
893     auto& qp = queue_for_cpu(src_cpuid);
894     if (!qp._sw_reta)
895       return src_cpuid;
896
897     assert(!qp._sw_reta);
898     auto hash = hashfn() >> _rss_table_bits;
899     auto& reta = *qp._sw_reta;
900     return reta[hash % reta.size()];
901   }
902   unsigned hash2cpu(uint32_t hash) {
903     // there is an assumption here that qid == get_id() which will
904     // not necessary be true in the future
905     return forward_dst(hash2qid(hash), [hash] { return hash; });
906   }
907
908   hw_features& hw_features_ref() { return _hw_features; }
909
910   const rte_eth_rxconf* def_rx_conf() const {
911     return &_dev_info.default_rxconf;
912   }
913
914   const rte_eth_txconf* def_tx_conf() const {
915     return &_dev_info.default_txconf;
916   }
917
918   /**
919    *  Set the RSS table in the device and store it in the internal vector.
920    */
921   void set_rss_table();
922
923   uint8_t port_idx() { return _port_idx; }
924   bool is_i40e_device() const {
925     return _is_i40e_device;
926   }
927   bool is_vmxnet3_device() const {
928     return _is_vmxnet3_device;
929   }
930 };
931
932
933 std::unique_ptr<DPDKDevice> create_dpdk_net_device(
934     CephContext *c, unsigned cores, uint8_t port_idx = 0,
935     bool use_lro = true, bool enable_fc = true);
936
937
938 /**
939  * @return Number of bytes needed for mempool objects of each QP.
940  */
941 uint32_t qp_mempool_obj_size();
942
943 #endif // CEPH_DPDK_DEV_H