Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / mon / test-mon-msg.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #include <stdio.h>
14 #include <string.h>
15 #include <iostream>
16 #include <sstream>
17 #include <time.h>
18 #include <stdlib.h>
19 #include <map>
20
21 #include "global/global_init.h"
22 #include "global/global_context.h"
23 #include "common/ceph_argparse.h"
24 #include "common/dout.h"
25 #include "common/debug.h"
26 #include "common/Cond.h"
27 #include "common/Mutex.h"
28 #include "common/Timer.h"
29 #include "common/errno.h"
30 #include "mon/MonClient.h"
31 #include "msg/Dispatcher.h"
32 #include "include/err.h"
33 #include <boost/scoped_ptr.hpp>
34
35 #include "gtest/gtest.h"
36
37 #include "common/config.h"
38 #include "include/assert.h"
39
40 #include "messages/MMonProbe.h"
41 #include "messages/MRoute.h"
42 #include "messages/MGenericMessage.h"
43 #include "messages/MMonJoin.h"
44
45 #define dout_context g_ceph_context
46 #define dout_subsys ceph_subsys_
47 #undef dout_prefix
48 #define dout_prefix *_dout << "test-mon-msg "
49
50 class MonClientHelper : public Dispatcher
51 {
52 protected:
53   CephContext *cct;
54   Messenger *msg;
55   MonClient monc;
56
57   Mutex lock;
58
59   set<int> wanted;
60
61 public:
62
63   explicit MonClientHelper(CephContext *cct_)
64     : Dispatcher(cct_),
65       cct(cct_),
66       msg(NULL),
67       monc(cct_),
68       lock("mon-msg-test::lock")
69   { }
70
71
72   int post_init() {
73     dout(1) << __func__ << dendl;
74     if (!msg)
75       return -EINVAL;
76     msg->add_dispatcher_tail(this);
77     return 0;
78   }
79
80   int init_messenger() {
81     dout(1) << __func__ << dendl;
82
83     std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf->get_val<std::string>("ms_type") : cct->_conf->ms_public_type;
84     msg = Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(-1),
85                             "test-mon-msg", 0, 0);
86     assert(msg != NULL);
87     msg->set_default_policy(Messenger::Policy::lossy_client(0));
88     dout(0) << __func__ << " starting messenger at "
89             << msg->get_myaddr() << dendl;
90     msg->start();
91     return 0;
92   }
93
94   int init_monc() {
95     dout(1) << __func__ << dendl;
96     assert(msg != NULL);
97     int err = monc.build_initial_monmap();
98     if (err < 0) {
99       derr << __func__ << " error building monmap: "
100            << cpp_strerror(err) << dendl;
101       return err;
102     }
103
104     monc.set_messenger(msg);
105     msg->add_dispatcher_head(&monc);
106
107     monc.set_want_keys(CEPH_ENTITY_TYPE_MON);
108     err = monc.init();
109     if (err < 0) {
110       derr << __func__ << " monc init failed: "
111            << cpp_strerror(err) << dendl;
112       goto fail;
113     }
114
115     err = monc.authenticate();
116     if (err < 0) {
117       derr << __func__ << " monc auth failed: "
118            << cpp_strerror(err) << dendl;
119       goto fail_monc;
120     }
121     monc.wait_auth_rotating(30.0);
122     monc.renew_subs();
123     dout(0) << __func__ << " finished" << dendl;
124     return 0;
125
126 fail_monc:
127     derr << __func__ << " failing monc" << dendl;
128     monc.shutdown();
129 fail:
130     return err;
131   }
132
133   void shutdown_messenger() {
134     dout(0) << __func__ << dendl;
135     msg->shutdown();
136     msg->wait();
137   }
138
139   void shutdown_monc() {
140     dout(0) << __func__ << dendl;
141     monc.shutdown();
142   }
143
144   void shutdown() {
145     dout(0) << __func__ << dendl;
146     shutdown_monc();
147     shutdown_messenger();
148   }
149
150   MonMap *get_monmap() {
151     return &monc.monmap;
152   }
153
154   int init() {
155     int err = init_messenger();
156     if (err < 0)
157       goto fail;
158     err = init_monc();
159     if (err < 0)
160       goto fail_msgr;
161     err = post_init();
162     if (err < 0)
163       goto fail_monc;
164     return 0;
165 fail_monc:
166     shutdown_monc();
167 fail_msgr:
168     shutdown_messenger();
169 fail:
170     return err;
171   }
172
173   virtual void handle_wanted(Message *m) { }
174
175   bool handle_message(Message *m) {
176     dout(1) << __func__ << " " << *m << dendl;
177     if (!is_wanted(m)) {
178       dout(10) << __func__ << " not wanted" << dendl;
179       return false;
180     }
181     handle_wanted(m);
182     m->put();
183
184     return true;
185   }
186
187   bool ms_dispatch(Message *m) override {
188     return handle_message(m);  
189   }
190   void ms_handle_connect(Connection *con) override { }
191   void ms_handle_remote_reset(Connection *con) override { }
192   bool ms_handle_reset(Connection *con) override { return false; }
193   bool ms_handle_refused(Connection *con) override { return false; }
194
195   bool is_wanted(Message *m) {
196     dout(20) << __func__ << " " << *m << " type " << m->get_type() << dendl;
197     return (wanted.find(m->get_type()) != wanted.end());
198   }
199
200   void add_wanted(int t) {
201     dout(20) << __func__ << " type " << t << dendl;
202     wanted.insert(t);
203   }
204
205   void rm_wanted(int t) {
206     dout(20) << __func__ << " type " << t << dendl;
207     wanted.erase(t);
208   }
209
210   void send_message(Message *m) {
211     dout(15) << __func__ << " " << *m << dendl;
212     monc.send_mon_message(m);
213   }
214
215   void wait() { msg->wait(); }
216 };
217
218 class MonMsgTest : public MonClientHelper,
219                    public ::testing::Test
220 {
221 protected:
222   int reply_type;
223   Message *reply_msg = nullptr;
224   Mutex lock;
225   Cond cond;
226
227   MonMsgTest() :
228     MonClientHelper(g_ceph_context),
229     lock("lock") { }
230
231 public:
232   void SetUp() override {
233     reply_type = -1;
234     if (reply_msg) {
235       reply_msg->put();
236       reply_msg = nullptr;
237     }
238     ASSERT_EQ(init(), 0);
239   }
240
241   void TearDown() override {
242     shutdown();
243     if (reply_msg) {
244       reply_msg->put();
245       reply_msg = nullptr;
246     }
247   }
248
249   void handle_wanted(Message *m) override {
250     lock.Lock();
251     // caller will put() after they call us, so hold on to a ref
252     m->get();
253     reply_msg = m;
254     cond.Signal();
255     lock.Unlock();
256   }
257
258   Message *send_wait_reply(Message *m, int t, double timeout=30.0) {
259     lock.Lock();
260     reply_type = t;
261     add_wanted(t);
262     send_message(m);
263
264     int err = 0;
265     if (timeout > 0) {
266       utime_t cond_timeout;
267       cond_timeout.set_from_double(timeout);
268       utime_t s = ceph_clock_now();
269       err = cond.WaitInterval(lock, cond_timeout);
270       utime_t e = ceph_clock_now();
271       dout(20) << __func__ << " took " << (e-s) << " seconds" << dendl;
272     } else {
273       err = cond.Wait(lock);
274     }
275     rm_wanted(t);
276     lock.Unlock();
277     if (err > 0) {
278       dout(20) << __func__ << " error: " << cpp_strerror(err) << dendl;
279       return (Message*)((long)-err);
280     }
281
282     if (!reply_msg)
283       dout(20) << __func__ << " reply_msg is nullptr" << dendl;
284     else
285       dout(20) << __func__ << " reply_msg " << *reply_msg << dendl;
286     return reply_msg;
287   }
288 };
289
290 TEST_F(MonMsgTest, MMonProbeTest)
291 {
292   Message *m = new MMonProbe(get_monmap()->fsid,
293                         MMonProbe::OP_PROBE, "b", false);
294   Message *r = send_wait_reply(m, MSG_MON_PROBE);
295   ASSERT_NE(IS_ERR(r), 0);
296   ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT);
297 }
298
299 TEST_F(MonMsgTest, MRouteTest)
300 {
301   Message *payload = new MGenericMessage(CEPH_MSG_SHUTDOWN);
302   MRoute *m = new MRoute;
303   m->msg = payload;
304   m->dest = msg->get_myinst();
305   Message *r = send_wait_reply(m, CEPH_MSG_SHUTDOWN);
306   // we want an error
307   ASSERT_NE(IS_ERR(r), 0);
308   ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT);
309 }
310
311 /* MMonScrub and MMonSync have other safeguards in place that prevent
312  * us from actually receiving a reply even if the message is handled
313  * by the monitor due to lack of cap checking.
314  */
315 TEST_F(MonMsgTest, MMonJoin)
316 {
317   Message *m = new MMonJoin(get_monmap()->fsid, string("client"),
318                             msg->get_myaddr());
319   send_wait_reply(m, MSG_MON_PAXOS, 10.0);
320
321   int r = monc.get_monmap();
322   ASSERT_EQ(r, 0);
323   ASSERT_FALSE(monc.monmap.contains("client"));
324 }
325
326 int main(int argc, char *argv[])
327 {
328   vector<const char*> args;
329   argv_to_vec(argc, (const char **)argv, args);
330
331   auto cct = global_init(nullptr, args,
332                          CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY,
333                          0);
334   common_init_finish(g_ceph_context);
335   g_ceph_context->_conf->apply_changes(NULL);
336   ::testing::InitGoogleTest(&argc, argv);
337
338   return RUN_ALL_TESTS();
339 }
340