1 #include "include/rados/librados.h"
2 #include "include/rados/librados.hpp"
3 #include "include/rados/rados_types.h"
4 #include "test/librados/test.h"
5 #include "test/librados/TestCase.h"
10 #include "gtest/gtest.h"
11 #include "include/encoding.h"
15 using namespace librados;
17 typedef RadosTestEC LibRadosWatchNotifyEC;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP;
25 static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg)
27 std::cout << __func__ << std::endl;
31 class WatchNotifyTestCtx : public WatchCtx
34 void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) override
36 std::cout << __func__ << std::endl;
41 class LibRadosWatchNotify : public RadosTest
46 std::set<uint64_t> notify_cookies;
47 rados_ioctx_t notify_io;
48 const char *notify_oid = nullptr;
51 static void watch_notify2_test_cb(void *arg,
54 uint64_t notifier_gid,
57 static void watch_notify2_test_errcb(void *arg, uint64_t cookie, int err);
61 void LibRadosWatchNotify::watch_notify2_test_cb(void *arg,
64 uint64_t notifier_gid,
68 std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
69 << " cookie " << cookie << std::endl;
70 assert(notifier_gid > 0);
71 auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
73 thiz->notify_cookies.insert(cookie);
74 thiz->notify_bl.clear();
75 thiz->notify_bl.append((char*)data, data_len);
78 rados_notify_ack(thiz->notify_io, thiz->notify_oid, notify_id, cookie,
82 void LibRadosWatchNotify::watch_notify2_test_errcb(void *arg,
86 std::cout << __func__ << " cookie " << cookie << " err " << err << std::endl;
87 assert(cookie > 1000);
88 auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
90 thiz->notify_err = err;
93 class WatchNotifyTestCtx2;
94 class LibRadosWatchNotifyPP : public RadosTestParamPP
98 std::set<uint64_t> notify_cookies;
99 rados_ioctx_t notify_io;
100 const char *notify_oid = nullptr;
103 friend class WatchNotifyTestCtx2;
108 class WatchNotifyTestCtx2 : public WatchCtx2
110 LibRadosWatchNotifyPP *notify;
113 WatchNotifyTestCtx2(LibRadosWatchNotifyPP *notify)
117 void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
118 bufferlist& bl) override {
119 std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
120 << " notifier_gid " << notifier_gid << std::endl;
121 notify->notify_bl = bl;
122 notify->notify_cookies.insert(cookie);
124 reply.append("reply", 5);
127 notify_ioctx->notify_ack(notify->notify_oid, notify_id, cookie, reply);
130 void handle_error(uint64_t cookie, int err) override {
131 std::cout << __func__ << " cookie " << cookie
132 << " err " << err << std::endl;
133 assert(cookie > 1000);
134 notify->notify_err = err;
140 #pragma GCC diagnostic ignored "-Wpragmas"
141 #pragma GCC diagnostic push
142 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
144 TEST_F(LibRadosWatchNotify, WatchNotify) {
145 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
147 memset(buf, 0xcc, sizeof(buf));
148 ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
151 rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
152 ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
155 rados_unwatch(ioctx, "foo", handle);
159 rados_watch(ioctx, "dne", 0, &handle, watch_notify_test_cb, NULL));
164 TEST_P(LibRadosWatchNotifyPP, WatchNotify) {
165 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
167 memset(buf, 0xcc, sizeof(buf));
169 bl1.append(buf, sizeof(buf));
170 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
172 WatchNotifyTestCtx ctx;
173 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
174 std::list<obj_watch_t> watches;
175 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
176 ASSERT_EQ(watches.size(), 1u);
178 ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
181 ioctx.unwatch("foo", handle);
185 TEST_F(LibRadosWatchNotifyEC, WatchNotify) {
186 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
188 memset(buf, 0xcc, sizeof(buf));
189 ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
192 rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
193 ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
196 rados_unwatch(ioctx, "foo", handle);
200 TEST_F(LibRadosWatchNotifyECPP, WatchNotify) {
201 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
203 memset(buf, 0xcc, sizeof(buf));
205 bl1.append(buf, sizeof(buf));
206 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
208 WatchNotifyTestCtx ctx;
209 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
210 std::list<obj_watch_t> watches;
211 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
212 ASSERT_EQ(watches.size(), 1u);
214 ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
217 ioctx.unwatch("foo", handle);
223 TEST_P(LibRadosWatchNotifyPP, WatchNotifyTimeout) {
224 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
225 ioctx.set_notify_timeout(1);
227 WatchNotifyTestCtx ctx;
230 memset(buf, 0xcc, sizeof(buf));
232 bl1.append(buf, sizeof(buf));
233 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
235 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
237 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
240 TEST_F(LibRadosWatchNotifyECPP, WatchNotifyTimeout) {
241 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
242 ioctx.set_notify_timeout(1);
244 WatchNotifyTestCtx ctx;
247 memset(buf, 0xcc, sizeof(buf));
249 bl1.append(buf, sizeof(buf));
250 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
252 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
254 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
257 #pragma GCC diagnostic pop
258 #pragma GCC diagnostic warning "-Wpragmas"
263 TEST_F(LibRadosWatchNotify, Watch2Delete) {
268 memset(buf, 0xcc, sizeof(buf));
269 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
272 rados_watch2(ioctx, notify_oid, &handle,
273 watch_notify2_test_cb,
274 watch_notify2_test_errcb, this));
275 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
277 std::cout << "waiting up to " << left << " for disconnect notification ..."
279 while (notify_err == 0 && --left) {
282 ASSERT_TRUE(left > 0);
283 ASSERT_EQ(-ENOTCONN, notify_err);
284 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
285 rados_unwatch2(ioctx, handle);
286 rados_watch_flush(cluster);
289 TEST_F(LibRadosWatchNotify, AioWatchDelete) {
294 memset(buf, 0xcc, sizeof(buf));
295 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
298 rados_completion_t comp;
300 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
301 rados_aio_watch(ioctx, notify_oid, comp, &handle,
302 watch_notify2_test_cb, watch_notify2_test_errcb, this);
303 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
304 ASSERT_EQ(0, rados_aio_get_return_value(comp));
305 rados_aio_release(comp);
306 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
308 std::cout << "waiting up to " << left << " for disconnect notification ..."
310 while (notify_err == 0 && --left) {
313 ASSERT_TRUE(left > 0);
314 ASSERT_EQ(-ENOTCONN, notify_err);
315 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
316 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
317 rados_aio_unwatch(ioctx, handle, comp);
318 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
319 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
320 rados_aio_release(comp);
325 TEST_F(LibRadosWatchNotify, WatchNotify2) {
328 notify_cookies.clear();
330 memset(buf, 0xcc, sizeof(buf));
331 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
334 rados_watch2(ioctx, notify_oid, &handle,
335 watch_notify2_test_cb,
336 watch_notify2_test_errcb, this));
337 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
339 size_t reply_buf_len;
340 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
342 &reply_buf, &reply_buf_len));
344 reply.append(reply_buf, reply_buf_len);
345 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
346 std::set<std::pair<uint64_t,uint64_t> > missed_map;
347 bufferlist::iterator reply_p = reply.begin();
348 ::decode(reply_map, reply_p);
349 ::decode(missed_map, reply_p);
350 ASSERT_EQ(1u, reply_map.size());
351 ASSERT_EQ(0u, missed_map.size());
352 ASSERT_EQ(1u, notify_cookies.size());
353 ASSERT_EQ(1u, notify_cookies.count(handle));
354 ASSERT_EQ(5u, reply_map.begin()->second.length());
355 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
356 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
357 rados_buffer_free(reply_buf);
359 // try it on a non-existent object ... our buffer pointers
360 // should get zeroed.
361 ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
363 &reply_buf, &reply_buf_len));
364 ASSERT_EQ((char*)0, reply_buf);
365 ASSERT_EQ(0u, reply_buf_len);
367 rados_unwatch2(ioctx, handle);
368 rados_watch_flush(cluster);
371 TEST_F(LibRadosWatchNotify, AioWatchNotify2) {
374 notify_cookies.clear();
376 memset(buf, 0xcc, sizeof(buf));
377 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
379 rados_completion_t comp;
381 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
382 rados_aio_watch(ioctx, notify_oid, comp, &handle,
383 watch_notify2_test_cb, watch_notify2_test_errcb, this);
384 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
385 ASSERT_EQ(0, rados_aio_get_return_value(comp));
386 rados_aio_release(comp);
388 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
390 size_t reply_buf_len;
391 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
393 &reply_buf, &reply_buf_len));
395 reply.append(reply_buf, reply_buf_len);
396 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
397 std::set<std::pair<uint64_t,uint64_t> > missed_map;
398 bufferlist::iterator reply_p = reply.begin();
399 ::decode(reply_map, reply_p);
400 ::decode(missed_map, reply_p);
401 ASSERT_EQ(1u, reply_map.size());
402 ASSERT_EQ(0u, missed_map.size());
403 ASSERT_EQ(1u, notify_cookies.size());
404 ASSERT_EQ(1u, notify_cookies.count(handle));
405 ASSERT_EQ(5u, reply_map.begin()->second.length());
406 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
407 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
408 rados_buffer_free(reply_buf);
410 // try it on a non-existent object ... our buffer pointers
411 // should get zeroed.
412 ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
414 &reply_buf, &reply_buf_len));
415 ASSERT_EQ((char*)0, reply_buf);
416 ASSERT_EQ(0u, reply_buf_len);
418 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
419 rados_aio_unwatch(ioctx, handle, comp);
420 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
421 ASSERT_EQ(0, rados_aio_get_return_value(comp));
422 rados_aio_release(comp);
425 TEST_F(LibRadosWatchNotify, AioNotify) {
428 notify_cookies.clear();
430 memset(buf, 0xcc, sizeof(buf));
431 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
434 rados_watch2(ioctx, notify_oid, &handle,
435 watch_notify2_test_cb,
436 watch_notify2_test_errcb, this));
437 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
439 size_t reply_buf_len;
440 rados_completion_t comp;
441 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
442 ASSERT_EQ(0, rados_aio_notify(ioctx, "foo", comp, "notify", 6, 300000,
443 &reply_buf, &reply_buf_len));
444 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
445 ASSERT_EQ(0, rados_aio_get_return_value(comp));
446 rados_aio_release(comp);
449 reply.append(reply_buf, reply_buf_len);
450 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
451 std::set<std::pair<uint64_t,uint64_t> > missed_map;
452 bufferlist::iterator reply_p = reply.begin();
453 ::decode(reply_map, reply_p);
454 ::decode(missed_map, reply_p);
455 ASSERT_EQ(1u, reply_map.size());
456 ASSERT_EQ(0u, missed_map.size());
457 ASSERT_EQ(1u, notify_cookies.size());
458 ASSERT_EQ(1u, notify_cookies.count(handle));
459 ASSERT_EQ(5u, reply_map.begin()->second.length());
460 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
461 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
462 rados_buffer_free(reply_buf);
464 // try it on a non-existent object ... our buffer pointers
465 // should get zeroed.
466 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
467 ASSERT_EQ(0, rados_aio_notify(ioctx, "doesnotexist", comp, "notify", 6,
468 300000, &reply_buf, &reply_buf_len));
469 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
470 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
471 rados_aio_release(comp);
472 ASSERT_EQ((char*)0, reply_buf);
473 ASSERT_EQ(0u, reply_buf_len);
475 rados_unwatch2(ioctx, handle);
476 rados_watch_flush(cluster);
479 TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
481 notify_ioctx = &ioctx;
482 notify_cookies.clear();
484 memset(buf, 0xcc, sizeof(buf));
486 bl1.append(buf, sizeof(buf));
487 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
489 WatchNotifyTestCtx2 ctx(this);
490 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
491 ASSERT_GT(ioctx.watch_check(handle), 0);
492 std::list<obj_watch_t> watches;
493 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
494 ASSERT_EQ(watches.size(), 1u);
495 bufferlist bl2, bl_reply;
496 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
497 bufferlist::iterator p = bl_reply.begin();
498 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
499 std::set<std::pair<uint64_t,uint64_t> > missed_map;
500 ::decode(reply_map, p);
501 ::decode(missed_map, p);
502 ASSERT_EQ(1u, notify_cookies.size());
503 ASSERT_EQ(1u, notify_cookies.count(handle));
504 ASSERT_EQ(1u, reply_map.size());
505 ASSERT_EQ(5u, reply_map.begin()->second.length());
506 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
507 ASSERT_EQ(0u, missed_map.size());
508 ASSERT_GT(ioctx.watch_check(handle), 0);
509 ioctx.unwatch2(handle);
512 TEST_P(LibRadosWatchNotifyPP, AioWatchNotify2) {
514 notify_ioctx = &ioctx;
515 notify_cookies.clear();
517 memset(buf, 0xcc, sizeof(buf));
519 bl1.append(buf, sizeof(buf));
520 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
523 WatchNotifyTestCtx2 ctx(this);
524 librados::AioCompletion *comp = cluster.aio_create_completion();
525 ASSERT_EQ(0, ioctx.aio_watch(notify_oid, comp, &handle, &ctx));
526 ASSERT_EQ(0, comp->wait_for_complete());
527 ASSERT_EQ(0, comp->get_return_value());
530 ASSERT_GT(ioctx.watch_check(handle), 0);
531 std::list<obj_watch_t> watches;
532 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
533 ASSERT_EQ(watches.size(), 1u);
534 bufferlist bl2, bl_reply;
535 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
536 bufferlist::iterator p = bl_reply.begin();
537 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
538 std::set<std::pair<uint64_t,uint64_t> > missed_map;
539 ::decode(reply_map, p);
540 ::decode(missed_map, p);
541 ASSERT_EQ(1u, notify_cookies.size());
542 ASSERT_EQ(1u, notify_cookies.count(handle));
543 ASSERT_EQ(1u, reply_map.size());
544 ASSERT_EQ(5u, reply_map.begin()->second.length());
545 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
546 ASSERT_EQ(0u, missed_map.size());
547 ASSERT_GT(ioctx.watch_check(handle), 0);
549 comp = cluster.aio_create_completion();
550 ioctx.aio_unwatch(handle, comp);
551 ASSERT_EQ(0, comp->wait_for_complete());
556 TEST_P(LibRadosWatchNotifyPP, AioNotify) {
558 notify_ioctx = &ioctx;
559 notify_cookies.clear();
561 memset(buf, 0xcc, sizeof(buf));
563 bl1.append(buf, sizeof(buf));
564 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
566 WatchNotifyTestCtx2 ctx(this);
567 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
568 ASSERT_GT(ioctx.watch_check(handle), 0);
569 std::list<obj_watch_t> watches;
570 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
571 ASSERT_EQ(watches.size(), 1u);
572 bufferlist bl2, bl_reply;
573 librados::AioCompletion *comp = cluster.aio_create_completion();
574 ASSERT_EQ(0, ioctx.aio_notify(notify_oid, comp, bl2, 300000, &bl_reply));
575 ASSERT_EQ(0, comp->wait_for_complete());
576 ASSERT_EQ(0, comp->get_return_value());
578 bufferlist::iterator p = bl_reply.begin();
579 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
580 std::set<std::pair<uint64_t,uint64_t> > missed_map;
581 ::decode(reply_map, p);
582 ::decode(missed_map, p);
583 ASSERT_EQ(1u, notify_cookies.size());
584 ASSERT_EQ(1u, notify_cookies.count(handle));
585 ASSERT_EQ(1u, reply_map.size());
586 ASSERT_EQ(5u, reply_map.begin()->second.length());
587 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
588 ASSERT_EQ(0u, missed_map.size());
589 ASSERT_GT(ioctx.watch_check(handle), 0);
590 ioctx.unwatch2(handle);
591 cluster.watch_flush();
596 TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
599 notify_cookies.clear();
601 memset(buf, 0xcc, sizeof(buf));
602 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
603 uint64_t handle1, handle2;
605 rados_watch2(ioctx, notify_oid, &handle1,
606 watch_notify2_test_cb,
607 watch_notify2_test_errcb, this));
609 rados_watch2(ioctx, notify_oid, &handle2,
610 watch_notify2_test_cb,
611 watch_notify2_test_errcb, this));
612 ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
613 ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
614 ASSERT_NE(handle1, handle2);
616 size_t reply_buf_len;
617 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
619 &reply_buf, &reply_buf_len));
621 reply.append(reply_buf, reply_buf_len);
622 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
623 std::set<std::pair<uint64_t,uint64_t> > missed_map;
624 bufferlist::iterator reply_p = reply.begin();
625 ::decode(reply_map, reply_p);
626 ::decode(missed_map, reply_p);
627 ASSERT_EQ(2u, reply_map.size());
628 ASSERT_EQ(5u, reply_map.begin()->second.length());
629 ASSERT_EQ(0u, missed_map.size());
630 ASSERT_EQ(2u, notify_cookies.size());
631 ASSERT_EQ(1u, notify_cookies.count(handle1));
632 ASSERT_EQ(1u, notify_cookies.count(handle2));
633 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
634 ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
635 ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
636 rados_buffer_free(reply_buf);
637 rados_unwatch2(ioctx, handle1);
638 rados_unwatch2(ioctx, handle2);
639 rados_watch_flush(cluster);
644 TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
647 notify_sleep = 3; // 3s
648 notify_cookies.clear();
650 memset(buf, 0xcc, sizeof(buf));
651 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
654 rados_watch2(ioctx, notify_oid, &handle,
655 watch_notify2_test_cb,
656 watch_notify2_test_errcb, this));
657 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
659 size_t reply_buf_len;
660 ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,
661 "notify", 6, 1000, // 1s
662 &reply_buf, &reply_buf_len));
663 ASSERT_EQ(1u, notify_cookies.size());
666 reply.append(reply_buf, reply_buf_len);
667 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
668 std::set<std::pair<uint64_t,uint64_t> > missed_map;
669 bufferlist::iterator reply_p = reply.begin();
670 ::decode(reply_map, reply_p);
671 ::decode(missed_map, reply_p);
672 ASSERT_EQ(0u, reply_map.size());
673 ASSERT_EQ(1u, missed_map.size());
675 rados_buffer_free(reply_buf);
677 // we should get the next notify, though!
679 notify_cookies.clear();
680 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
681 "notify", 6, 300000, // 300s
682 &reply_buf, &reply_buf_len));
683 ASSERT_EQ(1u, notify_cookies.size());
684 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
686 rados_unwatch2(ioctx, handle);
688 rados_completion_t comp;
689 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
690 rados_aio_watch_flush(cluster, comp);
691 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
692 ASSERT_EQ(0, rados_aio_get_return_value(comp));
693 rados_aio_release(comp);
694 rados_buffer_free(reply_buf);
698 TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
700 notify_ioctx = &ioctx;
701 notify_sleep = 3; // 3s
702 notify_cookies.clear();
704 memset(buf, 0xcc, sizeof(buf));
706 bl1.append(buf, sizeof(buf));
707 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
709 WatchNotifyTestCtx2 ctx(this);
710 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
711 ASSERT_GT(ioctx.watch_check(handle), 0);
712 std::list<obj_watch_t> watches;
713 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
714 ASSERT_EQ(watches.size(), 1u);
715 ASSERT_EQ(0u, notify_cookies.size());
716 bufferlist bl2, bl_reply;
717 std::cout << " trying..." << std::endl;
718 ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */,
720 std::cout << " timed out" << std::endl;
721 ASSERT_GT(ioctx.watch_check(handle), 0);
722 ioctx.unwatch2(handle);
724 std::cout << " flushing" << std::endl;
725 librados::AioCompletion *comp = cluster.aio_create_completion();
726 cluster.aio_watch_flush(comp);
727 ASSERT_EQ(0, comp->wait_for_complete());
728 ASSERT_EQ(0, comp->get_return_value());
729 std::cout << " flushed" << std::endl;
733 TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
735 notify_ioctx = &ioctx;
736 notify_cookies.clear();
737 uint32_t timeout = 12; // configured timeout
739 memset(buf, 0xcc, sizeof(buf));
741 bl1.append(buf, sizeof(buf));
742 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
744 WatchNotifyTestCtx2 ctx(this);
745 ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
746 ASSERT_GT(ioctx.watch_check(handle), 0);
747 std::list<obj_watch_t> watches;
748 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
749 ASSERT_EQ(watches.size(), 1u);
750 std::cout << "List watches" << std::endl;
751 for (std::list<obj_watch_t>::iterator it = watches.begin();
752 it != watches.end(); ++it) {
753 ASSERT_EQ(it->timeout_seconds, timeout);
755 bufferlist bl2, bl_reply;
756 std::cout << "notify2" << std::endl;
757 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
758 std::cout << "notify2 done" << std::endl;
759 bufferlist::iterator p = bl_reply.begin();
760 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
761 std::set<std::pair<uint64_t,uint64_t> > missed_map;
762 ::decode(reply_map, p);
763 ::decode(missed_map, p);
764 ASSERT_EQ(1u, notify_cookies.size());
765 ASSERT_EQ(1u, notify_cookies.count(handle));
766 ASSERT_EQ(1u, reply_map.size());
767 ASSERT_EQ(5u, reply_map.begin()->second.length());
768 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
769 ASSERT_EQ(0u, missed_map.size());
770 std::cout << "watch_check" << std::endl;
771 ASSERT_GT(ioctx.watch_check(handle), 0);
772 std::cout << "unwatch2" << std::endl;
773 ioctx.unwatch2(handle);
775 std::cout << " flushing" << std::endl;
776 cluster.watch_flush();
777 std::cout << "done" << std::endl;
780 TEST_F(LibRadosWatchNotify, Watch3Timeout) {
783 notify_cookies.clear();
786 memset(buf, 0xcc, sizeof(buf));
787 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
789 time_t start = time(0);
790 const uint32_t timeout = 4;
792 // make sure i timeout before the messenger reconnects to the OSD,
793 // it will resend a watch request on behalf of the client, and the
794 // timer of timeout on OSD side will be reset by the new request.
796 ASSERT_EQ(0, rados_conf_get(cluster,
797 "ms_tcp_read_timeout",
798 conf, sizeof(conf)));
799 auto tcp_read_timeout = std::stoll(conf);
800 ASSERT_LT(timeout, tcp_read_timeout);
803 rados_watch3(ioctx, notify_oid, &handle,
804 watch_notify2_test_cb, watch_notify2_test_errcb,
806 int age = rados_watch_check(ioctx, handle);
807 time_t age_bound = time(0) + 1 - start;
808 ASSERT_LT(age, age_bound * 1000);
810 rados_conf_set(cluster, "objecter_inject_no_watch_ping", "true");
811 // allow a long time here since an osd peering event will renew our
813 int left = 16 * timeout;
814 std::cout << "waiting up to " << left << " for osd to time us out ..."
816 while (notify_err == 0 && --left) {
820 rados_conf_set(cluster, "objecter_inject_no_watch_ping", "false");
821 ASSERT_EQ(-ENOTCONN, notify_err);
822 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
824 // a subsequent notify should not reach us
825 char *reply_buf = nullptr;
826 size_t reply_buf_len;
827 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
829 &reply_buf, &reply_buf_len));
832 reply.append(reply_buf, reply_buf_len);
833 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
834 std::set<std::pair<uint64_t,uint64_t> > missed_map;
835 bufferlist::iterator reply_p = reply.begin();
836 ::decode(reply_map, reply_p);
837 ::decode(missed_map, reply_p);
838 ASSERT_EQ(0u, reply_map.size());
839 ASSERT_EQ(0u, missed_map.size());
841 ASSERT_EQ(0u, notify_cookies.size());
842 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
843 rados_buffer_free(reply_buf);
846 rados_unwatch2(ioctx, handle);
847 rados_watch_flush(cluster);
851 rados_watch2(ioctx, notify_oid, &handle,
852 watch_notify2_test_cb,
853 watch_notify2_test_errcb, this));
854 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
856 // and now a notify will work.
857 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
859 &reply_buf, &reply_buf_len));
862 reply.append(reply_buf, reply_buf_len);
863 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
864 std::set<std::pair<uint64_t,uint64_t> > missed_map;
865 bufferlist::iterator reply_p = reply.begin();
866 ::decode(reply_map, reply_p);
867 ::decode(missed_map, reply_p);
868 ASSERT_EQ(1u, reply_map.size());
869 ASSERT_EQ(0u, missed_map.size());
870 ASSERT_EQ(1u, notify_cookies.count(handle));
871 ASSERT_EQ(5u, reply_map.begin()->second.length());
872 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
874 ASSERT_EQ(1u, notify_cookies.size());
875 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
877 rados_buffer_free(reply_buf);
878 rados_unwatch2(ioctx, handle);
879 rados_watch_flush(cluster);
882 TEST_F(LibRadosWatchNotify, AioWatchDelete2) {
887 uint32_t timeout = 3;
888 memset(buf, 0xcc, sizeof(buf));
889 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
892 rados_completion_t comp;
894 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
895 rados_aio_watch2(ioctx, notify_oid, comp, &handle,
896 watch_notify2_test_cb, watch_notify2_test_errcb, timeout, this);
897 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
898 ASSERT_EQ(0, rados_aio_get_return_value(comp));
899 rados_aio_release(comp);
900 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
902 std::cout << "waiting up to " << left << " for disconnect notification ..."
904 while (notify_err == 0 && --left) {
907 ASSERT_TRUE(left > 0);
908 ASSERT_EQ(-ENOTCONN, notify_err);
909 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
910 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
911 rados_aio_unwatch(ioctx, handle, comp);
912 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
913 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
914 rados_aio_release(comp);
918 INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
919 ::testing::Values("", "cache"));