Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / messenger / xio_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 CohortFS, LLC
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <sys/types.h>
16
17 #include <iostream>
18 #include <string>
19
20 using namespace std;
21
22 #include "common/config.h"
23 #include "msg/msg_types.h"
24 #include "msg/xio/XioMessenger.h"
25 #include "msg/xio/FastStrategy.h"
26 #include "msg/xio/QueueStrategy.h"
27 #include "msg/xio/XioMsg.h"
28 #include "messages/MPing.h"
29 #include "common/Timer.h"
30 #include "common/ceph_argparse.h"
31 #include "global/global_init.h"
32 #include "perfglue/heap_profiler.h"
33 #include "common/address_helper.h"
34 #include "message_helper.h"
35 #include "xio_dispatcher.h"
36 #include "msg/xio/XioConnection.h"
37
38 #define dout_subsys ceph_subsys_xio_client
39
40 void usage(ostream& out)
41 {
42   out << "usage: xio_client [options]\n"
43 "options:\n"
44 "  --addr X\n"
45 "  --port X\n"
46 "  --msgs X\n"
47 "  --dsize X\n"
48 "  --nfrags X\n"
49 "  --dfast\n"
50     ;
51 }
52
53 int main(int argc, const char **argv)
54 {
55         vector<const char*> args;
56         Messenger* messenger;
57         XioDispatcher *dispatcher;
58         std::vector<const char*>::iterator arg_iter;
59         std::string val;
60         entity_addr_t dest_addr;
61         ConnectionRef conn;
62         int r = 0;
63
64         std::string addr = "localhost";
65         std::string port = "1234";
66         int n_msgs = 50;
67         int n_dsize = 0;
68         int n_nfrags = 1;
69         bool dfast = false;
70
71         struct timespec ts;
72         ts.tv_sec = 5;
73         ts.tv_nsec = 0;
74
75         argv_to_vec(argc, argv, args);
76         env_to_vec(args);
77
78         auto cct = global_init(NULL, args,
79                                CEPH_ENTITY_TYPE_ANY,
80                                CODE_ENVIRONMENT_UTILITY, 0);
81
82         for (arg_iter = args.begin(); arg_iter != args.end();) {
83           if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
84                                     (char*) NULL)) {
85             addr = val;
86           } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
87                                     (char*) NULL)) {
88             port = val;
89           } else if (ceph_argparse_witharg(args, arg_iter, &val, "--msgs",
90                                     (char*) NULL)) {
91             n_msgs = atoi(val.c_str());
92           } else if (ceph_argparse_witharg(args, arg_iter, &val, "--dsize",
93                                     (char*) NULL)) {
94             n_dsize = atoi(val.c_str());
95           } else if (ceph_argparse_witharg(args, arg_iter, &val, "--nfrags",
96                                     (char*) NULL)) {
97             n_nfrags = atoi(val.c_str());
98           } else if (ceph_argparse_flag(args, arg_iter, "--dfast",
99                                            (char*) NULL)) {
100             dfast = true;
101           } else {
102             ++arg_iter;
103           }
104         };
105
106         if (!args.empty()) {
107           cerr << "What is this? -- " << args[0] << std::endl;
108           usage(cerr);
109           exit(1);
110         }
111
112         DispatchStrategy* dstrategy;
113         if (dfast)
114           dstrategy = new FastStrategy();
115         else
116           dstrategy = new QueueStrategy(2);
117
118         messenger = new XioMessenger(g_ceph_context,
119                                      entity_name_t::MON(-1),
120                                      "xio_client",
121                                      0 /* nonce */,
122                                      0 /* cflags */,
123                                      dstrategy);
124
125         // enable timing prints
126         static_cast<XioMessenger*>(messenger)->set_magic(
127           MSG_MAGIC_REDUPE /* resubmit messages on delivery (REQUIRED) */ |
128           MSG_MAGIC_TRACE_CTR /* timing prints */);
129
130         // ensure we have a pool of sizeof(payload data)
131         if (n_dsize)
132           (void) static_cast<XioMessenger*>(messenger)->pool_hint(n_dsize);
133
134         messenger->set_default_policy(Messenger::Policy::lossy_client(0));
135
136         string dest_str = "tcp://";
137         dest_str += addr;
138         dest_str += ":";
139         dest_str += port;
140         entity_addr_from_url(&dest_addr, dest_str.c_str());
141         entity_inst_t dest_server(entity_name_t::MON(-1), dest_addr);
142
143         dispatcher = new XioDispatcher(messenger);
144         messenger->add_dispatcher_head(dispatcher);
145
146         dispatcher->set_active(); // this side is the pinger
147
148         r = messenger->start();
149         if (r < 0)
150                 goto out;
151
152         conn = messenger->get_connection(dest_server);
153
154         // do stuff
155         time_t t1, t2;
156         t1 = time(NULL);
157
158         int msg_ix;
159         for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
160           /* add a data payload if asked */
161           if (! n_dsize) {
162             conn->send_message(new MPing());
163           } else {
164             conn->send_message(new_simple_ping_with_data("xio_client", n_dsize, n_nfrags));
165           }
166         }
167
168         // do stuff
169         while (conn->is_connected()) {
170           nanosleep(&ts, NULL);
171         }
172
173         t2 = time(NULL);
174         cout << "Processed "
175              << static_cast<XioConnection*>(conn->get())->get_scount()
176              << " one-way messages in " << t2-t1 << "s"
177              << std::endl;
178
179         conn->put();
180
181         // wait a bit for cleanup to finalize
182         ts.tv_sec = 5;
183         nanosleep(&ts, NULL);
184
185         messenger->shutdown();
186
187 out:
188         return r;
189 }