X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Fdpdk%2FDPDKStack.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Fdpdk%2FDPDKStack.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=af5a5fd2400224a8d8dd8ddbdfe5de7eac748eae;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/dpdk/DPDKStack.h b/src/ceph/src/msg/async/dpdk/DPDKStack.h deleted file mode 100644 index af5a5fd..0000000 --- a/src/ceph/src/msg/async/dpdk/DPDKStack.h +++ /dev/null @@ -1,258 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 XSky - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ -#ifndef CEPH_MSG_DPDKSTACK_H -#define CEPH_MSG_DPDKSTACK_H - -#include - -#include "common/ceph_context.h" -#include "common/Tub.h" - -#include "msg/async/Stack.h" -#include "dpdk_rte.h" -#include "DPDK.h" -#include "net.h" -#include "const.h" -#include "IP.h" -#include "Packet.h" - -class interface; - -template -class NativeConnectedSocketImpl; - -// DPDKServerSocketImpl -template -class DPDKServerSocketImpl : public ServerSocketImpl { - typename Protocol::listener _listener; - public: - DPDKServerSocketImpl(Protocol& proto, uint16_t port, const SocketOptions &opt); - int listen() { - return _listener.listen(); - } - virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; - virtual void abort_accept() override; - virtual int fd() const override { - return _listener.fd(); - } -}; - -// NativeConnectedSocketImpl -template -class NativeConnectedSocketImpl : public ConnectedSocketImpl { - typename Protocol::connection _conn; - uint32_t _cur_frag = 0; - uint32_t _cur_off = 0; - Tub _buf; - Tub _cache_ptr; - - public: - explicit NativeConnectedSocketImpl(typename Protocol::connection conn) - : _conn(std::move(conn)) {} - NativeConnectedSocketImpl(NativeConnectedSocketImpl &&rhs) - : _conn(std::move(rhs._conn)), _buf(std::move(rhs.buf)) {} - virtual int is_connected() override { - return _conn.is_connected(); - } - - virtual ssize_t read(char *buf, size_t len) override { - size_t left = len; - ssize_t r = 0; - size_t off = 0; - while (left > 0) { - if (!_cache_ptr) { - _cache_ptr.construct(); - r = zero_copy_read(*_cache_ptr); - if (r <= 0) { - _cache_ptr.destroy(); - if (r == -EAGAIN) - break; - return r; - } - } - if (_cache_ptr->length() <= left) { - _cache_ptr->copy_out(0, _cache_ptr->length(), buf+off); - left -= _cache_ptr->length(); - off += _cache_ptr->length(); - _cache_ptr.destroy(); - } else { - _cache_ptr->copy_out(0, left, buf+off); - _cache_ptr->set_offset(_cache_ptr->offset() + left); - _cache_ptr->set_length(_cache_ptr->length() - left); - left = 0; - break; - } - } - return len - left ? len - left : -EAGAIN; - } - - virtual ssize_t zero_copy_read(bufferptr &data) override { - auto err = _conn.get_errno(); - if (err <= 0) - return err; - - if (!_buf) { - _buf = std::move(_conn.read()); - if (!_buf) - return -EAGAIN; - } - - fragment &f = _buf->frag(_cur_frag); - Packet p = _buf->share(_cur_off, f.size); - auto del = std::bind( - [](Packet &p) {}, std::move(p)); - data = buffer::claim_buffer( - f.size, f.base, make_deleter(std::move(del))); - if (++_cur_frag == _buf->nr_frags()) { - _cur_frag = 0; - _cur_off = 0; - _buf.destroy(); - } else { - _cur_off += f.size; - } - assert(data.length()); - return data.length(); - } - virtual ssize_t send(bufferlist &bl, bool more) override { - auto err = _conn.get_errno(); - if (err < 0) - return (ssize_t)err; - - size_t available = _conn.peek_sent_available(); - if (available == 0) { - return 0; - } - - std::vector frags; - std::list::const_iterator pb = bl.buffers().begin(); - uint64_t left_pbrs = bl.buffers().size(); - uint64_t len = 0; - uint64_t seglen = 0; - while (len < available && left_pbrs--) { - seglen = pb->length(); - if (len + seglen > available) { - // don't continue if we enough at least 1 fragment since no available - // space for next ptr. - if (len > 0) - break; - seglen = MIN(seglen, available); - } - len += seglen; - frags.push_back(fragment{(char*)pb->c_str(), seglen}); - ++pb; - } - - if (len != bl.length()) { - bufferlist swapped; - bl.splice(0, len, &swapped); - auto del = std::bind( - [](bufferlist &bl) {}, std::move(swapped)); - return _conn.send(Packet(std::move(frags), make_deleter(std::move(del)))); - } else { - auto del = std::bind( - [](bufferlist &bl) {}, std::move(bl)); - - return _conn.send(Packet(std::move(frags), make_deleter(std::move(del)))); - } - } - virtual void shutdown() override { - _conn.close_write(); - } - // FIXME need to impl close - virtual void close() override { - _conn.close_write(); - } - virtual int fd() const override { - return _conn.fd(); - } -}; - -template -DPDKServerSocketImpl::DPDKServerSocketImpl( - Protocol& proto, uint16_t port, const SocketOptions &opt) - : _listener(proto.listen(port)) {} - -template -int DPDKServerSocketImpl::accept(ConnectedSocket *s, const SocketOptions &options, entity_addr_t *out, Worker *w) { - if (_listener.get_errno() < 0) - return _listener.get_errno(); - auto c = _listener.accept(); - if (!c) - return -EAGAIN; - - if (out) - *out = c->remote_addr(); - std::unique_ptr> csi( - new NativeConnectedSocketImpl(std::move(*c))); - *s = ConnectedSocket(std::move(csi)); - return 0; -} - -template -void DPDKServerSocketImpl::abort_accept() { - _listener.abort_accept(); -} - -class DPDKWorker : public Worker { - struct Impl { - unsigned id; - interface _netif; - std::shared_ptr _dev; - ipv4 _inet; - Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr dev); - ~Impl() { - _dev->unset_local_queue(id); - } - }; - std::unique_ptr _impl; - - virtual void initialize(); - void set_ipv4_packet_filter(ip_packet_filter* filter) { - _impl->_inet.set_packet_filter(filter); - } - using tcp4 = tcp; - - public: - explicit DPDKWorker(CephContext *c, unsigned i): Worker(c, i) {} - virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; - virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; - void arp_learn(ethernet_address l2, ipv4_address l3) { - _impl->_inet.learn(l2, l3); - } - virtual void destroy() override { - _impl.reset(); - } - - friend class DPDKServerSocketImpl; -}; - -class DPDKStack : public NetworkStack { - vector > funcs; - public: - explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) { - funcs.resize(cct->_conf->ms_async_max_op_threads); - } - virtual bool support_zero_copy_read() const override { return true; } - virtual bool support_local_listen_table() const { return true; } - - virtual void spawn_worker(unsigned i, std::function &&func) override; - virtual void join_worker(unsigned i) override { - dpdk::eal::execute_on_master([&]() { - rte_eal_wait_lcore(i+1); - }); - } -}; - -#endif