// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "test/librados_test_stub/TestWatchNotify.h" #include "include/Context.h" #include "include/stringify.h" #include "common/Finisher.h" #include "test/librados_test_stub/TestRadosClient.h" #include #include #include "include/assert.h" #define dout_subsys ceph_subsys_rados #undef dout_prefix #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": " namespace librados { std::ostream& operator<<(std::ostream& out, const TestWatchNotify::WatcherID &watcher_id) { out << "(" << watcher_id.first << "," << watcher_id.second << ")"; return out; } TestWatchNotify::TestWatchNotify() : m_lock("librados::TestWatchNotify::m_lock") { } void TestWatchNotify::flush(TestRadosClient *rados_client) { CephContext *cct = rados_client->cct(); ldout(cct, 20) << "enter" << dendl; // block until we know no additional async notify callbacks will occur Mutex::Locker locker(m_lock); while (m_pending_notifies > 0) { m_file_watcher_cond.Wait(m_lock); } } int TestWatchNotify::list_watchers(const std::string& o, std::list *out_watchers) { Mutex::Locker lock(m_lock); SharedWatcher watcher = get_watcher(o); out_watchers->clear(); for (TestWatchNotify::WatchHandles::iterator it = watcher->watch_handles.begin(); it != watcher->watch_handles.end(); ++it) { obj_watch_t obj; strcpy(obj.addr, it->second.addr.c_str()); obj.watcher_id = static_cast(it->second.gid); obj.cookie = it->second.handle; obj.timeout_seconds = 30; out_watchers->push_back(obj); } return 0; } void TestWatchNotify::aio_flush(TestRadosClient *rados_client, Context *on_finish) { rados_client->get_aio_finisher()->queue(on_finish); } void TestWatchNotify::aio_watch(TestRadosClient *rados_client, const std::string& o, uint64_t gid, uint64_t *handle, librados::WatchCtx2 *watch_ctx, Context *on_finish) { int r = watch(rados_client, o, gid, handle, nullptr, watch_ctx); rados_client->get_aio_finisher()->queue(on_finish, r); } void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client, uint64_t handle, Context *on_finish) { unwatch(rados_client, handle); rados_client->get_aio_finisher()->queue(on_finish); } void TestWatchNotify::aio_notify(TestRadosClient *rados_client, const std::string& oid, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl, Context *on_notify) { CephContext *cct = rados_client->cct(); Mutex::Locker lock(m_lock); ++m_pending_notifies; uint64_t notify_id = ++m_notify_id; ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl; SharedWatcher watcher = get_watcher(oid); SharedNotifyHandle notify_handle(new NotifyHandle()); notify_handle->rados_client = rados_client; notify_handle->pbl = pbl; notify_handle->on_notify = on_notify; for (auto &watch_handle_pair : watcher->watch_handles) { WatchHandle &watch_handle = watch_handle_pair.second; notify_handle->pending_watcher_ids.insert(std::make_pair( watch_handle.gid, watch_handle.handle)); } watcher->notify_handles[notify_id] = notify_handle; FunctionContext *ctx = new FunctionContext( boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl, notify_id)); rados_client->get_aio_finisher()->queue(ctx); } int TestWatchNotify::notify(TestRadosClient *rados_client, const std::string& oid, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) { C_SaferCond cond; aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond); return cond.wait(); } void TestWatchNotify::notify_ack(TestRadosClient *rados_client, const std::string& o, uint64_t notify_id, uint64_t handle, uint64_t gid, bufferlist& bl) { CephContext *cct = rados_client->cct(); ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle << ", gid=" << gid << dendl; Mutex::Locker lock(m_lock); WatcherID watcher_id = std::make_pair(gid, handle); ack_notify(rados_client, o, notify_id, watcher_id, bl); finish_notify(rados_client, o, notify_id); } int TestWatchNotify::watch(TestRadosClient *rados_client, const std::string& o, uint64_t gid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) { CephContext *cct = rados_client->cct(); Mutex::Locker lock(m_lock); SharedWatcher watcher = get_watcher(o); WatchHandle watch_handle; watch_handle.rados_client = rados_client; watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce()); watch_handle.nonce = rados_client->get_nonce(); watch_handle.gid = gid; watch_handle.handle = ++m_handle; watch_handle.watch_ctx = ctx; watch_handle.watch_ctx2 = ctx2; watcher->watch_handles[watch_handle.handle] = watch_handle; *handle = watch_handle.handle; ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle << dendl; return 0; } int TestWatchNotify::unwatch(TestRadosClient *rados_client, uint64_t handle) { CephContext *cct = rados_client->cct(); ldout(cct, 20) << "handle=" << handle << dendl; Mutex::Locker locker(m_lock); for (FileWatchers::iterator it = m_file_watchers.begin(); it != m_file_watchers.end(); ++it) { SharedWatcher watcher = it->second; WatchHandles::iterator w_it = watcher->watch_handles.find(handle); if (w_it != watcher->watch_handles.end()) { watcher->watch_handles.erase(w_it); if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { m_file_watchers.erase(it); } break; } } return 0; } TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher( const std::string& oid) { assert(m_lock.is_locked()); SharedWatcher &watcher = m_file_watchers[oid]; if (!watcher) { watcher.reset(new Watcher()); } return watcher; } void TestWatchNotify::execute_notify(TestRadosClient *rados_client, const std::string &oid, bufferlist &bl, uint64_t notify_id) { CephContext *cct = rados_client->cct(); ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl; Mutex::Locker lock(m_lock); SharedWatcher watcher = get_watcher(oid); WatchHandles &watch_handles = watcher->watch_handles; NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id); if (n_it == watcher->notify_handles.end()) { ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id << ": not found" << dendl; return; } SharedNotifyHandle notify_handle = n_it->second; WatcherIDs watcher_ids(notify_handle->pending_watcher_ids); for (WatcherIDs::iterator w_id_it = watcher_ids.begin(); w_id_it != watcher_ids.end(); ++w_id_it) { WatcherID watcher_id = *w_id_it; WatchHandles::iterator w_it = watch_handles.find(watcher_id.second); if (w_it == watch_handles.end()) { // client disconnected before notification processed notify_handle->pending_watcher_ids.erase(watcher_id); } else { WatchHandle watch_handle = w_it->second; assert(watch_handle.gid == watcher_id.first); assert(watch_handle.handle == watcher_id.second); uint64_t notifier_id = rados_client->get_instance_id(); watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext( [this, oid, bl, notify_id, watch_handle, notifier_id](int r) { bufferlist notify_bl; notify_bl.append(bl); if (watch_handle.watch_ctx2 != NULL) { watch_handle.watch_ctx2->handle_notify(notify_id, watch_handle.handle, notifier_id, notify_bl); } else if (watch_handle.watch_ctx != NULL) { watch_handle.watch_ctx->notify(0, 0, notify_bl); // auto ack old-style watch/notify clients ack_notify(watch_handle.rados_client, oid, notify_id, {watch_handle.gid, watch_handle.handle}, bufferlist()); } })); } } finish_notify(rados_client, oid, notify_id); if (--m_pending_notifies == 0) { m_file_watcher_cond.Signal(); } } void TestWatchNotify::ack_notify(TestRadosClient *rados_client, const std::string &oid, uint64_t notify_id, const WatcherID &watcher_id, const bufferlist &bl) { CephContext *cct = rados_client->cct(); ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << ", WatcherID=" << watcher_id << dendl; assert(m_lock.is_locked()); SharedWatcher watcher = get_watcher(oid); NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); if (it == watcher->notify_handles.end()) { ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id << ", WatcherID=" << watcher_id << ": not found" << dendl; return; } bufferlist response; response.append(bl); SharedNotifyHandle notify_handle = it->second; notify_handle->notify_responses[watcher_id] = response; notify_handle->pending_watcher_ids.erase(watcher_id); } void TestWatchNotify::finish_notify(TestRadosClient *rados_client, const std::string &oid, uint64_t notify_id) { CephContext *cct = rados_client->cct(); ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl; assert(m_lock.is_locked()); SharedWatcher watcher = get_watcher(oid); NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); if (it == watcher->notify_handles.end()) { ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id << ": not found" << dendl; return; } SharedNotifyHandle notify_handle = it->second; if (!notify_handle->pending_watcher_ids.empty()) { ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id << ": pending watchers, returning" << dendl; return; } ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << ": completing" << dendl; if (notify_handle->pbl != NULL) { ::encode(notify_handle->notify_responses, *notify_handle->pbl); ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl); } notify_handle->rados_client->get_aio_finisher()->queue( notify_handle->on_notify, 0); watcher->notify_handles.erase(notify_id); if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { m_file_watchers.erase(oid); } } void TestWatchNotify::blacklist(uint32_t nonce) { Mutex::Locker locker(m_lock); for (auto file_it = m_file_watchers.begin(); file_it != m_file_watchers.end(); ) { auto &watcher = file_it->second; for (auto w_it = watcher->watch_handles.begin(); w_it != watcher->watch_handles.end();) { if (w_it->second.nonce == nonce) { w_it = watcher->watch_handles.erase(w_it); } else { ++w_it; } } if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { file_it = m_file_watchers.erase(file_it); } else { ++file_it; } } } } // namespace librados