// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #ifndef DUMBBACKEND #define DUMBBACKEND #include "backend.h" #include "include/Context.h" #include "os/ObjectStore.h" #include "common/WorkQueue.h" #include "common/Semaphore.h" #include class DumbBackend : public Backend { const string path; struct write_item { const string oid; bufferlist bl; uint64_t offset; Context *on_applied; Context *on_commit; write_item( const string &oid, const bufferlist &bl, uint64_t offset, Context *on_applied, Context *on_commit) : oid(oid), bl(bl), offset(offset), on_applied(on_applied), on_commit(on_commit) {} }; Semaphore sem; bool do_fsync; bool do_sync_file_range; bool do_fadvise; unsigned sync_interval; int sync_fd; ThreadPool tp; class SyncThread : public Thread { DumbBackend *backend; public: explicit SyncThread(DumbBackend *backend) : backend(backend) {} void *entry() override { backend->sync_loop(); return 0; } } thread; friend class SyncThread; Mutex sync_loop_mutex; Cond sync_loop_cond; int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped void sync_loop(); Mutex pending_commit_mutex; set pending_commits; class WriteQueue : public ThreadPool::WorkQueue { deque item_queue; DumbBackend *backend; public: WriteQueue( DumbBackend *_backend, time_t ti, ThreadPool *tp) : ThreadPool::WorkQueue("DumbBackend::queue", ti, ti*10, tp), backend(_backend) {} bool _enqueue(write_item *item) override { item_queue.push_back(item); return true; } void _dequeue(write_item*) override { ceph_abort(); } write_item *_dequeue() override { if (item_queue.empty()) return 0; write_item *retval = item_queue.front(); item_queue.pop_front(); return retval; } bool _empty() override { return item_queue.empty(); } void _process(write_item *item, ThreadPool::TPHandle &) override { return backend->_write( item->oid, item->offset, item->bl, item->on_applied, item->on_commit); } void _clear() override { return item_queue.clear(); } } queue; friend class WriteQueue; string get_full_path(const string &oid); void _write( const string &oid, uint64_t offset, const bufferlist &bl, Context *on_applied, Context *on_commit); public: DumbBackend( const string &path, bool do_fsync, bool do_sync_file_range, bool do_fadvise, unsigned sync_interval, int sync_fd, unsigned worker_threads, CephContext *cct) : path(path), do_fsync(do_fsync), do_sync_file_range(do_sync_file_range), do_fadvise(do_fadvise), sync_interval(sync_interval), sync_fd(sync_fd), tp(cct, "DumbBackend::tp", "tp_dumb_backend", worker_threads), thread(this), sync_loop_mutex("DumbBackend::sync_loop_mutex"), sync_loop_stop(0), pending_commit_mutex("DumbBackend::pending_commit_mutex"), queue(this, 20, &tp) { thread.create("thread"); tp.start(); for (unsigned i = 0; i < 10*worker_threads; ++i) { sem.Put(); } } ~DumbBackend() override { { Mutex::Locker l(sync_loop_mutex); if (sync_loop_stop == 0) sync_loop_stop = 1; while (sync_loop_stop < 2) sync_loop_cond.Wait(sync_loop_mutex); } tp.stop(); thread.join(); } void write( const string &oid, uint64_t offset, const bufferlist &bl, Context *on_applied, Context *on_commit) override { sem.Get(); queue.queue( new write_item( oid, bl, offset, on_applied, on_commit)); } void read( const string &oid, uint64_t offset, uint64_t length, bufferlist *bl, Context *on_complete) override; }; #endif