+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "test/librados_test_stub/TestWatchNotify.h"
-#include "include/Context.h"
-#include "include/stringify.h"
-#include "common/Finisher.h"
-#include "test/librados_test_stub/TestRadosClient.h"
-#include <boost/bind.hpp>
-#include <boost/function.hpp>
-#include "include/assert.h"
-
-#define dout_subsys ceph_subsys_rados
-#undef dout_prefix
-#define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
-
-namespace librados {
-
-std::ostream& operator<<(std::ostream& out,
- const TestWatchNotify::WatcherID &watcher_id) {
- out << "(" << watcher_id.first << "," << watcher_id.second << ")";
- return out;
-}
-
-TestWatchNotify::TestWatchNotify()
- : m_lock("librados::TestWatchNotify::m_lock") {
-}
-
-void TestWatchNotify::flush(TestRadosClient *rados_client) {
- CephContext *cct = rados_client->cct();
-
- ldout(cct, 20) << "enter" << dendl;
- // block until we know no additional async notify callbacks will occur
- Mutex::Locker locker(m_lock);
- while (m_pending_notifies > 0) {
- m_file_watcher_cond.Wait(m_lock);
- }
-}
-
-int TestWatchNotify::list_watchers(const std::string& o,
- std::list<obj_watch_t> *out_watchers) {
- Mutex::Locker lock(m_lock);
- SharedWatcher watcher = get_watcher(o);
-
- out_watchers->clear();
- for (TestWatchNotify::WatchHandles::iterator it =
- watcher->watch_handles.begin();
- it != watcher->watch_handles.end(); ++it) {
- obj_watch_t obj;
- strcpy(obj.addr, it->second.addr.c_str());
- obj.watcher_id = static_cast<int64_t>(it->second.gid);
- obj.cookie = it->second.handle;
- obj.timeout_seconds = 30;
- out_watchers->push_back(obj);
- }
- return 0;
-}
-
-void TestWatchNotify::aio_flush(TestRadosClient *rados_client,
- Context *on_finish) {
- rados_client->get_aio_finisher()->queue(on_finish);
-}
-
-void TestWatchNotify::aio_watch(TestRadosClient *rados_client,
- const std::string& o, uint64_t gid,
- uint64_t *handle,
- librados::WatchCtx2 *watch_ctx,
- Context *on_finish) {
- int r = watch(rados_client, o, gid, handle, nullptr, watch_ctx);
- rados_client->get_aio_finisher()->queue(on_finish, r);
-}
-
-void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
- uint64_t handle, Context *on_finish) {
- unwatch(rados_client, handle);
- rados_client->get_aio_finisher()->queue(on_finish);
-}
-
-void TestWatchNotify::aio_notify(TestRadosClient *rados_client,
- const std::string& oid, bufferlist& bl,
- uint64_t timeout_ms, bufferlist *pbl,
- Context *on_notify) {
- CephContext *cct = rados_client->cct();
-
- Mutex::Locker lock(m_lock);
- ++m_pending_notifies;
- uint64_t notify_id = ++m_notify_id;
-
- ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
-
- SharedWatcher watcher = get_watcher(oid);
-
- SharedNotifyHandle notify_handle(new NotifyHandle());
- notify_handle->rados_client = rados_client;
- notify_handle->pbl = pbl;
- notify_handle->on_notify = on_notify;
- for (auto &watch_handle_pair : watcher->watch_handles) {
- WatchHandle &watch_handle = watch_handle_pair.second;
- notify_handle->pending_watcher_ids.insert(std::make_pair(
- watch_handle.gid, watch_handle.handle));
- }
- watcher->notify_handles[notify_id] = notify_handle;
-
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl,
- notify_id));
- rados_client->get_aio_finisher()->queue(ctx);
-}
-
-int TestWatchNotify::notify(TestRadosClient *rados_client,
- const std::string& oid, bufferlist& bl,
- uint64_t timeout_ms, bufferlist *pbl) {
- C_SaferCond cond;
- aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond);
- return cond.wait();
-}
-
-void TestWatchNotify::notify_ack(TestRadosClient *rados_client,
- const std::string& o, uint64_t notify_id,
- uint64_t handle, uint64_t gid,
- bufferlist& bl) {
- CephContext *cct = rados_client->cct();
- ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
- << ", gid=" << gid << dendl;
- Mutex::Locker lock(m_lock);
- WatcherID watcher_id = std::make_pair(gid, handle);
- ack_notify(rados_client, o, notify_id, watcher_id, bl);
- finish_notify(rados_client, o, notify_id);
-}
-
-int TestWatchNotify::watch(TestRadosClient *rados_client,
- const std::string& o, uint64_t gid,
- uint64_t *handle, librados::WatchCtx *ctx,
- librados::WatchCtx2 *ctx2) {
- CephContext *cct = rados_client->cct();
-
- Mutex::Locker lock(m_lock);
- SharedWatcher watcher = get_watcher(o);
-
- WatchHandle watch_handle;
- watch_handle.rados_client = rados_client;
- watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce());
- watch_handle.nonce = rados_client->get_nonce();
- watch_handle.gid = gid;
- watch_handle.handle = ++m_handle;
- watch_handle.watch_ctx = ctx;
- watch_handle.watch_ctx2 = ctx2;
- watcher->watch_handles[watch_handle.handle] = watch_handle;
-
- *handle = watch_handle.handle;
-
- ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
- << dendl;
- return 0;
-}
-
-int TestWatchNotify::unwatch(TestRadosClient *rados_client,
- uint64_t handle) {
- CephContext *cct = rados_client->cct();
-
- ldout(cct, 20) << "handle=" << handle << dendl;
- Mutex::Locker locker(m_lock);
- for (FileWatchers::iterator it = m_file_watchers.begin();
- it != m_file_watchers.end(); ++it) {
- SharedWatcher watcher = it->second;
-
- WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
- if (w_it != watcher->watch_handles.end()) {
- watcher->watch_handles.erase(w_it);
- if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- m_file_watchers.erase(it);
- }
- break;
- }
- }
- return 0;
-}
-
-TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
- const std::string& oid) {
- assert(m_lock.is_locked());
- SharedWatcher &watcher = m_file_watchers[oid];
- if (!watcher) {
- watcher.reset(new Watcher());
- }
- return watcher;
-}
-
-void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
- const std::string &oid,
- bufferlist &bl, uint64_t notify_id) {
- CephContext *cct = rados_client->cct();
-
- ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
-
- Mutex::Locker lock(m_lock);
- SharedWatcher watcher = get_watcher(oid);
- WatchHandles &watch_handles = watcher->watch_handles;
-
- NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
- if (n_it == watcher->notify_handles.end()) {
- ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
- << ": not found" << dendl;
- return;
- }
-
- SharedNotifyHandle notify_handle = n_it->second;
- WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
- for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
- w_id_it != watcher_ids.end(); ++w_id_it) {
- WatcherID watcher_id = *w_id_it;
- WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
- if (w_it == watch_handles.end()) {
- // client disconnected before notification processed
- notify_handle->pending_watcher_ids.erase(watcher_id);
- } else {
- WatchHandle watch_handle = w_it->second;
- assert(watch_handle.gid == watcher_id.first);
- assert(watch_handle.handle == watcher_id.second);
-
- uint64_t notifier_id = rados_client->get_instance_id();
- watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
- [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
- bufferlist notify_bl;
- notify_bl.append(bl);
-
- if (watch_handle.watch_ctx2 != NULL) {
- watch_handle.watch_ctx2->handle_notify(notify_id,
- watch_handle.handle,
- notifier_id, notify_bl);
- } else if (watch_handle.watch_ctx != NULL) {
- watch_handle.watch_ctx->notify(0, 0, notify_bl);
-
- // auto ack old-style watch/notify clients
- ack_notify(watch_handle.rados_client, oid, notify_id,
- {watch_handle.gid, watch_handle.handle}, bufferlist());
- }
- }));
- }
- }
-
- finish_notify(rados_client, oid, notify_id);
-
- if (--m_pending_notifies == 0) {
- m_file_watcher_cond.Signal();
- }
-}
-
-void TestWatchNotify::ack_notify(TestRadosClient *rados_client,
- const std::string &oid,
- uint64_t notify_id,
- const WatcherID &watcher_id,
- const bufferlist &bl) {
- CephContext *cct = rados_client->cct();
-
- ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
- << ", WatcherID=" << watcher_id << dendl;
-
- assert(m_lock.is_locked());
- SharedWatcher watcher = get_watcher(oid);
-
- NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
- if (it == watcher->notify_handles.end()) {
- ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
- << ", WatcherID=" << watcher_id << ": not found" << dendl;
- return;
- }
-
- bufferlist response;
- response.append(bl);
-
- SharedNotifyHandle notify_handle = it->second;
- notify_handle->notify_responses[watcher_id] = response;
- notify_handle->pending_watcher_ids.erase(watcher_id);
-}
-
-void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
- const std::string &oid,
- uint64_t notify_id) {
- CephContext *cct = rados_client->cct();
-
- ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
-
- assert(m_lock.is_locked());
- SharedWatcher watcher = get_watcher(oid);
-
- NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
- if (it == watcher->notify_handles.end()) {
- ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
- << ": not found" << dendl;
- return;
- }
-
- SharedNotifyHandle notify_handle = it->second;
- if (!notify_handle->pending_watcher_ids.empty()) {
- ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
- << ": pending watchers, returning" << dendl;
- return;
- }
-
- ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
- << ": completing" << dendl;
-
- if (notify_handle->pbl != NULL) {
- ::encode(notify_handle->notify_responses, *notify_handle->pbl);
- ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
- }
-
- notify_handle->rados_client->get_aio_finisher()->queue(
- notify_handle->on_notify, 0);
- watcher->notify_handles.erase(notify_id);
- if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- m_file_watchers.erase(oid);
- }
-}
-
-void TestWatchNotify::blacklist(uint32_t nonce) {
- Mutex::Locker locker(m_lock);
-
- for (auto file_it = m_file_watchers.begin();
- file_it != m_file_watchers.end(); ) {
- auto &watcher = file_it->second;
- for (auto w_it = watcher->watch_handles.begin();
- w_it != watcher->watch_handles.end();) {
- if (w_it->second.nonce == nonce) {
- w_it = watcher->watch_handles.erase(w_it);
- } else {
- ++w_it;
- }
- }
- if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- file_it = m_file_watchers.erase(file_it);
- } else {
- ++file_it;
- }
- }
-}
-
-} // namespace librados