// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef CEPH_TEST_WATCH_NOTIFY_H #define CEPH_TEST_WATCH_NOTIFY_H #include "include/rados/librados.hpp" #include "common/Cond.h" #include "common/Mutex.h" #include #include #include #include class Cond; class Finisher; namespace librados { class TestRadosClient; class TestWatchNotify : boost::noncopyable { public: typedef std::pair WatcherID; typedef std::set WatcherIDs; typedef std::map, bufferlist> NotifyResponses; struct NotifyHandle { TestRadosClient *rados_client = nullptr; WatcherIDs pending_watcher_ids; NotifyResponses notify_responses; bufferlist *pbl = nullptr; Context *on_notify = nullptr; }; typedef boost::shared_ptr SharedNotifyHandle; typedef std::map NotifyHandles; struct WatchHandle { TestRadosClient *rados_client = nullptr; std::string addr; uint32_t nonce; uint64_t gid; uint64_t handle; librados::WatchCtx* watch_ctx; librados::WatchCtx2* watch_ctx2; }; typedef std::map WatchHandles; struct Watcher { WatchHandles watch_handles; NotifyHandles notify_handles; }; typedef boost::shared_ptr SharedWatcher; TestWatchNotify(); int list_watchers(const std::string& o, std::list *out_watchers); void aio_flush(TestRadosClient *rados_client, Context *on_finish); void aio_watch(TestRadosClient *rados_client, const std::string& o, uint64_t gid, uint64_t *handle, librados::WatchCtx2 *watch_ctx, Context *on_finish); void aio_unwatch(TestRadosClient *rados_client, uint64_t handle, Context *on_finish); void aio_notify(TestRadosClient *rados_client, const std::string& oid, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl, Context *on_notify); void flush(TestRadosClient *rados_client); int notify(TestRadosClient *rados_client, const std::string& o, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl); void notify_ack(TestRadosClient *rados_client, const std::string& o, uint64_t notify_id, uint64_t handle, uint64_t gid, bufferlist& bl); int watch(TestRadosClient *rados_client, const std::string& o, uint64_t gid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2); int unwatch(TestRadosClient *rados_client, uint64_t handle); void blacklist(uint32_t nonce); private: typedef std::map FileWatchers; uint64_t m_handle = 0; uint64_t m_notify_id = 0; Mutex m_lock; uint64_t m_pending_notifies = 0; Cond m_file_watcher_cond; FileWatchers m_file_watchers; SharedWatcher get_watcher(const std::string& oid); void execute_notify(TestRadosClient *rados_client, const std::string &oid, bufferlist &bl, uint64_t notify_id); void ack_notify(TestRadosClient *rados_client, const std::string &oid, uint64_t notify_id, const WatcherID &watcher_id, const bufferlist &bl); void finish_notify(TestRadosClient *rados_client, const std::string &oid, uint64_t notify_id); }; } // namespace librados #endif // CEPH_TEST_WATCH_NOTIFY_H