X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Flibrados_test_stub%2FTestWatchNotify.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Flibrados_test_stub%2FTestWatchNotify.cc;h=6f69092fe4e3ffb6a89edda7c5330d2bd2059150;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/test/librados_test_stub/TestWatchNotify.cc b/src/ceph/src/test/librados_test_stub/TestWatchNotify.cc new file mode 100644 index 0000000..6f69092 --- /dev/null +++ b/src/ceph/src/test/librados_test_stub/TestWatchNotify.cc @@ -0,0 +1,339 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librados_test_stub/TestWatchNotify.h" +#include "include/Context.h" +#include "include/stringify.h" +#include "common/Finisher.h" +#include "test/librados_test_stub/TestRadosClient.h" +#include +#include +#include "include/assert.h" + +#define dout_subsys ceph_subsys_rados +#undef dout_prefix +#define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": " + +namespace librados { + +std::ostream& operator<<(std::ostream& out, + const TestWatchNotify::WatcherID &watcher_id) { + out << "(" << watcher_id.first << "," << watcher_id.second << ")"; + return out; +} + +TestWatchNotify::TestWatchNotify() + : m_lock("librados::TestWatchNotify::m_lock") { +} + +void TestWatchNotify::flush(TestRadosClient *rados_client) { + CephContext *cct = rados_client->cct(); + + ldout(cct, 20) << "enter" << dendl; + // block until we know no additional async notify callbacks will occur + Mutex::Locker locker(m_lock); + while (m_pending_notifies > 0) { + m_file_watcher_cond.Wait(m_lock); + } +} + +int TestWatchNotify::list_watchers(const std::string& o, + std::list *out_watchers) { + Mutex::Locker lock(m_lock); + SharedWatcher watcher = get_watcher(o); + + out_watchers->clear(); + for (TestWatchNotify::WatchHandles::iterator it = + watcher->watch_handles.begin(); + it != watcher->watch_handles.end(); ++it) { + obj_watch_t obj; + strcpy(obj.addr, it->second.addr.c_str()); + obj.watcher_id = static_cast(it->second.gid); + obj.cookie = it->second.handle; + obj.timeout_seconds = 30; + out_watchers->push_back(obj); + } + return 0; +} + +void TestWatchNotify::aio_flush(TestRadosClient *rados_client, + Context *on_finish) { + rados_client->get_aio_finisher()->queue(on_finish); +} + +void TestWatchNotify::aio_watch(TestRadosClient *rados_client, + const std::string& o, uint64_t gid, + uint64_t *handle, + librados::WatchCtx2 *watch_ctx, + Context *on_finish) { + int r = watch(rados_client, o, gid, handle, nullptr, watch_ctx); + rados_client->get_aio_finisher()->queue(on_finish, r); +} + +void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client, + uint64_t handle, Context *on_finish) { + unwatch(rados_client, handle); + rados_client->get_aio_finisher()->queue(on_finish); +} + +void TestWatchNotify::aio_notify(TestRadosClient *rados_client, + const std::string& oid, bufferlist& bl, + uint64_t timeout_ms, bufferlist *pbl, + Context *on_notify) { + CephContext *cct = rados_client->cct(); + + Mutex::Locker lock(m_lock); + ++m_pending_notifies; + uint64_t notify_id = ++m_notify_id; + + ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl; + + SharedWatcher watcher = get_watcher(oid); + + SharedNotifyHandle notify_handle(new NotifyHandle()); + notify_handle->rados_client = rados_client; + notify_handle->pbl = pbl; + notify_handle->on_notify = on_notify; + for (auto &watch_handle_pair : watcher->watch_handles) { + WatchHandle &watch_handle = watch_handle_pair.second; + notify_handle->pending_watcher_ids.insert(std::make_pair( + watch_handle.gid, watch_handle.handle)); + } + watcher->notify_handles[notify_id] = notify_handle; + + FunctionContext *ctx = new FunctionContext( + boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl, + notify_id)); + rados_client->get_aio_finisher()->queue(ctx); +} + +int TestWatchNotify::notify(TestRadosClient *rados_client, + const std::string& oid, bufferlist& bl, + uint64_t timeout_ms, bufferlist *pbl) { + C_SaferCond cond; + aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond); + return cond.wait(); +} + +void TestWatchNotify::notify_ack(TestRadosClient *rados_client, + const std::string& o, uint64_t notify_id, + uint64_t handle, uint64_t gid, + bufferlist& bl) { + CephContext *cct = rados_client->cct(); + ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle + << ", gid=" << gid << dendl; + Mutex::Locker lock(m_lock); + WatcherID watcher_id = std::make_pair(gid, handle); + ack_notify(rados_client, o, notify_id, watcher_id, bl); + finish_notify(rados_client, o, notify_id); +} + +int TestWatchNotify::watch(TestRadosClient *rados_client, + const std::string& o, uint64_t gid, + uint64_t *handle, librados::WatchCtx *ctx, + librados::WatchCtx2 *ctx2) { + CephContext *cct = rados_client->cct(); + + Mutex::Locker lock(m_lock); + SharedWatcher watcher = get_watcher(o); + + WatchHandle watch_handle; + watch_handle.rados_client = rados_client; + watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce()); + watch_handle.nonce = rados_client->get_nonce(); + watch_handle.gid = gid; + watch_handle.handle = ++m_handle; + watch_handle.watch_ctx = ctx; + watch_handle.watch_ctx2 = ctx2; + watcher->watch_handles[watch_handle.handle] = watch_handle; + + *handle = watch_handle.handle; + + ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle + << dendl; + return 0; +} + +int TestWatchNotify::unwatch(TestRadosClient *rados_client, + uint64_t handle) { + CephContext *cct = rados_client->cct(); + + ldout(cct, 20) << "handle=" << handle << dendl; + Mutex::Locker locker(m_lock); + for (FileWatchers::iterator it = m_file_watchers.begin(); + it != m_file_watchers.end(); ++it) { + SharedWatcher watcher = it->second; + + WatchHandles::iterator w_it = watcher->watch_handles.find(handle); + if (w_it != watcher->watch_handles.end()) { + watcher->watch_handles.erase(w_it); + if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { + m_file_watchers.erase(it); + } + break; + } + } + return 0; +} + +TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher( + const std::string& oid) { + assert(m_lock.is_locked()); + SharedWatcher &watcher = m_file_watchers[oid]; + if (!watcher) { + watcher.reset(new Watcher()); + } + return watcher; +} + +void TestWatchNotify::execute_notify(TestRadosClient *rados_client, + const std::string &oid, + bufferlist &bl, uint64_t notify_id) { + CephContext *cct = rados_client->cct(); + + ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl; + + Mutex::Locker lock(m_lock); + SharedWatcher watcher = get_watcher(oid); + WatchHandles &watch_handles = watcher->watch_handles; + + NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id); + if (n_it == watcher->notify_handles.end()) { + ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id + << ": not found" << dendl; + return; + } + + SharedNotifyHandle notify_handle = n_it->second; + WatcherIDs watcher_ids(notify_handle->pending_watcher_ids); + for (WatcherIDs::iterator w_id_it = watcher_ids.begin(); + w_id_it != watcher_ids.end(); ++w_id_it) { + WatcherID watcher_id = *w_id_it; + WatchHandles::iterator w_it = watch_handles.find(watcher_id.second); + if (w_it == watch_handles.end()) { + // client disconnected before notification processed + notify_handle->pending_watcher_ids.erase(watcher_id); + } else { + WatchHandle watch_handle = w_it->second; + assert(watch_handle.gid == watcher_id.first); + assert(watch_handle.handle == watcher_id.second); + + uint64_t notifier_id = rados_client->get_instance_id(); + watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext( + [this, oid, bl, notify_id, watch_handle, notifier_id](int r) { + bufferlist notify_bl; + notify_bl.append(bl); + + if (watch_handle.watch_ctx2 != NULL) { + watch_handle.watch_ctx2->handle_notify(notify_id, + watch_handle.handle, + notifier_id, notify_bl); + } else if (watch_handle.watch_ctx != NULL) { + watch_handle.watch_ctx->notify(0, 0, notify_bl); + + // auto ack old-style watch/notify clients + ack_notify(watch_handle.rados_client, oid, notify_id, + {watch_handle.gid, watch_handle.handle}, bufferlist()); + } + })); + } + } + + finish_notify(rados_client, oid, notify_id); + + if (--m_pending_notifies == 0) { + m_file_watcher_cond.Signal(); + } +} + +void TestWatchNotify::ack_notify(TestRadosClient *rados_client, + const std::string &oid, + uint64_t notify_id, + const WatcherID &watcher_id, + const bufferlist &bl) { + CephContext *cct = rados_client->cct(); + + ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id + << ", WatcherID=" << watcher_id << dendl; + + assert(m_lock.is_locked()); + SharedWatcher watcher = get_watcher(oid); + + NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); + if (it == watcher->notify_handles.end()) { + ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id + << ", WatcherID=" << watcher_id << ": not found" << dendl; + return; + } + + bufferlist response; + response.append(bl); + + SharedNotifyHandle notify_handle = it->second; + notify_handle->notify_responses[watcher_id] = response; + notify_handle->pending_watcher_ids.erase(watcher_id); +} + +void TestWatchNotify::finish_notify(TestRadosClient *rados_client, + const std::string &oid, + uint64_t notify_id) { + CephContext *cct = rados_client->cct(); + + ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl; + + assert(m_lock.is_locked()); + SharedWatcher watcher = get_watcher(oid); + + NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); + if (it == watcher->notify_handles.end()) { + ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id + << ": not found" << dendl; + return; + } + + SharedNotifyHandle notify_handle = it->second; + if (!notify_handle->pending_watcher_ids.empty()) { + ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id + << ": pending watchers, returning" << dendl; + return; + } + + ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id + << ": completing" << dendl; + + if (notify_handle->pbl != NULL) { + ::encode(notify_handle->notify_responses, *notify_handle->pbl); + ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl); + } + + notify_handle->rados_client->get_aio_finisher()->queue( + notify_handle->on_notify, 0); + watcher->notify_handles.erase(notify_id); + if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { + m_file_watchers.erase(oid); + } +} + +void TestWatchNotify::blacklist(uint32_t nonce) { + Mutex::Locker locker(m_lock); + + for (auto file_it = m_file_watchers.begin(); + file_it != m_file_watchers.end(); ) { + auto &watcher = file_it->second; + for (auto w_it = watcher->watch_handles.begin(); + w_it != watcher->watch_handles.end();) { + if (w_it->second.nonce == nonce) { + w_it = watcher->watch_handles.erase(w_it); + } else { + ++w_it; + } + } + if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { + file_it = m_file_watchers.erase(file_it); + } else { + ++file_it; + } + } +} + +} // namespace librados