Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / testmsgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <sys/stat.h>
16 #include <iostream>
17 #include <string>
18 using namespace std;
19
20 #include "common/config.h"
21
22 #include "mon/MonMap.h"
23 #include "mon/MonClient.h"
24 #include "msg/Messenger.h"
25 #include "messages/MPing.h"
26
27 #include "common/Timer.h"
28 #include "global/global_init.h"
29 #include "common/ceph_argparse.h"
30
31 #include <sys/types.h>
32 #include <fcntl.h>
33
34 #define dout_subsys ceph_subsys_ms
35
36 Messenger *messenger = 0;
37
38 Mutex test_lock("mylock");
39 Cond cond;
40
41 uint64_t received = 0;
42
43 class Admin : public Dispatcher {
44 public:
45   Admin() 
46     : Dispatcher(g_ceph_context)
47   {
48   }
49 private:
50   bool ms_dispatch(Message *m) {
51
52     //cerr << "got ping from " << m->get_source() << std::endl;
53     dout(0) << "got ping from " << m->get_source() << dendl;
54     test_lock.Lock();
55     ++received;
56     cond.Signal();
57     test_lock.Unlock();
58
59     m->put();
60     return true;
61   }
62
63   bool ms_handle_reset(Connection *con) { return false; }
64   void ms_handle_remote_reset(Connection *con) {}
65   bool ms_handle_refused(Connection *con) { return false; }
66
67 } dispatcher;
68
69
70 int main(int argc, const char **argv, const char *envp[]) {
71
72   vector<const char*> args;
73   argv_to_vec(argc, argv, args);
74   env_to_vec(args);
75
76   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
77                          CODE_ENVIRONMENT_UTILITY, 0);
78   common_init_finish(g_ceph_context);
79
80   dout(0) << "i am mon " << args[0] << dendl;
81
82   // get monmap
83   MonClient mc(g_ceph_context);
84   if (mc.build_initial_monmap() < 0)
85     return -1;
86   
87   // start up network
88   int whoami = mc.monmap.get_rank(args[0]);
89   assert(whoami >= 0);
90   ostringstream ss;
91   ss << mc.monmap.get_addr(whoami);
92   std::string sss(ss.str());
93   g_ceph_context->_conf->set_val("public_addr", sss.c_str());
94   g_ceph_context->_conf->apply_changes(NULL);
95   std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
96   Messenger *rank = Messenger::create(g_ceph_context,
97                                       public_msgr_type,
98                                       entity_name_t::MON(whoami), "tester",
99                                       getpid());
100   int err = rank->bind(g_ceph_context->_conf->public_addr);
101   if (err < 0)
102     return 1;
103
104   // start monitor
105   messenger = rank;
106   messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
107   messenger->add_dispatcher_head(&dispatcher);
108
109   rank->start();
110   
111   int isend = 0;
112   if (whoami == 0)
113     isend = 100;
114
115   test_lock.Lock();
116   uint64_t sent = 0;
117   while (1) {
118     while (received + isend <= sent) {
119       //cerr << "wait r " << received << " s " << sent << " is " << isend << std::endl;
120       dout(0) << "wait r " << received << " s " << sent << " is " << isend << dendl;
121       cond.Wait(test_lock);
122     }
123
124     int t = rand() % mc.get_num_mon();
125     if (t == whoami)
126       continue;
127     
128     if (rand() % 10 == 0) {
129       //cerr << "mark_down " << t << std::endl;
130       dout(0) << "mark_down " << t << dendl;
131       messenger->mark_down(mc.get_mon_addr(t));
132     } 
133     //cerr << "pinging " << t << std::endl;
134     dout(0) << "pinging " << t << dendl;
135     messenger->send_message(new MPing, mc.get_mon_inst(t));
136     cerr << isend << "\t" << ++sent << "\t" << received << "\r";
137   }
138   test_lock.Unlock();
139
140   // wait for messenger to finish
141   rank->wait();
142   
143   return 0;
144 }
145