Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / msgr / perf_msgr_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Haomai Wang
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include <stdlib.h>
18 #include <stdint.h>
19 #include <string>
20 #include <unistd.h>
21 #include <iostream>
22
23 using namespace std;
24
25 #include "common/ceph_argparse.h"
26 #include "common/debug.h"
27 #include "common/Cycles.h"
28 #include "global/global_init.h"
29 #include "msg/Messenger.h"
30 #include "messages/MOSDOp.h"
31
32 #include <atomic>
33
34 class MessengerClient {
35   class ClientThread;
36   class ClientDispatcher : public Dispatcher {
37     uint64_t think_time;
38     ClientThread *thread;
39
40    public:
41     ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {}
42     bool ms_can_fast_dispatch_any() const override { return true; }
43     bool ms_can_fast_dispatch(const Message *m) const override {
44       switch (m->get_type()) {
45       case CEPH_MSG_OSD_OPREPLY:
46         return true;
47       default:
48         return false;
49       }
50     }
51
52     void ms_handle_fast_connect(Connection *con) override {}
53     void ms_handle_fast_accept(Connection *con) override {}
54     bool ms_dispatch(Message *m) override { return true; }
55     void ms_fast_dispatch(Message *m) override;
56     bool ms_handle_reset(Connection *con) override { return true; }
57     void ms_handle_remote_reset(Connection *con) override {}
58     bool ms_handle_refused(Connection *con) override { return false; }
59     bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
60                               bufferlist& authorizer, bufferlist& authorizer_reply,
61                               bool& isvalid, CryptoKey& session_key) override {
62       isvalid = true;
63       return true;
64     }
65   };
66
67   class ClientThread : public Thread {
68     Messenger *msgr;
69     int concurrent;
70     ConnectionRef conn;
71     std::atomic<unsigned> client_inc = { 0 };
72     object_t oid;
73     object_locator_t oloc;
74     pg_t pgid;
75     int msg_len;
76     bufferlist data;
77     int ops;
78     ClientDispatcher dispatcher;
79
80    public:
81     Mutex lock;
82     Cond cond;
83     uint64_t inflight;
84
85     ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us):
86         msgr(m), concurrent(c), conn(con), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops),
87         dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock"), inflight(0) {
88       m->add_dispatcher_head(&dispatcher);
89       bufferptr ptr(msg_len);
90       memset(ptr.c_str(), 0, msg_len);
91       data.append(ptr);
92     }
93     void *entry() override {
94       lock.Lock();
95       for (int i = 0; i < ops; ++i) {
96         if (inflight > uint64_t(concurrent)) {
97           cond.Wait(lock);
98         }
99         hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(),
100                        oloc.nspace);
101         spg_t spgid(pgid);
102         MOSDOp *m = new MOSDOp(client_inc, 0, hobj, spgid, 0, 0, 0);
103         m->write(0, msg_len, data);
104         inflight++;
105         conn->send_message(m);
106         //cerr << __func__ << " send m=" << m << std::endl;
107       }
108       lock.Unlock();
109       msgr->shutdown();
110       return 0;
111     }
112   };
113
114   string type;
115   string serveraddr;
116   int think_time_us;
117   vector<Messenger*> msgrs;
118   vector<ClientThread*> clients;
119
120  public:
121   MessengerClient(string t, string addr, int delay):
122       type(t), serveraddr(addr), think_time_us(delay) {
123   }
124   ~MessengerClient() {
125     for (uint64_t i = 0; i < clients.size(); ++i)
126       delete clients[i];
127     for (uint64_t i = 0; i < msgrs.size(); ++i) {
128       msgrs[i]->shutdown();
129       msgrs[i]->wait();
130     }
131   }
132   void ready(int c, int jobs, int ops, int msg_len) {
133     entity_addr_t addr;
134     addr.parse(serveraddr.c_str());
135     addr.set_nonce(0);
136     for (int i = 0; i < jobs; ++i) {
137       Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0);
138       msgr->set_default_policy(Messenger::Policy::lossless_client(0));
139       entity_inst_t inst(entity_name_t::OSD(0), addr);
140       ConnectionRef conn = msgr->get_connection(inst);
141       ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us);
142       msgrs.push_back(msgr);
143       clients.push_back(t);
144       msgr->start();
145     }
146     usleep(1000*1000);
147   }
148   void start() {
149     for (uint64_t i = 0; i < clients.size(); ++i)
150       clients[i]->create("client");
151     for (uint64_t i = 0; i < msgrs.size(); ++i)
152       msgrs[i]->wait();
153   }
154 };
155
156 void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) {
157   usleep(think_time);
158   m->put();
159   Mutex::Locker l(thread->lock);
160   thread->inflight--;
161   thread->cond.Signal();
162 }
163
164
165 void usage(const string &name) {
166   cerr << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl;
167   cerr << "       [server ip:port]: connect to the ip:port pair" << std::endl;
168   cerr << "       [numjobs]: how much client threads spawned and do benchmark" << std::endl;
169   cerr << "       [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl;
170   cerr << "       [ios]: how much messages sent for each client" << std::endl;
171   cerr << "       [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl;
172   cerr << "       [msg length]: message data bytes" << std::endl;
173 }
174
175 int main(int argc, char **argv)
176 {
177   vector<const char*> args;
178   argv_to_vec(argc, (const char **)argv, args);
179
180   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
181                          CODE_ENVIRONMENT_UTILITY, 0);
182   common_init_finish(g_ceph_context);
183   g_ceph_context->_conf->apply_changes(NULL);
184
185   if (args.size() < 6) {
186     usage(argv[0]);
187     return 1;
188   }
189
190   int numjobs = atoi(args[1]);
191   int concurrent = atoi(args[2]);
192   int ios = atoi(args[3]);
193   int think_time = atoi(args[4]);
194   int len = atoi(args[5]);
195
196   std::string public_msgr_type = g_ceph_context->_conf->ms_public_type.empty() ? g_ceph_context->_conf->get_val<std::string>("ms_type") : g_ceph_context->_conf->ms_public_type;
197
198   cerr << " using ms-public-type " << public_msgr_type << std::endl;
199   cerr << "       server ip:port " << args[0] << std::endl;
200   cerr << "       numjobs " << numjobs << std::endl;
201   cerr << "       concurrency " << concurrent << std::endl;
202   cerr << "       ios " << ios << std::endl;
203   cerr << "       thinktime(us) " << think_time << std::endl;
204   cerr << "       message data bytes " << len << std::endl;
205
206   MessengerClient client(public_msgr_type, args[0], think_time);
207
208   client.ready(concurrent, numjobs, ios, len);
209   Cycles::init();
210   uint64_t start = Cycles::rdtsc();
211   client.start();
212   uint64_t stop = Cycles::rdtsc();
213   cerr << " Total op " << ios << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl;
214
215   return 0;
216 }