+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef DUMBBACKEND
-#define DUMBBACKEND
-
-#include "backend.h"
-#include "include/Context.h"
-#include "os/ObjectStore.h"
-#include "common/WorkQueue.h"
-#include "common/Semaphore.h"
-
-#include <deque>
-
-class DumbBackend : public Backend {
- const string path;
-
- struct write_item {
- const string oid;
- bufferlist bl;
- uint64_t offset;
- Context *on_applied;
- Context *on_commit;
- write_item(
- const string &oid,
- const bufferlist &bl,
- uint64_t offset,
- Context *on_applied,
- Context *on_commit) :
- oid(oid), bl(bl), offset(offset), on_applied(on_applied),
- on_commit(on_commit) {}
- };
-
- Semaphore sem;
-
- bool do_fsync;
- bool do_sync_file_range;
- bool do_fadvise;
- unsigned sync_interval;
- int sync_fd;
- ThreadPool tp;
-
- class SyncThread : public Thread {
- DumbBackend *backend;
- public:
- explicit SyncThread(DumbBackend *backend) : backend(backend) {}
- void *entry() override {
- backend->sync_loop();
- return 0;
- }
- } thread;
- friend class SyncThread;
-
- Mutex sync_loop_mutex;
- Cond sync_loop_cond;
- int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
- void sync_loop();
-
- Mutex pending_commit_mutex;
- set<Context*> pending_commits;
-
- class WriteQueue : public ThreadPool::WorkQueue<write_item> {
- deque<write_item*> item_queue;
- DumbBackend *backend;
-
- public:
- WriteQueue(
- DumbBackend *_backend,
- time_t ti,
- ThreadPool *tp) :
- ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
- backend(_backend) {}
- bool _enqueue(write_item *item) override {
- item_queue.push_back(item);
- return true;
- }
- void _dequeue(write_item*) override { ceph_abort(); }
- write_item *_dequeue() override {
- if (item_queue.empty())
- return 0;
- write_item *retval = item_queue.front();
- item_queue.pop_front();
- return retval;
- }
- bool _empty() override {
- return item_queue.empty();
- }
- void _process(write_item *item, ThreadPool::TPHandle &) override {
- return backend->_write(
- item->oid,
- item->offset,
- item->bl,
- item->on_applied,
- item->on_commit);
- }
- void _clear() override {
- return item_queue.clear();
- }
- } queue;
- friend class WriteQueue;
-
- string get_full_path(const string &oid);
-
- void _write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit);
-
-public:
- DumbBackend(
- const string &path,
- bool do_fsync,
- bool do_sync_file_range,
- bool do_fadvise,
- unsigned sync_interval,
- int sync_fd,
- unsigned worker_threads,
- CephContext *cct)
- : path(path), do_fsync(do_fsync),
- do_sync_file_range(do_sync_file_range),
- do_fadvise(do_fadvise),
- sync_interval(sync_interval),
- sync_fd(sync_fd),
- tp(cct, "DumbBackend::tp", "tp_dumb_backend", worker_threads),
- thread(this),
- sync_loop_mutex("DumbBackend::sync_loop_mutex"),
- sync_loop_stop(0),
- pending_commit_mutex("DumbBackend::pending_commit_mutex"),
- queue(this, 20, &tp) {
- thread.create("thread");
- tp.start();
- for (unsigned i = 0; i < 10*worker_threads; ++i) {
- sem.Put();
- }
- }
- ~DumbBackend() override {
- {
- Mutex::Locker l(sync_loop_mutex);
- if (sync_loop_stop == 0)
- sync_loop_stop = 1;
- while (sync_loop_stop < 2)
- sync_loop_cond.Wait(sync_loop_mutex);
- }
- tp.stop();
- thread.join();
- }
- void write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit) override {
- sem.Get();
- queue.queue(
- new write_item(
- oid, bl, offset, on_applied, on_commit));
- }
-
- void read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete) override;
-};
-
-#endif