Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / dpdk / DPDKStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 /*
3  * This file is open source software, licensed to you under the terms
4  * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
5  * distributed with this work for additional information regarding copyright
6  * ownership.  You may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 /*
20  * Copyright (C) 2014 Cloudius Systems, Ltd.
21  */
22 /*
23  * Ceph - scalable distributed file system
24  *
25  * Copyright (C) 2015 XSky <haomai@xsky.com>
26  *
27  * Author: Haomai Wang <haomaiwang@gmail.com>
28  *
29  * This is free software; you can redistribute it and/or
30  * modify it under the terms of the GNU Lesser General Public
31  * License version 2.1, as published by the Free Software
32  * Foundation.  See file COPYING.
33  *
34  */
35
36 #include <memory>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 #include <unistd.h>
40
41 #include <tuple>
42
43 #include "common/ceph_argparse.h"
44 #include "DPDKStack.h"
45 #include "DPDK.h"
46 #include "IP.h"
47 #include "TCP-Stack.h"
48
49 #include "common/dout.h"
50 #include "include/assert.h"
51
52 #define dout_subsys ceph_subsys_dpdk
53 #undef dout_prefix
54 #define dout_prefix *_dout << "dpdkstack "
55
56 static int dpdk_thread_adaptor(void* f)
57 {
58   (*static_cast<std::function<void ()>*>(f))();
59   return 0;
60 }
61
62 void DPDKWorker::initialize()
63 {
64   static enum {
65     WAIT_DEVICE_STAGE,
66     WAIT_PORT_FIN_STAGE,
67     DONE
68   } create_stage = WAIT_DEVICE_STAGE;
69   static Mutex lock("DPDKStack::lock");
70   static Cond cond;
71   static unsigned queue_init_done = 0;
72   static unsigned cores = 0;
73   static std::shared_ptr<DPDKDevice> sdev;
74
75   unsigned i = center.get_id();
76   if (i == 0) {
77     // Hardcoded port index 0.
78     // TODO: Inherit it from the opts
79     cores = cct->_conf->ms_async_op_threads;
80     std::unique_ptr<DPDKDevice> dev = create_dpdk_net_device(
81         cct, cores, cct->_conf->ms_dpdk_port_id,
82         cct->_conf->ms_dpdk_lro,
83         cct->_conf->ms_dpdk_hw_flow_control);
84     sdev = std::shared_ptr<DPDKDevice>(dev.release());
85     sdev->workers.resize(cores);
86     ldout(cct, 1) << __func__ << " using " << cores << " cores " << dendl;
87
88     Mutex::Locker l(lock);
89     create_stage = WAIT_PORT_FIN_STAGE;
90     cond.Signal();
91   } else {
92     Mutex::Locker l(lock);
93     while (create_stage <= WAIT_DEVICE_STAGE)
94       cond.Wait(lock);
95   }
96   assert(sdev);
97   if (i < sdev->hw_queues_count()) {
98     auto qp = sdev->init_local_queue(cct, &center, cct->_conf->ms_dpdk_hugepages, i);
99     std::map<unsigned, float> cpu_weights;
100     for (unsigned j = sdev->hw_queues_count() + i % sdev->hw_queues_count();
101          j < cores; j+= sdev->hw_queues_count())
102       cpu_weights[i] = 1;
103     cpu_weights[i] = cct->_conf->ms_dpdk_hw_queue_weight;
104     qp->configure_proxies(cpu_weights);
105     sdev->set_local_queue(i, std::move(qp));
106     Mutex::Locker l(lock);
107     ++queue_init_done;
108     cond.Signal();
109   } else {
110     // auto master = qid % sdev->hw_queues_count();
111     // sdev->set_local_queue(create_proxy_net_device(master, sdev.get()));
112     ceph_abort();
113   }
114   if (i == 0) {
115     {
116       Mutex::Locker l(lock);
117       while (queue_init_done < cores)
118         cond.Wait(lock);
119     }
120
121     if (sdev->init_port_fini() < 0) {
122       lderr(cct) << __func__ << " init_port_fini failed " << dendl;
123       ceph_abort();
124     }
125     Mutex::Locker l(lock);
126     create_stage = DONE;
127     cond.Signal();
128   } else {
129     Mutex::Locker l(lock);
130     while (create_stage <= WAIT_PORT_FIN_STAGE)
131       cond.Wait(lock);
132   }
133
134   sdev->workers[i] = this;
135   _impl = std::unique_ptr<DPDKWorker::Impl>(
136           new DPDKWorker::Impl(cct, i, &center, sdev));
137   {
138     Mutex::Locker l(lock);
139     if (!--queue_init_done) {
140       create_stage = WAIT_DEVICE_STAGE;
141       sdev.reset();
142     }
143   }
144 }
145
146 using AvailableIPAddress = std::tuple<string, string, string>;
147 static bool parse_available_address(
148         const string &ips, const string &gates, const string &masks, vector<AvailableIPAddress> &res)
149 {
150   vector<string> ip_vec, gate_vec, mask_vec;
151   string_to_vec(ip_vec, ips);
152   string_to_vec(gate_vec, gates);
153   string_to_vec(mask_vec, masks);
154   if (ip_vec.empty() || ip_vec.size() != gate_vec.size() || ip_vec.size() != mask_vec.size())
155     return false;
156
157   for (size_t i = 0; i < ip_vec.size(); ++i) {
158     res.push_back(AvailableIPAddress{ip_vec[i], gate_vec[i], mask_vec[i]});
159   }
160   return true;
161 }
162
163 static bool match_available_address(const vector<AvailableIPAddress> &avails,
164                                     const entity_addr_t &ip, int &res)
165 {
166   for (size_t i = 0; i < avails.size(); ++i) {
167     entity_addr_t addr;
168     auto a = std::get<0>(avails[i]).c_str();
169     if (!addr.parse(a))
170       continue;
171     if (addr.is_same_host(ip)) {
172       res = i;
173       return true;
174     }
175   }
176   return false;
177 }
178
179 DPDKWorker::Impl::Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr<DPDKDevice> dev)
180     : id(i), _netif(cct, dev, c), _dev(dev), _inet(cct, c, &_netif)
181 {
182   vector<AvailableIPAddress> tuples;
183   bool parsed = parse_available_address(cct->_conf->get_val<std::string>("ms_dpdk_host_ipv4_addr"),
184                                         cct->_conf->get_val<std::string>("ms_dpdk_gateway_ipv4_addr"),
185                                         cct->_conf->get_val<std::string>("ms_dpdk_netmask_ipv4_addr"), tuples);
186   if (!parsed) {
187     lderr(cct) << __func__ << " no available address "
188                << cct->_conf->get_val<std::string>("ms_dpdk_host_ipv4_addr") << ", "
189                << cct->_conf->get_val<std::string>("ms_dpdk_gateway_ipv4_addr") << ", "
190                << cct->_conf->get_val<std::string>("ms_dpdk_netmask_ipv4_addr") << ", "
191                << dendl;
192     ceph_abort();
193   }
194   _inet.set_host_address(ipv4_address(std::get<0>(tuples[0])));
195   _inet.set_gw_address(ipv4_address(std::get<1>(tuples[0])));
196   _inet.set_netmask_address(ipv4_address(std::get<2>(tuples[0])));
197 }
198
199 int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
200                        ServerSocket *sock)
201 {
202   assert(sa.get_family() == AF_INET);
203   assert(sock);
204
205   ldout(cct, 10) << __func__ << " addr " << sa << dendl;
206   // vector<AvailableIPAddress> tuples;
207   // bool parsed = parse_available_address(cct->_conf->ms_dpdk_host_ipv4_addr,
208   //                                       cct->_conf->ms_dpdk_gateway_ipv4_addr,
209   //                                       cct->_conf->ms_dpdk_netmask_ipv4_addr, tuples);
210   // if (!parsed) {
211   //   lderr(cct) << __func__ << " no available address "
212   //              << cct->_conf->ms_dpdk_host_ipv4_addr << ", "
213   //              << cct->_conf->ms_dpdk_gateway_ipv4_addr << ", "
214   //              << cct->_conf->ms_dpdk_netmask_ipv4_addr << ", "
215   //              << dendl;
216   //   return -EINVAL;
217   // }
218   // int idx;
219   // parsed = match_available_address(tuples, sa, idx);
220   // if (!parsed) {
221   //   lderr(cct) << __func__ << " no matched address for " << sa << dendl;
222   //   return -EINVAL;
223   // }
224   // _inet.set_host_address(ipv4_address(std::get<0>(tuples[idx])));
225   // _inet.set_gw_address(ipv4_address(std::get<1>(tuples[idx])));
226   // _inet.set_netmask_address(ipv4_address(std::get<2>(tuples[idx])));
227   return tcpv4_listen(_impl->_inet.get_tcp(), sa.get_port(), opt, sock);
228 }
229
230 int DPDKWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
231 {
232   // assert(addr.get_family() == AF_INET);
233   int r =  tcpv4_connect(_impl->_inet.get_tcp(), addr, socket);
234   ldout(cct, 10) << __func__ << " addr " << addr << dendl;
235   return r;
236 }
237
238 void DPDKStack::spawn_worker(unsigned i, std::function<void ()> &&func)
239 {
240   // create a extra master thread
241   //
242   funcs[i] = std::move(func);
243   int r = 0;
244   r = dpdk::eal::init(cct);
245   if (r < 0) {
246     lderr(cct) << __func__ << " init dpdk rte failed, r=" << r << dendl;
247     ceph_abort();
248   }
249   // if dpdk::eal::init already called by NVMEDevice, we will select 1..n
250   // cores
251   assert(rte_lcore_count() >= i + 1);
252   dpdk::eal::execute_on_master([&]() {
253     r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&funcs[i]), i+1);
254     if (r < 0) {
255       lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl;
256       ceph_abort();
257     }
258   });
259 }