Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / librados_test_stub / TestWatchNotify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/librados_test_stub/TestWatchNotify.h"
5 #include "include/Context.h"
6 #include "include/stringify.h"
7 #include "common/Finisher.h"
8 #include "test/librados_test_stub/TestRadosClient.h"
9 #include <boost/bind.hpp>
10 #include <boost/function.hpp>
11 #include "include/assert.h"
12
13 #define dout_subsys ceph_subsys_rados
14 #undef dout_prefix
15 #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
16
17 namespace librados {
18
19 std::ostream& operator<<(std::ostream& out,
20                          const TestWatchNotify::WatcherID &watcher_id) {
21   out << "(" << watcher_id.first << "," << watcher_id.second << ")";
22   return out;
23 }
24
25 TestWatchNotify::TestWatchNotify()
26   : m_lock("librados::TestWatchNotify::m_lock") {
27 }
28
29 void TestWatchNotify::flush(TestRadosClient *rados_client) {
30   CephContext *cct = rados_client->cct();
31
32   ldout(cct, 20) << "enter" << dendl;
33   // block until we know no additional async notify callbacks will occur
34   Mutex::Locker locker(m_lock);
35   while (m_pending_notifies > 0) {
36     m_file_watcher_cond.Wait(m_lock);
37   }
38 }
39
40 int TestWatchNotify::list_watchers(const std::string& o,
41                                    std::list<obj_watch_t> *out_watchers) {
42   Mutex::Locker lock(m_lock);
43   SharedWatcher watcher = get_watcher(o);
44
45   out_watchers->clear();
46   for (TestWatchNotify::WatchHandles::iterator it =
47          watcher->watch_handles.begin();
48        it != watcher->watch_handles.end(); ++it) {
49     obj_watch_t obj;
50     strcpy(obj.addr, it->second.addr.c_str());
51     obj.watcher_id = static_cast<int64_t>(it->second.gid);
52     obj.cookie = it->second.handle;
53     obj.timeout_seconds = 30;
54     out_watchers->push_back(obj);
55   }
56   return 0;
57 }
58
59 void TestWatchNotify::aio_flush(TestRadosClient *rados_client,
60                                 Context *on_finish) {
61   rados_client->get_aio_finisher()->queue(on_finish);
62 }
63
64 void TestWatchNotify::aio_watch(TestRadosClient *rados_client,
65                                 const std::string& o, uint64_t gid,
66                                 uint64_t *handle,
67                                 librados::WatchCtx2 *watch_ctx,
68                                 Context *on_finish) {
69   int r = watch(rados_client, o, gid, handle, nullptr, watch_ctx);
70   rados_client->get_aio_finisher()->queue(on_finish, r);
71 }
72
73 void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
74                                   uint64_t handle, Context *on_finish) {
75   unwatch(rados_client, handle);
76   rados_client->get_aio_finisher()->queue(on_finish);
77 }
78
79 void TestWatchNotify::aio_notify(TestRadosClient *rados_client,
80                                  const std::string& oid, bufferlist& bl,
81                                  uint64_t timeout_ms, bufferlist *pbl,
82                                  Context *on_notify) {
83   CephContext *cct = rados_client->cct();
84
85   Mutex::Locker lock(m_lock);
86   ++m_pending_notifies;
87   uint64_t notify_id = ++m_notify_id;
88
89   ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
90
91   SharedWatcher watcher = get_watcher(oid);
92
93   SharedNotifyHandle notify_handle(new NotifyHandle());
94   notify_handle->rados_client = rados_client;
95   notify_handle->pbl = pbl;
96   notify_handle->on_notify = on_notify;
97   for (auto &watch_handle_pair : watcher->watch_handles) {
98     WatchHandle &watch_handle = watch_handle_pair.second;
99     notify_handle->pending_watcher_ids.insert(std::make_pair(
100       watch_handle.gid, watch_handle.handle));
101   }
102   watcher->notify_handles[notify_id] = notify_handle;
103
104   FunctionContext *ctx = new FunctionContext(
105     boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl,
106                 notify_id));
107   rados_client->get_aio_finisher()->queue(ctx);
108 }
109
110 int TestWatchNotify::notify(TestRadosClient *rados_client,
111                             const std::string& oid, bufferlist& bl,
112                             uint64_t timeout_ms, bufferlist *pbl) {
113   C_SaferCond cond;
114   aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond);
115   return cond.wait();
116 }
117
118 void TestWatchNotify::notify_ack(TestRadosClient *rados_client,
119                                  const std::string& o, uint64_t notify_id,
120                                  uint64_t handle, uint64_t gid,
121                                  bufferlist& bl) {
122   CephContext *cct = rados_client->cct();
123   ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
124                  << ", gid=" << gid << dendl;
125   Mutex::Locker lock(m_lock);
126   WatcherID watcher_id = std::make_pair(gid, handle);
127   ack_notify(rados_client, o, notify_id, watcher_id, bl);
128   finish_notify(rados_client, o, notify_id);
129 }
130
131 int TestWatchNotify::watch(TestRadosClient *rados_client,
132                            const std::string& o, uint64_t gid,
133                            uint64_t *handle, librados::WatchCtx *ctx,
134                            librados::WatchCtx2 *ctx2) {
135   CephContext *cct = rados_client->cct();
136
137   Mutex::Locker lock(m_lock);
138   SharedWatcher watcher = get_watcher(o);
139
140   WatchHandle watch_handle;
141   watch_handle.rados_client = rados_client;
142   watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce());
143   watch_handle.nonce = rados_client->get_nonce();
144   watch_handle.gid = gid;
145   watch_handle.handle = ++m_handle;
146   watch_handle.watch_ctx = ctx;
147   watch_handle.watch_ctx2 = ctx2;
148   watcher->watch_handles[watch_handle.handle] = watch_handle;
149
150   *handle = watch_handle.handle;
151
152   ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
153                  << dendl;
154   return 0;
155 }
156
157 int TestWatchNotify::unwatch(TestRadosClient *rados_client,
158                              uint64_t handle) {
159   CephContext *cct = rados_client->cct();
160
161   ldout(cct, 20) << "handle=" << handle << dendl;
162   Mutex::Locker locker(m_lock);
163   for (FileWatchers::iterator it = m_file_watchers.begin();
164        it != m_file_watchers.end(); ++it) {
165     SharedWatcher watcher = it->second;
166
167     WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
168     if (w_it != watcher->watch_handles.end()) {
169       watcher->watch_handles.erase(w_it);
170       if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
171         m_file_watchers.erase(it);
172       }
173       break;
174     }
175   }
176   return 0;
177 }
178
179 TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
180     const std::string& oid) {
181   assert(m_lock.is_locked());
182   SharedWatcher &watcher = m_file_watchers[oid];
183   if (!watcher) {
184     watcher.reset(new Watcher());
185   }
186   return watcher;
187 }
188
189 void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
190                                      const std::string &oid,
191                                      bufferlist &bl, uint64_t notify_id) {
192   CephContext *cct = rados_client->cct();
193
194   ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
195
196   Mutex::Locker lock(m_lock);
197   SharedWatcher watcher = get_watcher(oid);
198   WatchHandles &watch_handles = watcher->watch_handles;
199
200   NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
201   if (n_it == watcher->notify_handles.end()) {
202     ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
203                   << ": not found" << dendl;
204     return;
205   }
206
207   SharedNotifyHandle notify_handle = n_it->second;
208   WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
209   for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
210        w_id_it != watcher_ids.end(); ++w_id_it) {
211     WatcherID watcher_id = *w_id_it;
212     WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
213     if (w_it == watch_handles.end()) {
214       // client disconnected before notification processed
215       notify_handle->pending_watcher_ids.erase(watcher_id);
216     } else {
217       WatchHandle watch_handle = w_it->second;
218       assert(watch_handle.gid == watcher_id.first);
219       assert(watch_handle.handle == watcher_id.second);
220
221       uint64_t notifier_id = rados_client->get_instance_id();
222       watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
223         [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
224           bufferlist notify_bl;
225           notify_bl.append(bl);
226
227           if (watch_handle.watch_ctx2 != NULL) {
228             watch_handle.watch_ctx2->handle_notify(notify_id,
229                                                    watch_handle.handle,
230                                                    notifier_id, notify_bl);
231           } else if (watch_handle.watch_ctx != NULL) {
232             watch_handle.watch_ctx->notify(0, 0, notify_bl);
233
234             // auto ack old-style watch/notify clients
235             ack_notify(watch_handle.rados_client, oid, notify_id,
236                        {watch_handle.gid, watch_handle.handle}, bufferlist());
237           }
238         }));
239     }
240   }
241
242   finish_notify(rados_client, oid, notify_id);
243
244   if (--m_pending_notifies == 0) {
245     m_file_watcher_cond.Signal();
246   }
247 }
248
249 void TestWatchNotify::ack_notify(TestRadosClient *rados_client,
250                                  const std::string &oid,
251                                  uint64_t notify_id,
252                                  const WatcherID &watcher_id,
253                                  const bufferlist &bl) {
254   CephContext *cct = rados_client->cct();
255
256   ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
257                  << ", WatcherID=" << watcher_id << dendl;
258
259   assert(m_lock.is_locked());
260   SharedWatcher watcher = get_watcher(oid);
261
262   NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
263   if (it == watcher->notify_handles.end()) {
264     ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
265                   << ", WatcherID=" << watcher_id << ": not found" << dendl;
266     return;
267   }
268
269   bufferlist response;
270   response.append(bl);
271
272   SharedNotifyHandle notify_handle = it->second;
273   notify_handle->notify_responses[watcher_id] = response;
274   notify_handle->pending_watcher_ids.erase(watcher_id);
275 }
276
277 void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
278                                     const std::string &oid,
279                                     uint64_t notify_id) {
280   CephContext *cct = rados_client->cct();
281
282   ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
283
284   assert(m_lock.is_locked());
285   SharedWatcher watcher = get_watcher(oid);
286
287   NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
288   if (it == watcher->notify_handles.end()) {
289     ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
290                   << ": not found" << dendl;
291     return;
292   }
293
294   SharedNotifyHandle notify_handle = it->second;
295   if (!notify_handle->pending_watcher_ids.empty()) {
296     ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
297                    << ": pending watchers, returning" << dendl;
298     return;
299   }
300
301   ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
302                  << ": completing" << dendl;
303
304   if (notify_handle->pbl != NULL) {
305     ::encode(notify_handle->notify_responses, *notify_handle->pbl);
306     ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
307   }
308
309   notify_handle->rados_client->get_aio_finisher()->queue(
310     notify_handle->on_notify, 0);
311   watcher->notify_handles.erase(notify_id);
312   if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
313     m_file_watchers.erase(oid);
314   }
315 }
316
317 void TestWatchNotify::blacklist(uint32_t nonce) {
318   Mutex::Locker locker(m_lock);
319
320   for (auto file_it = m_file_watchers.begin();
321        file_it != m_file_watchers.end(); ) {
322     auto &watcher = file_it->second;
323     for (auto w_it = watcher->watch_handles.begin();
324          w_it != watcher->watch_handles.end();) {
325       if (w_it->second.nonce == nonce) {
326         w_it = watcher->watch_handles.erase(w_it);
327       } else {
328         ++w_it;
329       }
330     }
331     if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
332         file_it = m_file_watchers.erase(file_it);
333     } else {
334       ++file_it;
335     }
336   }
337 }
338
339 } // namespace librados