Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / librados / watch_notify.cc
1 #include "include/rados/librados.h"
2 #include "include/rados/librados.hpp"
3 #include "include/rados/rados_types.h"
4 #include "test/librados/test.h"
5 #include "test/librados/TestCase.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <semaphore.h>
10 #include "gtest/gtest.h"
11 #include "include/encoding.h"
12 #include <set>
13 #include <map>
14
15 using namespace librados;
16
17 typedef RadosTestEC LibRadosWatchNotifyEC;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP;
19
20 int notify_sleep = 0;
21
22 // notify
23 static sem_t *sem;
24
25 static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg)
26 {
27   std::cout << __func__ << std::endl;
28   sem_post(sem);
29 }
30
31 class WatchNotifyTestCtx : public WatchCtx
32 {
33 public:
34     void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) override
35     {
36       std::cout << __func__ << std::endl;
37       sem_post(sem);
38     }
39 };
40
41 class LibRadosWatchNotify : public RadosTest
42 {
43 protected:
44   // notify 2
45   bufferlist notify_bl;
46   std::set<uint64_t> notify_cookies;
47   rados_ioctx_t notify_io;
48   const char *notify_oid = nullptr;
49   int notify_err = 0;
50
51   static void watch_notify2_test_cb(void *arg,
52                                     uint64_t notify_id,
53                                     uint64_t cookie,
54                                     uint64_t notifier_gid,
55                                     void *data,
56                                     size_t data_len);
57   static void watch_notify2_test_errcb(void *arg, uint64_t cookie, int err);
58 };
59
60
61 void LibRadosWatchNotify::watch_notify2_test_cb(void *arg,
62                                   uint64_t notify_id,
63                                   uint64_t cookie,
64                                   uint64_t notifier_gid,
65                                   void *data,
66                                   size_t data_len)
67 {
68   std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
69             << " cookie " << cookie << std::endl;
70   assert(notifier_gid > 0);
71   auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
72   assert(thiz);
73   thiz->notify_cookies.insert(cookie);
74   thiz->notify_bl.clear();
75   thiz->notify_bl.append((char*)data, data_len);
76   if (notify_sleep)
77     sleep(notify_sleep);
78   rados_notify_ack(thiz->notify_io, thiz->notify_oid, notify_id, cookie,
79                    "reply", 5);
80 }
81
82 void LibRadosWatchNotify::watch_notify2_test_errcb(void *arg,
83                                                    uint64_t cookie,
84                                                    int err)
85 {
86   std::cout << __func__ << " cookie " << cookie << " err " << err << std::endl;
87   assert(cookie > 1000);
88   auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
89   assert(thiz);
90   thiz->notify_err = err;
91 }
92
93 class WatchNotifyTestCtx2;
94 class LibRadosWatchNotifyPP : public RadosTestParamPP
95 {
96 protected:
97   bufferlist notify_bl;
98   std::set<uint64_t> notify_cookies;
99   rados_ioctx_t notify_io;
100   const char *notify_oid = nullptr;
101   int notify_err = 0;
102
103   friend class WatchNotifyTestCtx2;
104 };
105
106 IoCtx *notify_ioctx;
107
108 class WatchNotifyTestCtx2 : public WatchCtx2
109 {
110   LibRadosWatchNotifyPP *notify;
111
112 public:
113   WatchNotifyTestCtx2(LibRadosWatchNotifyPP *notify)
114     : notify(notify)
115   {}
116
117   void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
118                      bufferlist& bl) override {
119     std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
120               << " notifier_gid " << notifier_gid << std::endl;
121     notify->notify_bl = bl;
122     notify->notify_cookies.insert(cookie);
123     bufferlist reply;
124     reply.append("reply", 5);
125     if (notify_sleep)
126       sleep(notify_sleep);
127     notify_ioctx->notify_ack(notify->notify_oid, notify_id, cookie, reply);
128   }
129
130   void handle_error(uint64_t cookie, int err) override {
131     std::cout << __func__ << " cookie " << cookie
132               << " err " << err << std::endl;
133     assert(cookie > 1000);
134     notify->notify_err = err;
135   }
136 };
137
138 // --
139
140 #pragma GCC diagnostic ignored "-Wpragmas"
141 #pragma GCC diagnostic push
142 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
143
144 TEST_F(LibRadosWatchNotify, WatchNotify) {
145   ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
146   char buf[128];
147   memset(buf, 0xcc, sizeof(buf));
148   ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
149   uint64_t handle;
150   ASSERT_EQ(0,
151       rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
152   ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
153   TestAlarm alarm;
154   sem_wait(sem);
155   rados_unwatch(ioctx, "foo", handle);
156
157   // when dne ...
158   ASSERT_EQ(-ENOENT,
159       rados_watch(ioctx, "dne", 0, &handle, watch_notify_test_cb, NULL));
160
161   sem_close(sem);
162 }
163
164 TEST_P(LibRadosWatchNotifyPP, WatchNotify) {
165   ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
166   char buf[128];
167   memset(buf, 0xcc, sizeof(buf));
168   bufferlist bl1;
169   bl1.append(buf, sizeof(buf));
170   ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
171   uint64_t handle;
172   WatchNotifyTestCtx ctx;
173   ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
174   std::list<obj_watch_t> watches;
175   ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
176   ASSERT_EQ(watches.size(), 1u);
177   bufferlist bl2;
178   ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
179   TestAlarm alarm;
180   sem_wait(sem);
181   ioctx.unwatch("foo", handle);
182   sem_close(sem);
183 }
184
185 TEST_F(LibRadosWatchNotifyEC, WatchNotify) {
186   ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
187   char buf[128];
188   memset(buf, 0xcc, sizeof(buf));
189   ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
190   uint64_t handle;
191   ASSERT_EQ(0,
192       rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
193   ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
194   TestAlarm alarm;
195   sem_wait(sem);
196   rados_unwatch(ioctx, "foo", handle);
197   sem_close(sem);
198 }
199
200 TEST_F(LibRadosWatchNotifyECPP, WatchNotify) {
201   ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
202   char buf[128];
203   memset(buf, 0xcc, sizeof(buf));
204   bufferlist bl1;
205   bl1.append(buf, sizeof(buf));
206   ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
207   uint64_t handle;
208   WatchNotifyTestCtx ctx;
209   ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
210   std::list<obj_watch_t> watches;
211   ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
212   ASSERT_EQ(watches.size(), 1u);
213   bufferlist bl2;
214   ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
215   TestAlarm alarm;
216   sem_wait(sem);
217   ioctx.unwatch("foo", handle);
218   sem_close(sem);
219 }
220
221 // --
222
223 TEST_P(LibRadosWatchNotifyPP, WatchNotifyTimeout) {
224   ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
225   ioctx.set_notify_timeout(1);
226   uint64_t handle;
227   WatchNotifyTestCtx ctx;
228
229   char buf[128];
230   memset(buf, 0xcc, sizeof(buf));
231   bufferlist bl1;
232   bl1.append(buf, sizeof(buf));
233   ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
234
235   ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
236   sem_close(sem);
237   ASSERT_EQ(0, ioctx.unwatch("foo", handle));
238 }
239
240 TEST_F(LibRadosWatchNotifyECPP, WatchNotifyTimeout) {
241   ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
242   ioctx.set_notify_timeout(1);
243   uint64_t handle;
244   WatchNotifyTestCtx ctx;
245
246   char buf[128];
247   memset(buf, 0xcc, sizeof(buf));
248   bufferlist bl1;
249   bl1.append(buf, sizeof(buf));
250   ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
251
252   ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
253   sem_close(sem);
254   ASSERT_EQ(0, ioctx.unwatch("foo", handle));
255 }
256
257 #pragma GCC diagnostic pop
258 #pragma GCC diagnostic warning "-Wpragmas"
259
260
261 // --
262
263 TEST_F(LibRadosWatchNotify, Watch2Delete) {
264   notify_io = ioctx;
265   notify_oid = "foo";
266   notify_err = 0;
267   char buf[128];
268   memset(buf, 0xcc, sizeof(buf));
269   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
270   uint64_t handle;
271   ASSERT_EQ(0,
272             rados_watch2(ioctx, notify_oid, &handle,
273                          watch_notify2_test_cb,
274                          watch_notify2_test_errcb, this));
275   ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
276   int left = 300;
277   std::cout << "waiting up to " << left << " for disconnect notification ..."
278             << std::endl;
279   while (notify_err == 0 && --left) {
280     sleep(1);
281   }
282   ASSERT_TRUE(left > 0);
283   ASSERT_EQ(-ENOTCONN, notify_err);
284   ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
285   rados_unwatch2(ioctx, handle);
286   rados_watch_flush(cluster);
287 }
288
289 TEST_F(LibRadosWatchNotify, AioWatchDelete) {
290   notify_io = ioctx;
291   notify_oid = "foo";
292   notify_err = 0;
293   char buf[128];
294   memset(buf, 0xcc, sizeof(buf));
295   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
296
297
298   rados_completion_t comp;
299   uint64_t handle;
300   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
301   rados_aio_watch(ioctx, notify_oid, comp, &handle,
302                   watch_notify2_test_cb, watch_notify2_test_errcb, this);
303   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
304   ASSERT_EQ(0, rados_aio_get_return_value(comp));
305   rados_aio_release(comp);
306   ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
307   int left = 300;
308   std::cout << "waiting up to " << left << " for disconnect notification ..."
309             << std::endl;
310   while (notify_err == 0 && --left) {
311     sleep(1);
312   }
313   ASSERT_TRUE(left > 0);
314   ASSERT_EQ(-ENOTCONN, notify_err);
315   ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
316   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
317   rados_aio_unwatch(ioctx, handle, comp);
318   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
319   ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
320   rados_aio_release(comp);
321 }
322
323 // --
324
325 TEST_F(LibRadosWatchNotify, WatchNotify2) {
326   notify_io = ioctx;
327   notify_oid = "foo";
328   notify_cookies.clear();
329   char buf[128];
330   memset(buf, 0xcc, sizeof(buf));
331   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
332   uint64_t handle;
333   ASSERT_EQ(0,
334       rados_watch2(ioctx, notify_oid, &handle,
335                    watch_notify2_test_cb,
336                    watch_notify2_test_errcb, this));
337   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
338   char *reply_buf = 0;
339   size_t reply_buf_len;
340   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
341                              "notify", 6, 300000,
342                              &reply_buf, &reply_buf_len));
343   bufferlist reply;
344   reply.append(reply_buf, reply_buf_len);
345   std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
346   std::set<std::pair<uint64_t,uint64_t> > missed_map;
347   bufferlist::iterator reply_p = reply.begin();
348   ::decode(reply_map, reply_p);
349   ::decode(missed_map, reply_p);
350   ASSERT_EQ(1u, reply_map.size());
351   ASSERT_EQ(0u, missed_map.size());
352   ASSERT_EQ(1u, notify_cookies.size());
353   ASSERT_EQ(1u, notify_cookies.count(handle));
354   ASSERT_EQ(5u, reply_map.begin()->second.length());
355   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
356   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
357   rados_buffer_free(reply_buf);
358
359   // try it on a non-existent object ... our buffer pointers
360   // should get zeroed.
361   ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
362                                    "notify", 6, 300000,
363                                    &reply_buf, &reply_buf_len));
364   ASSERT_EQ((char*)0, reply_buf);
365   ASSERT_EQ(0u, reply_buf_len);
366
367   rados_unwatch2(ioctx, handle);
368   rados_watch_flush(cluster);
369 }
370
371 TEST_F(LibRadosWatchNotify, AioWatchNotify2) {
372   notify_io = ioctx;
373   notify_oid = "foo";
374   notify_cookies.clear();
375   char buf[128];
376   memset(buf, 0xcc, sizeof(buf));
377   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
378
379   rados_completion_t comp;
380   uint64_t handle;
381   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
382   rados_aio_watch(ioctx, notify_oid, comp, &handle,
383                   watch_notify2_test_cb, watch_notify2_test_errcb, this);
384   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
385   ASSERT_EQ(0, rados_aio_get_return_value(comp));
386   rados_aio_release(comp);
387
388   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
389   char *reply_buf = 0;
390   size_t reply_buf_len;
391   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
392                              "notify", 6, 300000,
393                              &reply_buf, &reply_buf_len));
394   bufferlist reply;
395   reply.append(reply_buf, reply_buf_len);
396   std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
397   std::set<std::pair<uint64_t,uint64_t> > missed_map;
398   bufferlist::iterator reply_p = reply.begin();
399   ::decode(reply_map, reply_p);
400   ::decode(missed_map, reply_p);
401   ASSERT_EQ(1u, reply_map.size());
402   ASSERT_EQ(0u, missed_map.size());
403   ASSERT_EQ(1u, notify_cookies.size());
404   ASSERT_EQ(1u, notify_cookies.count(handle));
405   ASSERT_EQ(5u, reply_map.begin()->second.length());
406   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
407   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
408   rados_buffer_free(reply_buf);
409
410   // try it on a non-existent object ... our buffer pointers
411   // should get zeroed.
412   ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
413                                    "notify", 6, 300000,
414                                    &reply_buf, &reply_buf_len));
415   ASSERT_EQ((char*)0, reply_buf);
416   ASSERT_EQ(0u, reply_buf_len);
417
418   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
419   rados_aio_unwatch(ioctx, handle, comp);
420   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
421   ASSERT_EQ(0, rados_aio_get_return_value(comp));
422   rados_aio_release(comp);
423 }
424
425 TEST_F(LibRadosWatchNotify, AioNotify) {
426   notify_io = ioctx;
427   notify_oid = "foo";
428   notify_cookies.clear();
429   char buf[128];
430   memset(buf, 0xcc, sizeof(buf));
431   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
432   uint64_t handle;
433   ASSERT_EQ(0,
434       rados_watch2(ioctx, notify_oid, &handle,
435                    watch_notify2_test_cb,
436                    watch_notify2_test_errcb, this));
437   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
438   char *reply_buf = 0;
439   size_t reply_buf_len;
440   rados_completion_t comp;
441   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
442   ASSERT_EQ(0, rados_aio_notify(ioctx, "foo", comp, "notify", 6, 300000,
443                                 &reply_buf, &reply_buf_len));
444   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
445   ASSERT_EQ(0, rados_aio_get_return_value(comp));
446   rados_aio_release(comp);
447
448   bufferlist reply;
449   reply.append(reply_buf, reply_buf_len);
450   std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
451   std::set<std::pair<uint64_t,uint64_t> > missed_map;
452   bufferlist::iterator reply_p = reply.begin();
453   ::decode(reply_map, reply_p);
454   ::decode(missed_map, reply_p);
455   ASSERT_EQ(1u, reply_map.size());
456   ASSERT_EQ(0u, missed_map.size());
457   ASSERT_EQ(1u, notify_cookies.size());
458   ASSERT_EQ(1u, notify_cookies.count(handle));
459   ASSERT_EQ(5u, reply_map.begin()->second.length());
460   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
461   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
462   rados_buffer_free(reply_buf);
463
464   // try it on a non-existent object ... our buffer pointers
465   // should get zeroed.
466   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
467   ASSERT_EQ(0, rados_aio_notify(ioctx, "doesnotexist", comp, "notify", 6,
468                                 300000, &reply_buf, &reply_buf_len));
469   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
470   ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
471   rados_aio_release(comp);
472   ASSERT_EQ((char*)0, reply_buf);
473   ASSERT_EQ(0u, reply_buf_len);
474
475   rados_unwatch2(ioctx, handle);
476   rados_watch_flush(cluster);
477 }
478
479 TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
480   notify_oid = "foo";
481   notify_ioctx = &ioctx;
482   notify_cookies.clear();
483   char buf[128];
484   memset(buf, 0xcc, sizeof(buf));
485   bufferlist bl1;
486   bl1.append(buf, sizeof(buf));
487   ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
488   uint64_t handle;
489   WatchNotifyTestCtx2 ctx(this);
490   ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
491   ASSERT_GT(ioctx.watch_check(handle), 0);
492   std::list<obj_watch_t> watches;
493   ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
494   ASSERT_EQ(watches.size(), 1u);
495   bufferlist bl2, bl_reply;
496   ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
497   bufferlist::iterator p = bl_reply.begin();
498   std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
499   std::set<std::pair<uint64_t,uint64_t> > missed_map;
500   ::decode(reply_map, p);
501   ::decode(missed_map, p);
502   ASSERT_EQ(1u, notify_cookies.size());
503   ASSERT_EQ(1u, notify_cookies.count(handle));
504   ASSERT_EQ(1u, reply_map.size());
505   ASSERT_EQ(5u, reply_map.begin()->second.length());
506   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
507   ASSERT_EQ(0u, missed_map.size());
508   ASSERT_GT(ioctx.watch_check(handle), 0);
509   ioctx.unwatch2(handle);
510 }
511
512 TEST_P(LibRadosWatchNotifyPP, AioWatchNotify2) {
513   notify_oid = "foo";
514   notify_ioctx = &ioctx;
515   notify_cookies.clear();
516   char buf[128];
517   memset(buf, 0xcc, sizeof(buf));
518   bufferlist bl1;
519   bl1.append(buf, sizeof(buf));
520   ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
521
522   uint64_t handle;
523   WatchNotifyTestCtx2 ctx(this);
524   librados::AioCompletion *comp = cluster.aio_create_completion();
525   ASSERT_EQ(0, ioctx.aio_watch(notify_oid, comp, &handle, &ctx));
526   ASSERT_EQ(0, comp->wait_for_complete());
527   ASSERT_EQ(0, comp->get_return_value());
528   comp->release();
529
530   ASSERT_GT(ioctx.watch_check(handle), 0);
531   std::list<obj_watch_t> watches;
532   ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
533   ASSERT_EQ(watches.size(), 1u);
534   bufferlist bl2, bl_reply;
535   ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
536   bufferlist::iterator p = bl_reply.begin();
537   std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
538   std::set<std::pair<uint64_t,uint64_t> > missed_map;
539   ::decode(reply_map, p);
540   ::decode(missed_map, p);
541   ASSERT_EQ(1u, notify_cookies.size());
542   ASSERT_EQ(1u, notify_cookies.count(handle));
543   ASSERT_EQ(1u, reply_map.size());
544   ASSERT_EQ(5u, reply_map.begin()->second.length());
545   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
546   ASSERT_EQ(0u, missed_map.size());
547   ASSERT_GT(ioctx.watch_check(handle), 0);
548
549   comp = cluster.aio_create_completion();
550   ioctx.aio_unwatch(handle, comp);
551   ASSERT_EQ(0, comp->wait_for_complete());
552   comp->release();
553 }
554
555
556 TEST_P(LibRadosWatchNotifyPP, AioNotify) {
557   notify_oid = "foo";
558   notify_ioctx = &ioctx;
559   notify_cookies.clear();
560   char buf[128];
561   memset(buf, 0xcc, sizeof(buf));
562   bufferlist bl1;
563   bl1.append(buf, sizeof(buf));
564   ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
565   uint64_t handle;
566   WatchNotifyTestCtx2 ctx(this);
567   ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
568   ASSERT_GT(ioctx.watch_check(handle), 0);
569   std::list<obj_watch_t> watches;
570   ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
571   ASSERT_EQ(watches.size(), 1u);
572   bufferlist bl2, bl_reply;
573   librados::AioCompletion *comp = cluster.aio_create_completion();
574   ASSERT_EQ(0, ioctx.aio_notify(notify_oid, comp, bl2, 300000, &bl_reply));
575   ASSERT_EQ(0, comp->wait_for_complete());
576   ASSERT_EQ(0, comp->get_return_value());
577   comp->release();
578   bufferlist::iterator p = bl_reply.begin();
579   std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
580   std::set<std::pair<uint64_t,uint64_t> > missed_map;
581   ::decode(reply_map, p);
582   ::decode(missed_map, p);
583   ASSERT_EQ(1u, notify_cookies.size());
584   ASSERT_EQ(1u, notify_cookies.count(handle));
585   ASSERT_EQ(1u, reply_map.size());
586   ASSERT_EQ(5u, reply_map.begin()->second.length());
587   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
588   ASSERT_EQ(0u, missed_map.size());
589   ASSERT_GT(ioctx.watch_check(handle), 0);
590   ioctx.unwatch2(handle);
591   cluster.watch_flush();
592 }
593
594 // --
595
596 TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
597   notify_io = ioctx;
598   notify_oid = "foo";
599   notify_cookies.clear();
600   char buf[128];
601   memset(buf, 0xcc, sizeof(buf));
602   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
603   uint64_t handle1, handle2;
604   ASSERT_EQ(0,
605       rados_watch2(ioctx, notify_oid, &handle1,
606                    watch_notify2_test_cb,
607                    watch_notify2_test_errcb, this));
608   ASSERT_EQ(0,
609       rados_watch2(ioctx, notify_oid, &handle2,
610                    watch_notify2_test_cb,
611                    watch_notify2_test_errcb, this));
612   ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
613   ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
614   ASSERT_NE(handle1, handle2);
615   char *reply_buf = 0;
616   size_t reply_buf_len;
617   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
618                              "notify", 6, 300000,
619                              &reply_buf, &reply_buf_len));
620   bufferlist reply;
621   reply.append(reply_buf, reply_buf_len);
622   std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
623   std::set<std::pair<uint64_t,uint64_t> > missed_map;
624   bufferlist::iterator reply_p = reply.begin();
625   ::decode(reply_map, reply_p);
626   ::decode(missed_map, reply_p);
627   ASSERT_EQ(2u, reply_map.size());
628   ASSERT_EQ(5u, reply_map.begin()->second.length());
629   ASSERT_EQ(0u, missed_map.size());
630   ASSERT_EQ(2u, notify_cookies.size());
631   ASSERT_EQ(1u, notify_cookies.count(handle1));
632   ASSERT_EQ(1u, notify_cookies.count(handle2));
633   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
634   ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
635   ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
636   rados_buffer_free(reply_buf);
637   rados_unwatch2(ioctx, handle1);
638   rados_unwatch2(ioctx, handle2);
639   rados_watch_flush(cluster);
640 }
641
642 // --
643
644 TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
645   notify_io = ioctx;
646   notify_oid = "foo";
647   notify_sleep = 3; // 3s
648   notify_cookies.clear();
649   char buf[128];
650   memset(buf, 0xcc, sizeof(buf));
651   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
652   uint64_t handle;
653   ASSERT_EQ(0,
654       rados_watch2(ioctx, notify_oid, &handle,
655                    watch_notify2_test_cb,
656                    watch_notify2_test_errcb, this));
657   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
658   char *reply_buf = 0;
659   size_t reply_buf_len;
660   ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,
661                                       "notify", 6, 1000, // 1s
662                                       &reply_buf, &reply_buf_len));
663   ASSERT_EQ(1u, notify_cookies.size());
664   {
665     bufferlist reply;
666     reply.append(reply_buf, reply_buf_len);
667     std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
668     std::set<std::pair<uint64_t,uint64_t> > missed_map;
669     bufferlist::iterator reply_p = reply.begin();
670     ::decode(reply_map, reply_p);
671     ::decode(missed_map, reply_p);
672     ASSERT_EQ(0u, reply_map.size());
673     ASSERT_EQ(1u, missed_map.size());
674   }
675   rados_buffer_free(reply_buf);
676
677   // we should get the next notify, though!
678   notify_sleep = 0;
679   notify_cookies.clear();
680   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
681                              "notify", 6, 300000, // 300s
682                              &reply_buf, &reply_buf_len));
683   ASSERT_EQ(1u, notify_cookies.size());
684   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
685
686   rados_unwatch2(ioctx, handle);
687
688   rados_completion_t comp;
689   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
690   rados_aio_watch_flush(cluster, comp);
691   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
692   ASSERT_EQ(0, rados_aio_get_return_value(comp));
693   rados_aio_release(comp);
694   rados_buffer_free(reply_buf);
695
696 }
697
698 TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
699   notify_oid = "foo";
700   notify_ioctx = &ioctx;
701   notify_sleep = 3;  // 3s
702   notify_cookies.clear();
703   char buf[128];
704   memset(buf, 0xcc, sizeof(buf));
705   bufferlist bl1;
706   bl1.append(buf, sizeof(buf));
707   ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
708   uint64_t handle;
709   WatchNotifyTestCtx2 ctx(this);
710   ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
711   ASSERT_GT(ioctx.watch_check(handle), 0);
712   std::list<obj_watch_t> watches;
713   ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
714   ASSERT_EQ(watches.size(), 1u);
715   ASSERT_EQ(0u, notify_cookies.size());
716   bufferlist bl2, bl_reply;
717   std::cout << " trying..." << std::endl;
718   ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */,
719                                       &bl_reply));
720   std::cout << " timed out" << std::endl;
721   ASSERT_GT(ioctx.watch_check(handle), 0);
722   ioctx.unwatch2(handle);
723
724   std::cout << " flushing" << std::endl;
725   librados::AioCompletion *comp = cluster.aio_create_completion();
726   cluster.aio_watch_flush(comp);
727   ASSERT_EQ(0, comp->wait_for_complete());
728   ASSERT_EQ(0, comp->get_return_value());
729   std::cout << " flushed" << std::endl;
730   comp->release();
731 }
732
733 TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
734   notify_oid = "foo";
735   notify_ioctx = &ioctx;
736   notify_cookies.clear();
737   uint32_t timeout = 12; // configured timeout
738   char buf[128];
739   memset(buf, 0xcc, sizeof(buf));
740   bufferlist bl1;
741   bl1.append(buf, sizeof(buf));
742   ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
743   uint64_t handle;
744   WatchNotifyTestCtx2 ctx(this);
745   ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
746   ASSERT_GT(ioctx.watch_check(handle), 0);
747   std::list<obj_watch_t> watches;
748   ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
749   ASSERT_EQ(watches.size(), 1u);
750   std::cout << "List watches" << std::endl;
751   for (std::list<obj_watch_t>::iterator it = watches.begin();
752     it != watches.end(); ++it) {
753     ASSERT_EQ(it->timeout_seconds, timeout);
754   }
755   bufferlist bl2, bl_reply;
756   std::cout << "notify2" << std::endl;
757   ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
758   std::cout << "notify2 done" << std::endl;
759   bufferlist::iterator p = bl_reply.begin();
760   std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
761   std::set<std::pair<uint64_t,uint64_t> > missed_map;
762   ::decode(reply_map, p);
763   ::decode(missed_map, p);
764   ASSERT_EQ(1u, notify_cookies.size());
765   ASSERT_EQ(1u, notify_cookies.count(handle));
766   ASSERT_EQ(1u, reply_map.size());
767   ASSERT_EQ(5u, reply_map.begin()->second.length());
768   ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
769   ASSERT_EQ(0u, missed_map.size());
770   std::cout << "watch_check" << std::endl;
771   ASSERT_GT(ioctx.watch_check(handle), 0);
772   std::cout << "unwatch2" << std::endl;
773   ioctx.unwatch2(handle);
774
775   std::cout << " flushing" << std::endl;
776   cluster.watch_flush();
777   std::cout << "done" << std::endl;
778 }
779
780 TEST_F(LibRadosWatchNotify, Watch3Timeout) {
781   notify_io = ioctx;
782   notify_oid = "foo";
783   notify_cookies.clear();
784   notify_err = 0;
785   char buf[128];
786   memset(buf, 0xcc, sizeof(buf));
787   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
788   uint64_t handle;
789   time_t start = time(0);
790   const uint32_t timeout = 4;
791   {
792     // make sure i timeout before the messenger reconnects to the OSD,
793     // it will resend a watch request on behalf of the client, and the
794     // timer of timeout on OSD side will be reset by the new request.
795     char conf[128];
796     ASSERT_EQ(0, rados_conf_get(cluster,
797                                 "ms_tcp_read_timeout",
798                                 conf, sizeof(conf)));
799     auto tcp_read_timeout = std::stoll(conf);
800     ASSERT_LT(timeout, tcp_read_timeout);
801   }
802   ASSERT_EQ(0,
803             rados_watch3(ioctx, notify_oid, &handle,
804                      watch_notify2_test_cb, watch_notify2_test_errcb,
805                      timeout, this));
806   int age = rados_watch_check(ioctx, handle);
807   time_t age_bound = time(0) + 1 - start;
808   ASSERT_LT(age, age_bound * 1000);
809   ASSERT_GT(age, 0);
810   rados_conf_set(cluster, "objecter_inject_no_watch_ping", "true");
811   // allow a long time here since an osd peering event will renew our
812   // watch.
813   int left = 16 * timeout;
814   std::cout << "waiting up to " << left << " for osd to time us out ..."
815             << std::endl;
816   while (notify_err == 0 && --left) {
817     sleep(1);
818   }
819   ASSERT_GT(left, 0);
820   rados_conf_set(cluster, "objecter_inject_no_watch_ping", "false");
821   ASSERT_EQ(-ENOTCONN, notify_err);
822   ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
823
824   // a subsequent notify should not reach us
825   char *reply_buf = nullptr;
826   size_t reply_buf_len;
827   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
828                              "notify", 6, 300000,
829                              &reply_buf, &reply_buf_len));
830   {
831     bufferlist reply;
832     reply.append(reply_buf, reply_buf_len);
833     std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
834     std::set<std::pair<uint64_t,uint64_t> > missed_map;
835     bufferlist::iterator reply_p = reply.begin();
836     ::decode(reply_map, reply_p);
837     ::decode(missed_map, reply_p);
838     ASSERT_EQ(0u, reply_map.size());
839     ASSERT_EQ(0u, missed_map.size());
840   }
841   ASSERT_EQ(0u, notify_cookies.size());
842   ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
843   rados_buffer_free(reply_buf);
844
845   // re-watch
846   rados_unwatch2(ioctx, handle);
847   rados_watch_flush(cluster);
848
849   handle = 0;
850   ASSERT_EQ(0,
851             rados_watch2(ioctx, notify_oid, &handle,
852                          watch_notify2_test_cb,
853                          watch_notify2_test_errcb, this));
854   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
855
856   // and now a notify will work.
857   ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
858                              "notify", 6, 300000,
859                              &reply_buf, &reply_buf_len));
860   {
861     bufferlist reply;
862     reply.append(reply_buf, reply_buf_len);
863     std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
864     std::set<std::pair<uint64_t,uint64_t> > missed_map;
865     bufferlist::iterator reply_p = reply.begin();
866     ::decode(reply_map, reply_p);
867     ::decode(missed_map, reply_p);
868     ASSERT_EQ(1u, reply_map.size());
869     ASSERT_EQ(0u, missed_map.size());
870     ASSERT_EQ(1u, notify_cookies.count(handle));
871     ASSERT_EQ(5u, reply_map.begin()->second.length());
872     ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
873   }
874   ASSERT_EQ(1u, notify_cookies.size());
875   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
876
877   rados_buffer_free(reply_buf);
878   rados_unwatch2(ioctx, handle);
879   rados_watch_flush(cluster);
880 }
881
882 TEST_F(LibRadosWatchNotify, AioWatchDelete2) {
883   notify_io = ioctx;
884   notify_oid = "foo";
885   notify_err = 0;
886   char buf[128];
887   uint32_t timeout = 3;
888   memset(buf, 0xcc, sizeof(buf));
889   ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
890
891
892   rados_completion_t comp;
893   uint64_t handle;
894   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
895   rados_aio_watch2(ioctx, notify_oid, comp, &handle,
896                   watch_notify2_test_cb, watch_notify2_test_errcb, timeout, this);
897   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
898   ASSERT_EQ(0, rados_aio_get_return_value(comp));
899   rados_aio_release(comp);
900   ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
901   int left = 30;
902   std::cout << "waiting up to " << left << " for disconnect notification ..."
903       << std::endl;
904   while (notify_err == 0 && --left) {
905     sleep(1);
906   }
907   ASSERT_TRUE(left > 0);
908   ASSERT_EQ(-ENOTCONN, notify_err);
909   ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
910   ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
911   rados_aio_unwatch(ioctx, handle, comp);
912   ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
913   ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
914   rados_aio_release(comp);
915 }
916 // --
917
918 INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
919                         ::testing::Values("", "cache"));