1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 * Ceph - scalable distributed file system
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
27 * Author: Haomai Wang <haomaiwang@gmail.com>
29 * This is free software; you can redistribute it and/or
30 * modify it under the terms of the GNU Lesser General Public
31 * License version 2.1, as published by the Free Software
32 * Foundation. See file COPYING.
37 #include <sys/types.h>
43 #include "common/ceph_argparse.h"
44 #include "DPDKStack.h"
47 #include "TCP-Stack.h"
49 #include "common/dout.h"
50 #include "include/assert.h"
52 #define dout_subsys ceph_subsys_dpdk
54 #define dout_prefix *_dout << "dpdkstack "
56 static int dpdk_thread_adaptor(void* f)
58 (*static_cast<std::function<void ()>*>(f))();
62 void DPDKWorker::initialize()
68 } create_stage = WAIT_DEVICE_STAGE;
69 static Mutex lock("DPDKStack::lock");
71 static unsigned queue_init_done = 0;
72 static unsigned cores = 0;
73 static std::shared_ptr<DPDKDevice> sdev;
75 unsigned i = center.get_id();
77 // Hardcoded port index 0.
78 // TODO: Inherit it from the opts
79 cores = cct->_conf->ms_async_op_threads;
80 std::unique_ptr<DPDKDevice> dev = create_dpdk_net_device(
81 cct, cores, cct->_conf->ms_dpdk_port_id,
82 cct->_conf->ms_dpdk_lro,
83 cct->_conf->ms_dpdk_hw_flow_control);
84 sdev = std::shared_ptr<DPDKDevice>(dev.release());
85 sdev->workers.resize(cores);
86 ldout(cct, 1) << __func__ << " using " << cores << " cores " << dendl;
88 Mutex::Locker l(lock);
89 create_stage = WAIT_PORT_FIN_STAGE;
92 Mutex::Locker l(lock);
93 while (create_stage <= WAIT_DEVICE_STAGE)
97 if (i < sdev->hw_queues_count()) {
98 auto qp = sdev->init_local_queue(cct, ¢er, cct->_conf->ms_dpdk_hugepages, i);
99 std::map<unsigned, float> cpu_weights;
100 for (unsigned j = sdev->hw_queues_count() + i % sdev->hw_queues_count();
101 j < cores; j+= sdev->hw_queues_count())
103 cpu_weights[i] = cct->_conf->ms_dpdk_hw_queue_weight;
104 qp->configure_proxies(cpu_weights);
105 sdev->set_local_queue(i, std::move(qp));
106 Mutex::Locker l(lock);
110 // auto master = qid % sdev->hw_queues_count();
111 // sdev->set_local_queue(create_proxy_net_device(master, sdev.get()));
116 Mutex::Locker l(lock);
117 while (queue_init_done < cores)
121 if (sdev->init_port_fini() < 0) {
122 lderr(cct) << __func__ << " init_port_fini failed " << dendl;
125 Mutex::Locker l(lock);
129 Mutex::Locker l(lock);
130 while (create_stage <= WAIT_PORT_FIN_STAGE)
134 sdev->workers[i] = this;
135 _impl = std::unique_ptr<DPDKWorker::Impl>(
136 new DPDKWorker::Impl(cct, i, ¢er, sdev));
138 Mutex::Locker l(lock);
139 if (!--queue_init_done) {
140 create_stage = WAIT_DEVICE_STAGE;
146 using AvailableIPAddress = std::tuple<string, string, string>;
147 static bool parse_available_address(
148 const string &ips, const string &gates, const string &masks, vector<AvailableIPAddress> &res)
150 vector<string> ip_vec, gate_vec, mask_vec;
151 string_to_vec(ip_vec, ips);
152 string_to_vec(gate_vec, gates);
153 string_to_vec(mask_vec, masks);
154 if (ip_vec.empty() || ip_vec.size() != gate_vec.size() || ip_vec.size() != mask_vec.size())
157 for (size_t i = 0; i < ip_vec.size(); ++i) {
158 res.push_back(AvailableIPAddress{ip_vec[i], gate_vec[i], mask_vec[i]});
163 static bool match_available_address(const vector<AvailableIPAddress> &avails,
164 const entity_addr_t &ip, int &res)
166 for (size_t i = 0; i < avails.size(); ++i) {
168 auto a = std::get<0>(avails[i]).c_str();
171 if (addr.is_same_host(ip)) {
179 DPDKWorker::Impl::Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr<DPDKDevice> dev)
180 : id(i), _netif(cct, dev, c), _dev(dev), _inet(cct, c, &_netif)
182 vector<AvailableIPAddress> tuples;
183 bool parsed = parse_available_address(cct->_conf->get_val<std::string>("ms_dpdk_host_ipv4_addr"),
184 cct->_conf->get_val<std::string>("ms_dpdk_gateway_ipv4_addr"),
185 cct->_conf->get_val<std::string>("ms_dpdk_netmask_ipv4_addr"), tuples);
187 lderr(cct) << __func__ << " no available address "
188 << cct->_conf->get_val<std::string>("ms_dpdk_host_ipv4_addr") << ", "
189 << cct->_conf->get_val<std::string>("ms_dpdk_gateway_ipv4_addr") << ", "
190 << cct->_conf->get_val<std::string>("ms_dpdk_netmask_ipv4_addr") << ", "
194 _inet.set_host_address(ipv4_address(std::get<0>(tuples[0])));
195 _inet.set_gw_address(ipv4_address(std::get<1>(tuples[0])));
196 _inet.set_netmask_address(ipv4_address(std::get<2>(tuples[0])));
199 int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
202 assert(sa.get_family() == AF_INET);
205 ldout(cct, 10) << __func__ << " addr " << sa << dendl;
206 // vector<AvailableIPAddress> tuples;
207 // bool parsed = parse_available_address(cct->_conf->ms_dpdk_host_ipv4_addr,
208 // cct->_conf->ms_dpdk_gateway_ipv4_addr,
209 // cct->_conf->ms_dpdk_netmask_ipv4_addr, tuples);
211 // lderr(cct) << __func__ << " no available address "
212 // << cct->_conf->ms_dpdk_host_ipv4_addr << ", "
213 // << cct->_conf->ms_dpdk_gateway_ipv4_addr << ", "
214 // << cct->_conf->ms_dpdk_netmask_ipv4_addr << ", "
219 // parsed = match_available_address(tuples, sa, idx);
221 // lderr(cct) << __func__ << " no matched address for " << sa << dendl;
224 // _inet.set_host_address(ipv4_address(std::get<0>(tuples[idx])));
225 // _inet.set_gw_address(ipv4_address(std::get<1>(tuples[idx])));
226 // _inet.set_netmask_address(ipv4_address(std::get<2>(tuples[idx])));
227 return tcpv4_listen(_impl->_inet.get_tcp(), sa.get_port(), opt, sock);
230 int DPDKWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
232 // assert(addr.get_family() == AF_INET);
233 int r = tcpv4_connect(_impl->_inet.get_tcp(), addr, socket);
234 ldout(cct, 10) << __func__ << " addr " << addr << dendl;
238 void DPDKStack::spawn_worker(unsigned i, std::function<void ()> &&func)
240 // create a extra master thread
242 funcs[i] = std::move(func);
244 r = dpdk::eal::init(cct);
246 lderr(cct) << __func__ << " init dpdk rte failed, r=" << r << dendl;
249 // if dpdk::eal::init already called by NVMEDevice, we will select 1..n
251 assert(rte_lcore_count() >= i + 1);
252 dpdk::eal::execute_on_master([&]() {
253 r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&funcs[i]), i+1);
255 lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl;