Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / librados_test_stub / TestRadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/librados_test_stub/TestRadosClient.h"
5 #include "test/librados_test_stub/TestIoCtxImpl.h"
6 #include "librados/AioCompletionImpl.h"
7 #include "include/assert.h"
8 #include "common/ceph_json.h"
9 #include "common/Finisher.h"
10 #include <boost/bind.hpp>
11 #include <boost/thread.hpp>
12 #include <errno.h>
13
14 #include <atomic>
15
16 static int get_concurrency() {
17   int concurrency = 0;
18   char *env = getenv("LIBRADOS_CONCURRENCY");
19   if (env != NULL) {
20     concurrency = atoi(env);
21   }
22   if (concurrency == 0) {
23     concurrency = boost::thread::thread::hardware_concurrency();
24   }
25   if (concurrency == 0) {
26     concurrency = 1;
27   }
28   return concurrency;
29 }
30
31 namespace librados {
32
33 static void finish_aio_completion(AioCompletionImpl *c, int r) {
34   c->lock.Lock();
35   c->complete = true;
36   c->rval = r;
37   c->lock.Unlock();
38
39   rados_callback_t cb_complete = c->callback_complete;
40   void *cb_complete_arg = c->callback_complete_arg;
41   if (cb_complete) {
42     cb_complete(c, cb_complete_arg);
43   }
44
45   rados_callback_t cb_safe = c->callback_safe;
46   void *cb_safe_arg = c->callback_safe_arg;
47   if (cb_safe) {
48     cb_safe(c, cb_safe_arg);
49   }
50
51   c->lock.Lock();
52   c->callback_complete = NULL;
53   c->callback_safe = NULL;
54   c->cond.Signal();
55   c->put_unlock();
56 }
57
58 class AioFunctionContext : public Context {
59 public:
60   AioFunctionContext(const TestRadosClient::AioFunction &callback,
61                      Finisher *finisher, AioCompletionImpl *c)
62     : m_callback(callback), m_finisher(finisher), m_comp(c)
63   {
64     if (m_comp != NULL) {
65       m_comp->get();
66     }
67   }
68
69   void finish(int r) override {
70     int ret = m_callback();
71     if (m_comp != NULL) {
72       if (m_finisher != NULL) {
73         m_finisher->queue(new FunctionContext(boost::bind(
74           &finish_aio_completion, m_comp, ret)));
75       } else {
76         finish_aio_completion(m_comp, ret);
77       }
78     }
79   }
80 private:
81   TestRadosClient::AioFunction m_callback;
82   Finisher *m_finisher;
83   AioCompletionImpl *m_comp;
84 };
85
86 TestRadosClient::TestRadosClient(CephContext *cct,
87                                  TestWatchNotify *watch_notify)
88   : m_cct(cct->get()), m_watch_notify(watch_notify),
89     m_aio_finisher(new Finisher(m_cct))
90 {
91   get();
92
93   // simulate multiple OSDs
94   int concurrency = get_concurrency();
95   for (int i = 0; i < concurrency; ++i) {
96     m_finishers.push_back(new Finisher(m_cct));
97     m_finishers.back()->start();
98   }
99
100   // replicate AIO callback processing
101   m_aio_finisher->start();
102 }
103
104 TestRadosClient::~TestRadosClient() {
105   flush_aio_operations();
106
107   for (size_t i = 0; i < m_finishers.size(); ++i) {
108     m_finishers[i]->stop();
109     delete m_finishers[i];
110   }
111   m_aio_finisher->stop();
112   delete m_aio_finisher;
113
114   m_cct->put();
115   m_cct = NULL;
116 }
117
118 void TestRadosClient::get() {
119   m_refcount++;
120 }
121
122 void TestRadosClient::put() {
123   if (--m_refcount == 0) {
124     shutdown();
125     delete this;
126   }
127 }
128
129 CephContext *TestRadosClient::cct() {
130   return m_cct;
131 }
132
133 int TestRadosClient::connect() {
134   return 0;
135 }
136
137 void TestRadosClient::shutdown() {
138 }
139
140 int TestRadosClient::wait_for_latest_osdmap() {
141   return 0;
142 }
143
144 int TestRadosClient::mon_command(const std::vector<std::string>& cmd,
145                                  const bufferlist &inbl,
146                                  bufferlist *outbl, std::string *outs) {
147   for (std::vector<std::string>::const_iterator it = cmd.begin();
148        it != cmd.end(); ++it) {
149     JSONParser parser;
150     if (!parser.parse(it->c_str(), it->length())) {
151       return -EINVAL;
152     }
153
154     JSONObjIter j_it = parser.find("prefix");
155     if (j_it.end()) {
156       return -EINVAL;
157     }
158
159     if ((*j_it)->get_data() == "osd tier add") {
160       return 0;
161     } else if ((*j_it)->get_data() == "osd tier cache-mode") {
162       return 0;
163     } else if ((*j_it)->get_data() == "osd tier set-overlay") {
164       return 0;
165     } else if ((*j_it)->get_data() == "osd tier remove-overlay") {
166       return 0;
167     } else if ((*j_it)->get_data() == "osd tier remove") {
168       return 0;
169     }
170   }
171   return -ENOSYS;
172 }
173
174 void TestRadosClient::add_aio_operation(const std::string& oid,
175                                         bool queue_callback,
176                                         const AioFunction &aio_function,
177                                         AioCompletionImpl *c) {
178   AioFunctionContext *ctx = new AioFunctionContext(
179     aio_function, queue_callback ? m_aio_finisher : NULL, c);
180   get_finisher(oid)->queue(ctx);
181 }
182
183 struct WaitForFlush {
184   int flushed() {
185     if (--count == 0) {
186       aio_finisher->queue(new FunctionContext(boost::bind(
187         &finish_aio_completion, c, 0)));
188       delete this;
189     }
190     return 0;
191   }
192
193   std::atomic<int64_t> count = { 0 };
194   Finisher *aio_finisher;
195   AioCompletionImpl *c;
196 };
197
198 void TestRadosClient::flush_aio_operations() {
199   AioCompletionImpl *comp = new AioCompletionImpl();
200   flush_aio_operations(comp);
201   comp->wait_for_safe();
202   comp->put();
203 }
204
205 void TestRadosClient::flush_aio_operations(AioCompletionImpl *c) {
206   c->get();
207
208   WaitForFlush *wait_for_flush = new WaitForFlush();
209   wait_for_flush->count = m_finishers.size();
210   wait_for_flush->aio_finisher = m_aio_finisher;
211   wait_for_flush->c = c;
212
213   for (size_t i = 0; i < m_finishers.size(); ++i) {
214     AioFunctionContext *ctx = new AioFunctionContext(
215       boost::bind(&WaitForFlush::flushed, wait_for_flush),
216       nullptr, nullptr);
217     m_finishers[i]->queue(ctx);
218   }
219 }
220
221 int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
222   c->get();
223   Context *ctx = new FunctionContext(boost::bind(
224     &TestRadosClient::finish_aio_completion, this, c, _1));
225   get_watch_notify()->aio_flush(this, ctx);
226   return 0;
227 }
228
229 void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
230   librados::finish_aio_completion(c, r);
231 }
232
233 Finisher *TestRadosClient::get_finisher(const std::string &oid) {
234   std::size_t h = m_hash(oid);
235   return m_finishers[h % m_finishers.size()];
236 }
237
238 } // namespace librados